This is an automated email from the ASF dual-hosted git repository. jbonofre pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 78919fd [BEAM-6732] Added "Write.withResults()" new fe79225 Merge pull request #8310 from aromanenko-dev/BEAM-6732-JdbcIO-withResults 78919fd is described below commit 78919fd8da0e1cf5f34ac8648de90a4e08afaef0 Author: Alexey Romanenko <aromanenko....@gmail.com> AuthorDate: Fri Apr 12 18:57:00 2019 +0200 [BEAM-6732] Added "Write.withResults()" --- .../java/org/apache/beam/sdk/io/jdbc/JdbcIO.java | 104 +++++++++++++++++--- .../org/apache/beam/sdk/io/jdbc/JdbcIOTest.java | 105 +++++++++++++-------- 2 files changed, 155 insertions(+), 54 deletions(-) diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java index 8c824a8..985b271 100644 --- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java +++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java @@ -42,6 +42,7 @@ import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.Reshuffle; import org.apache.beam.sdk.transforms.SerializableFunctions; import org.apache.beam.sdk.transforms.View; +import org.apache.beam.sdk.transforms.Wait; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.util.BackOff; import org.apache.beam.sdk.util.BackOffUtils; @@ -189,7 +190,11 @@ public class JdbcIO { * @param <T> Type of the data to be written. */ public static <T> Write<T> write() { - return new AutoValue_JdbcIO_Write.Builder<T>() + return new Write(); + } + + public static <T> WriteVoid<T> writeVoid() { + return new AutoValue_JdbcIO_WriteVoid.Builder<T>() .setBatchSize(DEFAULT_BATCH_SIZE) .setRetryStrategy(new DefaultRetryStrategy()) .build(); @@ -730,9 +735,81 @@ public class JdbcIO { boolean apply(SQLException sqlException); } + /** + * This class is used as the default return value of {@link JdbcIO#write()}. + * + * <p>All methods in this class delegate to the appropriate method of {@link JdbcIO.WriteVoid}. + */ + public static class Write<T> extends PTransform<PCollection<T>, PDone> { + WriteVoid<T> inner; + + Write() { + this(JdbcIO.<T>writeVoid()); + } + + Write(WriteVoid<T> inner) { + this.inner = inner; + } + + /** See {@link WriteVoid#withDataSourceConfiguration(DataSourceConfiguration)}. */ + public Write<T> withDataSourceConfiguration(DataSourceConfiguration config) { + return new Write(inner.withDataSourceConfiguration(config)); + } + + /** See {@link WriteVoid#withStatement(String)}. */ + public Write<T> withStatement(String statement) { + return new Write(inner.withStatement(statement)); + } + + /** See {@link WriteVoid#withPreparedStatementSetter(PreparedStatementSetter)}. */ + public Write<T> withPreparedStatementSetter(PreparedStatementSetter<T> setter) { + return new Write(inner.withPreparedStatementSetter(setter)); + } + + /** See {@link WriteVoid#withBatchSize(long)}. */ + public Write<T> withBatchSize(long batchSize) { + return new Write(inner.withBatchSize(batchSize)); + } + + /** See {@link WriteVoid#withRetryStrategy(RetryStrategy)}. */ + public Write<T> withRetryStrategy(RetryStrategy retryStrategy) { + return new Write(inner.withRetryStrategy(retryStrategy)); + } + + /** + * Returns {@link WriteVoid} transform which can be used in {@link Wait#on(PCollection[])} to + * wait until all data is written. + * + * <p>Example: write a {@link PCollection} to one database and then to another database, making + * sure that writing a window of data to the second database starts only after the respective + * window has been fully written to the first database. + * + * <pre>{@code + * PCollection<Void> firstWriteResults = data.apply(JdbcIO.write() + * .withDataSourceConfiguration(CONF_DB_1).withResults()); + * data.apply(Wait.on(firstWriteResults)) + * .apply(JdbcIO.write().withDataSourceConfiguration(CONF_DB_2)); + * }</pre> + */ + public WriteVoid<T> withResults() { + return inner; + } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + inner.populateDisplayData(builder); + } + + @Override + public PDone expand(PCollection<T> input) { + inner.expand(input); + return PDone.in(input.getPipeline()); + } + } + /** A {@link PTransform} to write to a JDBC datasource. */ @AutoValue - public abstract static class Write<T> extends PTransform<PCollection<T>, PDone> { + public abstract static class WriteVoid<T> extends PTransform<PCollection<T>, PCollection<Void>> { @Nullable abstract DataSourceConfiguration getDataSourceConfiguration(); @@ -761,22 +838,22 @@ public class JdbcIO { abstract Builder<T> setRetryStrategy(RetryStrategy deadlockPredicate); - abstract Write<T> build(); + abstract WriteVoid<T> build(); } - public Write<T> withDataSourceConfiguration(DataSourceConfiguration config) { + public WriteVoid<T> withDataSourceConfiguration(DataSourceConfiguration config) { return toBuilder().setDataSourceConfiguration(config).build(); } - public Write<T> withStatement(String statement) { + public WriteVoid<T> withStatement(String statement) { return withStatement(ValueProvider.StaticValueProvider.of(statement)); } - public Write<T> withStatement(ValueProvider<String> statement) { + public WriteVoid<T> withStatement(ValueProvider<String> statement) { return toBuilder().setStatement(statement).build(); } - public Write<T> withPreparedStatementSetter(PreparedStatementSetter<T> setter) { + public WriteVoid<T> withPreparedStatementSetter(PreparedStatementSetter<T> setter) { return toBuilder().setPreparedStatementSetter(setter).build(); } @@ -785,7 +862,7 @@ public class JdbcIO { * * @param batchSize maximum batch size in number of statements */ - public Write<T> withBatchSize(long batchSize) { + public WriteVoid<T> withBatchSize(long batchSize) { checkArgument(batchSize > 0, "batchSize must be > 0, but was %s", batchSize); return toBuilder().setBatchSize(batchSize).build(); } @@ -795,26 +872,25 @@ public class JdbcIO { * will retry the statements. If {@link RetryStrategy#apply(SQLException)} returns {@code true}, * then {@link Write} retries the statements. */ - public Write<T> withRetryStrategy(RetryStrategy retryStrategy) { + public WriteVoid<T> withRetryStrategy(RetryStrategy retryStrategy) { checkArgument(retryStrategy != null, "retryStrategy can not be null"); return toBuilder().setRetryStrategy(retryStrategy).build(); } @Override - public PDone expand(PCollection<T> input) { + public PCollection<Void> expand(PCollection<T> input) { checkArgument( getDataSourceConfiguration() != null, "withDataSourceConfiguration() is required"); checkArgument(getStatement() != null, "withStatement() is required"); checkArgument( getPreparedStatementSetter() != null, "withPreparedStatementSetter() is required"); - input.apply(ParDo.of(new WriteFn<>(this))); - return PDone.in(input.getPipeline()); + return input.apply(ParDo.of(new WriteFn<T>(this))); } private static class WriteFn<T> extends DoFn<T, Void> { - private final Write<T> spec; + private final WriteVoid<T> spec; private static final int MAX_RETRIES = 5; private static final FluentBackoff BUNDLE_WRITE_BACKOFF = @@ -827,7 +903,7 @@ public class JdbcIO { private PreparedStatement preparedStatement; private List<T> records = new ArrayList<>(); - public WriteFn(Write<T> spec) { + public WriteFn(WriteVoid<T> spec) { this.spec = spec; } diff --git a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java index 3e45363..4d2587a 100644 --- a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java +++ b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java @@ -43,6 +43,7 @@ import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Count; import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.Wait; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.commons.dbcp2.PoolingDataSource; @@ -276,41 +277,75 @@ public class JdbcIOTest implements Serializable { String tableName = DatabaseTestHelper.getTestTableName("UT_WRITE"); DatabaseTestHelper.createTable(dataSource, tableName); try { - ArrayList<KV<Integer, String>> data = new ArrayList<>(); - for (int i = 0; i < rowsToAdd; i++) { - KV<Integer, String> kv = KV.of(i, "Test"); - data.add(kv); - } - pipeline - .apply(Create.of(data)) - .apply( - JdbcIO.<KV<Integer, String>>write() - .withDataSourceConfiguration( - JdbcIO.DataSourceConfiguration.create( - "org.apache.derby.jdbc.ClientDriver", - "jdbc:derby://localhost:" + port + "/target/beam")) - .withStatement(String.format("insert into %s values(?, ?)", tableName)) - .withBatchSize(10L) - .withPreparedStatementSetter( - (element, statement) -> { - statement.setInt(1, element.getKey()); - statement.setString(2, element.getValue()); - })); + ArrayList<KV<Integer, String>> data = getDataToWrite(rowsToAdd); + pipeline.apply(Create.of(data)).apply(getJdbcWrite(tableName)); + + pipeline.run(); + + assertRowCount(tableName, EXPECTED_ROW_COUNT); + } finally { + DatabaseTestHelper.deleteTable(dataSource, tableName); + } + } + + @Test + public void testWriteWithResultsAndWaitOn() throws Exception { + final long rowsToAdd = 1000L; + + String firstTableName = DatabaseTestHelper.getTestTableName("UT_WRITE"); + String secondTableName = DatabaseTestHelper.getTestTableName("UT_WRITE_AFTER_WAIT"); + DatabaseTestHelper.createTable(dataSource, firstTableName); + DatabaseTestHelper.createTable(dataSource, secondTableName); + try { + ArrayList<KV<Integer, String>> data = getDataToWrite(rowsToAdd); + + PCollection<KV<Integer, String>> dataCollection = pipeline.apply(Create.of(data)); + PCollection<Void> rowsWritten = + dataCollection.apply(getJdbcWrite(firstTableName).withResults()); + dataCollection.apply(Wait.on(rowsWritten)).apply(getJdbcWrite(secondTableName)); pipeline.run(); - try (Connection connection = dataSource.getConnection()) { - try (Statement statement = connection.createStatement()) { - try (ResultSet resultSet = statement.executeQuery("select count(*) from " + tableName)) { - resultSet.next(); - int count = resultSet.getInt(1); + assertRowCount(firstTableName, EXPECTED_ROW_COUNT); + assertRowCount(secondTableName, EXPECTED_ROW_COUNT); + } finally { + DatabaseTestHelper.deleteTable(dataSource, firstTableName); + } + } + + private static JdbcIO.Write<KV<Integer, String>> getJdbcWrite(String tableName) { + return JdbcIO.<KV<Integer, String>>write() + .withDataSourceConfiguration( + JdbcIO.DataSourceConfiguration.create( + "org.apache.derby.jdbc.ClientDriver", + "jdbc:derby://localhost:" + port + "/target/beam")) + .withStatement(String.format("insert into %s values(?, ?)", tableName)) + .withBatchSize(10L) + .withPreparedStatementSetter( + (element, statement) -> { + statement.setInt(1, element.getKey()); + statement.setString(2, element.getValue()); + }); + } - Assert.assertEquals(EXPECTED_ROW_COUNT, count); - } + private static ArrayList<KV<Integer, String>> getDataToWrite(long rowsToAdd) { + ArrayList<KV<Integer, String>> data = new ArrayList<>(); + for (int i = 0; i < rowsToAdd; i++) { + KV<Integer, String> kv = KV.of(i, "Test"); + data.add(kv); + } + return data; + } + + private static void assertRowCount(String tableName, int expectedRowCount) throws SQLException { + try (Connection connection = dataSource.getConnection()) { + try (Statement statement = connection.createStatement()) { + try (ResultSet resultSet = statement.executeQuery("select count(*) from " + tableName)) { + resultSet.next(); + int count = resultSet.getInt(1); + Assert.assertEquals(expectedRowCount, count); } } - } finally { - DatabaseTestHelper.deleteTable(dataSource, tableName); } } @@ -373,17 +408,7 @@ public class JdbcIOTest implements Serializable { // we verify the the backoff has been called thanks to the log message expectedLogs.verifyWarn("Deadlock detected, retrying"); - try (Connection readConnection = dataSource.getConnection()) { - try (Statement statement = readConnection.createStatement()) { - try (ResultSet resultSet = statement.executeQuery("select count(*) from " + tableName)) { - resultSet.next(); - int count = resultSet.getInt(1); - // here we have the record inserted by the first transaction (by hand), and a second one - // inserted by the pipeline - Assert.assertEquals(2, count); - } - } - } + assertRowCount(tableName, 2); } @After