This is an automated email from the ASF dual-hosted git repository.

yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 8e31f7a5850 JDBC SchemaTransform implementation. Need to break out 
into a separat… (#24918)
8e31f7a5850 is described below

commit 8e31f7a5850c896534b5c8ddad9a3bf09f69d605
Author: Byron Ellis <[email protected]>
AuthorDate: Tue Jan 10 13:56:52 2023 -0800

    JDBC SchemaTransform implementation. Need to break out into a separat… 
(#24918)
    
    * JDBC SchemaTransform implementation. Need to break out into a separate PR 
to submit.
    
    * Update configuration to accommodate SDKs that might send empty strings or 
zeros in place of nulls.
    
    * Move write options validation test to correct class. Update classname and 
driver check to use Strings.isNullOrEmpty
    
    * Add a little documentation blurb to the providers
    
    * Fix periods that Github complains about but locally doesn't throw an 
error.
---
 .../io/jdbc/JdbcReadSchemaTransformProvider.java   | 216 ++++++++++++++++++++
 .../io/jdbc/JdbcWriteSchemaTransformProvider.java  | 226 +++++++++++++++++++++
 .../jdbc/JdbcReadSchemaTransformProviderTest.java  | 146 +++++++++++++
 .../jdbc/JdbcWriteSchemaTransformProviderTest.java | 129 ++++++++++++
 4 files changed, 717 insertions(+)

diff --git 
a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProvider.java
 
b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProvider.java
new file mode 100644
index 00000000000..cb2acfcac99
--- /dev/null
+++ 
b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProvider.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.jdbc;
+
+import com.google.auto.service.AutoService;
+import com.google.auto.value.AutoValue;
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
+import org.checkerframework.checker.initialization.qual.Initialized;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
+
+/**
+ * An implementation of {@link 
org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider} for
+ * reading from JDBC connections using {@link 
org.apache.beam.sdk.io.jdbc.JdbcIO}.
+ */
+@AutoService(SchemaTransformProvider.class)
+public class JdbcReadSchemaTransformProvider
+    extends TypedSchemaTransformProvider<
+        JdbcReadSchemaTransformProvider.JdbcReadSchemaTransformConfiguration> {
+
+  @Override
+  protected @UnknownKeyFor @NonNull @Initialized 
Class<JdbcReadSchemaTransformConfiguration>
+      configurationClass() {
+    return JdbcReadSchemaTransformConfiguration.class;
+  }
+
+  @Override
+  protected @UnknownKeyFor @NonNull @Initialized SchemaTransform from(
+      JdbcReadSchemaTransformConfiguration configuration) {
+    configuration.validate();
+    return new JdbcReadSchemaTransform(configuration);
+  }
+
+  static class JdbcReadSchemaTransform implements SchemaTransform, 
Serializable {
+
+    JdbcReadSchemaTransformConfiguration config;
+
+    public JdbcReadSchemaTransform(JdbcReadSchemaTransformConfiguration 
config) {
+      this.config = config;
+    }
+
+    protected JdbcIO.DataSourceConfiguration dataSourceConfiguration() {
+      JdbcIO.DataSourceConfiguration dsConfig =
+          JdbcIO.DataSourceConfiguration.create(config.getDriverClassName(), 
config.getJdbcUrl())
+              .withUsername("".equals(config.getUsername()) ? null : 
config.getUsername())
+              .withPassword("".equals(config.getPassword()) ? null : 
config.getPassword());
+      String connectionProperties = config.getConnectionProperties();
+      if (connectionProperties != null) {
+        dsConfig = dsConfig.withConnectionProperties(connectionProperties);
+      }
+
+      List<@org.checkerframework.checker.nullness.qual.Nullable String> 
initialSql =
+          config.getConnectionInitSql();
+      if (initialSql != null && initialSql.size() > 0) {
+        dsConfig = dsConfig.withConnectionInitSqls(initialSql);
+      }
+
+      return dsConfig;
+    }
+
+    @Override
+    public @UnknownKeyFor @NonNull @Initialized PTransform<
+            @UnknownKeyFor @NonNull @Initialized PCollectionRowTuple,
+            @UnknownKeyFor @NonNull @Initialized PCollectionRowTuple>
+        buildTransform() {
+      return new PTransform<PCollectionRowTuple, PCollectionRowTuple>() {
+        @Override
+        public PCollectionRowTuple expand(PCollectionRowTuple input) {
+          String query = config.getReadQuery();
+          if (query == null) {
+            query = String.format("SELECT * FROM %s", config.getLocation());
+          }
+          JdbcIO.ReadRows readRows =
+              JdbcIO.readRows()
+                  .withDataSourceConfiguration(dataSourceConfiguration())
+                  .withQuery(query);
+          Short fetchSize = config.getFetchSize();
+          if (fetchSize != null && fetchSize > 0) {
+            readRows = readRows.withFetchSize(fetchSize);
+          }
+          Boolean outputParallelization = config.getOutputParallelization();
+          if (outputParallelization != null) {
+            readRows = 
readRows.withOutputParallelization(outputParallelization);
+          }
+          return PCollectionRowTuple.of("output", 
input.getPipeline().apply(readRows));
+        }
+      };
+    }
+  }
+
+  @Override
+  public @UnknownKeyFor @NonNull @Initialized String identifier() {
+    return "beam:schematransform:org.apache.beam:jdbc_read:v1";
+  }
+
+  @Override
+  public @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull 
@Initialized String>
+      inputCollectionNames() {
+    return Collections.emptyList();
+  }
+
+  @Override
+  public @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull 
@Initialized String>
+      outputCollectionNames() {
+    return Collections.singletonList("output");
+  }
+
+  @AutoValue
+  @DefaultSchema(AutoValueSchema.class)
+  public abstract static class JdbcReadSchemaTransformConfiguration implements 
Serializable {
+    public abstract String getDriverClassName();
+
+    public abstract String getJdbcUrl();
+
+    @Nullable
+    public abstract String getUsername();
+
+    @Nullable
+    public abstract String getPassword();
+
+    @Nullable
+    public abstract String getConnectionProperties();
+
+    @Nullable
+    public abstract List<@org.checkerframework.checker.nullness.qual.Nullable 
String>
+        getConnectionInitSql();
+
+    @Nullable
+    public abstract String getReadQuery();
+
+    @Nullable
+    public abstract String getLocation();
+
+    @Nullable
+    public abstract Short getFetchSize();
+
+    @Nullable
+    public abstract Boolean getOutputParallelization();
+
+    public void validate() throws IllegalArgumentException {
+      if (Strings.isNullOrEmpty(getDriverClassName())) {
+        throw new IllegalArgumentException("JDBC Driver class name cannot be 
blank.");
+      }
+      if (Strings.isNullOrEmpty(getJdbcUrl())) {
+        throw new IllegalArgumentException("JDBC URL cannot be blank");
+      }
+
+      boolean readQueryPresent = (getReadQuery() != null && 
!"".equals(getReadQuery()));
+      boolean locationPresent = (getLocation() != null && 
!"".equals(getLocation()));
+
+      if (readQueryPresent && locationPresent) {
+        throw new IllegalArgumentException(
+            "ReadQuery and Location are mutually exclusive configurations");
+      }
+      if (!readQueryPresent && !locationPresent) {
+        throw new IllegalArgumentException("Either ReadQuery or Location must 
be set.");
+      }
+    }
+
+    public static Builder builder() {
+      return new 
AutoValue_JdbcReadSchemaTransformProvider_JdbcReadSchemaTransformConfiguration
+          .Builder();
+    }
+
+    @AutoValue.Builder
+    public abstract static class Builder {
+      public abstract Builder setDriverClassName(String value);
+
+      public abstract Builder setJdbcUrl(String value);
+
+      public abstract Builder setUsername(String value);
+
+      public abstract Builder setPassword(String value);
+
+      public abstract Builder setLocation(String value);
+
+      public abstract Builder setReadQuery(String value);
+
+      public abstract Builder setConnectionProperties(String value);
+
+      public abstract Builder setConnectionInitSql(List<String> value);
+
+      public abstract Builder setFetchSize(Short value);
+
+      public abstract Builder setOutputParallelization(Boolean value);
+
+      public abstract JdbcReadSchemaTransformConfiguration build();
+    }
+  }
+}
diff --git 
a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcWriteSchemaTransformProvider.java
 
b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcWriteSchemaTransformProvider.java
new file mode 100644
index 00000000000..7f9cc3b7756
--- /dev/null
+++ 
b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcWriteSchemaTransformProvider.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.jdbc;
+
+import com.google.auto.service.AutoService;
+import com.google.auto.value.AutoValue;
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
+import org.checkerframework.checker.initialization.qual.Initialized;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
+
+/**
+ * An implementation of {@link 
org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider} for
+ * writing to a JDBC connections using {@link 
org.apache.beam.sdk.io.jdbc.JdbcIO}.
+ */
+@AutoService(SchemaTransformProvider.class)
+public class JdbcWriteSchemaTransformProvider
+    extends TypedSchemaTransformProvider<
+        
JdbcWriteSchemaTransformProvider.JdbcWriteSchemaTransformConfiguration> {
+
+  @Override
+  protected @UnknownKeyFor @NonNull @Initialized 
Class<JdbcWriteSchemaTransformConfiguration>
+      configurationClass() {
+    return JdbcWriteSchemaTransformConfiguration.class;
+  }
+
+  @Override
+  protected @UnknownKeyFor @NonNull @Initialized SchemaTransform from(
+      JdbcWriteSchemaTransformConfiguration configuration) {
+    configuration.validate();
+    return new JdbcWriteSchemaTransform(configuration);
+  }
+
+  static class JdbcWriteSchemaTransform implements SchemaTransform, 
Serializable {
+
+    JdbcWriteSchemaTransformConfiguration config;
+
+    public JdbcWriteSchemaTransform(JdbcWriteSchemaTransformConfiguration 
config) {
+      this.config = config;
+    }
+
+    protected JdbcIO.DataSourceConfiguration dataSourceConfiguration() {
+      JdbcIO.DataSourceConfiguration dsConfig =
+          JdbcIO.DataSourceConfiguration.create(config.getDriverClassName(), 
config.getJdbcUrl())
+              .withUsername("".equals(config.getUsername()) ? null : 
config.getUsername())
+              .withPassword("".equals(config.getPassword()) ? null : 
config.getPassword());
+      String connectionProperties = config.getConnectionProperties();
+      if (connectionProperties != null) {
+        dsConfig = dsConfig.withConnectionProperties(connectionProperties);
+      }
+
+      List<@org.checkerframework.checker.nullness.qual.Nullable String> 
initialSql =
+          config.getConnectionInitSql();
+      if (initialSql != null && initialSql.size() > 0) {
+        dsConfig = dsConfig.withConnectionInitSqls(initialSql);
+      }
+
+      return dsConfig;
+    }
+
+    protected String writeStatement(Schema schema) {
+      String writeStatement = config.getWriteStatement();
+      if (writeStatement != null) {
+        return writeStatement;
+      } else {
+        StringBuilder statement = new StringBuilder("INSERT INTO ");
+        statement.append(config.getLocation());
+        statement.append(" VALUES(");
+        for (int i = 0; i < schema.getFieldCount() - 1; i++) {
+          statement.append("?, ");
+        }
+        statement.append("?)");
+        return statement.toString();
+      }
+    }
+
+    @Override
+    public @UnknownKeyFor @NonNull @Initialized PTransform<
+            @UnknownKeyFor @NonNull @Initialized PCollectionRowTuple,
+            @UnknownKeyFor @NonNull @Initialized PCollectionRowTuple>
+        buildTransform() {
+      return new PTransform<PCollectionRowTuple, PCollectionRowTuple>() {
+        @Override
+        public PCollectionRowTuple expand(PCollectionRowTuple input) {
+          JdbcIO.Write<Row> writeRows =
+              JdbcIO.<Row>write()
+                  .withDataSourceConfiguration(dataSourceConfiguration())
+                  
.withStatement(writeStatement(input.get("input").getSchema()))
+                  .withPreparedStatementSetter(new 
JdbcUtil.BeamRowPreparedStatementSetter());
+          Boolean autosharding = config.getAutosharding();
+          if (autosharding != null && autosharding) {
+            writeRows = writeRows.withAutoSharding();
+          }
+          input.get("input").apply(writeRows);
+          return PCollectionRowTuple.empty(input.getPipeline());
+        }
+      };
+    }
+  }
+
+  @Override
+  public @UnknownKeyFor @NonNull @Initialized String identifier() {
+    return "beam:schematransform:org.apache.beam:jdbc_write:v1";
+  }
+
+  @Override
+  public @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull 
@Initialized String>
+      inputCollectionNames() {
+    return Collections.singletonList("input");
+  }
+
+  @Override
+  public @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull 
@Initialized String>
+      outputCollectionNames() {
+    return Collections.emptyList();
+  }
+
+  @AutoValue
+  @DefaultSchema(AutoValueSchema.class)
+  public abstract static class JdbcWriteSchemaTransformConfiguration 
implements Serializable {
+
+    public abstract String getDriverClassName();
+
+    public abstract String getJdbcUrl();
+
+    @Nullable
+    public abstract String getUsername();
+
+    @Nullable
+    public abstract String getPassword();
+
+    @Nullable
+    public abstract String getConnectionProperties();
+
+    @Nullable
+    public abstract List<@org.checkerframework.checker.nullness.qual.Nullable 
String>
+        getConnectionInitSql();
+
+    @Nullable
+    public abstract String getLocation();
+
+    @Nullable
+    public abstract String getWriteStatement();
+
+    @Nullable
+    public abstract Boolean getAutosharding();
+
+    public void validate() throws IllegalArgumentException {
+      if (Strings.isNullOrEmpty(getDriverClassName())) {
+        throw new IllegalArgumentException("JDBC Driver class name cannot be 
blank.");
+      }
+      if (Strings.isNullOrEmpty(getJdbcUrl())) {
+        throw new IllegalArgumentException("JDBC URL cannot be blank");
+      }
+
+      boolean writeStatementPresent =
+          (getWriteStatement() != null && !"".equals(getWriteStatement()));
+      boolean locationPresent = (getLocation() != null && 
!"".equals(getLocation()));
+
+      if (writeStatementPresent && locationPresent) {
+        throw new IllegalArgumentException(
+            "ReadQuery and Location are mutually exclusive configurations");
+      }
+      if (!writeStatementPresent && !locationPresent) {
+        throw new IllegalArgumentException("Either ReadQuery or Location must 
be set.");
+      }
+    }
+
+    public static Builder builder() {
+      return new 
AutoValue_JdbcWriteSchemaTransformProvider_JdbcWriteSchemaTransformConfiguration
+          .Builder();
+    }
+
+    @AutoValue.Builder
+    public abstract static class Builder {
+      public abstract Builder setDriverClassName(String value);
+
+      public abstract Builder setJdbcUrl(String value);
+
+      public abstract Builder setUsername(String value);
+
+      public abstract Builder setPassword(String value);
+
+      public abstract Builder setConnectionProperties(String value);
+
+      public abstract Builder setConnectionInitSql(
+          List<@org.checkerframework.checker.nullness.qual.Nullable String> 
value);
+
+      public abstract Builder setLocation(String value);
+
+      public abstract Builder setWriteStatement(String value);
+
+      public abstract Builder setAutosharding(Boolean value);
+
+      public abstract JdbcWriteSchemaTransformConfiguration build();
+    }
+  }
+}
diff --git 
a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProviderTest.java
 
b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProviderTest.java
new file mode 100644
index 00000000000..3e82b565fba
--- /dev/null
+++ 
b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProviderTest.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.jdbc;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThrows;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.ServiceLoader;
+import javax.sql.DataSource;
+import org.apache.beam.sdk.io.common.DatabaseTestHelper;
+import org.apache.beam.sdk.io.common.TestRow;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class JdbcReadSchemaTransformProviderTest {
+
+  private static final JdbcIO.DataSourceConfiguration 
DATA_SOURCE_CONFIGURATION =
+      JdbcIO.DataSourceConfiguration.create(
+          "org.apache.derby.jdbc.EmbeddedDriver", 
"jdbc:derby:memory:testDB;create=true");
+  private static final int EXPECTED_ROW_COUNT = 1000;
+
+  private static final DataSource DATA_SOURCE = 
DATA_SOURCE_CONFIGURATION.buildDatasource();
+  private static final String READ_TABLE_NAME = 
DatabaseTestHelper.getTestTableName("UT_READ");
+
+  @Rule public final transient TestPipeline pipeline = TestPipeline.create();
+
+  @BeforeClass
+  public static void beforeClass() throws Exception {
+    // by default, derby uses a lock timeout of 60 seconds. In order to speed 
up the test
+    // and detect the lock faster, we decrease this timeout
+    System.setProperty("derby.locks.waitTimeout", "2");
+    System.setProperty("derby.stream.error.file", "build/derby.log");
+
+    DatabaseTestHelper.createTable(DATA_SOURCE, READ_TABLE_NAME);
+    addInitialData(DATA_SOURCE, READ_TABLE_NAME);
+  }
+
+  @Test
+  public void testInvalidReadSchemaOptions() {
+    assertThrows(
+        IllegalArgumentException.class,
+        () -> {
+          
JdbcReadSchemaTransformProvider.JdbcReadSchemaTransformConfiguration.builder()
+              .setDriverClassName("")
+              .setJdbcUrl("")
+              .build()
+              .validate();
+        });
+    assertThrows(
+        IllegalArgumentException.class,
+        () -> {
+          
JdbcReadSchemaTransformProvider.JdbcReadSchemaTransformConfiguration.builder()
+              .setDriverClassName("ClassName")
+              .setJdbcUrl("JdbcUrl")
+              .setLocation("Location")
+              .setReadQuery("Query")
+              .build()
+              .validate();
+        });
+    assertThrows(
+        IllegalArgumentException.class,
+        () -> {
+          
JdbcReadSchemaTransformProvider.JdbcReadSchemaTransformConfiguration.builder()
+              .setDriverClassName("ClassName")
+              .setJdbcUrl("JdbcUrl")
+              .build()
+              .validate();
+        });
+  }
+
+  @Test
+  public void testRead() {
+    JdbcReadSchemaTransformProvider provider = null;
+    for (SchemaTransformProvider p : 
ServiceLoader.load(SchemaTransformProvider.class)) {
+      if (p instanceof JdbcReadSchemaTransformProvider) {
+        provider = (JdbcReadSchemaTransformProvider) p;
+        break;
+      }
+    }
+    assertNotNull(provider);
+
+    PCollection<Row> output =
+        PCollectionRowTuple.empty(pipeline)
+            .apply(
+                provider
+                    .from(
+                        
JdbcReadSchemaTransformProvider.JdbcReadSchemaTransformConfiguration
+                            .builder()
+                            .setDriverClassName(
+                                
DATA_SOURCE_CONFIGURATION.getDriverClassName().get())
+                            
.setJdbcUrl(DATA_SOURCE_CONFIGURATION.getUrl().get())
+                            .setLocation(READ_TABLE_NAME)
+                            .build())
+                    .buildTransform())
+            .get("output");
+    Long expected = Long.valueOf(EXPECTED_ROW_COUNT);
+    PAssert.that(output.apply(Count.globally())).containsInAnyOrder(expected);
+    pipeline.run();
+  }
+
+  /** Create test data that is consistent with that generated by TestRow. */
+  private static void addInitialData(DataSource dataSource, String tableName) 
throws SQLException {
+    try (Connection connection = dataSource.getConnection()) {
+      connection.setAutoCommit(false);
+      try (PreparedStatement preparedStatement =
+          connection.prepareStatement(String.format("insert into %s values 
(?,?)", tableName))) {
+        for (int i = 0; i < EXPECTED_ROW_COUNT; i++) {
+          preparedStatement.clearParameters();
+          preparedStatement.setInt(1, i);
+          preparedStatement.setString(2, TestRow.getNameForSeed(i));
+          preparedStatement.executeUpdate();
+        }
+      }
+      connection.commit();
+    }
+  }
+}
diff --git 
a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcWriteSchemaTransformProviderTest.java
 
b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcWriteSchemaTransformProviderTest.java
new file mode 100644
index 00000000000..7f422affda2
--- /dev/null
+++ 
b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcWriteSchemaTransformProviderTest.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.jdbc;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThrows;
+
+import java.sql.SQLException;
+import java.util.List;
+import java.util.ServiceLoader;
+import javax.sql.DataSource;
+import org.apache.beam.sdk.io.common.DatabaseTestHelper;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class JdbcWriteSchemaTransformProviderTest {
+
+  private static final JdbcIO.DataSourceConfiguration 
DATA_SOURCE_CONFIGURATION =
+      JdbcIO.DataSourceConfiguration.create(
+          "org.apache.derby.jdbc.EmbeddedDriver", 
"jdbc:derby:memory:testDB;create=true");
+  private static final DataSource DATA_SOURCE = 
DATA_SOURCE_CONFIGURATION.buildDatasource();
+  private static final String WRITE_TABLE_NAME = 
DatabaseTestHelper.getTestTableName("UT_WRITE");
+
+  @Rule public final transient TestPipeline pipeline = TestPipeline.create();
+
+  @BeforeClass
+  public static void beforeClass() throws Exception {
+    // by default, derby uses a lock timeout of 60 seconds. In order to speed 
up the test
+    // and detect the lock faster, we decrease this timeout
+    System.setProperty("derby.locks.waitTimeout", "2");
+    System.setProperty("derby.stream.error.file", "build/derby.log");
+
+    DatabaseTestHelper.createTable(DATA_SOURCE, WRITE_TABLE_NAME);
+  }
+
+  @Test
+  public void testInvalidWriteSchemaOptions() {
+    assertThrows(
+        IllegalArgumentException.class,
+        () -> {
+          
JdbcWriteSchemaTransformProvider.JdbcWriteSchemaTransformConfiguration.builder()
+              .setDriverClassName("")
+              .setJdbcUrl("")
+              .build()
+              .validate();
+        });
+    assertThrows(
+        IllegalArgumentException.class,
+        () -> {
+          
JdbcWriteSchemaTransformProvider.JdbcWriteSchemaTransformConfiguration.builder()
+              .setDriverClassName("ClassName")
+              .setJdbcUrl("JdbcUrl")
+              .setLocation("Location")
+              .setWriteStatement("WriteStatement")
+              .build()
+              .validate();
+        });
+    assertThrows(
+        IllegalArgumentException.class,
+        () -> {
+          
JdbcWriteSchemaTransformProvider.JdbcWriteSchemaTransformConfiguration.builder()
+              .setDriverClassName("ClassName")
+              .setJdbcUrl("JdbcUrl")
+              .build()
+              .validate();
+        });
+  }
+
+  @Test
+  public void testReadWriteToTable() throws SQLException {
+    JdbcWriteSchemaTransformProvider provider = null;
+    for (SchemaTransformProvider p : 
ServiceLoader.load(SchemaTransformProvider.class)) {
+      if (p instanceof JdbcWriteSchemaTransformProvider) {
+        provider = (JdbcWriteSchemaTransformProvider) p;
+        break;
+      }
+    }
+    assertNotNull(provider);
+
+    Schema schema =
+        Schema.of(
+            Schema.Field.of("id", Schema.FieldType.INT64),
+            Schema.Field.of("name", Schema.FieldType.STRING));
+
+    List<Row> rows =
+        ImmutableList.of(
+            Row.withSchema(schema).attachValues(1L, "name1"),
+            Row.withSchema(schema).attachValues(2L, "name2"));
+
+    PCollectionRowTuple.of("input", 
pipeline.apply(Create.of(rows).withRowSchema(schema)))
+        .apply(
+            provider
+                .from(
+                    
JdbcWriteSchemaTransformProvider.JdbcWriteSchemaTransformConfiguration.builder()
+                        
.setDriverClassName(DATA_SOURCE_CONFIGURATION.getDriverClassName().get())
+                        .setJdbcUrl(DATA_SOURCE_CONFIGURATION.getUrl().get())
+                        .setLocation(WRITE_TABLE_NAME)
+                        .build())
+                .buildTransform());
+    pipeline.run();
+    DatabaseTestHelper.assertRowCount(DATA_SOURCE, WRITE_TABLE_NAME, 2);
+  }
+}

Reply via email to