This is an automated email from the ASF dual-hosted git repository.
jbonofre pushed a commit to branch release-2.3.0
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/release-2.3.0 by this push:
new e91a03f [BEAM-793] Add backoff support in JdbcIO Write
e91a03f is described below
commit e91a03f096fe0dda7fd444a3389cbe1630b11a39
Author: Jean-Baptiste Onofré <[email protected]>
AuthorDate: Fri Jan 26 13:12:12 2018 +0100
[BEAM-793] Add backoff support in JdbcIO Write
---
sdks/java/io/jdbc/build.gradle | 1 +
sdks/java/io/jdbc/pom.xml | 21 +++-
.../java/org/apache/beam/sdk/io/jdbc/JdbcIO.java | 138 ++++++++++++++++-----
.../org/apache/beam/sdk/io/jdbc/JdbcIOTest.java | 118 +++++++++++++++---
4 files changed, 227 insertions(+), 51 deletions(-)
diff --git a/sdks/java/io/jdbc/build.gradle b/sdks/java/io/jdbc/build.gradle
index ab488c9..d610a4e 100644
--- a/sdks/java/io/jdbc/build.gradle
+++ b/sdks/java/io/jdbc/build.gradle
@@ -35,6 +35,7 @@ dependencies {
shadow project(path: ":sdks:java:core", configuration: "shadow")
shadow library.java.findbugs_jsr305
shadow "org.apache.commons:commons-dbcp2:2.1.1"
+ testCompile project(path: ":sdks:java:core", configuration: "shadowTest")
testCompile project(path: ":runners:direct-java", configuration: "shadow")
testCompile project(path: ":sdks:java:io:common", configuration: "shadow")
testCompile project(":sdks:java:io:common").sourceSets.test.output
diff --git a/sdks/java/io/jdbc/pom.xml b/sdks/java/io/jdbc/pom.xml
index 6264e9e..3af599e 100644
--- a/sdks/java/io/jdbc/pom.xml
+++ b/sdks/java/io/jdbc/pom.xml
@@ -264,6 +264,11 @@
</dependency>
<dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </dependency>
+
+ <dependency>
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
</dependency>
@@ -274,6 +279,11 @@
<version>2.1.1</version>
</dependency>
+ <dependency>
+ <groupId>joda-time</groupId>
+ <artifactId>joda-time</artifactId>
+ </dependency>
+
<!-- compile dependencies -->
<dependency>
<groupId>com.google.auto.value</groupId>
@@ -317,11 +327,6 @@
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
- <artifactId>slf4j-api</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.slf4j</groupId>
<artifactId>slf4j-jdk14</artifactId>
<scope>test</scope>
</dependency>
@@ -332,6 +337,12 @@
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
+ <artifactId>beam-sdks-java-core</artifactId>
+ <scope>test</scope>
+ <classifier>tests</classifier>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-common</artifactId>
<scope>test</scope>
<classifier>tests</classifier>
diff --git
a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
index 8b47aa9..1a6b54b 100644
--- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
+++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
@@ -20,11 +20,15 @@ package org.apache.beam.sdk.io.jdbc;
import static com.google.common.base.Preconditions.checkArgument;
import com.google.auto.value.AutoValue;
+import java.io.IOException;
import java.io.Serializable;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.Predicate;
import javax.annotation.Nullable;
import javax.sql.DataSource;
import org.apache.beam.sdk.annotations.Experimental;
@@ -39,11 +43,18 @@ import org.apache.beam.sdk.transforms.Reshuffle;
import org.apache.beam.sdk.transforms.SerializableFunctions;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.util.BackOff;
+import org.apache.beam.sdk.util.BackOffUtils;
+import org.apache.beam.sdk.util.FluentBackoff;
+import org.apache.beam.sdk.util.Sleeper;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.PDone;
import org.apache.commons.dbcp2.BasicDataSource;
+import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* IO to read and write data on JDBC.
@@ -134,6 +145,9 @@ import org.apache.commons.dbcp2.BasicDataSource;
*/
@Experimental(Experimental.Kind.SOURCE_SINK)
public class JdbcIO {
+
+ private static final Logger LOG = LoggerFactory.getLogger(JdbcIO.class);
+
/**
* Read data from a JDBC datasource.
*
@@ -164,9 +178,22 @@ public class JdbcIO {
public static <T> Write<T> write() {
return new AutoValue_JdbcIO_Write.Builder<T>()
.setBatchSize(DEFAULT_BATCH_SIZE)
+ .setRetryStrategy(new DefaultRetryStrategy())
.build();
}
+ /**
+ * This is the default {@link Predicate} we use to detect DeadLock.
+ * It basically test if the {@link SQLException#getSQLState()} equals 40001.
+ * 40001 is the SQL State used by most of database to identify deadlock.
+ */
+ public static class DefaultRetryStrategy implements RetryStrategy {
+ @Override
+ public boolean apply(SQLException e) {
+ return (e.getSQLState().equals("40001"));
+ }
+ }
+
private JdbcIO() {}
/**
@@ -506,6 +533,16 @@ public class JdbcIO {
void setParameters(T element, PreparedStatement preparedStatement) throws
Exception;
}
+ /**
+ * An interface used to control if we retry the statements when a {@link
SQLException} occurs.
+ * If {@link RetryStrategy#apply(SQLException)} returns true, {@link Write}
tries
+ * to replay the statements.
+ */
+ @FunctionalInterface
+ public interface RetryStrategy extends Serializable {
+ boolean apply(SQLException sqlException);
+ }
+
/** A {@link PTransform} to write to a JDBC datasource. */
@AutoValue
public abstract static class Write<T> extends PTransform<PCollection<T>,
PDone> {
@@ -513,6 +550,7 @@ public class JdbcIO {
@Nullable abstract String getStatement();
abstract long getBatchSize();
@Nullable abstract PreparedStatementSetter<T> getPreparedStatementSetter();
+ @Nullable abstract RetryStrategy getRetryStrategy();
abstract Builder<T> toBuilder();
@@ -522,6 +560,7 @@ public class JdbcIO {
abstract Builder<T> setStatement(String statement);
abstract Builder<T> setBatchSize(long batchSize);
abstract Builder<T>
setPreparedStatementSetter(PreparedStatementSetter<T> setter);
+ abstract Builder<T> setRetryStrategy(RetryStrategy deadlockPredicate);
abstract Write<T> build();
}
@@ -540,13 +579,23 @@ public class JdbcIO {
* Provide a maximum size in number of SQL statenebt for the batch.
Default is 1000.
*
* @param batchSize maximum batch size in number of statements
- * @return the {@link Write} with connection batch size set
*/
public Write<T> withBatchSize(long batchSize) {
checkArgument(batchSize > 0, "batchSize must be > 0, but was %d",
batchSize);
return toBuilder().setBatchSize(batchSize).build();
}
+ /**
+ * When a SQL exception occurs, {@link Write} uses this {@link
RetryStrategy} to determine
+ * if it will retry the statements.
+ * If {@link RetryStrategy#apply(SQLException)} returns {@code true},
+ * then {@link Write} retries the statements.
+ */
+ public Write<T> withRetryStrategy(RetryStrategy retryStrategy) {
+ checkArgument(retryStrategy != null, "retryStrategy can not be null");
+ return toBuilder().setRetryStrategy(retryStrategy).build();
+ }
+
@Override
public PDone expand(PCollection<T> input) {
checkArgument(
@@ -563,10 +612,14 @@ public class JdbcIO {
private final Write<T> spec;
+ private static final int MAX_RETRIES = 5;
+ private static final FluentBackoff BUNDLE_WRITE_BACKOFF =
+ FluentBackoff.DEFAULT
+
.withMaxRetries(MAX_RETRIES).withInitialBackoff(Duration.standardSeconds(5));
+
private DataSource dataSource;
private Connection connection;
- private PreparedStatement preparedStatement;
- private int batchCount;
+ private List<T> records = new ArrayList<>();
public WriteFn(Write<T> spec) {
this.spec = spec;
@@ -577,55 +630,78 @@ public class JdbcIO {
dataSource = spec.getDataSourceConfiguration().buildDatasource();
connection = dataSource.getConnection();
connection.setAutoCommit(false);
- preparedStatement = connection.prepareStatement(spec.getStatement());
- }
-
- @StartBundle
- public void startBundle() {
- batchCount = 0;
}
@ProcessElement
public void processElement(ProcessContext context) throws Exception {
T record = context.element();
- preparedStatement.clearParameters();
- spec.getPreparedStatementSetter().setParameters(record,
preparedStatement);
- preparedStatement.addBatch();
+ records.add(record);
- batchCount++;
-
- if (batchCount >= spec.getBatchSize()) {
+ if (records.size() >= spec.getBatchSize()) {
executeBatch();
}
}
+ private void processRecord(T record, PreparedStatement
preparedStatement) {
+ try {
+ preparedStatement.clearParameters();
+ spec.getPreparedStatementSetter().setParameters(record,
preparedStatement);
+ preparedStatement.addBatch();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
@FinishBundle
public void finishBundle() throws Exception {
executeBatch();
}
- private void executeBatch() throws SQLException {
- if (batchCount > 0) {
- preparedStatement.executeBatch();
- connection.commit();
- batchCount = 0;
+ private void executeBatch() throws SQLException, IOException,
InterruptedException {
+ if (records.size() == 0) {
+ return;
+ }
+ Sleeper sleeper = Sleeper.DEFAULT;
+ BackOff backoff = BUNDLE_WRITE_BACKOFF.backoff();
+ while (true) {
+ try (PreparedStatement preparedStatement =
+ connection.prepareStatement(spec.getStatement())) {
+ try {
+ // add each record in the statement batch
+ for (T record : records) {
+ processRecord(record, preparedStatement);
+ }
+ // execute the batch
+ preparedStatement.executeBatch();
+ // commit the changes
+ connection.commit();
+ break;
+ } catch (SQLException exception) {
+ if (!spec.getRetryStrategy().apply(exception)) {
+ throw exception;
+ }
+ LOG.warn("Deadlock detected, retrying", exception);
+ // clean up the statement batch and the connection state
+ preparedStatement.clearBatch();
+ connection.rollback();
+ if (!BackOffUtils.next(sleeper, backoff)) {
+ // we tried the max number of times
+ throw exception;
+ }
+ }
+ }
}
+ records.clear();
}
@Teardown
public void teardown() throws Exception {
- try {
- if (preparedStatement != null) {
- preparedStatement.close();
- }
- } finally {
- if (connection != null) {
- connection.close();
- }
- if (dataSource instanceof AutoCloseable) {
- ((AutoCloseable) dataSource).close();
- }
+ if (connection != null) {
+ connection.close();
+ }
+ if (dataSource instanceof AutoCloseable) {
+ ((AutoCloseable) dataSource).close();
}
}
}
diff --git
a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java
b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java
index beb3685..8e5ff79 100644
---
a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java
+++
b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java
@@ -39,6 +39,7 @@ import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.io.common.DatabaseTestHelper;
import org.apache.beam.sdk.io.common.TestRow;
+import org.apache.beam.sdk.testing.ExpectedLogs;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Count;
@@ -47,6 +48,7 @@ import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.derby.drda.NetworkServerControl;
import org.apache.derby.jdbc.ClientDataSource;
+import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
@@ -59,8 +61,10 @@ import org.slf4j.LoggerFactory;
* Test on the JdbcIO.
*/
public class JdbcIOTest implements Serializable {
+
private static final Logger LOG = LoggerFactory.getLogger(JdbcIOTest.class);
public static final int EXPECTED_ROW_COUNT = 1000;
+ public static final String BACKOFF_TABLE = "UT_WRITE_BACKOFF";
private static NetworkServerControl derbyServer;
private static ClientDataSource dataSource;
@@ -71,6 +75,9 @@ public class JdbcIOTest implements Serializable {
@Rule
public final transient TestPipeline pipeline = TestPipeline.create();
+ @Rule
+ public final transient ExpectedLogs expectedLogs =
ExpectedLogs.none(JdbcIO.class);
+
@BeforeClass
public static void startDatabase() throws Exception {
ServerSocket socket = new ServerSocket(0);
@@ -79,6 +86,9 @@ public class JdbcIOTest implements Serializable {
LOG.info("Starting Derby database on {}", port);
+ // by default, derby uses a lock timeout of 60 seconds. In order to speed
up the test
+ // and detect the lock faster, we decrease this timeout
+ System.setProperty("derby.locks.waitTimeout", "2");
System.setProperty("derby.stream.error.file", "target/derby.log");
derbyServer = new NetworkServerControl(InetAddress.getByName("localhost"),
port);
@@ -222,25 +232,25 @@ public class JdbcIOTest implements Serializable {
pipeline.run();
}
- @Test
- public void testReadWithSingleStringParameter() throws Exception {
+ @Test
+ public void testReadWithSingleStringParameter() throws Exception {
PCollection<TestRow> rows =
- pipeline.apply(
- JdbcIO.<TestRow>read()
-
.withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(dataSource))
- .withQuery(String.format("select name,id from %s where name =
?", readTableName))
- .withStatementPreparator(
- (preparedStatement) -> preparedStatement.setString(1,
getNameForSeed(1)))
- .withRowMapper(new JdbcTestHelper.CreateTestRowOfNameAndId())
- .withCoder(SerializableCoder.of(TestRow.class)));
+ pipeline.apply(
+ JdbcIO.<TestRow>read()
+
.withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(dataSource))
+ .withQuery(String.format("select name,id from %s where name = ?",
readTableName))
+ .withStatementPreparator(
+ (preparedStatement) -> preparedStatement.setString(1,
getNameForSeed(1)))
+ .withRowMapper(new JdbcTestHelper.CreateTestRowOfNameAndId())
+ .withCoder(SerializableCoder.of(TestRow.class)));
PAssert.thatSingleton(rows.apply("Count All",
Count.globally())).isEqualTo(1L);
- Iterable<TestRow> expectedValues =
Collections.singletonList(TestRow.fromSeed(1));
- PAssert.that(rows).containsInAnyOrder(expectedValues);
+ Iterable<TestRow> expectedValues =
Collections.singletonList(TestRow.fromSeed(1));
+ PAssert.that(rows).containsInAnyOrder(expectedValues);
- pipeline.run();
- }
+ pipeline.run();
+ }
@Test
public void testWrite() throws Exception {
@@ -275,7 +285,7 @@ public class JdbcIOTest implements Serializable {
try (Connection connection = dataSource.getConnection()) {
try (Statement statement = connection.createStatement()) {
try (ResultSet resultSet = statement.executeQuery("select count(*)
from "
- + tableName)) {
+ + tableName)) {
resultSet.next();
int count = resultSet.getInt(1);
@@ -289,6 +299,84 @@ public class JdbcIOTest implements Serializable {
}
@Test
+ public void testWriteWithBackoff() throws Exception {
+ String tableName = DatabaseTestHelper.getTestTableName("UT_WRITE_BACKOFF");
+ DatabaseTestHelper.createTable(dataSource, tableName);
+
+ // lock table
+ Connection connection = dataSource.getConnection();
+ Statement lockStatement = connection.createStatement();
+ lockStatement.execute("ALTER TABLE " + tableName + " LOCKSIZE TABLE");
+ lockStatement.execute("LOCK TABLE " + tableName + " IN EXCLUSIVE MODE");
+
+ // start a first transaction
+ connection.setAutoCommit(false);
+ PreparedStatement insertStatement =
+ connection.prepareStatement("insert into " + tableName + " values(?,
?)");
+ insertStatement.setInt(1, 1);
+ insertStatement.setString(2, "TEST");
+ insertStatement.execute();
+
+ // try to write to this table
+ pipeline
+ .apply(Create.of(Collections.singletonList(KV.of(1, "TEST"))))
+ .apply(
+ JdbcIO.<KV<Integer, String>>write()
+ .withDataSourceConfiguration(
+ JdbcIO.DataSourceConfiguration.create(
+ "org.apache.derby.jdbc.ClientDriver",
+ "jdbc:derby://localhost:" + port + "/target/beam"))
+ .withStatement(String.format("insert into %s values(?, ?)",
tableName))
+ .withRetryStrategy((JdbcIO.RetryStrategy) e -> {
+ return e.getSQLState().equals("XJ208"); // we fake a
deadlock with a lock here
+ })
+ .withPreparedStatementSetter(
+ (element, statement) -> {
+ statement.setInt(1, element.getKey());
+ statement.setString(2, element.getValue());
+ }));
+
+ // starting a thread to perform the commit later, while the pipeline is
running into the backoff
+ Thread commitThread = new Thread(() -> {
+ try {
+ Thread.sleep(10000);
+ connection.commit();
+ } catch (Exception e) {
+ // nothing to do
+ }
+ });
+ commitThread.start();
+ pipeline.run();
+ commitThread.join();
+
+ // we verify the the backoff has been called thanks to the log message
+ expectedLogs.verifyWarn("Deadlock detected, retrying");
+
+ try (Connection readConnection = dataSource.getConnection()) {
+ try (Statement statement = readConnection.createStatement()) {
+ try (ResultSet resultSet = statement.executeQuery("select count(*)
from "
+ + tableName)) {
+ resultSet.next();
+ int count = resultSet.getInt(1);
+ // here we have the record inserted by the first transaction (by
hand), and a second one
+ // inserted by the pipeline
+ Assert.assertEquals(2, count);
+ }
+ }
+ }
+
+ }
+
+ @After
+ public void tearDown() {
+ try {
+ DatabaseTestHelper.deleteTable(dataSource, BACKOFF_TABLE);
+ } catch (Exception e) {
+ // nothing to do
+ }
+ }
+
+ @Test
public void testWriteWithEmptyPCollection() throws Exception {
pipeline
.apply(Create.empty(KvCoder.of(VarIntCoder.of(),
StringUtf8Coder.of())))
--
To stop receiving notification emails like this one, please contact
[email protected].