rkhachatryan commented on a change in pull request #15636:
URL: https://github.com/apache/flink/pull/15636#discussion_r616174733



##########
File path: 
flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcExactlyOnceSinkE2eTest.java
##########
@@ -31,68 +37,172 @@
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.util.ExceptionUtils;
 
+import org.junit.Rule;
 import org.junit.Test;
+import org.postgresql.xa.PGXADataSource;
+import org.testcontainers.containers.PostgreSQLContainer;
 
-import java.io.Serializable;
+import javax.sql.XADataSource;
+
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.CountDownLatch;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
-import static 
org.apache.flink.connector.jdbc.JdbcTestFixture.DERBY_EBOOKSHOP_DB;
+import static java.util.Collections.singletonList;
+import static 
org.apache.flink.configuration.JobManagerOptions.EXECUTION_FAILOVER_STRATEGY;
 import static org.apache.flink.connector.jdbc.JdbcTestFixture.INPUT_TABLE;
 import static org.apache.flink.connector.jdbc.JdbcTestFixture.INSERT_TEMPLATE;
-import static org.apache.flink.connector.jdbc.JdbcTestFixture.TEST_DATA;
+import static 
org.apache.flink.connector.jdbc.xa.JdbcXaFacadeTestHelper.getInsertedIds;
+import static org.apache.flink.util.Preconditions.checkState;
+import static org.junit.Assert.assertTrue;
 
 /** A simple end-to-end test for {@link JdbcXaSinkFunction}. */
-public class JdbcExactlyOnceSinkE2eTest extends JdbcXaSinkTestBase {
+public class JdbcExactlyOnceSinkE2eTest extends JdbcTestBase {
+
+    private static final class PgXaDb extends PostgreSQLContainer<PgXaDb> {
+        public PgXaDb(String dockerImageName) {
+            super(dockerImageName);
+            // set max_prepared_transactions to non-zero
+            this.setCommand("postgres", "-c", "max_prepared_transactions=50", 
"-c", "fsync=off");
+        }
+    }
+
+    @Rule public PgXaDb db = new PgXaDb("postgres:9.6.12");
+
+    @Override
+    public void after() throws Exception {
+        // no need for cleanup - done by test container tear down
+    }
 
     @Test
     public void testInsert() throws Exception {
-        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-        env.setParallelism(1);
-        env.setRestartStrategy(new NoRestartStrategyConfiguration());
+        int parallelism = 4,
+                elementsPerSource = 500,
+                numElementsPerCheckpoint = 7,
+                minElementsPerFailure = numElementsPerCheckpoint / 3,
+                maxElementsPerFailure = numElementsPerCheckpoint * 3;

Review comment:
       Okay, I don't have any preference.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to