This is an automated email from the ASF dual-hosted git repository.

jbonofre pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 78919fd  [BEAM-6732] Added "Write.withResults()"
     new fe79225  Merge pull request #8310 from 
aromanenko-dev/BEAM-6732-JdbcIO-withResults
78919fd is described below

commit 78919fd8da0e1cf5f34ac8648de90a4e08afaef0
Author: Alexey Romanenko <aromanenko....@gmail.com>
AuthorDate: Fri Apr 12 18:57:00 2019 +0200

    [BEAM-6732] Added "Write.withResults()"
---
 .../java/org/apache/beam/sdk/io/jdbc/JdbcIO.java   | 104 +++++++++++++++++---
 .../org/apache/beam/sdk/io/jdbc/JdbcIOTest.java    | 105 +++++++++++++--------
 2 files changed, 155 insertions(+), 54 deletions(-)

diff --git 
a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java 
b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
index 8c824a8..985b271 100644
--- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
+++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
@@ -42,6 +42,7 @@ import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.Reshuffle;
 import org.apache.beam.sdk.transforms.SerializableFunctions;
 import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.transforms.Wait;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.util.BackOff;
 import org.apache.beam.sdk.util.BackOffUtils;
@@ -189,7 +190,11 @@ public class JdbcIO {
    * @param <T> Type of the data to be written.
    */
   public static <T> Write<T> write() {
-    return new AutoValue_JdbcIO_Write.Builder<T>()
+    return new Write();
+  }
+
+  public static <T> WriteVoid<T> writeVoid() {
+    return new AutoValue_JdbcIO_WriteVoid.Builder<T>()
         .setBatchSize(DEFAULT_BATCH_SIZE)
         .setRetryStrategy(new DefaultRetryStrategy())
         .build();
@@ -730,9 +735,81 @@ public class JdbcIO {
     boolean apply(SQLException sqlException);
   }
 
+  /**
+   * This class is used as the default return value of {@link JdbcIO#write()}.
+   *
+   * <p>All methods in this class delegate to the appropriate method of {@link 
JdbcIO.WriteVoid}.
+   */
+  public static class Write<T> extends PTransform<PCollection<T>, PDone> {
+    WriteVoid<T> inner;
+
+    Write() {
+      this(JdbcIO.<T>writeVoid());
+    }
+
+    Write(WriteVoid<T> inner) {
+      this.inner = inner;
+    }
+
+    /** See {@link 
WriteVoid#withDataSourceConfiguration(DataSourceConfiguration)}. */
+    public Write<T> withDataSourceConfiguration(DataSourceConfiguration 
config) {
+      return new Write(inner.withDataSourceConfiguration(config));
+    }
+
+    /** See {@link WriteVoid#withStatement(String)}. */
+    public Write<T> withStatement(String statement) {
+      return new Write(inner.withStatement(statement));
+    }
+
+    /** See {@link 
WriteVoid#withPreparedStatementSetter(PreparedStatementSetter)}. */
+    public Write<T> withPreparedStatementSetter(PreparedStatementSetter<T> 
setter) {
+      return new Write(inner.withPreparedStatementSetter(setter));
+    }
+
+    /** See {@link WriteVoid#withBatchSize(long)}. */
+    public Write<T> withBatchSize(long batchSize) {
+      return new Write(inner.withBatchSize(batchSize));
+    }
+
+    /** See {@link WriteVoid#withRetryStrategy(RetryStrategy)}. */
+    public Write<T> withRetryStrategy(RetryStrategy retryStrategy) {
+      return new Write(inner.withRetryStrategy(retryStrategy));
+    }
+
+    /**
+     * Returns {@link WriteVoid} transform which can be used in {@link 
Wait#on(PCollection[])} to
+     * wait until all data is written.
+     *
+     * <p>Example: write a {@link PCollection} to one database and then to 
another database, making
+     * sure that writing a window of data to the second database starts only 
after the respective
+     * window has been fully written to the first database.
+     *
+     * <pre>{@code
+     * PCollection<Void> firstWriteResults = data.apply(JdbcIO.write()
+     *     .withDataSourceConfiguration(CONF_DB_1).withResults());
+     * data.apply(Wait.on(firstWriteResults))
+     *     .apply(JdbcIO.write().withDataSourceConfiguration(CONF_DB_2));
+     * }</pre>
+     */
+    public WriteVoid<T> withResults() {
+      return inner;
+    }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      inner.populateDisplayData(builder);
+    }
+
+    @Override
+    public PDone expand(PCollection<T> input) {
+      inner.expand(input);
+      return PDone.in(input.getPipeline());
+    }
+  }
+
   /** A {@link PTransform} to write to a JDBC datasource. */
   @AutoValue
-  public abstract static class Write<T> extends PTransform<PCollection<T>, 
PDone> {
+  public abstract static class WriteVoid<T> extends PTransform<PCollection<T>, 
PCollection<Void>> {
     @Nullable
     abstract DataSourceConfiguration getDataSourceConfiguration();
 
@@ -761,22 +838,22 @@ public class JdbcIO {
 
       abstract Builder<T> setRetryStrategy(RetryStrategy deadlockPredicate);
 
-      abstract Write<T> build();
+      abstract WriteVoid<T> build();
     }
 
-    public Write<T> withDataSourceConfiguration(DataSourceConfiguration 
config) {
+    public WriteVoid<T> withDataSourceConfiguration(DataSourceConfiguration 
config) {
       return toBuilder().setDataSourceConfiguration(config).build();
     }
 
-    public Write<T> withStatement(String statement) {
+    public WriteVoid<T> withStatement(String statement) {
       return withStatement(ValueProvider.StaticValueProvider.of(statement));
     }
 
-    public Write<T> withStatement(ValueProvider<String> statement) {
+    public WriteVoid<T> withStatement(ValueProvider<String> statement) {
       return toBuilder().setStatement(statement).build();
     }
 
-    public Write<T> withPreparedStatementSetter(PreparedStatementSetter<T> 
setter) {
+    public WriteVoid<T> withPreparedStatementSetter(PreparedStatementSetter<T> 
setter) {
       return toBuilder().setPreparedStatementSetter(setter).build();
     }
 
@@ -785,7 +862,7 @@ public class JdbcIO {
      *
      * @param batchSize maximum batch size in number of statements
      */
-    public Write<T> withBatchSize(long batchSize) {
+    public WriteVoid<T> withBatchSize(long batchSize) {
       checkArgument(batchSize > 0, "batchSize must be > 0, but was %s", 
batchSize);
       return toBuilder().setBatchSize(batchSize).build();
     }
@@ -795,26 +872,25 @@ public class JdbcIO {
      * will retry the statements. If {@link RetryStrategy#apply(SQLException)} 
returns {@code true},
      * then {@link Write} retries the statements.
      */
-    public Write<T> withRetryStrategy(RetryStrategy retryStrategy) {
+    public WriteVoid<T> withRetryStrategy(RetryStrategy retryStrategy) {
       checkArgument(retryStrategy != null, "retryStrategy can not be null");
       return toBuilder().setRetryStrategy(retryStrategy).build();
     }
 
     @Override
-    public PDone expand(PCollection<T> input) {
+    public PCollection<Void> expand(PCollection<T> input) {
       checkArgument(
           getDataSourceConfiguration() != null, "withDataSourceConfiguration() 
is required");
       checkArgument(getStatement() != null, "withStatement() is required");
       checkArgument(
           getPreparedStatementSetter() != null, "withPreparedStatementSetter() 
is required");
 
-      input.apply(ParDo.of(new WriteFn<>(this)));
-      return PDone.in(input.getPipeline());
+      return input.apply(ParDo.of(new WriteFn<T>(this)));
     }
 
     private static class WriteFn<T> extends DoFn<T, Void> {
 
-      private final Write<T> spec;
+      private final WriteVoid<T> spec;
 
       private static final int MAX_RETRIES = 5;
       private static final FluentBackoff BUNDLE_WRITE_BACKOFF =
@@ -827,7 +903,7 @@ public class JdbcIO {
       private PreparedStatement preparedStatement;
       private List<T> records = new ArrayList<>();
 
-      public WriteFn(Write<T> spec) {
+      public WriteFn(WriteVoid<T> spec) {
         this.spec = spec;
       }
 
diff --git 
a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java 
b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java
index 3e45363..4d2587a 100644
--- 
a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java
+++ 
b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java
@@ -43,6 +43,7 @@ import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Count;
 import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.Wait;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.commons.dbcp2.PoolingDataSource;
@@ -276,41 +277,75 @@ public class JdbcIOTest implements Serializable {
     String tableName = DatabaseTestHelper.getTestTableName("UT_WRITE");
     DatabaseTestHelper.createTable(dataSource, tableName);
     try {
-      ArrayList<KV<Integer, String>> data = new ArrayList<>();
-      for (int i = 0; i < rowsToAdd; i++) {
-        KV<Integer, String> kv = KV.of(i, "Test");
-        data.add(kv);
-      }
-      pipeline
-          .apply(Create.of(data))
-          .apply(
-              JdbcIO.<KV<Integer, String>>write()
-                  .withDataSourceConfiguration(
-                      JdbcIO.DataSourceConfiguration.create(
-                          "org.apache.derby.jdbc.ClientDriver",
-                          "jdbc:derby://localhost:" + port + "/target/beam"))
-                  .withStatement(String.format("insert into %s values(?, ?)", 
tableName))
-                  .withBatchSize(10L)
-                  .withPreparedStatementSetter(
-                      (element, statement) -> {
-                        statement.setInt(1, element.getKey());
-                        statement.setString(2, element.getValue());
-                      }));
+      ArrayList<KV<Integer, String>> data = getDataToWrite(rowsToAdd);
+      pipeline.apply(Create.of(data)).apply(getJdbcWrite(tableName));
+
+      pipeline.run();
+
+      assertRowCount(tableName, EXPECTED_ROW_COUNT);
+    } finally {
+      DatabaseTestHelper.deleteTable(dataSource, tableName);
+    }
+  }
+
+  @Test
+  public void testWriteWithResultsAndWaitOn() throws Exception {
+    final long rowsToAdd = 1000L;
+
+    String firstTableName = DatabaseTestHelper.getTestTableName("UT_WRITE");
+    String secondTableName = 
DatabaseTestHelper.getTestTableName("UT_WRITE_AFTER_WAIT");
+    DatabaseTestHelper.createTable(dataSource, firstTableName);
+    DatabaseTestHelper.createTable(dataSource, secondTableName);
+    try {
+      ArrayList<KV<Integer, String>> data = getDataToWrite(rowsToAdd);
+
+      PCollection<KV<Integer, String>> dataCollection = 
pipeline.apply(Create.of(data));
+      PCollection<Void> rowsWritten =
+          dataCollection.apply(getJdbcWrite(firstTableName).withResults());
+      
dataCollection.apply(Wait.on(rowsWritten)).apply(getJdbcWrite(secondTableName));
 
       pipeline.run();
 
-      try (Connection connection = dataSource.getConnection()) {
-        try (Statement statement = connection.createStatement()) {
-          try (ResultSet resultSet = statement.executeQuery("select count(*) 
from " + tableName)) {
-            resultSet.next();
-            int count = resultSet.getInt(1);
+      assertRowCount(firstTableName, EXPECTED_ROW_COUNT);
+      assertRowCount(secondTableName, EXPECTED_ROW_COUNT);
+    } finally {
+      DatabaseTestHelper.deleteTable(dataSource, firstTableName);
+    }
+  }
+
+  private static JdbcIO.Write<KV<Integer, String>> getJdbcWrite(String 
tableName) {
+    return JdbcIO.<KV<Integer, String>>write()
+        .withDataSourceConfiguration(
+            JdbcIO.DataSourceConfiguration.create(
+                "org.apache.derby.jdbc.ClientDriver",
+                "jdbc:derby://localhost:" + port + "/target/beam"))
+        .withStatement(String.format("insert into %s values(?, ?)", tableName))
+        .withBatchSize(10L)
+        .withPreparedStatementSetter(
+            (element, statement) -> {
+              statement.setInt(1, element.getKey());
+              statement.setString(2, element.getValue());
+            });
+  }
 
-            Assert.assertEquals(EXPECTED_ROW_COUNT, count);
-          }
+  private static ArrayList<KV<Integer, String>> getDataToWrite(long rowsToAdd) 
{
+    ArrayList<KV<Integer, String>> data = new ArrayList<>();
+    for (int i = 0; i < rowsToAdd; i++) {
+      KV<Integer, String> kv = KV.of(i, "Test");
+      data.add(kv);
+    }
+    return data;
+  }
+
+  private static void assertRowCount(String tableName, int expectedRowCount) 
throws SQLException {
+    try (Connection connection = dataSource.getConnection()) {
+      try (Statement statement = connection.createStatement()) {
+        try (ResultSet resultSet = statement.executeQuery("select count(*) 
from " + tableName)) {
+          resultSet.next();
+          int count = resultSet.getInt(1);
+          Assert.assertEquals(expectedRowCount, count);
         }
       }
-    } finally {
-      DatabaseTestHelper.deleteTable(dataSource, tableName);
     }
   }
 
@@ -373,17 +408,7 @@ public class JdbcIOTest implements Serializable {
     // we verify the the backoff has been called thanks to the log message
     expectedLogs.verifyWarn("Deadlock detected, retrying");
 
-    try (Connection readConnection = dataSource.getConnection()) {
-      try (Statement statement = readConnection.createStatement()) {
-        try (ResultSet resultSet = statement.executeQuery("select count(*) 
from " + tableName)) {
-          resultSet.next();
-          int count = resultSet.getInt(1);
-          // here we have the record inserted by the first transaction (by 
hand), and a second one
-          // inserted by the pipeline
-          Assert.assertEquals(2, count);
-        }
-      }
-    }
+    assertRowCount(tableName, 2);
   }
 
   @After

Reply via email to