This is an automated email from the ASF dual-hosted git repository.
jbonofre pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 78919fd [BEAM-6732] Added "Write.withResults()"
new fe79225 Merge pull request #8310 from
aromanenko-dev/BEAM-6732-JdbcIO-withResults
78919fd is described below
commit 78919fd8da0e1cf5f34ac8648de90a4e08afaef0
Author: Alexey Romanenko <[email protected]>
AuthorDate: Fri Apr 12 18:57:00 2019 +0200
[BEAM-6732] Added "Write.withResults()"
---
.../java/org/apache/beam/sdk/io/jdbc/JdbcIO.java | 104 +++++++++++++++++---
.../org/apache/beam/sdk/io/jdbc/JdbcIOTest.java | 105 +++++++++++++--------
2 files changed, 155 insertions(+), 54 deletions(-)
diff --git
a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
index 8c824a8..985b271 100644
--- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
+++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
@@ -42,6 +42,7 @@ import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Reshuffle;
import org.apache.beam.sdk.transforms.SerializableFunctions;
import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.transforms.Wait;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.util.BackOff;
import org.apache.beam.sdk.util.BackOffUtils;
@@ -189,7 +190,11 @@ public class JdbcIO {
* @param <T> Type of the data to be written.
*/
public static <T> Write<T> write() {
- return new AutoValue_JdbcIO_Write.Builder<T>()
+ return new Write();
+ }
+
+ public static <T> WriteVoid<T> writeVoid() {
+ return new AutoValue_JdbcIO_WriteVoid.Builder<T>()
.setBatchSize(DEFAULT_BATCH_SIZE)
.setRetryStrategy(new DefaultRetryStrategy())
.build();
@@ -730,9 +735,81 @@ public class JdbcIO {
boolean apply(SQLException sqlException);
}
+ /**
+ * This class is used as the default return value of {@link JdbcIO#write()}.
+ *
+ * <p>All methods in this class delegate to the appropriate method of {@link
JdbcIO.WriteVoid}.
+ */
+ public static class Write<T> extends PTransform<PCollection<T>, PDone> {
+ WriteVoid<T> inner;
+
+ Write() {
+ this(JdbcIO.<T>writeVoid());
+ }
+
+ Write(WriteVoid<T> inner) {
+ this.inner = inner;
+ }
+
+ /** See {@link
WriteVoid#withDataSourceConfiguration(DataSourceConfiguration)}. */
+ public Write<T> withDataSourceConfiguration(DataSourceConfiguration
config) {
+ return new Write(inner.withDataSourceConfiguration(config));
+ }
+
+ /** See {@link WriteVoid#withStatement(String)}. */
+ public Write<T> withStatement(String statement) {
+ return new Write(inner.withStatement(statement));
+ }
+
+ /** See {@link
WriteVoid#withPreparedStatementSetter(PreparedStatementSetter)}. */
+ public Write<T> withPreparedStatementSetter(PreparedStatementSetter<T>
setter) {
+ return new Write(inner.withPreparedStatementSetter(setter));
+ }
+
+ /** See {@link WriteVoid#withBatchSize(long)}. */
+ public Write<T> withBatchSize(long batchSize) {
+ return new Write(inner.withBatchSize(batchSize));
+ }
+
+ /** See {@link WriteVoid#withRetryStrategy(RetryStrategy)}. */
+ public Write<T> withRetryStrategy(RetryStrategy retryStrategy) {
+ return new Write(inner.withRetryStrategy(retryStrategy));
+ }
+
+ /**
+ * Returns {@link WriteVoid} transform which can be used in {@link
Wait#on(PCollection[])} to
+ * wait until all data is written.
+ *
+ * <p>Example: write a {@link PCollection} to one database and then to
another database, making
+ * sure that writing a window of data to the second database starts only
after the respective
+ * window has been fully written to the first database.
+ *
+ * <pre>{@code
+ * PCollection<Void> firstWriteResults = data.apply(JdbcIO.write()
+ * .withDataSourceConfiguration(CONF_DB_1).withResults());
+ * data.apply(Wait.on(firstWriteResults))
+ * .apply(JdbcIO.write().withDataSourceConfiguration(CONF_DB_2));
+ * }</pre>
+ */
+ public WriteVoid<T> withResults() {
+ return inner;
+ }
+
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ inner.populateDisplayData(builder);
+ }
+
+ @Override
+ public PDone expand(PCollection<T> input) {
+ inner.expand(input);
+ return PDone.in(input.getPipeline());
+ }
+ }
+
/** A {@link PTransform} to write to a JDBC datasource. */
@AutoValue
- public abstract static class Write<T> extends PTransform<PCollection<T>,
PDone> {
+ public abstract static class WriteVoid<T> extends PTransform<PCollection<T>,
PCollection<Void>> {
@Nullable
abstract DataSourceConfiguration getDataSourceConfiguration();
@@ -761,22 +838,22 @@ public class JdbcIO {
abstract Builder<T> setRetryStrategy(RetryStrategy deadlockPredicate);
- abstract Write<T> build();
+ abstract WriteVoid<T> build();
}
- public Write<T> withDataSourceConfiguration(DataSourceConfiguration
config) {
+ public WriteVoid<T> withDataSourceConfiguration(DataSourceConfiguration
config) {
return toBuilder().setDataSourceConfiguration(config).build();
}
- public Write<T> withStatement(String statement) {
+ public WriteVoid<T> withStatement(String statement) {
return withStatement(ValueProvider.StaticValueProvider.of(statement));
}
- public Write<T> withStatement(ValueProvider<String> statement) {
+ public WriteVoid<T> withStatement(ValueProvider<String> statement) {
return toBuilder().setStatement(statement).build();
}
- public Write<T> withPreparedStatementSetter(PreparedStatementSetter<T>
setter) {
+ public WriteVoid<T> withPreparedStatementSetter(PreparedStatementSetter<T>
setter) {
return toBuilder().setPreparedStatementSetter(setter).build();
}
@@ -785,7 +862,7 @@ public class JdbcIO {
*
* @param batchSize maximum batch size in number of statements
*/
- public Write<T> withBatchSize(long batchSize) {
+ public WriteVoid<T> withBatchSize(long batchSize) {
checkArgument(batchSize > 0, "batchSize must be > 0, but was %s",
batchSize);
return toBuilder().setBatchSize(batchSize).build();
}
@@ -795,26 +872,25 @@ public class JdbcIO {
* will retry the statements. If {@link RetryStrategy#apply(SQLException)}
returns {@code true},
* then {@link Write} retries the statements.
*/
- public Write<T> withRetryStrategy(RetryStrategy retryStrategy) {
+ public WriteVoid<T> withRetryStrategy(RetryStrategy retryStrategy) {
checkArgument(retryStrategy != null, "retryStrategy can not be null");
return toBuilder().setRetryStrategy(retryStrategy).build();
}
@Override
- public PDone expand(PCollection<T> input) {
+ public PCollection<Void> expand(PCollection<T> input) {
checkArgument(
getDataSourceConfiguration() != null, "withDataSourceConfiguration()
is required");
checkArgument(getStatement() != null, "withStatement() is required");
checkArgument(
getPreparedStatementSetter() != null, "withPreparedStatementSetter()
is required");
- input.apply(ParDo.of(new WriteFn<>(this)));
- return PDone.in(input.getPipeline());
+ return input.apply(ParDo.of(new WriteFn<T>(this)));
}
private static class WriteFn<T> extends DoFn<T, Void> {
- private final Write<T> spec;
+ private final WriteVoid<T> spec;
private static final int MAX_RETRIES = 5;
private static final FluentBackoff BUNDLE_WRITE_BACKOFF =
@@ -827,7 +903,7 @@ public class JdbcIO {
private PreparedStatement preparedStatement;
private List<T> records = new ArrayList<>();
- public WriteFn(Write<T> spec) {
+ public WriteFn(WriteVoid<T> spec) {
this.spec = spec;
}
diff --git
a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java
b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java
index 3e45363..4d2587a 100644
---
a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java
+++
b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java
@@ -43,6 +43,7 @@ import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.Wait;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.commons.dbcp2.PoolingDataSource;
@@ -276,41 +277,75 @@ public class JdbcIOTest implements Serializable {
String tableName = DatabaseTestHelper.getTestTableName("UT_WRITE");
DatabaseTestHelper.createTable(dataSource, tableName);
try {
- ArrayList<KV<Integer, String>> data = new ArrayList<>();
- for (int i = 0; i < rowsToAdd; i++) {
- KV<Integer, String> kv = KV.of(i, "Test");
- data.add(kv);
- }
- pipeline
- .apply(Create.of(data))
- .apply(
- JdbcIO.<KV<Integer, String>>write()
- .withDataSourceConfiguration(
- JdbcIO.DataSourceConfiguration.create(
- "org.apache.derby.jdbc.ClientDriver",
- "jdbc:derby://localhost:" + port + "/target/beam"))
- .withStatement(String.format("insert into %s values(?, ?)",
tableName))
- .withBatchSize(10L)
- .withPreparedStatementSetter(
- (element, statement) -> {
- statement.setInt(1, element.getKey());
- statement.setString(2, element.getValue());
- }));
+ ArrayList<KV<Integer, String>> data = getDataToWrite(rowsToAdd);
+ pipeline.apply(Create.of(data)).apply(getJdbcWrite(tableName));
+
+ pipeline.run();
+
+ assertRowCount(tableName, EXPECTED_ROW_COUNT);
+ } finally {
+ DatabaseTestHelper.deleteTable(dataSource, tableName);
+ }
+ }
+
+ @Test
+ public void testWriteWithResultsAndWaitOn() throws Exception {
+ final long rowsToAdd = 1000L;
+
+ String firstTableName = DatabaseTestHelper.getTestTableName("UT_WRITE");
+ String secondTableName =
DatabaseTestHelper.getTestTableName("UT_WRITE_AFTER_WAIT");
+ DatabaseTestHelper.createTable(dataSource, firstTableName);
+ DatabaseTestHelper.createTable(dataSource, secondTableName);
+ try {
+ ArrayList<KV<Integer, String>> data = getDataToWrite(rowsToAdd);
+
+ PCollection<KV<Integer, String>> dataCollection =
pipeline.apply(Create.of(data));
+ PCollection<Void> rowsWritten =
+ dataCollection.apply(getJdbcWrite(firstTableName).withResults());
+
dataCollection.apply(Wait.on(rowsWritten)).apply(getJdbcWrite(secondTableName));
pipeline.run();
- try (Connection connection = dataSource.getConnection()) {
- try (Statement statement = connection.createStatement()) {
- try (ResultSet resultSet = statement.executeQuery("select count(*)
from " + tableName)) {
- resultSet.next();
- int count = resultSet.getInt(1);
+ assertRowCount(firstTableName, EXPECTED_ROW_COUNT);
+ assertRowCount(secondTableName, EXPECTED_ROW_COUNT);
+ } finally {
+ DatabaseTestHelper.deleteTable(dataSource, firstTableName);
+ }
+ }
+
+ private static JdbcIO.Write<KV<Integer, String>> getJdbcWrite(String
tableName) {
+ return JdbcIO.<KV<Integer, String>>write()
+ .withDataSourceConfiguration(
+ JdbcIO.DataSourceConfiguration.create(
+ "org.apache.derby.jdbc.ClientDriver",
+ "jdbc:derby://localhost:" + port + "/target/beam"))
+ .withStatement(String.format("insert into %s values(?, ?)", tableName))
+ .withBatchSize(10L)
+ .withPreparedStatementSetter(
+ (element, statement) -> {
+ statement.setInt(1, element.getKey());
+ statement.setString(2, element.getValue());
+ });
+ }
- Assert.assertEquals(EXPECTED_ROW_COUNT, count);
- }
+ private static ArrayList<KV<Integer, String>> getDataToWrite(long rowsToAdd)
{
+ ArrayList<KV<Integer, String>> data = new ArrayList<>();
+ for (int i = 0; i < rowsToAdd; i++) {
+ KV<Integer, String> kv = KV.of(i, "Test");
+ data.add(kv);
+ }
+ return data;
+ }
+
+ private static void assertRowCount(String tableName, int expectedRowCount)
throws SQLException {
+ try (Connection connection = dataSource.getConnection()) {
+ try (Statement statement = connection.createStatement()) {
+ try (ResultSet resultSet = statement.executeQuery("select count(*)
from " + tableName)) {
+ resultSet.next();
+ int count = resultSet.getInt(1);
+ Assert.assertEquals(expectedRowCount, count);
}
}
- } finally {
- DatabaseTestHelper.deleteTable(dataSource, tableName);
}
}
@@ -373,17 +408,7 @@ public class JdbcIOTest implements Serializable {
// we verify the the backoff has been called thanks to the log message
expectedLogs.verifyWarn("Deadlock detected, retrying");
- try (Connection readConnection = dataSource.getConnection()) {
- try (Statement statement = readConnection.createStatement()) {
- try (ResultSet resultSet = statement.executeQuery("select count(*)
from " + tableName)) {
- resultSet.next();
- int count = resultSet.getInt(1);
- // here we have the record inserted by the first transaction (by
hand), and a second one
- // inserted by the pipeline
- Assert.assertEquals(2, count);
- }
- }
- }
+ assertRowCount(tableName, 2);
}
@After