This is an automated email from the ASF dual-hosted git repository. roman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 9e4e14adef2014f4c0b3dda1967f2811a5610723 Author: Roman Khachatryan <[email protected]> AuthorDate: Thu Apr 15 19:23:25 2021 +0200 [FLINK-22239][tests] Test JDBC XA Sink against PostgreSQL --- flink-connectors/flink-connector-jdbc/pom.xml | 7 + .../apache/flink/connector/jdbc/DbMetadata.java | 8 + .../apache/flink/connector/jdbc/JdbcTestBase.java | 4 +- .../flink/connector/jdbc/JdbcTestFixture.java | 4 +- .../jdbc/xa/JdbcExactlyOnceSinkE2eTest.java | 261 ++++++++++++++++++--- .../connector/jdbc/xa/JdbcXaFacadeTestHelper.java | 29 +-- .../connector/jdbc/xa/JdbcXaSinkMigrationTest.java | 8 +- .../connector/jdbc/xa/JdbcXaSinkTestBase.java | 13 +- 8 files changed, 273 insertions(+), 61 deletions(-) diff --git a/flink-connectors/flink-connector-jdbc/pom.xml b/flink-connectors/flink-connector-jdbc/pom.xml index 54b902e..fa9afb2 100644 --- a/flink-connectors/flink-connector-jdbc/pom.xml +++ b/flink-connectors/flink-connector-jdbc/pom.xml @@ -165,6 +165,13 @@ under the License. <scope>test</scope> </dependency> + <dependency> + <groupId>org.testcontainers</groupId> + <artifactId>postgresql</artifactId> + <version>1.15.1</version> + <scope>test</scope> + </dependency> + </dependencies> <build> diff --git a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/DbMetadata.java b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/DbMetadata.java index 2483e75..21f93d1 100644 --- a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/DbMetadata.java +++ b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/DbMetadata.java @@ -28,6 +28,14 @@ public interface DbMetadata extends Serializable { String getUrl(); + default String getUser() { + return ""; + } + + default String getPassword() { + return ""; + } + XADataSource buildXaDataSource(); String getDriverClass(); diff --git a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcTestBase.java b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcTestBase.java index 60766d4..88daae4 100644 --- a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcTestBase.java +++ b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcTestBase.java @@ -27,12 +27,12 @@ import org.junit.Before; public abstract class JdbcTestBase { @Before - public final void before() throws Exception { + public void before() throws Exception { JdbcTestFixture.initSchema(getDbMetadata()); } @After - public final void after() throws Exception { + public void after() throws Exception { JdbcTestFixture.cleanupData(getDbMetadata().getUrl()); JdbcTestFixture.cleanUpDatabasesStatic(getDbMetadata()); } diff --git a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcTestFixture.java b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcTestFixture.java index 39c2c68..3beec55 100644 --- a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcTestFixture.java +++ b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcTestFixture.java @@ -188,7 +188,9 @@ public class JdbcTestFixture { System.setProperty( "derby.stream.error.field", JdbcTestFixture.class.getCanonicalName() + ".DEV_NULL"); Class.forName(dbMetadata.getDriverClass()); - try (Connection conn = DriverManager.getConnection(dbMetadata.getInitUrl())) { + try (Connection conn = + DriverManager.getConnection( + dbMetadata.getInitUrl(), dbMetadata.getUser(), dbMetadata.getPassword())) { createTable(conn, JdbcTestFixture.INPUT_TABLE); createTable(conn, OUTPUT_TABLE); createTable(conn, OUTPUT_TABLE_2); diff --git a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcExactlyOnceSinkE2eTest.java b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcExactlyOnceSinkE2eTest.java index 0a8df15..03abed9 100644 --- a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcExactlyOnceSinkE2eTest.java +++ b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcExactlyOnceSinkE2eTest.java @@ -17,12 +17,18 @@ package org.apache.flink.connector.jdbc.xa; -import org.apache.flink.api.common.restartstrategy.RestartStrategies.NoRestartStrategyConfiguration; +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.Configuration; import org.apache.flink.connector.jdbc.DbMetadata; import org.apache.flink.connector.jdbc.JdbcExactlyOnceOptions; import org.apache.flink.connector.jdbc.JdbcExecutionOptions; import org.apache.flink.connector.jdbc.JdbcITCase; import org.apache.flink.connector.jdbc.JdbcSink; +import org.apache.flink.connector.jdbc.JdbcTestBase; import org.apache.flink.connector.jdbc.JdbcTestFixture.TestEntry; import org.apache.flink.runtime.state.CheckpointListener; import org.apache.flink.runtime.state.FunctionInitializationContext; @@ -31,68 +37,172 @@ import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.util.ExceptionUtils; +import org.junit.Rule; import org.junit.Test; +import org.postgresql.xa.PGXADataSource; +import org.testcontainers.containers.PostgreSQLContainer; -import java.io.Serializable; +import javax.sql.XADataSource; + +import java.util.List; +import java.util.Random; +import java.util.concurrent.CountDownLatch; +import java.util.function.Supplier; +import java.util.stream.Collectors; import java.util.stream.IntStream; -import static org.apache.flink.connector.jdbc.JdbcTestFixture.DERBY_EBOOKSHOP_DB; +import static java.util.Collections.singletonList; +import static org.apache.flink.configuration.JobManagerOptions.EXECUTION_FAILOVER_STRATEGY; import static org.apache.flink.connector.jdbc.JdbcTestFixture.INPUT_TABLE; import static org.apache.flink.connector.jdbc.JdbcTestFixture.INSERT_TEMPLATE; -import static org.apache.flink.connector.jdbc.JdbcTestFixture.TEST_DATA; +import static org.apache.flink.connector.jdbc.xa.JdbcXaFacadeTestHelper.getInsertedIds; +import static org.apache.flink.util.Preconditions.checkState; +import static org.junit.Assert.assertTrue; /** A simple end-to-end test for {@link JdbcXaSinkFunction}. */ -public class JdbcExactlyOnceSinkE2eTest extends JdbcXaSinkTestBase { +public class JdbcExactlyOnceSinkE2eTest extends JdbcTestBase { + + private static final class PgXaDb extends PostgreSQLContainer<PgXaDb> { + public PgXaDb(String dockerImageName) { + super(dockerImageName); + // set max_prepared_transactions to non-zero + this.setCommand("postgres", "-c", "max_prepared_transactions=50", "-c", "fsync=off"); + } + } + + @Rule public PgXaDb db = new PgXaDb("postgres:9.6.12"); + + @Override + public void after() throws Exception { + // no need for cleanup - done by test container tear down + } @Test public void testInsert() throws Exception { - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setParallelism(1); - env.setRestartStrategy(new NoRestartStrategyConfiguration()); + int parallelism = 4; + int elementsPerSource = 500; + int numElementsPerCheckpoint = 7; + int minElementsPerFailure = numElementsPerCheckpoint / 3; + int maxElementsPerFailure = numElementsPerCheckpoint * 3; + + Configuration configuration = new Configuration(); + configuration.set( + EXECUTION_FAILOVER_STRATEGY, + "full" /* allow checkpointing even after some sources have finished */); + StreamExecutionEnvironment env = + StreamExecutionEnvironment.getExecutionEnvironment(configuration); + env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1000, Time.milliseconds(100))); env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); env.enableCheckpointing(50, CheckpointingMode.EXACTLY_ONCE); - env.addSource(new CheckpointAwaitingSource<>(TEST_DATA)) - .returns(TestEntry.class) + env.setParallelism(parallelism); + env.disableOperatorChaining(); + String password = db.getPassword(); + String username = db.getUsername(); + String jdbcUrl = db.getJdbcUrl(); + + env.addSource(new TestEntrySource(elementsPerSource, numElementsPerCheckpoint)) + .setParallelism(parallelism) + .map(new FailingMapper(minElementsPerFailure, maxElementsPerFailure)) .addSink( JdbcSink.exactlyOnceSink( String.format(INSERT_TEMPLATE, INPUT_TABLE), JdbcITCase.TEST_ENTRY_JDBC_STATEMENT_BUILDER, JdbcExecutionOptions.builder().build(), JdbcExactlyOnceOptions.defaults(), - DERBY_EBOOKSHOP_DB::buildXaDataSource)); + () -> getXaDataSource(jdbcUrl, username, password))); env.execute(); - xaHelper.assertDbContentsEquals(IntStream.range(0, TEST_DATA.length)); + + List<Integer> insertedIds = getInsertedIds(jdbcUrl, username, password, INPUT_TABLE); + List<Integer> expectedIds = + IntStream.range(0, elementsPerSource * parallelism) + .boxed() + .collect(Collectors.toList()); + assertTrue( + insertedIds.toString(), + insertedIds.size() == expectedIds.size() && expectedIds.containsAll(insertedIds)); } @Override protected DbMetadata getDbMetadata() { - return DERBY_EBOOKSHOP_DB; + return new DbMetadata() { + @Override + public String getInitUrl() { + return db.getJdbcUrl(); + } + + @Override + public String getUrl() { + return db.getJdbcUrl(); + } + + @Override + public XADataSource buildXaDataSource() { + return getXaDataSource(db.getJdbcUrl(), db.getUsername(), db.getPassword()); + } + + @Override + public String getDriverClass() { + return db.getDriverClassName(); + } + + @Override + public String getUser() { + return db.getUsername(); + } + + @Override + public String getPassword() { + return db.getPassword(); + } + }; } - /** {@link SourceFunction} emits all the data and waits for the checkpoint. */ - private static class CheckpointAwaitingSource<T extends Serializable> - implements SourceFunction<T>, CheckpointListener, CheckpointedFunction { + /** {@link SourceFunction} emits {@link TestEntry test entries} and waits for the checkpoint. */ + private static class TestEntrySource extends RichParallelSourceFunction<TestEntry> + implements CheckpointListener, CheckpointedFunction { + private final int numElements; + private final int numElementsPerCheckpoint; + + private transient ListState<SourceRange> ranges; + private volatile boolean allDataEmitted = false; - private volatile boolean dataCheckpointed = false; + private volatile boolean snapshotTaken = false; + private volatile long lastCheckpointId = -1L; + private volatile boolean lastSnapshotConfirmed = false; private volatile boolean running = true; - private volatile long checkpointAfterData = -1L; - private final T[] data; + private static volatile CountDownLatch runningSources; - private CheckpointAwaitingSource(T... data) { - this.data = data; + private TestEntrySource(int numElements, int numElementsPerCheckpoint) { + this.numElements = numElements; + this.numElementsPerCheckpoint = numElementsPerCheckpoint; } @Override - public void run(SourceContext<T> ctx) { - for (T datum : data) { - ctx.collect(datum); + public void run(SourceContext<TestEntry> ctx) throws Exception { + for (SourceRange range : ranges.get()) { + for (int i = range.from; i < range.to; ) { + synchronized (ctx.getCheckpointLock()) { + snapshotTaken = false; + for (int j = 0; j < numElementsPerCheckpoint && i < range.to; j++, i++) { + emit(ctx, i); + range.advance(); + } + } + sleep(() -> !snapshotTaken); + } } allDataEmitted = true; - while (!dataCheckpointed && running) { - Thread.yield(); - } + sleep(() -> !lastSnapshotConfirmed); + runningSources.countDown(); + runningSources.await(); // participate in checkpointing + } + + private void emit(SourceContext<TestEntry> ctx, int i) { + ctx.collect(new TestEntry(i, Integer.toString(i), Integer.toString(i), (double) i, i)); } @Override @@ -102,19 +212,108 @@ public class JdbcExactlyOnceSinkE2eTest extends JdbcXaSinkTestBase { @Override public void notifyCheckpointComplete(long checkpointId) { - if (checkpointId == this.checkpointAfterData) { - dataCheckpointed = true; + if (lastCheckpointId > -1L && checkpointId >= this.lastCheckpointId) { + lastSnapshotConfirmed = true; + } + } + + @Override + public void open(Configuration parameters) throws Exception { + if (getRuntimeContext().getIndexOfThisSubtask() == 0) { + runningSources = + new CountDownLatch(getRuntimeContext().getNumberOfParallelSubtasks()); + } + } + + @Override + public void initializeState(FunctionInitializationContext context) throws Exception { + ranges = + context.getOperatorStateStore() + .getListState( + new ListStateDescriptor<>("SourceState", SourceRange.class)); + if (!context.isRestored()) { + ranges.update( + singletonList( + SourceRange.forSubtask( + getRuntimeContext().getIndexOfThisSubtask(), numElements))); } } @Override public void snapshotState(FunctionSnapshotContext context) { + snapshotTaken = true; if (allDataEmitted) { - checkpointAfterData = context.getCheckpointId(); + lastCheckpointId = context.getCheckpointId(); + } + } + + private void sleep(Supplier<Boolean> condition) { + while (condition.get() && running && !Thread.currentThread().isInterrupted()) { + try { + Thread.sleep(10); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + ExceptionUtils.rethrow(e); + } + } + } + + private static final class SourceRange { + private int from; + private final int to; + + private SourceRange(int from, int to) { + this.from = from; + this.to = to; + } + + public static SourceRange forSubtask(int subtaskIndex, int elementCount) { + return new SourceRange( + subtaskIndex * elementCount, (subtaskIndex + 1) * elementCount); + } + + public void advance() { + checkState(from < to); + from++; } } + } + + private static XADataSource getXaDataSource(String jdbcUrl, String username, String password) { + PGXADataSource xaDataSource = new PGXADataSource(); + xaDataSource.setUrl(jdbcUrl); + xaDataSource.setUser(username); + xaDataSource.setPassword(password); + return xaDataSource; + } + + private static class FailingMapper extends RichMapFunction<TestEntry, TestEntry> { + private final int minElementsPerFailure; + private final int maxElementsPerFailure; + private transient int remaining; + + public FailingMapper(int minElementsPerFailure, int maxElementsPerFailure) { + this.minElementsPerFailure = minElementsPerFailure; + this.maxElementsPerFailure = maxElementsPerFailure; + } @Override - public void initializeState(FunctionInitializationContext context) {} + public void open(Configuration parameters) throws Exception { + remaining = minElementsPerFailure + new Random().nextInt(maxElementsPerFailure); + } + + @Override + public TestEntry map(TestEntry value) throws Exception { + if (--remaining <= 0) { + throw new TestException(); + } + return value; + } + } + + private static final class TestException extends Exception { + public TestException() { + super("expected", null, true, false); + } } } diff --git a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaFacadeTestHelper.java b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaFacadeTestHelper.java index 46a1141..91cf0d4 100644 --- a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaFacadeTestHelper.java +++ b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaFacadeTestHelper.java @@ -36,24 +36,18 @@ import static org.apache.flink.connector.jdbc.JdbcTestFixture.TEST_DATA; import static org.junit.Assert.assertEquals; class JdbcXaFacadeTestHelper implements AutoCloseable { - private final XADataSource xaDataSource; private final String table; private final String dbUrl; private final String user; private final String pass; private final XaFacade xaFacade; - JdbcXaFacadeTestHelper(XADataSource xaDataSource, String dbUrl, String table) throws Exception { - this(xaDataSource, dbUrl, table, "", ""); - } - JdbcXaFacadeTestHelper( XADataSource xaDataSource, String dbUrl, String table, String user, String pass) throws Exception { - this.xaDataSource = xaDataSource; this.dbUrl = dbUrl; this.table = table; - this.xaFacade = XaFacadeImpl.fromXaDataSource(this.xaDataSource); + this.xaFacade = XaFacadeImpl.fromXaDataSource(xaDataSource); this.xaFacade.open(); this.user = user; this.pass = pass; @@ -63,14 +57,6 @@ class JdbcXaFacadeTestHelper implements AutoCloseable { assertEquals(expected, xaFacade.recover().size()); } - void assertDbContentsEquals(int[]... dataIdx) throws SQLException { - assertDbContentsEquals(Arrays.stream(dataIdx).flatMapToInt(Arrays::stream)); - } - - void assertDbContentsEquals(int... dataIdx) throws SQLException { - assertDbContentsEquals(Arrays.stream(dataIdx)); - } - void assertDbContentsEquals(JdbcTestCheckpoint... checkpoints) throws SQLException { assertDbContentsEquals( Arrays.stream(checkpoints).flatMapToInt(x -> Arrays.stream(x.dataItemsIdx))); @@ -86,6 +72,11 @@ class JdbcXaFacadeTestHelper implements AutoCloseable { } private List<Integer> getInsertedIds() throws SQLException { + return getInsertedIds(dbUrl, user, pass, table); + } + + static List<Integer> getInsertedIds(String dbUrl, String user, String pass, String table) + throws SQLException { List<Integer> dbContents = new ArrayList<>(); try (Connection connection = DriverManager.getConnection(dbUrl, user, pass)) { connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED); @@ -114,14 +105,6 @@ class JdbcXaFacadeTestHelper implements AutoCloseable { } } - public XADataSource getXaDataSource() { - return xaDataSource; - } - - XaFacade getXaFacade() { - return xaFacade; - } - @Override public void close() throws Exception { xaFacade.close(); diff --git a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkMigrationTest.java b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkMigrationTest.java index b07ed23..888d355 100644 --- a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkMigrationTest.java +++ b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkMigrationTest.java @@ -90,7 +90,9 @@ public class JdbcXaSinkMigrationTest extends JdbcTestBase { new JdbcXaFacadeTestHelper( JdbcXaSinkDerbyTest.derbyXaDs(), getDbMetadata().getUrl(), - JdbcTestFixture.INPUT_TABLE)) { + JdbcTestFixture.INPUT_TABLE, + getDbMetadata().getUser(), + getDbMetadata().getPassword())) { h.assertDbContentsEquals(CP0); } } @@ -178,7 +180,9 @@ public class JdbcXaSinkMigrationTest extends JdbcTestBase { new JdbcXaFacadeTestHelper( derbyXaDs(), JdbcTestFixture.DERBY_EBOOKSHOP_DB.getUrl(), - JdbcTestFixture.INPUT_TABLE)) { + JdbcTestFixture.INPUT_TABLE, + JdbcTestFixture.DERBY_EBOOKSHOP_DB.getUser(), + JdbcTestFixture.DERBY_EBOOKSHOP_DB.getPassword())) { xa.cancelAllTx(); } } diff --git a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkTestBase.java b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkTestBase.java index daad0ca..79d57f6 100644 --- a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkTestBase.java +++ b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkTestBase.java @@ -86,7 +86,11 @@ public abstract class JdbcXaSinkTestBase extends JdbcTestBase { xaDataSource = getDbMetadata().buildXaDataSource(); xaHelper = new JdbcXaFacadeTestHelper( - getDbMetadata().buildXaDataSource(), getDbMetadata().getUrl(), INPUT_TABLE); + getDbMetadata().buildXaDataSource(), + getDbMetadata().getUrl(), + INPUT_TABLE, + getDbMetadata().getUser(), + getDbMetadata().getPassword()); sinkHelper = buildSinkHelper(createStateHandler()); } @@ -103,7 +107,12 @@ public abstract class JdbcXaSinkTestBase extends JdbcTestBase { xaHelper.close(); } try (JdbcXaFacadeTestHelper xa = - new JdbcXaFacadeTestHelper(xaDataSource, getDbMetadata().getUrl(), INPUT_TABLE)) { + new JdbcXaFacadeTestHelper( + xaDataSource, + getDbMetadata().getUrl(), + INPUT_TABLE, + getDbMetadata().getUser(), + getDbMetadata().getPassword())) { xa.cancelAllTx(); } }
