This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 619c0d9ca2c [YAML] Add ability to specify JDBC type to read/write 
(#30024)
619c0d9ca2c is described below

commit 619c0d9ca2c5c78e033b1cd5c7dacfc88f462836
Author: Jeff Kinard <[email protected]>
AuthorDate: Thu Feb 8 14:13:18 2024 -0500

    [YAML] Add ability to specify JDBC type to read/write (#30024)
    
    PR adds ability to specify a JDBC type/flavor to the ReadFromJdbc and 
WriteToJdbc Beam YAML transforms rather than having to manually specify a 
driver class.
---
 .../io/jdbc/JdbcReadSchemaTransformProvider.java   | 36 +++++++-
 .../java/org/apache/beam/sdk/io/jdbc/JdbcUtil.java | 19 +++++
 .../io/jdbc/JdbcWriteSchemaTransformProvider.java  | 36 +++++++-
 .../jdbc/JdbcReadSchemaTransformProviderTest.java  | 76 +++++++++++++++++
 .../jdbc/JdbcWriteSchemaTransformProviderTest.java | 98 ++++++++++++++++++++--
 sdks/python/apache_beam/yaml/standard_io.yaml      | 18 ++--
 6 files changed, 262 insertions(+), 21 deletions(-)

diff --git 
a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProvider.java
 
b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProvider.java
index 3b504b1a90d..0139207235a 100644
--- 
a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProvider.java
+++ 
b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProvider.java
@@ -17,11 +17,14 @@
  */
 package org.apache.beam.sdk.io.jdbc;
 
+import static org.apache.beam.sdk.io.jdbc.JdbcUtil.JDBC_DRIVER_MAP;
+
 import com.google.auto.service.AutoService;
 import com.google.auto.value.AutoValue;
 import java.io.Serializable;
 import java.util.Collections;
 import java.util.List;
+import java.util.Objects;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.schemas.AutoValueSchema;
 import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
@@ -68,8 +71,15 @@ public class JdbcReadSchemaTransformProvider
     }
 
     protected JdbcIO.DataSourceConfiguration dataSourceConfiguration() {
+      String driverClassName = config.getDriverClassName();
+
+      if (Strings.isNullOrEmpty(driverClassName)) {
+        driverClassName =
+            
JDBC_DRIVER_MAP.get(Objects.requireNonNull(config.getJdbcType()).toLowerCase());
+      }
+
       JdbcIO.DataSourceConfiguration dsConfig =
-          JdbcIO.DataSourceConfiguration.create(config.getDriverClassName(), 
config.getJdbcUrl())
+          JdbcIO.DataSourceConfiguration.create(driverClassName, 
config.getJdbcUrl())
               .withUsername("".equals(config.getUsername()) ? null : 
config.getUsername())
               .withPassword("".equals(config.getPassword()) ? null : 
config.getPassword());
       String connectionProperties = config.getConnectionProperties();
@@ -131,8 +141,12 @@ public class JdbcReadSchemaTransformProvider
   @AutoValue
   @DefaultSchema(AutoValueSchema.class)
   public abstract static class JdbcReadSchemaTransformConfiguration implements 
Serializable {
+    @Nullable
     public abstract String getDriverClassName();
 
+    @Nullable
+    public abstract String getJdbcType();
+
     public abstract String getJdbcUrl();
 
     @Nullable
@@ -164,13 +178,25 @@ public class JdbcReadSchemaTransformProvider
     public abstract String getDriverJars();
 
     public void validate() throws IllegalArgumentException {
-      if (Strings.isNullOrEmpty(getDriverClassName())) {
-        throw new IllegalArgumentException("JDBC Driver class name cannot be 
blank.");
-      }
       if (Strings.isNullOrEmpty(getJdbcUrl())) {
         throw new IllegalArgumentException("JDBC URL cannot be blank");
       }
 
+      boolean driverClassNamePresent = 
!Strings.isNullOrEmpty(getDriverClassName());
+      boolean jdbcTypePresent = !Strings.isNullOrEmpty(getJdbcType());
+      if (driverClassNamePresent && jdbcTypePresent) {
+        throw new IllegalArgumentException(
+            "JDBC Driver class name and JDBC type are mutually exclusive 
configurations.");
+      }
+      if (!driverClassNamePresent && !jdbcTypePresent) {
+        throw new IllegalArgumentException(
+            "One of JDBC Driver class name or JDBC type must be specified.");
+      }
+      if (jdbcTypePresent
+          && 
!JDBC_DRIVER_MAP.containsKey(Objects.requireNonNull(getJdbcType()).toLowerCase()))
 {
+        throw new IllegalArgumentException("JDBC type must be one of " + 
JDBC_DRIVER_MAP.keySet());
+      }
+
       boolean readQueryPresent = (getReadQuery() != null && 
!"".equals(getReadQuery()));
       boolean locationPresent = (getLocation() != null && 
!"".equals(getLocation()));
 
@@ -192,6 +218,8 @@ public class JdbcReadSchemaTransformProvider
     public abstract static class Builder {
       public abstract Builder setDriverClassName(String value);
 
+      public abstract Builder setJdbcType(String value);
+
       public abstract Builder setJdbcUrl(String value);
 
       public abstract Builder setUsername(String value);
diff --git 
a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcUtil.java 
b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcUtil.java
index 8c7ee17d5fc..8d82938e596 100644
--- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcUtil.java
+++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcUtil.java
@@ -37,6 +37,7 @@ import java.util.ArrayList;
 import java.util.Calendar;
 import java.util.Collection;
 import java.util.EnumMap;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -58,6 +59,7 @@ import org.apache.beam.sdk.util.Preconditions;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.Row;
 import org.apache.beam.sdk.values.TypeDescriptor;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Splitter;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
@@ -75,6 +77,23 @@ class JdbcUtil {
 
   private static final Logger LOG = LoggerFactory.getLogger(JdbcUtil.class);
 
+  static final Map<String, String> JDBC_DRIVER_MAP =
+      new HashMap<>(
+          ImmutableMap.of(
+              "mysql",
+              "com.mysql.cj.jdbc.Driver",
+              "postgres",
+              "org.postgresql.Driver",
+              "oracle",
+              "oracle.jdbc.driver.OracleDriver",
+              "mssql",
+              "com.microsoft.sqlserver.jdbc.SQLServerDriver"));
+
+  @VisibleForTesting
+  static void registerJdbcDriver(Map<String, String> jdbcType) {
+    JDBC_DRIVER_MAP.putAll(jdbcType);
+  }
+
   /** Utility method to save jar files locally in the worker. */
   static URL[] saveFilesLocally(String driverJars) {
     List<String> listOfJarPaths = 
Splitter.on(',').trimResults().splitToList(driverJars);
diff --git 
a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcWriteSchemaTransformProvider.java
 
b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcWriteSchemaTransformProvider.java
index e9f67969626..a409b604b11 100644
--- 
a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcWriteSchemaTransformProvider.java
+++ 
b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcWriteSchemaTransformProvider.java
@@ -17,11 +17,14 @@
  */
 package org.apache.beam.sdk.io.jdbc;
 
+import static org.apache.beam.sdk.io.jdbc.JdbcUtil.JDBC_DRIVER_MAP;
+
 import com.google.auto.service.AutoService;
 import com.google.auto.value.AutoValue;
 import java.io.Serializable;
 import java.util.Collections;
 import java.util.List;
+import java.util.Objects;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.schemas.AutoValueSchema;
 import org.apache.beam.sdk.schemas.Schema;
@@ -73,8 +76,15 @@ public class JdbcWriteSchemaTransformProvider
     }
 
     protected JdbcIO.DataSourceConfiguration dataSourceConfiguration() {
+      String driverClassName = config.getDriverClassName();
+
+      if (Strings.isNullOrEmpty(driverClassName)) {
+        driverClassName =
+            
JDBC_DRIVER_MAP.get(Objects.requireNonNull(config.getJdbcType()).toLowerCase());
+      }
+
       JdbcIO.DataSourceConfiguration dsConfig =
-          JdbcIO.DataSourceConfiguration.create(config.getDriverClassName(), 
config.getJdbcUrl())
+          JdbcIO.DataSourceConfiguration.create(driverClassName, 
config.getJdbcUrl())
               .withUsername("".equals(config.getUsername()) ? null : 
config.getUsername())
               .withPassword("".equals(config.getPassword()) ? null : 
config.getPassword());
       String connectionProperties = config.getConnectionProperties();
@@ -162,8 +172,12 @@ public class JdbcWriteSchemaTransformProvider
   @DefaultSchema(AutoValueSchema.class)
   public abstract static class JdbcWriteSchemaTransformConfiguration 
implements Serializable {
 
+    @Nullable
     public abstract String getDriverClassName();
 
+    @Nullable
+    public abstract String getJdbcType();
+
     public abstract String getJdbcUrl();
 
     @Nullable
@@ -192,13 +206,25 @@ public class JdbcWriteSchemaTransformProvider
     public abstract String getDriverJars();
 
     public void validate() throws IllegalArgumentException {
-      if (Strings.isNullOrEmpty(getDriverClassName())) {
-        throw new IllegalArgumentException("JDBC Driver class name cannot be 
blank.");
-      }
       if (Strings.isNullOrEmpty(getJdbcUrl())) {
         throw new IllegalArgumentException("JDBC URL cannot be blank");
       }
 
+      boolean driverClassNamePresent = 
!Strings.isNullOrEmpty(getDriverClassName());
+      boolean jdbcTypePresent = !Strings.isNullOrEmpty(getJdbcType());
+      if (driverClassNamePresent && jdbcTypePresent) {
+        throw new IllegalArgumentException(
+            "JDBC Driver class name and JDBC type are mutually exclusive 
configurations.");
+      }
+      if (!driverClassNamePresent && !jdbcTypePresent) {
+        throw new IllegalArgumentException(
+            "One of JDBC Driver class name or JDBC type must be specified.");
+      }
+      if (jdbcTypePresent
+          && 
!JDBC_DRIVER_MAP.containsKey(Objects.requireNonNull(getJdbcType()).toLowerCase()))
 {
+        throw new IllegalArgumentException("JDBC type must be one of " + 
JDBC_DRIVER_MAP.keySet());
+      }
+
       boolean writeStatementPresent =
           (getWriteStatement() != null && !"".equals(getWriteStatement()));
       boolean locationPresent = (getLocation() != null && 
!"".equals(getLocation()));
@@ -221,6 +247,8 @@ public class JdbcWriteSchemaTransformProvider
     public abstract static class Builder {
       public abstract Builder setDriverClassName(String value);
 
+      public abstract Builder setJdbcType(String value);
+
       public abstract Builder setJdbcUrl(String value);
 
       public abstract Builder setUsername(String value);
diff --git 
a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProviderTest.java
 
b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProviderTest.java
index 251f995dea4..7cbdd48d158 100644
--- 
a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProviderTest.java
+++ 
b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProviderTest.java
@@ -17,12 +17,15 @@
  */
 package org.apache.beam.sdk.io.jdbc;
 
+import static org.apache.beam.sdk.io.jdbc.JdbcUtil.JDBC_DRIVER_MAP;
+import static org.apache.beam.sdk.io.jdbc.JdbcUtil.registerJdbcDriver;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertThrows;
 
 import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.SQLException;
+import java.util.Objects;
 import java.util.ServiceLoader;
 import javax.sql.DataSource;
 import org.apache.beam.sdk.io.common.DatabaseTestHelper;
@@ -34,6 +37,7 @@ import org.apache.beam.sdk.transforms.Count;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionRowTuple;
 import org.apache.beam.sdk.values.Row;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
 import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
@@ -60,6 +64,10 @@ public class JdbcReadSchemaTransformProviderTest {
     System.setProperty("derby.locks.waitTimeout", "2");
     System.setProperty("derby.stream.error.file", "build/derby.log");
 
+    registerJdbcDriver(
+        ImmutableMap.of(
+            "derby", 
Objects.requireNonNull(DATA_SOURCE_CONFIGURATION.getDriverClassName()).get()));
+
     DatabaseTestHelper.createTable(DATA_SOURCE, READ_TABLE_NAME);
     addInitialData(DATA_SOURCE, READ_TABLE_NAME);
   }
@@ -95,6 +103,48 @@ public class JdbcReadSchemaTransformProviderTest {
               .build()
               .validate();
         });
+    assertThrows(
+        IllegalArgumentException.class,
+        () -> {
+          
JdbcReadSchemaTransformProvider.JdbcReadSchemaTransformConfiguration.builder()
+              .setJdbcUrl("JdbcUrl")
+              .setLocation("Location")
+              .setJdbcType("invalidType")
+              .build()
+              .validate();
+        });
+    assertThrows(
+        IllegalArgumentException.class,
+        () -> {
+          
JdbcReadSchemaTransformProvider.JdbcReadSchemaTransformConfiguration.builder()
+              .setJdbcUrl("JdbcUrl")
+              .setLocation("Location")
+              .build()
+              .validate();
+        });
+    assertThrows(
+        IllegalArgumentException.class,
+        () -> {
+          
JdbcReadSchemaTransformProvider.JdbcReadSchemaTransformConfiguration.builder()
+              .setJdbcUrl("JdbcUrl")
+              .setLocation("Location")
+              .setDriverClassName("ClassName")
+              .setJdbcType((String) JDBC_DRIVER_MAP.keySet().toArray()[0])
+              .build()
+              .validate();
+        });
+  }
+
+  @Test
+  public void testValidReadSchemaOptions() {
+    for (String jdbcType : JDBC_DRIVER_MAP.keySet()) {
+      
JdbcReadSchemaTransformProvider.JdbcReadSchemaTransformConfiguration.builder()
+          .setJdbcUrl("JdbcUrl")
+          .setLocation("Location")
+          .setJdbcType(jdbcType)
+          .build()
+          .validate();
+    }
   }
 
   @Test
@@ -123,6 +173,32 @@ public class JdbcReadSchemaTransformProviderTest {
     pipeline.run();
   }
 
+  @Test
+  public void testReadWithJdbcTypeSpecified() {
+    JdbcReadSchemaTransformProvider provider = null;
+    for (SchemaTransformProvider p : 
ServiceLoader.load(SchemaTransformProvider.class)) {
+      if (p instanceof JdbcReadSchemaTransformProvider) {
+        provider = (JdbcReadSchemaTransformProvider) p;
+        break;
+      }
+    }
+    assertNotNull(provider);
+
+    PCollection<Row> output =
+        PCollectionRowTuple.empty(pipeline)
+            .apply(
+                provider.from(
+                    
JdbcReadSchemaTransformProvider.JdbcReadSchemaTransformConfiguration.builder()
+                        .setJdbcUrl(DATA_SOURCE_CONFIGURATION.getUrl().get())
+                        .setJdbcType("derby")
+                        .setLocation(READ_TABLE_NAME)
+                        .build()))
+            .get("output");
+    Long expected = Long.valueOf(EXPECTED_ROW_COUNT);
+    PAssert.that(output.apply(Count.globally())).containsInAnyOrder(expected);
+    pipeline.run();
+  }
+
   /** Create test data that is consistent with that generated by TestRow. */
   private static void addInitialData(DataSource dataSource, String tableName) 
throws SQLException {
     try (Connection connection = dataSource.getConnection()) {
diff --git 
a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcWriteSchemaTransformProviderTest.java
 
b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcWriteSchemaTransformProviderTest.java
index 64de7a1b56c..f66a143323e 100644
--- 
a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcWriteSchemaTransformProviderTest.java
+++ 
b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcWriteSchemaTransformProviderTest.java
@@ -17,11 +17,14 @@
  */
 package org.apache.beam.sdk.io.jdbc;
 
+import static org.apache.beam.sdk.io.jdbc.JdbcUtil.JDBC_DRIVER_MAP;
+import static org.apache.beam.sdk.io.jdbc.JdbcUtil.registerJdbcDriver;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertThrows;
 
 import java.sql.SQLException;
 import java.util.List;
+import java.util.Objects;
 import java.util.ServiceLoader;
 import javax.sql.DataSource;
 import org.apache.beam.sdk.io.common.DatabaseTestHelper;
@@ -32,6 +35,8 @@ import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.values.PCollectionRowTuple;
 import org.apache.beam.sdk.values.Row;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
@@ -45,7 +50,7 @@ public class JdbcWriteSchemaTransformProviderTest {
       JdbcIO.DataSourceConfiguration.create(
           "org.apache.derby.jdbc.EmbeddedDriver", 
"jdbc:derby:memory:testDB;create=true");
   private static final DataSource DATA_SOURCE = 
DATA_SOURCE_CONFIGURATION.buildDatasource();
-  private static final String WRITE_TABLE_NAME = 
DatabaseTestHelper.getTestTableName("UT_WRITE");
+  private String writeTableName;
 
   @Rule public final transient TestPipeline pipeline = TestPipeline.create();
 
@@ -56,7 +61,15 @@ public class JdbcWriteSchemaTransformProviderTest {
     System.setProperty("derby.locks.waitTimeout", "2");
     System.setProperty("derby.stream.error.file", "build/derby.log");
 
-    DatabaseTestHelper.createTable(DATA_SOURCE, WRITE_TABLE_NAME);
+    registerJdbcDriver(
+        ImmutableMap.of(
+            "derby", 
Objects.requireNonNull(DATA_SOURCE_CONFIGURATION.getDriverClassName()).get()));
+  }
+
+  @Before
+  public void before() throws SQLException {
+    writeTableName = DatabaseTestHelper.getTestTableName("UT_WRITE");
+    DatabaseTestHelper.createTable(DATA_SOURCE, writeTableName);
   }
 
   @Test
@@ -90,10 +103,52 @@ public class JdbcWriteSchemaTransformProviderTest {
               .build()
               .validate();
         });
+    assertThrows(
+        IllegalArgumentException.class,
+        () -> {
+          
JdbcWriteSchemaTransformProvider.JdbcWriteSchemaTransformConfiguration.builder()
+              .setJdbcUrl("JdbcUrl")
+              .setLocation("Location")
+              .setJdbcType("invalidType")
+              .build()
+              .validate();
+        });
+    assertThrows(
+        IllegalArgumentException.class,
+        () -> {
+          
JdbcWriteSchemaTransformProvider.JdbcWriteSchemaTransformConfiguration.builder()
+              .setJdbcUrl("JdbcUrl")
+              .setLocation("Location")
+              .build()
+              .validate();
+        });
+    assertThrows(
+        IllegalArgumentException.class,
+        () -> {
+          
JdbcWriteSchemaTransformProvider.JdbcWriteSchemaTransformConfiguration.builder()
+              .setJdbcUrl("JdbcUrl")
+              .setLocation("Location")
+              .setDriverClassName("ClassName")
+              .setJdbcType((String) JDBC_DRIVER_MAP.keySet().toArray()[0])
+              .build()
+              .validate();
+        });
   }
 
   @Test
-  public void testReadWriteToTable() throws SQLException {
+  public void testValidWriteSchemaOptions() {
+    for (String jdbcType : JDBC_DRIVER_MAP.keySet()) {
+      
JdbcWriteSchemaTransformProvider.JdbcWriteSchemaTransformConfiguration.builder()
+          .setJdbcUrl("JdbcUrl")
+          .setLocation("Location")
+          .setJdbcType(jdbcType)
+          .build()
+          .validate();
+    }
+  }
+
+  @Test
+  public void testWriteToTable() throws SQLException {
     JdbcWriteSchemaTransformProvider provider = null;
     for (SchemaTransformProvider p : 
ServiceLoader.load(SchemaTransformProvider.class)) {
       if (p instanceof JdbcWriteSchemaTransformProvider) {
@@ -119,9 +174,42 @@ public class JdbcWriteSchemaTransformProviderTest {
                 
JdbcWriteSchemaTransformProvider.JdbcWriteSchemaTransformConfiguration.builder()
                     
.setDriverClassName(DATA_SOURCE_CONFIGURATION.getDriverClassName().get())
                     .setJdbcUrl(DATA_SOURCE_CONFIGURATION.getUrl().get())
-                    .setLocation(WRITE_TABLE_NAME)
+                    .setLocation(writeTableName)
+                    .build()));
+    pipeline.run();
+    DatabaseTestHelper.assertRowCount(DATA_SOURCE, writeTableName, 2);
+  }
+
+  @Test
+  public void testWriteToTableWithJdbcTypeSpecified() throws SQLException {
+    JdbcWriteSchemaTransformProvider provider = null;
+    for (SchemaTransformProvider p : 
ServiceLoader.load(SchemaTransformProvider.class)) {
+      if (p instanceof JdbcWriteSchemaTransformProvider) {
+        provider = (JdbcWriteSchemaTransformProvider) p;
+        break;
+      }
+    }
+    assertNotNull(provider);
+
+    Schema schema =
+        Schema.of(
+            Schema.Field.of("id", Schema.FieldType.INT64),
+            Schema.Field.of("name", Schema.FieldType.STRING));
+
+    List<Row> rows =
+        ImmutableList.of(
+            Row.withSchema(schema).attachValues(1L, "name1"),
+            Row.withSchema(schema).attachValues(2L, "name2"));
+
+    PCollectionRowTuple.of("input", 
pipeline.apply(Create.of(rows).withRowSchema(schema)))
+        .apply(
+            provider.from(
+                
JdbcWriteSchemaTransformProvider.JdbcWriteSchemaTransformConfiguration.builder()
+                    .setJdbcUrl(DATA_SOURCE_CONFIGURATION.getUrl().get())
+                    .setJdbcType("derby")
+                    .setLocation(writeTableName)
                     .build()));
     pipeline.run();
-    DatabaseTestHelper.assertRowCount(DATA_SOURCE, WRITE_TABLE_NAME, 2);
+    DatabaseTestHelper.assertRowCount(DATA_SOURCE, writeTableName, 2);
   }
 }
diff --git a/sdks/python/apache_beam/yaml/standard_io.yaml 
b/sdks/python/apache_beam/yaml/standard_io.yaml
index d63729f1676..8a5ffd9f6a9 100644
--- a/sdks/python/apache_beam/yaml/standard_io.yaml
+++ b/sdks/python/apache_beam/yaml/standard_io.yaml
@@ -206,6 +206,7 @@
     mappings:
       'ReadFromJdbc':
         driver_class_name: 'driverClassName'
+        type: 'jdbcType'
         url: 'jdbcUrl'
         username: 'username'
         password: 'password'
@@ -216,6 +217,7 @@
         connection_init_sql: 'connectionInitSql'
       'WriteToJdbc':
         driver_class_name: 'driverClassName'
+        type: 'jdbcType'
         url: 'jdbcUrl'
         username: 'username'
         password: 'password'
@@ -233,21 +235,21 @@
       'WriteToSqlServer': 'WriteToJdbc'
     defaults:
       'ReadFromMySql':
-        driverClassName: 'com.mysql.jdbc.Driver'
+        jdbcType: 'mysql'
       'WriteToMySql':
-        driverClassName: 'com.mysql.jdbc.Driver'
+        jdbcType: 'mysql'
       'ReadFromPostgres':
-        driverClassName: 'org.postgresql.Driver'
+        jdbcType: 'postgres'
       'WriteToPostgres':
-        driverClassName: 'org.postgresql.Driver'
+        jdbcType: 'postgres'
       'ReadFromOracle':
-        driverClassName: 'oracle.jdbc.driver.OracleDriver'
+        jdbcType: 'oracle'
       'WriteToOracle':
-        driverClassName: 'oracle.jdbc.driver.OracleDriver'
+        jdbcType: 'oracle'
       'ReadFromSqlServer':
-        driverClassName: 'com.microsoft.sqlserver.jdbc.SQLServerDriver'
+        jdbcType: 'mssql'
       'WriteToSqlServer':
-        driverClassName: 'com.microsoft.sqlserver.jdbc.SQLServerDriver'
+        jdbcType: 'mssql'
     underlying_provider:
       type: beamJar
       transforms:

Reply via email to