This is an automated email from the ASF dual-hosted git repository.
robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 619c0d9ca2c [YAML] Add ability to specify JDBC type to read/write
(#30024)
619c0d9ca2c is described below
commit 619c0d9ca2c5c78e033b1cd5c7dacfc88f462836
Author: Jeff Kinard <[email protected]>
AuthorDate: Thu Feb 8 14:13:18 2024 -0500
[YAML] Add ability to specify JDBC type to read/write (#30024)
PR adds ability to specify a JDBC type/flavor to the ReadFromJdbc and
WriteToJdbc Beam YAML transforms rather than having to manually specify a
driver class.
---
.../io/jdbc/JdbcReadSchemaTransformProvider.java | 36 +++++++-
.../java/org/apache/beam/sdk/io/jdbc/JdbcUtil.java | 19 +++++
.../io/jdbc/JdbcWriteSchemaTransformProvider.java | 36 +++++++-
.../jdbc/JdbcReadSchemaTransformProviderTest.java | 76 +++++++++++++++++
.../jdbc/JdbcWriteSchemaTransformProviderTest.java | 98 ++++++++++++++++++++--
sdks/python/apache_beam/yaml/standard_io.yaml | 18 ++--
6 files changed, 262 insertions(+), 21 deletions(-)
diff --git
a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProvider.java
b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProvider.java
index 3b504b1a90d..0139207235a 100644
---
a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProvider.java
+++
b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProvider.java
@@ -17,11 +17,14 @@
*/
package org.apache.beam.sdk.io.jdbc;
+import static org.apache.beam.sdk.io.jdbc.JdbcUtil.JDBC_DRIVER_MAP;
+
import com.google.auto.service.AutoService;
import com.google.auto.value.AutoValue;
import java.io.Serializable;
import java.util.Collections;
import java.util.List;
+import java.util.Objects;
import javax.annotation.Nullable;
import org.apache.beam.sdk.schemas.AutoValueSchema;
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
@@ -68,8 +71,15 @@ public class JdbcReadSchemaTransformProvider
}
protected JdbcIO.DataSourceConfiguration dataSourceConfiguration() {
+ String driverClassName = config.getDriverClassName();
+
+ if (Strings.isNullOrEmpty(driverClassName)) {
+ driverClassName =
+
JDBC_DRIVER_MAP.get(Objects.requireNonNull(config.getJdbcType()).toLowerCase());
+ }
+
JdbcIO.DataSourceConfiguration dsConfig =
- JdbcIO.DataSourceConfiguration.create(config.getDriverClassName(),
config.getJdbcUrl())
+ JdbcIO.DataSourceConfiguration.create(driverClassName,
config.getJdbcUrl())
.withUsername("".equals(config.getUsername()) ? null :
config.getUsername())
.withPassword("".equals(config.getPassword()) ? null :
config.getPassword());
String connectionProperties = config.getConnectionProperties();
@@ -131,8 +141,12 @@ public class JdbcReadSchemaTransformProvider
@AutoValue
@DefaultSchema(AutoValueSchema.class)
public abstract static class JdbcReadSchemaTransformConfiguration implements
Serializable {
+ @Nullable
public abstract String getDriverClassName();
+ @Nullable
+ public abstract String getJdbcType();
+
public abstract String getJdbcUrl();
@Nullable
@@ -164,13 +178,25 @@ public class JdbcReadSchemaTransformProvider
public abstract String getDriverJars();
public void validate() throws IllegalArgumentException {
- if (Strings.isNullOrEmpty(getDriverClassName())) {
- throw new IllegalArgumentException("JDBC Driver class name cannot be
blank.");
- }
if (Strings.isNullOrEmpty(getJdbcUrl())) {
throw new IllegalArgumentException("JDBC URL cannot be blank");
}
+ boolean driverClassNamePresent =
!Strings.isNullOrEmpty(getDriverClassName());
+ boolean jdbcTypePresent = !Strings.isNullOrEmpty(getJdbcType());
+ if (driverClassNamePresent && jdbcTypePresent) {
+ throw new IllegalArgumentException(
+ "JDBC Driver class name and JDBC type are mutually exclusive
configurations.");
+ }
+ if (!driverClassNamePresent && !jdbcTypePresent) {
+ throw new IllegalArgumentException(
+ "One of JDBC Driver class name or JDBC type must be specified.");
+ }
+ if (jdbcTypePresent
+ &&
!JDBC_DRIVER_MAP.containsKey(Objects.requireNonNull(getJdbcType()).toLowerCase()))
{
+ throw new IllegalArgumentException("JDBC type must be one of " +
JDBC_DRIVER_MAP.keySet());
+ }
+
boolean readQueryPresent = (getReadQuery() != null &&
!"".equals(getReadQuery()));
boolean locationPresent = (getLocation() != null &&
!"".equals(getLocation()));
@@ -192,6 +218,8 @@ public class JdbcReadSchemaTransformProvider
public abstract static class Builder {
public abstract Builder setDriverClassName(String value);
+ public abstract Builder setJdbcType(String value);
+
public abstract Builder setJdbcUrl(String value);
public abstract Builder setUsername(String value);
diff --git
a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcUtil.java
b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcUtil.java
index 8c7ee17d5fc..8d82938e596 100644
--- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcUtil.java
+++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcUtil.java
@@ -37,6 +37,7 @@ import java.util.ArrayList;
import java.util.Calendar;
import java.util.Collection;
import java.util.EnumMap;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -58,6 +59,7 @@ import org.apache.beam.sdk.util.Preconditions;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TypeDescriptor;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Splitter;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
@@ -75,6 +77,23 @@ class JdbcUtil {
private static final Logger LOG = LoggerFactory.getLogger(JdbcUtil.class);
+ static final Map<String, String> JDBC_DRIVER_MAP =
+ new HashMap<>(
+ ImmutableMap.of(
+ "mysql",
+ "com.mysql.cj.jdbc.Driver",
+ "postgres",
+ "org.postgresql.Driver",
+ "oracle",
+ "oracle.jdbc.driver.OracleDriver",
+ "mssql",
+ "com.microsoft.sqlserver.jdbc.SQLServerDriver"));
+
+ @VisibleForTesting
+ static void registerJdbcDriver(Map<String, String> jdbcType) {
+ JDBC_DRIVER_MAP.putAll(jdbcType);
+ }
+
/** Utility method to save jar files locally in the worker. */
static URL[] saveFilesLocally(String driverJars) {
List<String> listOfJarPaths =
Splitter.on(',').trimResults().splitToList(driverJars);
diff --git
a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcWriteSchemaTransformProvider.java
b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcWriteSchemaTransformProvider.java
index e9f67969626..a409b604b11 100644
---
a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcWriteSchemaTransformProvider.java
+++
b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcWriteSchemaTransformProvider.java
@@ -17,11 +17,14 @@
*/
package org.apache.beam.sdk.io.jdbc;
+import static org.apache.beam.sdk.io.jdbc.JdbcUtil.JDBC_DRIVER_MAP;
+
import com.google.auto.service.AutoService;
import com.google.auto.value.AutoValue;
import java.io.Serializable;
import java.util.Collections;
import java.util.List;
+import java.util.Objects;
import javax.annotation.Nullable;
import org.apache.beam.sdk.schemas.AutoValueSchema;
import org.apache.beam.sdk.schemas.Schema;
@@ -73,8 +76,15 @@ public class JdbcWriteSchemaTransformProvider
}
protected JdbcIO.DataSourceConfiguration dataSourceConfiguration() {
+ String driverClassName = config.getDriverClassName();
+
+ if (Strings.isNullOrEmpty(driverClassName)) {
+ driverClassName =
+
JDBC_DRIVER_MAP.get(Objects.requireNonNull(config.getJdbcType()).toLowerCase());
+ }
+
JdbcIO.DataSourceConfiguration dsConfig =
- JdbcIO.DataSourceConfiguration.create(config.getDriverClassName(),
config.getJdbcUrl())
+ JdbcIO.DataSourceConfiguration.create(driverClassName,
config.getJdbcUrl())
.withUsername("".equals(config.getUsername()) ? null :
config.getUsername())
.withPassword("".equals(config.getPassword()) ? null :
config.getPassword());
String connectionProperties = config.getConnectionProperties();
@@ -162,8 +172,12 @@ public class JdbcWriteSchemaTransformProvider
@DefaultSchema(AutoValueSchema.class)
public abstract static class JdbcWriteSchemaTransformConfiguration
implements Serializable {
+ @Nullable
public abstract String getDriverClassName();
+ @Nullable
+ public abstract String getJdbcType();
+
public abstract String getJdbcUrl();
@Nullable
@@ -192,13 +206,25 @@ public class JdbcWriteSchemaTransformProvider
public abstract String getDriverJars();
public void validate() throws IllegalArgumentException {
- if (Strings.isNullOrEmpty(getDriverClassName())) {
- throw new IllegalArgumentException("JDBC Driver class name cannot be
blank.");
- }
if (Strings.isNullOrEmpty(getJdbcUrl())) {
throw new IllegalArgumentException("JDBC URL cannot be blank");
}
+ boolean driverClassNamePresent =
!Strings.isNullOrEmpty(getDriverClassName());
+ boolean jdbcTypePresent = !Strings.isNullOrEmpty(getJdbcType());
+ if (driverClassNamePresent && jdbcTypePresent) {
+ throw new IllegalArgumentException(
+ "JDBC Driver class name and JDBC type are mutually exclusive
configurations.");
+ }
+ if (!driverClassNamePresent && !jdbcTypePresent) {
+ throw new IllegalArgumentException(
+ "One of JDBC Driver class name or JDBC type must be specified.");
+ }
+ if (jdbcTypePresent
+ &&
!JDBC_DRIVER_MAP.containsKey(Objects.requireNonNull(getJdbcType()).toLowerCase()))
{
+ throw new IllegalArgumentException("JDBC type must be one of " +
JDBC_DRIVER_MAP.keySet());
+ }
+
boolean writeStatementPresent =
(getWriteStatement() != null && !"".equals(getWriteStatement()));
boolean locationPresent = (getLocation() != null &&
!"".equals(getLocation()));
@@ -221,6 +247,8 @@ public class JdbcWriteSchemaTransformProvider
public abstract static class Builder {
public abstract Builder setDriverClassName(String value);
+ public abstract Builder setJdbcType(String value);
+
public abstract Builder setJdbcUrl(String value);
public abstract Builder setUsername(String value);
diff --git
a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProviderTest.java
b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProviderTest.java
index 251f995dea4..7cbdd48d158 100644
---
a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProviderTest.java
+++
b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProviderTest.java
@@ -17,12 +17,15 @@
*/
package org.apache.beam.sdk.io.jdbc;
+import static org.apache.beam.sdk.io.jdbc.JdbcUtil.JDBC_DRIVER_MAP;
+import static org.apache.beam.sdk.io.jdbc.JdbcUtil.registerJdbcDriver;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThrows;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
+import java.util.Objects;
import java.util.ServiceLoader;
import javax.sql.DataSource;
import org.apache.beam.sdk.io.common.DatabaseTestHelper;
@@ -34,6 +37,7 @@ import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionRowTuple;
import org.apache.beam.sdk.values.Row;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
@@ -60,6 +64,10 @@ public class JdbcReadSchemaTransformProviderTest {
System.setProperty("derby.locks.waitTimeout", "2");
System.setProperty("derby.stream.error.file", "build/derby.log");
+ registerJdbcDriver(
+ ImmutableMap.of(
+ "derby",
Objects.requireNonNull(DATA_SOURCE_CONFIGURATION.getDriverClassName()).get()));
+
DatabaseTestHelper.createTable(DATA_SOURCE, READ_TABLE_NAME);
addInitialData(DATA_SOURCE, READ_TABLE_NAME);
}
@@ -95,6 +103,48 @@ public class JdbcReadSchemaTransformProviderTest {
.build()
.validate();
});
+ assertThrows(
+ IllegalArgumentException.class,
+ () -> {
+
JdbcReadSchemaTransformProvider.JdbcReadSchemaTransformConfiguration.builder()
+ .setJdbcUrl("JdbcUrl")
+ .setLocation("Location")
+ .setJdbcType("invalidType")
+ .build()
+ .validate();
+ });
+ assertThrows(
+ IllegalArgumentException.class,
+ () -> {
+
JdbcReadSchemaTransformProvider.JdbcReadSchemaTransformConfiguration.builder()
+ .setJdbcUrl("JdbcUrl")
+ .setLocation("Location")
+ .build()
+ .validate();
+ });
+ assertThrows(
+ IllegalArgumentException.class,
+ () -> {
+
JdbcReadSchemaTransformProvider.JdbcReadSchemaTransformConfiguration.builder()
+ .setJdbcUrl("JdbcUrl")
+ .setLocation("Location")
+ .setDriverClassName("ClassName")
+ .setJdbcType((String) JDBC_DRIVER_MAP.keySet().toArray()[0])
+ .build()
+ .validate();
+ });
+ }
+
+ @Test
+ public void testValidReadSchemaOptions() {
+ for (String jdbcType : JDBC_DRIVER_MAP.keySet()) {
+
JdbcReadSchemaTransformProvider.JdbcReadSchemaTransformConfiguration.builder()
+ .setJdbcUrl("JdbcUrl")
+ .setLocation("Location")
+ .setJdbcType(jdbcType)
+ .build()
+ .validate();
+ }
}
@Test
@@ -123,6 +173,32 @@ public class JdbcReadSchemaTransformProviderTest {
pipeline.run();
}
+ @Test
+ public void testReadWithJdbcTypeSpecified() {
+ JdbcReadSchemaTransformProvider provider = null;
+ for (SchemaTransformProvider p :
ServiceLoader.load(SchemaTransformProvider.class)) {
+ if (p instanceof JdbcReadSchemaTransformProvider) {
+ provider = (JdbcReadSchemaTransformProvider) p;
+ break;
+ }
+ }
+ assertNotNull(provider);
+
+ PCollection<Row> output =
+ PCollectionRowTuple.empty(pipeline)
+ .apply(
+ provider.from(
+
JdbcReadSchemaTransformProvider.JdbcReadSchemaTransformConfiguration.builder()
+ .setJdbcUrl(DATA_SOURCE_CONFIGURATION.getUrl().get())
+ .setJdbcType("derby")
+ .setLocation(READ_TABLE_NAME)
+ .build()))
+ .get("output");
+ Long expected = Long.valueOf(EXPECTED_ROW_COUNT);
+ PAssert.that(output.apply(Count.globally())).containsInAnyOrder(expected);
+ pipeline.run();
+ }
+
/** Create test data that is consistent with that generated by TestRow. */
private static void addInitialData(DataSource dataSource, String tableName)
throws SQLException {
try (Connection connection = dataSource.getConnection()) {
diff --git
a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcWriteSchemaTransformProviderTest.java
b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcWriteSchemaTransformProviderTest.java
index 64de7a1b56c..f66a143323e 100644
---
a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcWriteSchemaTransformProviderTest.java
+++
b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcWriteSchemaTransformProviderTest.java
@@ -17,11 +17,14 @@
*/
package org.apache.beam.sdk.io.jdbc;
+import static org.apache.beam.sdk.io.jdbc.JdbcUtil.JDBC_DRIVER_MAP;
+import static org.apache.beam.sdk.io.jdbc.JdbcUtil.registerJdbcDriver;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThrows;
import java.sql.SQLException;
import java.util.List;
+import java.util.Objects;
import java.util.ServiceLoader;
import javax.sql.DataSource;
import org.apache.beam.sdk.io.common.DatabaseTestHelper;
@@ -32,6 +35,8 @@ import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.values.PCollectionRowTuple;
import org.apache.beam.sdk.values.Row;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
@@ -45,7 +50,7 @@ public class JdbcWriteSchemaTransformProviderTest {
JdbcIO.DataSourceConfiguration.create(
"org.apache.derby.jdbc.EmbeddedDriver",
"jdbc:derby:memory:testDB;create=true");
private static final DataSource DATA_SOURCE =
DATA_SOURCE_CONFIGURATION.buildDatasource();
- private static final String WRITE_TABLE_NAME =
DatabaseTestHelper.getTestTableName("UT_WRITE");
+ private String writeTableName;
@Rule public final transient TestPipeline pipeline = TestPipeline.create();
@@ -56,7 +61,15 @@ public class JdbcWriteSchemaTransformProviderTest {
System.setProperty("derby.locks.waitTimeout", "2");
System.setProperty("derby.stream.error.file", "build/derby.log");
- DatabaseTestHelper.createTable(DATA_SOURCE, WRITE_TABLE_NAME);
+ registerJdbcDriver(
+ ImmutableMap.of(
+ "derby",
Objects.requireNonNull(DATA_SOURCE_CONFIGURATION.getDriverClassName()).get()));
+ }
+
+ @Before
+ public void before() throws SQLException {
+ writeTableName = DatabaseTestHelper.getTestTableName("UT_WRITE");
+ DatabaseTestHelper.createTable(DATA_SOURCE, writeTableName);
}
@Test
@@ -90,10 +103,52 @@ public class JdbcWriteSchemaTransformProviderTest {
.build()
.validate();
});
+ assertThrows(
+ IllegalArgumentException.class,
+ () -> {
+
JdbcWriteSchemaTransformProvider.JdbcWriteSchemaTransformConfiguration.builder()
+ .setJdbcUrl("JdbcUrl")
+ .setLocation("Location")
+ .setJdbcType("invalidType")
+ .build()
+ .validate();
+ });
+ assertThrows(
+ IllegalArgumentException.class,
+ () -> {
+
JdbcWriteSchemaTransformProvider.JdbcWriteSchemaTransformConfiguration.builder()
+ .setJdbcUrl("JdbcUrl")
+ .setLocation("Location")
+ .build()
+ .validate();
+ });
+ assertThrows(
+ IllegalArgumentException.class,
+ () -> {
+
JdbcWriteSchemaTransformProvider.JdbcWriteSchemaTransformConfiguration.builder()
+ .setJdbcUrl("JdbcUrl")
+ .setLocation("Location")
+ .setDriverClassName("ClassName")
+ .setJdbcType((String) JDBC_DRIVER_MAP.keySet().toArray()[0])
+ .build()
+ .validate();
+ });
}
@Test
- public void testReadWriteToTable() throws SQLException {
+ public void testValidWriteSchemaOptions() {
+ for (String jdbcType : JDBC_DRIVER_MAP.keySet()) {
+
JdbcWriteSchemaTransformProvider.JdbcWriteSchemaTransformConfiguration.builder()
+ .setJdbcUrl("JdbcUrl")
+ .setLocation("Location")
+ .setJdbcType(jdbcType)
+ .build()
+ .validate();
+ }
+ }
+
+ @Test
+ public void testWriteToTable() throws SQLException {
JdbcWriteSchemaTransformProvider provider = null;
for (SchemaTransformProvider p :
ServiceLoader.load(SchemaTransformProvider.class)) {
if (p instanceof JdbcWriteSchemaTransformProvider) {
@@ -119,9 +174,42 @@ public class JdbcWriteSchemaTransformProviderTest {
JdbcWriteSchemaTransformProvider.JdbcWriteSchemaTransformConfiguration.builder()
.setDriverClassName(DATA_SOURCE_CONFIGURATION.getDriverClassName().get())
.setJdbcUrl(DATA_SOURCE_CONFIGURATION.getUrl().get())
- .setLocation(WRITE_TABLE_NAME)
+ .setLocation(writeTableName)
+ .build()));
+ pipeline.run();
+ DatabaseTestHelper.assertRowCount(DATA_SOURCE, writeTableName, 2);
+ }
+
+ @Test
+ public void testWriteToTableWithJdbcTypeSpecified() throws SQLException {
+ JdbcWriteSchemaTransformProvider provider = null;
+ for (SchemaTransformProvider p :
ServiceLoader.load(SchemaTransformProvider.class)) {
+ if (p instanceof JdbcWriteSchemaTransformProvider) {
+ provider = (JdbcWriteSchemaTransformProvider) p;
+ break;
+ }
+ }
+ assertNotNull(provider);
+
+ Schema schema =
+ Schema.of(
+ Schema.Field.of("id", Schema.FieldType.INT64),
+ Schema.Field.of("name", Schema.FieldType.STRING));
+
+ List<Row> rows =
+ ImmutableList.of(
+ Row.withSchema(schema).attachValues(1L, "name1"),
+ Row.withSchema(schema).attachValues(2L, "name2"));
+
+ PCollectionRowTuple.of("input",
pipeline.apply(Create.of(rows).withRowSchema(schema)))
+ .apply(
+ provider.from(
+
JdbcWriteSchemaTransformProvider.JdbcWriteSchemaTransformConfiguration.builder()
+ .setJdbcUrl(DATA_SOURCE_CONFIGURATION.getUrl().get())
+ .setJdbcType("derby")
+ .setLocation(writeTableName)
.build()));
pipeline.run();
- DatabaseTestHelper.assertRowCount(DATA_SOURCE, WRITE_TABLE_NAME, 2);
+ DatabaseTestHelper.assertRowCount(DATA_SOURCE, writeTableName, 2);
}
}
diff --git a/sdks/python/apache_beam/yaml/standard_io.yaml
b/sdks/python/apache_beam/yaml/standard_io.yaml
index d63729f1676..8a5ffd9f6a9 100644
--- a/sdks/python/apache_beam/yaml/standard_io.yaml
+++ b/sdks/python/apache_beam/yaml/standard_io.yaml
@@ -206,6 +206,7 @@
mappings:
'ReadFromJdbc':
driver_class_name: 'driverClassName'
+ type: 'jdbcType'
url: 'jdbcUrl'
username: 'username'
password: 'password'
@@ -216,6 +217,7 @@
connection_init_sql: 'connectionInitSql'
'WriteToJdbc':
driver_class_name: 'driverClassName'
+ type: 'jdbcType'
url: 'jdbcUrl'
username: 'username'
password: 'password'
@@ -233,21 +235,21 @@
'WriteToSqlServer': 'WriteToJdbc'
defaults:
'ReadFromMySql':
- driverClassName: 'com.mysql.jdbc.Driver'
+ jdbcType: 'mysql'
'WriteToMySql':
- driverClassName: 'com.mysql.jdbc.Driver'
+ jdbcType: 'mysql'
'ReadFromPostgres':
- driverClassName: 'org.postgresql.Driver'
+ jdbcType: 'postgres'
'WriteToPostgres':
- driverClassName: 'org.postgresql.Driver'
+ jdbcType: 'postgres'
'ReadFromOracle':
- driverClassName: 'oracle.jdbc.driver.OracleDriver'
+ jdbcType: 'oracle'
'WriteToOracle':
- driverClassName: 'oracle.jdbc.driver.OracleDriver'
+ jdbcType: 'oracle'
'ReadFromSqlServer':
- driverClassName: 'com.microsoft.sqlserver.jdbc.SQLServerDriver'
+ jdbcType: 'mssql'
'WriteToSqlServer':
- driverClassName: 'com.microsoft.sqlserver.jdbc.SQLServerDriver'
+ jdbcType: 'mssql'
underlying_provider:
type: beamJar
transforms: