This is an automated email from the ASF dual-hosted git repository.

jbonofre pushed a commit to branch release-2.3.0
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/release-2.3.0 by this push:
     new e91a03f  [BEAM-793] Add backoff support in JdbcIO Write
e91a03f is described below

commit e91a03f096fe0dda7fd444a3389cbe1630b11a39
Author: Jean-Baptiste Onofré <[email protected]>
AuthorDate: Fri Jan 26 13:12:12 2018 +0100

    [BEAM-793] Add backoff support in JdbcIO Write
---
 sdks/java/io/jdbc/build.gradle                     |   1 +
 sdks/java/io/jdbc/pom.xml                          |  21 +++-
 .../java/org/apache/beam/sdk/io/jdbc/JdbcIO.java   | 138 ++++++++++++++++-----
 .../org/apache/beam/sdk/io/jdbc/JdbcIOTest.java    | 118 +++++++++++++++---
 4 files changed, 227 insertions(+), 51 deletions(-)

diff --git a/sdks/java/io/jdbc/build.gradle b/sdks/java/io/jdbc/build.gradle
index ab488c9..d610a4e 100644
--- a/sdks/java/io/jdbc/build.gradle
+++ b/sdks/java/io/jdbc/build.gradle
@@ -35,6 +35,7 @@ dependencies {
   shadow project(path: ":sdks:java:core", configuration: "shadow")
   shadow library.java.findbugs_jsr305
   shadow "org.apache.commons:commons-dbcp2:2.1.1"
+  testCompile project(path: ":sdks:java:core", configuration: "shadowTest")
   testCompile project(path: ":runners:direct-java", configuration: "shadow")
   testCompile project(path: ":sdks:java:io:common", configuration: "shadow")
   testCompile project(":sdks:java:io:common").sourceSets.test.output
diff --git a/sdks/java/io/jdbc/pom.xml b/sdks/java/io/jdbc/pom.xml
index 6264e9e..3af599e 100644
--- a/sdks/java/io/jdbc/pom.xml
+++ b/sdks/java/io/jdbc/pom.xml
@@ -264,6 +264,11 @@
     </dependency>
 
     <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+    </dependency>
+
+    <dependency>
       <groupId>com.google.code.findbugs</groupId>
       <artifactId>jsr305</artifactId>
     </dependency>
@@ -274,6 +279,11 @@
       <version>2.1.1</version>
     </dependency>
 
+    <dependency>
+      <groupId>joda-time</groupId>
+      <artifactId>joda-time</artifactId>
+    </dependency>
+
     <!-- compile dependencies -->
     <dependency>
       <groupId>com.google.auto.value</groupId>
@@ -317,11 +327,6 @@
     </dependency>
     <dependency>
       <groupId>org.slf4j</groupId>
-      <artifactId>slf4j-api</artifactId>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.slf4j</groupId>
       <artifactId>slf4j-jdk14</artifactId>
       <scope>test</scope>
     </dependency>
@@ -332,6 +337,12 @@
     </dependency>
     <dependency>
       <groupId>org.apache.beam</groupId>
+      <artifactId>beam-sdks-java-core</artifactId>
+      <scope>test</scope>
+      <classifier>tests</classifier>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.beam</groupId>
       <artifactId>beam-sdks-java-io-common</artifactId>
       <scope>test</scope>
       <classifier>tests</classifier>
diff --git 
a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java 
b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
index 8b47aa9..1a6b54b 100644
--- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
+++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
@@ -20,11 +20,15 @@ package org.apache.beam.sdk.io.jdbc;
 import static com.google.common.base.Preconditions.checkArgument;
 
 import com.google.auto.value.AutoValue;
+import java.io.IOException;
 import java.io.Serializable;
 import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.Predicate;
 import javax.annotation.Nullable;
 import javax.sql.DataSource;
 import org.apache.beam.sdk.annotations.Experimental;
@@ -39,11 +43,18 @@ import org.apache.beam.sdk.transforms.Reshuffle;
 import org.apache.beam.sdk.transforms.SerializableFunctions;
 import org.apache.beam.sdk.transforms.View;
 import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.util.BackOff;
+import org.apache.beam.sdk.util.BackOffUtils;
+import org.apache.beam.sdk.util.FluentBackoff;
+import org.apache.beam.sdk.util.Sleeper;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.PDone;
 import org.apache.commons.dbcp2.BasicDataSource;
+import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * IO to read and write data on JDBC.
@@ -134,6 +145,9 @@ import org.apache.commons.dbcp2.BasicDataSource;
  */
 @Experimental(Experimental.Kind.SOURCE_SINK)
 public class JdbcIO {
+
+  private static final Logger LOG = LoggerFactory.getLogger(JdbcIO.class);
+
   /**
    * Read data from a JDBC datasource.
    *
@@ -164,9 +178,22 @@ public class JdbcIO {
   public static <T> Write<T> write() {
     return new AutoValue_JdbcIO_Write.Builder<T>()
             .setBatchSize(DEFAULT_BATCH_SIZE)
+            .setRetryStrategy(new DefaultRetryStrategy())
             .build();
   }
 
+  /**
+   * This is the default {@link Predicate} we use to detect DeadLock.
+   * It basically test if the {@link SQLException#getSQLState()} equals 40001.
+   * 40001 is the SQL State used by most of database to identify deadlock.
+   */
+  public static class DefaultRetryStrategy implements RetryStrategy {
+    @Override
+    public boolean apply(SQLException e) {
+      return (e.getSQLState().equals("40001"));
+    }
+  }
+
   private JdbcIO() {}
 
   /**
@@ -506,6 +533,16 @@ public class JdbcIO {
     void setParameters(T element, PreparedStatement preparedStatement) throws 
Exception;
   }
 
+  /**
+   * An interface used to control if we retry the statements when a {@link 
SQLException} occurs.
+   * If {@link RetryStrategy#apply(SQLException)} returns true, {@link Write} 
tries
+   * to replay the statements.
+   */
+  @FunctionalInterface
+  public interface RetryStrategy extends Serializable {
+    boolean apply(SQLException sqlException);
+  }
+
   /** A {@link PTransform} to write to a JDBC datasource. */
   @AutoValue
   public abstract static class Write<T> extends PTransform<PCollection<T>, 
PDone> {
@@ -513,6 +550,7 @@ public class JdbcIO {
     @Nullable abstract String getStatement();
     abstract long getBatchSize();
     @Nullable abstract PreparedStatementSetter<T> getPreparedStatementSetter();
+    @Nullable abstract RetryStrategy getRetryStrategy();
 
     abstract Builder<T> toBuilder();
 
@@ -522,6 +560,7 @@ public class JdbcIO {
       abstract Builder<T> setStatement(String statement);
       abstract Builder<T> setBatchSize(long batchSize);
       abstract Builder<T> 
setPreparedStatementSetter(PreparedStatementSetter<T> setter);
+      abstract Builder<T> setRetryStrategy(RetryStrategy deadlockPredicate);
 
       abstract Write<T> build();
     }
@@ -540,13 +579,23 @@ public class JdbcIO {
      * Provide a maximum size in number of SQL statenebt for the batch. 
Default is 1000.
      *
      * @param batchSize maximum batch size in number of statements
-     * @return the {@link Write} with connection batch size set
      */
     public Write<T> withBatchSize(long batchSize) {
       checkArgument(batchSize > 0, "batchSize must be > 0, but was %d", 
batchSize);
       return toBuilder().setBatchSize(batchSize).build();
     }
 
+    /**
+     * When a SQL exception occurs, {@link Write} uses this {@link 
RetryStrategy} to determine
+     * if it will retry the statements.
+     * If {@link RetryStrategy#apply(SQLException)} returns {@code true},
+     * then {@link Write} retries the statements.
+     */
+    public Write<T> withRetryStrategy(RetryStrategy retryStrategy) {
+      checkArgument(retryStrategy != null, "retryStrategy can not be null");
+      return toBuilder().setRetryStrategy(retryStrategy).build();
+    }
+
     @Override
     public PDone expand(PCollection<T> input) {
       checkArgument(
@@ -563,10 +612,14 @@ public class JdbcIO {
 
       private final Write<T> spec;
 
+      private static final int MAX_RETRIES = 5;
+      private static final FluentBackoff BUNDLE_WRITE_BACKOFF =
+              FluentBackoff.DEFAULT
+                      
.withMaxRetries(MAX_RETRIES).withInitialBackoff(Duration.standardSeconds(5));
+
       private DataSource dataSource;
       private Connection connection;
-      private PreparedStatement preparedStatement;
-      private int batchCount;
+      private List<T> records = new ArrayList<>();
 
       public WriteFn(Write<T> spec) {
         this.spec = spec;
@@ -577,55 +630,78 @@ public class JdbcIO {
         dataSource = spec.getDataSourceConfiguration().buildDatasource();
         connection = dataSource.getConnection();
         connection.setAutoCommit(false);
-        preparedStatement = connection.prepareStatement(spec.getStatement());
-      }
-
-      @StartBundle
-      public void startBundle() {
-        batchCount = 0;
       }
 
       @ProcessElement
       public void processElement(ProcessContext context) throws Exception {
         T record = context.element();
 
-        preparedStatement.clearParameters();
-        spec.getPreparedStatementSetter().setParameters(record, 
preparedStatement);
-        preparedStatement.addBatch();
+        records.add(record);
 
-        batchCount++;
-
-        if (batchCount >= spec.getBatchSize()) {
+        if (records.size() >= spec.getBatchSize()) {
           executeBatch();
         }
       }
 
+      private void processRecord(T record, PreparedStatement 
preparedStatement) {
+        try {
+          preparedStatement.clearParameters();
+          spec.getPreparedStatementSetter().setParameters(record, 
preparedStatement);
+          preparedStatement.addBatch();
+        } catch (Exception e) {
+          throw new RuntimeException(e);
+        }
+      }
+
       @FinishBundle
       public void finishBundle() throws Exception {
         executeBatch();
       }
 
-      private void executeBatch() throws SQLException {
-        if (batchCount > 0) {
-          preparedStatement.executeBatch();
-          connection.commit();
-          batchCount = 0;
+      private void executeBatch() throws SQLException, IOException, 
InterruptedException {
+        if (records.size() == 0) {
+          return;
+        }
+        Sleeper sleeper = Sleeper.DEFAULT;
+        BackOff backoff = BUNDLE_WRITE_BACKOFF.backoff();
+        while (true) {
+          try (PreparedStatement preparedStatement =
+                       connection.prepareStatement(spec.getStatement())) {
+            try {
+              // add each record in the statement batch
+              for (T record : records) {
+                processRecord(record, preparedStatement);
+              }
+              // execute the batch
+              preparedStatement.executeBatch();
+              // commit the changes
+              connection.commit();
+              break;
+            } catch (SQLException exception) {
+              if (!spec.getRetryStrategy().apply(exception)) {
+                throw exception;
+              }
+              LOG.warn("Deadlock detected, retrying", exception);
+              // clean up the statement batch and the connection state
+              preparedStatement.clearBatch();
+              connection.rollback();
+              if (!BackOffUtils.next(sleeper, backoff)) {
+                // we tried the max number of times
+                throw exception;
+              }
+            }
+          }
         }
+        records.clear();
       }
 
       @Teardown
       public void teardown() throws Exception {
-        try {
-          if (preparedStatement != null) {
-            preparedStatement.close();
-          }
-        } finally {
-          if (connection != null) {
-            connection.close();
-          }
-          if (dataSource instanceof AutoCloseable) {
-            ((AutoCloseable) dataSource).close();
-          }
+        if (connection != null) {
+          connection.close();
+        }
+        if (dataSource instanceof AutoCloseable) {
+          ((AutoCloseable) dataSource).close();
         }
       }
     }
diff --git 
a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java 
b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java
index beb3685..8e5ff79 100644
--- 
a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java
+++ 
b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java
@@ -39,6 +39,7 @@ import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.io.common.DatabaseTestHelper;
 import org.apache.beam.sdk.io.common.TestRow;
+import org.apache.beam.sdk.testing.ExpectedLogs;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Count;
@@ -47,6 +48,7 @@ import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.derby.drda.NetworkServerControl;
 import org.apache.derby.jdbc.ClientDataSource;
+import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
@@ -59,8 +61,10 @@ import org.slf4j.LoggerFactory;
  * Test on the JdbcIO.
  */
 public class JdbcIOTest implements Serializable {
+
   private static final Logger LOG = LoggerFactory.getLogger(JdbcIOTest.class);
   public static final int EXPECTED_ROW_COUNT = 1000;
+  public static final String BACKOFF_TABLE = "UT_WRITE_BACKOFF";
 
   private static NetworkServerControl derbyServer;
   private static ClientDataSource dataSource;
@@ -71,6 +75,9 @@ public class JdbcIOTest implements Serializable {
   @Rule
   public final transient TestPipeline pipeline = TestPipeline.create();
 
+  @Rule
+  public final transient ExpectedLogs expectedLogs = 
ExpectedLogs.none(JdbcIO.class);
+
   @BeforeClass
   public static void startDatabase() throws Exception {
     ServerSocket socket = new ServerSocket(0);
@@ -79,6 +86,9 @@ public class JdbcIOTest implements Serializable {
 
     LOG.info("Starting Derby database on {}", port);
 
+    // by default, derby uses a lock timeout of 60 seconds. In order to speed 
up the test
+    // and detect the lock faster, we decrease this timeout
+    System.setProperty("derby.locks.waitTimeout", "2");
     System.setProperty("derby.stream.error.file", "target/derby.log");
 
     derbyServer = new NetworkServerControl(InetAddress.getByName("localhost"), 
port);
@@ -222,25 +232,25 @@ public class JdbcIOTest implements Serializable {
     pipeline.run();
   }
 
-   @Test
-   public void testReadWithSingleStringParameter() throws Exception {
+  @Test
+  public void testReadWithSingleStringParameter() throws Exception {
     PCollection<TestRow> rows =
-        pipeline.apply(
-            JdbcIO.<TestRow>read()
-                
.withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(dataSource))
-                .withQuery(String.format("select name,id from %s where name = 
?", readTableName))
-                .withStatementPreparator(
-                    (preparedStatement) -> preparedStatement.setString(1, 
getNameForSeed(1)))
-                .withRowMapper(new JdbcTestHelper.CreateTestRowOfNameAndId())
-                .withCoder(SerializableCoder.of(TestRow.class)));
+      pipeline.apply(
+        JdbcIO.<TestRow>read()
+          
.withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(dataSource))
+          .withQuery(String.format("select name,id from %s where name = ?", 
readTableName))
+          .withStatementPreparator(
+            (preparedStatement) -> preparedStatement.setString(1, 
getNameForSeed(1)))
+          .withRowMapper(new JdbcTestHelper.CreateTestRowOfNameAndId())
+          .withCoder(SerializableCoder.of(TestRow.class)));
 
     PAssert.thatSingleton(rows.apply("Count All", 
Count.globally())).isEqualTo(1L);
 
-     Iterable<TestRow> expectedValues = 
Collections.singletonList(TestRow.fromSeed(1));
-     PAssert.that(rows).containsInAnyOrder(expectedValues);
+    Iterable<TestRow> expectedValues = 
Collections.singletonList(TestRow.fromSeed(1));
+    PAssert.that(rows).containsInAnyOrder(expectedValues);
 
-     pipeline.run();
-   }
+    pipeline.run();
+  }
 
   @Test
   public void testWrite() throws Exception {
@@ -275,7 +285,7 @@ public class JdbcIOTest implements Serializable {
       try (Connection connection = dataSource.getConnection()) {
         try (Statement statement = connection.createStatement()) {
           try (ResultSet resultSet = statement.executeQuery("select count(*) 
from "
-                + tableName)) {
+              + tableName)) {
             resultSet.next();
             int count = resultSet.getInt(1);
 
@@ -289,6 +299,84 @@ public class JdbcIOTest implements Serializable {
   }
 
   @Test
+  public void testWriteWithBackoff() throws Exception {
+    String tableName = DatabaseTestHelper.getTestTableName("UT_WRITE_BACKOFF");
+    DatabaseTestHelper.createTable(dataSource, tableName);
+
+    // lock table
+    Connection connection = dataSource.getConnection();
+    Statement lockStatement = connection.createStatement();
+    lockStatement.execute("ALTER TABLE " + tableName + " LOCKSIZE TABLE");
+    lockStatement.execute("LOCK TABLE " + tableName + " IN EXCLUSIVE MODE");
+
+    // start a first transaction
+    connection.setAutoCommit(false);
+    PreparedStatement insertStatement =
+        connection.prepareStatement("insert into " + tableName + " values(?, 
?)");
+    insertStatement.setInt(1, 1);
+    insertStatement.setString(2, "TEST");
+    insertStatement.execute();
+
+    // try to write to this table
+    pipeline
+        .apply(Create.of(Collections.singletonList(KV.of(1, "TEST"))))
+        .apply(
+            JdbcIO.<KV<Integer, String>>write()
+                .withDataSourceConfiguration(
+                    JdbcIO.DataSourceConfiguration.create(
+                        "org.apache.derby.jdbc.ClientDriver",
+                        "jdbc:derby://localhost:" + port + "/target/beam"))
+                .withStatement(String.format("insert into %s values(?, ?)", 
tableName))
+                .withRetryStrategy((JdbcIO.RetryStrategy) e -> {
+                  return e.getSQLState().equals("XJ208"); // we fake a 
deadlock with a lock here
+                })
+                .withPreparedStatementSetter(
+                    (element, statement) -> {
+                      statement.setInt(1, element.getKey());
+                      statement.setString(2, element.getValue());
+                    }));
+
+    // starting a thread to perform the commit later, while the pipeline is 
running into the backoff
+    Thread commitThread = new Thread(() -> {
+      try {
+        Thread.sleep(10000);
+        connection.commit();
+      } catch (Exception e) {
+        // nothing to do
+      }
+    });
+    commitThread.start();
+    pipeline.run();
+    commitThread.join();
+
+    // we verify the the backoff has been called thanks to the log message
+    expectedLogs.verifyWarn("Deadlock detected, retrying");
+
+    try (Connection readConnection = dataSource.getConnection()) {
+      try (Statement statement = readConnection.createStatement()) {
+        try (ResultSet resultSet = statement.executeQuery("select count(*) 
from "
+            + tableName)) {
+          resultSet.next();
+          int count = resultSet.getInt(1);
+          // here we have the record inserted by the first transaction (by 
hand), and a second one
+          // inserted by the pipeline
+          Assert.assertEquals(2, count);
+        }
+      }
+    }
+
+  }
+
+  @After
+  public void tearDown() {
+    try {
+      DatabaseTestHelper.deleteTable(dataSource, BACKOFF_TABLE);
+    } catch (Exception e) {
+      // nothing to do
+    }
+  }
+
+  @Test
   public void testWriteWithEmptyPCollection() throws Exception {
     pipeline
         .apply(Create.empty(KvCoder.of(VarIntCoder.of(), 
StringUtf8Coder.of())))

-- 
To stop receiving notification emails like this one, please contact
[email protected].

Reply via email to