This is an automated email from the ASF dual-hosted git repository.
yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 8e31f7a5850 JDBC SchemaTransform implementation. Need to break out
into a separat… (#24918)
8e31f7a5850 is described below
commit 8e31f7a5850c896534b5c8ddad9a3bf09f69d605
Author: Byron Ellis <[email protected]>
AuthorDate: Tue Jan 10 13:56:52 2023 -0800
JDBC SchemaTransform implementation. Need to break out into a separat…
(#24918)
* JDBC SchemaTransform implementation. Need to break out into a separate PR
to submit.
* Update configuration to accommodate SDKs that might send empty strings or
zeros in place of nulls.
* Move write options validation test to correct class. Update classname and
driver check to use Strings.isNullOrEmpty
* Add a little documentation blurb to the providers
* Fix periods that Github complains about but locally doesn't throw an
error.
---
.../io/jdbc/JdbcReadSchemaTransformProvider.java | 216 ++++++++++++++++++++
.../io/jdbc/JdbcWriteSchemaTransformProvider.java | 226 +++++++++++++++++++++
.../jdbc/JdbcReadSchemaTransformProviderTest.java | 146 +++++++++++++
.../jdbc/JdbcWriteSchemaTransformProviderTest.java | 129 ++++++++++++
4 files changed, 717 insertions(+)
diff --git
a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProvider.java
b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProvider.java
new file mode 100644
index 00000000000..cb2acfcac99
--- /dev/null
+++
b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProvider.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.jdbc;
+
+import com.google.auto.service.AutoService;
+import com.google.auto.value.AutoValue;
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
+import org.checkerframework.checker.initialization.qual.Initialized;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
+
+/**
+ * An implementation of {@link
org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider} for
+ * reading from JDBC connections using {@link
org.apache.beam.sdk.io.jdbc.JdbcIO}.
+ */
+@AutoService(SchemaTransformProvider.class)
+public class JdbcReadSchemaTransformProvider
+ extends TypedSchemaTransformProvider<
+ JdbcReadSchemaTransformProvider.JdbcReadSchemaTransformConfiguration> {
+
+ @Override
+ protected @UnknownKeyFor @NonNull @Initialized
Class<JdbcReadSchemaTransformConfiguration>
+ configurationClass() {
+ return JdbcReadSchemaTransformConfiguration.class;
+ }
+
+ @Override
+ protected @UnknownKeyFor @NonNull @Initialized SchemaTransform from(
+ JdbcReadSchemaTransformConfiguration configuration) {
+ configuration.validate();
+ return new JdbcReadSchemaTransform(configuration);
+ }
+
+ static class JdbcReadSchemaTransform implements SchemaTransform,
Serializable {
+
+ JdbcReadSchemaTransformConfiguration config;
+
+ public JdbcReadSchemaTransform(JdbcReadSchemaTransformConfiguration
config) {
+ this.config = config;
+ }
+
+ protected JdbcIO.DataSourceConfiguration dataSourceConfiguration() {
+ JdbcIO.DataSourceConfiguration dsConfig =
+ JdbcIO.DataSourceConfiguration.create(config.getDriverClassName(),
config.getJdbcUrl())
+ .withUsername("".equals(config.getUsername()) ? null :
config.getUsername())
+ .withPassword("".equals(config.getPassword()) ? null :
config.getPassword());
+ String connectionProperties = config.getConnectionProperties();
+ if (connectionProperties != null) {
+ dsConfig = dsConfig.withConnectionProperties(connectionProperties);
+ }
+
+ List<@org.checkerframework.checker.nullness.qual.Nullable String>
initialSql =
+ config.getConnectionInitSql();
+ if (initialSql != null && initialSql.size() > 0) {
+ dsConfig = dsConfig.withConnectionInitSqls(initialSql);
+ }
+
+ return dsConfig;
+ }
+
+ @Override
+ public @UnknownKeyFor @NonNull @Initialized PTransform<
+ @UnknownKeyFor @NonNull @Initialized PCollectionRowTuple,
+ @UnknownKeyFor @NonNull @Initialized PCollectionRowTuple>
+ buildTransform() {
+ return new PTransform<PCollectionRowTuple, PCollectionRowTuple>() {
+ @Override
+ public PCollectionRowTuple expand(PCollectionRowTuple input) {
+ String query = config.getReadQuery();
+ if (query == null) {
+ query = String.format("SELECT * FROM %s", config.getLocation());
+ }
+ JdbcIO.ReadRows readRows =
+ JdbcIO.readRows()
+ .withDataSourceConfiguration(dataSourceConfiguration())
+ .withQuery(query);
+ Short fetchSize = config.getFetchSize();
+ if (fetchSize != null && fetchSize > 0) {
+ readRows = readRows.withFetchSize(fetchSize);
+ }
+ Boolean outputParallelization = config.getOutputParallelization();
+ if (outputParallelization != null) {
+ readRows =
readRows.withOutputParallelization(outputParallelization);
+ }
+ return PCollectionRowTuple.of("output",
input.getPipeline().apply(readRows));
+ }
+ };
+ }
+ }
+
+ @Override
+ public @UnknownKeyFor @NonNull @Initialized String identifier() {
+ return "beam:schematransform:org.apache.beam:jdbc_read:v1";
+ }
+
+ @Override
+ public @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull
@Initialized String>
+ inputCollectionNames() {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull
@Initialized String>
+ outputCollectionNames() {
+ return Collections.singletonList("output");
+ }
+
+ @AutoValue
+ @DefaultSchema(AutoValueSchema.class)
+ public abstract static class JdbcReadSchemaTransformConfiguration implements
Serializable {
+ public abstract String getDriverClassName();
+
+ public abstract String getJdbcUrl();
+
+ @Nullable
+ public abstract String getUsername();
+
+ @Nullable
+ public abstract String getPassword();
+
+ @Nullable
+ public abstract String getConnectionProperties();
+
+ @Nullable
+ public abstract List<@org.checkerframework.checker.nullness.qual.Nullable
String>
+ getConnectionInitSql();
+
+ @Nullable
+ public abstract String getReadQuery();
+
+ @Nullable
+ public abstract String getLocation();
+
+ @Nullable
+ public abstract Short getFetchSize();
+
+ @Nullable
+ public abstract Boolean getOutputParallelization();
+
+ public void validate() throws IllegalArgumentException {
+ if (Strings.isNullOrEmpty(getDriverClassName())) {
+ throw new IllegalArgumentException("JDBC Driver class name cannot be
blank.");
+ }
+ if (Strings.isNullOrEmpty(getJdbcUrl())) {
+ throw new IllegalArgumentException("JDBC URL cannot be blank");
+ }
+
+ boolean readQueryPresent = (getReadQuery() != null &&
!"".equals(getReadQuery()));
+ boolean locationPresent = (getLocation() != null &&
!"".equals(getLocation()));
+
+ if (readQueryPresent && locationPresent) {
+ throw new IllegalArgumentException(
+ "ReadQuery and Location are mutually exclusive configurations");
+ }
+ if (!readQueryPresent && !locationPresent) {
+ throw new IllegalArgumentException("Either ReadQuery or Location must
be set.");
+ }
+ }
+
+ public static Builder builder() {
+ return new
AutoValue_JdbcReadSchemaTransformProvider_JdbcReadSchemaTransformConfiguration
+ .Builder();
+ }
+
+ @AutoValue.Builder
+ public abstract static class Builder {
+ public abstract Builder setDriverClassName(String value);
+
+ public abstract Builder setJdbcUrl(String value);
+
+ public abstract Builder setUsername(String value);
+
+ public abstract Builder setPassword(String value);
+
+ public abstract Builder setLocation(String value);
+
+ public abstract Builder setReadQuery(String value);
+
+ public abstract Builder setConnectionProperties(String value);
+
+ public abstract Builder setConnectionInitSql(List<String> value);
+
+ public abstract Builder setFetchSize(Short value);
+
+ public abstract Builder setOutputParallelization(Boolean value);
+
+ public abstract JdbcReadSchemaTransformConfiguration build();
+ }
+ }
+}
diff --git
a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcWriteSchemaTransformProvider.java
b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcWriteSchemaTransformProvider.java
new file mode 100644
index 00000000000..7f9cc3b7756
--- /dev/null
+++
b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcWriteSchemaTransformProvider.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.jdbc;
+
+import com.google.auto.service.AutoService;
+import com.google.auto.value.AutoValue;
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
+import org.checkerframework.checker.initialization.qual.Initialized;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
+
+/**
+ * An implementation of {@link
org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider} for
+ * writing to a JDBC connections using {@link
org.apache.beam.sdk.io.jdbc.JdbcIO}.
+ */
+@AutoService(SchemaTransformProvider.class)
+public class JdbcWriteSchemaTransformProvider
+ extends TypedSchemaTransformProvider<
+
JdbcWriteSchemaTransformProvider.JdbcWriteSchemaTransformConfiguration> {
+
+ @Override
+ protected @UnknownKeyFor @NonNull @Initialized
Class<JdbcWriteSchemaTransformConfiguration>
+ configurationClass() {
+ return JdbcWriteSchemaTransformConfiguration.class;
+ }
+
+ @Override
+ protected @UnknownKeyFor @NonNull @Initialized SchemaTransform from(
+ JdbcWriteSchemaTransformConfiguration configuration) {
+ configuration.validate();
+ return new JdbcWriteSchemaTransform(configuration);
+ }
+
+ static class JdbcWriteSchemaTransform implements SchemaTransform,
Serializable {
+
+ JdbcWriteSchemaTransformConfiguration config;
+
+ public JdbcWriteSchemaTransform(JdbcWriteSchemaTransformConfiguration
config) {
+ this.config = config;
+ }
+
+ protected JdbcIO.DataSourceConfiguration dataSourceConfiguration() {
+ JdbcIO.DataSourceConfiguration dsConfig =
+ JdbcIO.DataSourceConfiguration.create(config.getDriverClassName(),
config.getJdbcUrl())
+ .withUsername("".equals(config.getUsername()) ? null :
config.getUsername())
+ .withPassword("".equals(config.getPassword()) ? null :
config.getPassword());
+ String connectionProperties = config.getConnectionProperties();
+ if (connectionProperties != null) {
+ dsConfig = dsConfig.withConnectionProperties(connectionProperties);
+ }
+
+ List<@org.checkerframework.checker.nullness.qual.Nullable String>
initialSql =
+ config.getConnectionInitSql();
+ if (initialSql != null && initialSql.size() > 0) {
+ dsConfig = dsConfig.withConnectionInitSqls(initialSql);
+ }
+
+ return dsConfig;
+ }
+
+ protected String writeStatement(Schema schema) {
+ String writeStatement = config.getWriteStatement();
+ if (writeStatement != null) {
+ return writeStatement;
+ } else {
+ StringBuilder statement = new StringBuilder("INSERT INTO ");
+ statement.append(config.getLocation());
+ statement.append(" VALUES(");
+ for (int i = 0; i < schema.getFieldCount() - 1; i++) {
+ statement.append("?, ");
+ }
+ statement.append("?)");
+ return statement.toString();
+ }
+ }
+
+ @Override
+ public @UnknownKeyFor @NonNull @Initialized PTransform<
+ @UnknownKeyFor @NonNull @Initialized PCollectionRowTuple,
+ @UnknownKeyFor @NonNull @Initialized PCollectionRowTuple>
+ buildTransform() {
+ return new PTransform<PCollectionRowTuple, PCollectionRowTuple>() {
+ @Override
+ public PCollectionRowTuple expand(PCollectionRowTuple input) {
+ JdbcIO.Write<Row> writeRows =
+ JdbcIO.<Row>write()
+ .withDataSourceConfiguration(dataSourceConfiguration())
+
.withStatement(writeStatement(input.get("input").getSchema()))
+ .withPreparedStatementSetter(new
JdbcUtil.BeamRowPreparedStatementSetter());
+ Boolean autosharding = config.getAutosharding();
+ if (autosharding != null && autosharding) {
+ writeRows = writeRows.withAutoSharding();
+ }
+ input.get("input").apply(writeRows);
+ return PCollectionRowTuple.empty(input.getPipeline());
+ }
+ };
+ }
+ }
+
+ @Override
+ public @UnknownKeyFor @NonNull @Initialized String identifier() {
+ return "beam:schematransform:org.apache.beam:jdbc_write:v1";
+ }
+
+ @Override
+ public @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull
@Initialized String>
+ inputCollectionNames() {
+ return Collections.singletonList("input");
+ }
+
+ @Override
+ public @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull
@Initialized String>
+ outputCollectionNames() {
+ return Collections.emptyList();
+ }
+
+ @AutoValue
+ @DefaultSchema(AutoValueSchema.class)
+ public abstract static class JdbcWriteSchemaTransformConfiguration
implements Serializable {
+
+ public abstract String getDriverClassName();
+
+ public abstract String getJdbcUrl();
+
+ @Nullable
+ public abstract String getUsername();
+
+ @Nullable
+ public abstract String getPassword();
+
+ @Nullable
+ public abstract String getConnectionProperties();
+
+ @Nullable
+ public abstract List<@org.checkerframework.checker.nullness.qual.Nullable
String>
+ getConnectionInitSql();
+
+ @Nullable
+ public abstract String getLocation();
+
+ @Nullable
+ public abstract String getWriteStatement();
+
+ @Nullable
+ public abstract Boolean getAutosharding();
+
+ public void validate() throws IllegalArgumentException {
+ if (Strings.isNullOrEmpty(getDriverClassName())) {
+ throw new IllegalArgumentException("JDBC Driver class name cannot be
blank.");
+ }
+ if (Strings.isNullOrEmpty(getJdbcUrl())) {
+ throw new IllegalArgumentException("JDBC URL cannot be blank");
+ }
+
+ boolean writeStatementPresent =
+ (getWriteStatement() != null && !"".equals(getWriteStatement()));
+ boolean locationPresent = (getLocation() != null &&
!"".equals(getLocation()));
+
+ if (writeStatementPresent && locationPresent) {
+ throw new IllegalArgumentException(
+ "ReadQuery and Location are mutually exclusive configurations");
+ }
+ if (!writeStatementPresent && !locationPresent) {
+ throw new IllegalArgumentException("Either ReadQuery or Location must
be set.");
+ }
+ }
+
+ public static Builder builder() {
+ return new
AutoValue_JdbcWriteSchemaTransformProvider_JdbcWriteSchemaTransformConfiguration
+ .Builder();
+ }
+
+ @AutoValue.Builder
+ public abstract static class Builder {
+ public abstract Builder setDriverClassName(String value);
+
+ public abstract Builder setJdbcUrl(String value);
+
+ public abstract Builder setUsername(String value);
+
+ public abstract Builder setPassword(String value);
+
+ public abstract Builder setConnectionProperties(String value);
+
+ public abstract Builder setConnectionInitSql(
+ List<@org.checkerframework.checker.nullness.qual.Nullable String>
value);
+
+ public abstract Builder setLocation(String value);
+
+ public abstract Builder setWriteStatement(String value);
+
+ public abstract Builder setAutosharding(Boolean value);
+
+ public abstract JdbcWriteSchemaTransformConfiguration build();
+ }
+ }
+}
diff --git
a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProviderTest.java
b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProviderTest.java
new file mode 100644
index 00000000000..3e82b565fba
--- /dev/null
+++
b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProviderTest.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.jdbc;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThrows;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.ServiceLoader;
+import javax.sql.DataSource;
+import org.apache.beam.sdk.io.common.DatabaseTestHelper;
+import org.apache.beam.sdk.io.common.TestRow;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class JdbcReadSchemaTransformProviderTest {
+
+ private static final JdbcIO.DataSourceConfiguration
DATA_SOURCE_CONFIGURATION =
+ JdbcIO.DataSourceConfiguration.create(
+ "org.apache.derby.jdbc.EmbeddedDriver",
"jdbc:derby:memory:testDB;create=true");
+ private static final int EXPECTED_ROW_COUNT = 1000;
+
+ private static final DataSource DATA_SOURCE =
DATA_SOURCE_CONFIGURATION.buildDatasource();
+ private static final String READ_TABLE_NAME =
DatabaseTestHelper.getTestTableName("UT_READ");
+
+ @Rule public final transient TestPipeline pipeline = TestPipeline.create();
+
+ @BeforeClass
+ public static void beforeClass() throws Exception {
+ // by default, derby uses a lock timeout of 60 seconds. In order to speed
up the test
+ // and detect the lock faster, we decrease this timeout
+ System.setProperty("derby.locks.waitTimeout", "2");
+ System.setProperty("derby.stream.error.file", "build/derby.log");
+
+ DatabaseTestHelper.createTable(DATA_SOURCE, READ_TABLE_NAME);
+ addInitialData(DATA_SOURCE, READ_TABLE_NAME);
+ }
+
+ @Test
+ public void testInvalidReadSchemaOptions() {
+ assertThrows(
+ IllegalArgumentException.class,
+ () -> {
+
JdbcReadSchemaTransformProvider.JdbcReadSchemaTransformConfiguration.builder()
+ .setDriverClassName("")
+ .setJdbcUrl("")
+ .build()
+ .validate();
+ });
+ assertThrows(
+ IllegalArgumentException.class,
+ () -> {
+
JdbcReadSchemaTransformProvider.JdbcReadSchemaTransformConfiguration.builder()
+ .setDriverClassName("ClassName")
+ .setJdbcUrl("JdbcUrl")
+ .setLocation("Location")
+ .setReadQuery("Query")
+ .build()
+ .validate();
+ });
+ assertThrows(
+ IllegalArgumentException.class,
+ () -> {
+
JdbcReadSchemaTransformProvider.JdbcReadSchemaTransformConfiguration.builder()
+ .setDriverClassName("ClassName")
+ .setJdbcUrl("JdbcUrl")
+ .build()
+ .validate();
+ });
+ }
+
+ @Test
+ public void testRead() {
+ JdbcReadSchemaTransformProvider provider = null;
+ for (SchemaTransformProvider p :
ServiceLoader.load(SchemaTransformProvider.class)) {
+ if (p instanceof JdbcReadSchemaTransformProvider) {
+ provider = (JdbcReadSchemaTransformProvider) p;
+ break;
+ }
+ }
+ assertNotNull(provider);
+
+ PCollection<Row> output =
+ PCollectionRowTuple.empty(pipeline)
+ .apply(
+ provider
+ .from(
+
JdbcReadSchemaTransformProvider.JdbcReadSchemaTransformConfiguration
+ .builder()
+ .setDriverClassName(
+
DATA_SOURCE_CONFIGURATION.getDriverClassName().get())
+
.setJdbcUrl(DATA_SOURCE_CONFIGURATION.getUrl().get())
+ .setLocation(READ_TABLE_NAME)
+ .build())
+ .buildTransform())
+ .get("output");
+ Long expected = Long.valueOf(EXPECTED_ROW_COUNT);
+ PAssert.that(output.apply(Count.globally())).containsInAnyOrder(expected);
+ pipeline.run();
+ }
+
+ /** Create test data that is consistent with that generated by TestRow. */
+ private static void addInitialData(DataSource dataSource, String tableName)
throws SQLException {
+ try (Connection connection = dataSource.getConnection()) {
+ connection.setAutoCommit(false);
+ try (PreparedStatement preparedStatement =
+ connection.prepareStatement(String.format("insert into %s values
(?,?)", tableName))) {
+ for (int i = 0; i < EXPECTED_ROW_COUNT; i++) {
+ preparedStatement.clearParameters();
+ preparedStatement.setInt(1, i);
+ preparedStatement.setString(2, TestRow.getNameForSeed(i));
+ preparedStatement.executeUpdate();
+ }
+ }
+ connection.commit();
+ }
+ }
+}
diff --git
a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcWriteSchemaTransformProviderTest.java
b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcWriteSchemaTransformProviderTest.java
new file mode 100644
index 00000000000..7f422affda2
--- /dev/null
+++
b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcWriteSchemaTransformProviderTest.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.jdbc;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThrows;
+
+import java.sql.SQLException;
+import java.util.List;
+import java.util.ServiceLoader;
+import javax.sql.DataSource;
+import org.apache.beam.sdk.io.common.DatabaseTestHelper;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class JdbcWriteSchemaTransformProviderTest {
+
+ private static final JdbcIO.DataSourceConfiguration
DATA_SOURCE_CONFIGURATION =
+ JdbcIO.DataSourceConfiguration.create(
+ "org.apache.derby.jdbc.EmbeddedDriver",
"jdbc:derby:memory:testDB;create=true");
+ private static final DataSource DATA_SOURCE =
DATA_SOURCE_CONFIGURATION.buildDatasource();
+ private static final String WRITE_TABLE_NAME =
DatabaseTestHelper.getTestTableName("UT_WRITE");
+
+ @Rule public final transient TestPipeline pipeline = TestPipeline.create();
+
+ @BeforeClass
+ public static void beforeClass() throws Exception {
+ // by default, derby uses a lock timeout of 60 seconds. In order to speed
up the test
+ // and detect the lock faster, we decrease this timeout
+ System.setProperty("derby.locks.waitTimeout", "2");
+ System.setProperty("derby.stream.error.file", "build/derby.log");
+
+ DatabaseTestHelper.createTable(DATA_SOURCE, WRITE_TABLE_NAME);
+ }
+
+ @Test
+ public void testInvalidWriteSchemaOptions() {
+ assertThrows(
+ IllegalArgumentException.class,
+ () -> {
+
JdbcWriteSchemaTransformProvider.JdbcWriteSchemaTransformConfiguration.builder()
+ .setDriverClassName("")
+ .setJdbcUrl("")
+ .build()
+ .validate();
+ });
+ assertThrows(
+ IllegalArgumentException.class,
+ () -> {
+
JdbcWriteSchemaTransformProvider.JdbcWriteSchemaTransformConfiguration.builder()
+ .setDriverClassName("ClassName")
+ .setJdbcUrl("JdbcUrl")
+ .setLocation("Location")
+ .setWriteStatement("WriteStatement")
+ .build()
+ .validate();
+ });
+ assertThrows(
+ IllegalArgumentException.class,
+ () -> {
+
JdbcWriteSchemaTransformProvider.JdbcWriteSchemaTransformConfiguration.builder()
+ .setDriverClassName("ClassName")
+ .setJdbcUrl("JdbcUrl")
+ .build()
+ .validate();
+ });
+ }
+
+ @Test
+ public void testReadWriteToTable() throws SQLException {
+ JdbcWriteSchemaTransformProvider provider = null;
+ for (SchemaTransformProvider p :
ServiceLoader.load(SchemaTransformProvider.class)) {
+ if (p instanceof JdbcWriteSchemaTransformProvider) {
+ provider = (JdbcWriteSchemaTransformProvider) p;
+ break;
+ }
+ }
+ assertNotNull(provider);
+
+ Schema schema =
+ Schema.of(
+ Schema.Field.of("id", Schema.FieldType.INT64),
+ Schema.Field.of("name", Schema.FieldType.STRING));
+
+ List<Row> rows =
+ ImmutableList.of(
+ Row.withSchema(schema).attachValues(1L, "name1"),
+ Row.withSchema(schema).attachValues(2L, "name2"));
+
+ PCollectionRowTuple.of("input",
pipeline.apply(Create.of(rows).withRowSchema(schema)))
+ .apply(
+ provider
+ .from(
+
JdbcWriteSchemaTransformProvider.JdbcWriteSchemaTransformConfiguration.builder()
+
.setDriverClassName(DATA_SOURCE_CONFIGURATION.getDriverClassName().get())
+ .setJdbcUrl(DATA_SOURCE_CONFIGURATION.getUrl().get())
+ .setLocation(WRITE_TABLE_NAME)
+ .build())
+ .buildTransform());
+ pipeline.run();
+ DatabaseTestHelper.assertRowCount(DATA_SOURCE, WRITE_TABLE_NAME, 2);
+ }
+}