This is an automated email from the ASF dual-hosted git repository.

roman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 9e4e14adef2014f4c0b3dda1967f2811a5610723
Author: Roman Khachatryan <[email protected]>
AuthorDate: Thu Apr 15 19:23:25 2021 +0200

    [FLINK-22239][tests] Test JDBC XA Sink against PostgreSQL
---
 flink-connectors/flink-connector-jdbc/pom.xml      |   7 +
 .../apache/flink/connector/jdbc/DbMetadata.java    |   8 +
 .../apache/flink/connector/jdbc/JdbcTestBase.java  |   4 +-
 .../flink/connector/jdbc/JdbcTestFixture.java      |   4 +-
 .../jdbc/xa/JdbcExactlyOnceSinkE2eTest.java        | 261 ++++++++++++++++++---
 .../connector/jdbc/xa/JdbcXaFacadeTestHelper.java  |  29 +--
 .../connector/jdbc/xa/JdbcXaSinkMigrationTest.java |   8 +-
 .../connector/jdbc/xa/JdbcXaSinkTestBase.java      |  13 +-
 8 files changed, 273 insertions(+), 61 deletions(-)

diff --git a/flink-connectors/flink-connector-jdbc/pom.xml 
b/flink-connectors/flink-connector-jdbc/pom.xml
index 54b902e..fa9afb2 100644
--- a/flink-connectors/flink-connector-jdbc/pom.xml
+++ b/flink-connectors/flink-connector-jdbc/pom.xml
@@ -165,6 +165,13 @@ under the License.
                        <scope>test</scope>
                </dependency>
 
+               <dependency>
+                       <groupId>org.testcontainers</groupId>
+                       <artifactId>postgresql</artifactId>
+                       <version>1.15.1</version>
+                       <scope>test</scope>
+               </dependency>
+
        </dependencies>
 
        <build>
diff --git 
a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/DbMetadata.java
 
b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/DbMetadata.java
index 2483e75..21f93d1 100644
--- 
a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/DbMetadata.java
+++ 
b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/DbMetadata.java
@@ -28,6 +28,14 @@ public interface DbMetadata extends Serializable {
 
     String getUrl();
 
+    default String getUser() {
+        return "";
+    }
+
+    default String getPassword() {
+        return "";
+    }
+
     XADataSource buildXaDataSource();
 
     String getDriverClass();
diff --git 
a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcTestBase.java
 
b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcTestBase.java
index 60766d4..88daae4 100644
--- 
a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcTestBase.java
+++ 
b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcTestBase.java
@@ -27,12 +27,12 @@ import org.junit.Before;
 public abstract class JdbcTestBase {
 
     @Before
-    public final void before() throws Exception {
+    public void before() throws Exception {
         JdbcTestFixture.initSchema(getDbMetadata());
     }
 
     @After
-    public final void after() throws Exception {
+    public void after() throws Exception {
         JdbcTestFixture.cleanupData(getDbMetadata().getUrl());
         JdbcTestFixture.cleanUpDatabasesStatic(getDbMetadata());
     }
diff --git 
a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcTestFixture.java
 
b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcTestFixture.java
index 39c2c68..3beec55 100644
--- 
a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcTestFixture.java
+++ 
b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcTestFixture.java
@@ -188,7 +188,9 @@ public class JdbcTestFixture {
         System.setProperty(
                 "derby.stream.error.field", 
JdbcTestFixture.class.getCanonicalName() + ".DEV_NULL");
         Class.forName(dbMetadata.getDriverClass());
-        try (Connection conn = 
DriverManager.getConnection(dbMetadata.getInitUrl())) {
+        try (Connection conn =
+                DriverManager.getConnection(
+                        dbMetadata.getInitUrl(), dbMetadata.getUser(), 
dbMetadata.getPassword())) {
             createTable(conn, JdbcTestFixture.INPUT_TABLE);
             createTable(conn, OUTPUT_TABLE);
             createTable(conn, OUTPUT_TABLE_2);
diff --git 
a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcExactlyOnceSinkE2eTest.java
 
b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcExactlyOnceSinkE2eTest.java
index 0a8df15..03abed9 100644
--- 
a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcExactlyOnceSinkE2eTest.java
+++ 
b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcExactlyOnceSinkE2eTest.java
@@ -17,12 +17,18 @@
 
 package org.apache.flink.connector.jdbc.xa;
 
-import 
org.apache.flink.api.common.restartstrategy.RestartStrategies.NoRestartStrategyConfiguration;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.connector.jdbc.DbMetadata;
 import org.apache.flink.connector.jdbc.JdbcExactlyOnceOptions;
 import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
 import org.apache.flink.connector.jdbc.JdbcITCase;
 import org.apache.flink.connector.jdbc.JdbcSink;
+import org.apache.flink.connector.jdbc.JdbcTestBase;
 import org.apache.flink.connector.jdbc.JdbcTestFixture.TestEntry;
 import org.apache.flink.runtime.state.CheckpointListener;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
@@ -31,68 +37,172 @@ import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.util.ExceptionUtils;
 
+import org.junit.Rule;
 import org.junit.Test;
+import org.postgresql.xa.PGXADataSource;
+import org.testcontainers.containers.PostgreSQLContainer;
 
-import java.io.Serializable;
+import javax.sql.XADataSource;
+
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.CountDownLatch;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
-import static 
org.apache.flink.connector.jdbc.JdbcTestFixture.DERBY_EBOOKSHOP_DB;
+import static java.util.Collections.singletonList;
+import static 
org.apache.flink.configuration.JobManagerOptions.EXECUTION_FAILOVER_STRATEGY;
 import static org.apache.flink.connector.jdbc.JdbcTestFixture.INPUT_TABLE;
 import static org.apache.flink.connector.jdbc.JdbcTestFixture.INSERT_TEMPLATE;
-import static org.apache.flink.connector.jdbc.JdbcTestFixture.TEST_DATA;
+import static 
org.apache.flink.connector.jdbc.xa.JdbcXaFacadeTestHelper.getInsertedIds;
+import static org.apache.flink.util.Preconditions.checkState;
+import static org.junit.Assert.assertTrue;
 
 /** A simple end-to-end test for {@link JdbcXaSinkFunction}. */
-public class JdbcExactlyOnceSinkE2eTest extends JdbcXaSinkTestBase {
+public class JdbcExactlyOnceSinkE2eTest extends JdbcTestBase {
+
+    private static final class PgXaDb extends PostgreSQLContainer<PgXaDb> {
+        public PgXaDb(String dockerImageName) {
+            super(dockerImageName);
+            // set max_prepared_transactions to non-zero
+            this.setCommand("postgres", "-c", "max_prepared_transactions=50", 
"-c", "fsync=off");
+        }
+    }
+
+    @Rule public PgXaDb db = new PgXaDb("postgres:9.6.12");
+
+    @Override
+    public void after() throws Exception {
+        // no need for cleanup - done by test container tear down
+    }
 
     @Test
     public void testInsert() throws Exception {
-        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-        env.setParallelism(1);
-        env.setRestartStrategy(new NoRestartStrategyConfiguration());
+        int parallelism = 4;
+        int elementsPerSource = 500;
+        int numElementsPerCheckpoint = 7;
+        int minElementsPerFailure = numElementsPerCheckpoint / 3;
+        int maxElementsPerFailure = numElementsPerCheckpoint * 3;
+
+        Configuration configuration = new Configuration();
+        configuration.set(
+                EXECUTION_FAILOVER_STRATEGY,
+                "full" /* allow checkpointing even after some sources have 
finished */);
+        StreamExecutionEnvironment env =
+                
StreamExecutionEnvironment.getExecutionEnvironment(configuration);
+        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1000, 
Time.milliseconds(100)));
         env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
         env.enableCheckpointing(50, CheckpointingMode.EXACTLY_ONCE);
-        env.addSource(new CheckpointAwaitingSource<>(TEST_DATA))
-                .returns(TestEntry.class)
+        env.setParallelism(parallelism);
+        env.disableOperatorChaining();
+        String password = db.getPassword();
+        String username = db.getUsername();
+        String jdbcUrl = db.getJdbcUrl();
+
+        env.addSource(new TestEntrySource(elementsPerSource, 
numElementsPerCheckpoint))
+                .setParallelism(parallelism)
+                .map(new FailingMapper(minElementsPerFailure, 
maxElementsPerFailure))
                 .addSink(
                         JdbcSink.exactlyOnceSink(
                                 String.format(INSERT_TEMPLATE, INPUT_TABLE),
                                 JdbcITCase.TEST_ENTRY_JDBC_STATEMENT_BUILDER,
                                 JdbcExecutionOptions.builder().build(),
                                 JdbcExactlyOnceOptions.defaults(),
-                                DERBY_EBOOKSHOP_DB::buildXaDataSource));
+                                () -> getXaDataSource(jdbcUrl, username, 
password)));
         env.execute();
-        xaHelper.assertDbContentsEquals(IntStream.range(0, TEST_DATA.length));
+
+        List<Integer> insertedIds = getInsertedIds(jdbcUrl, username, 
password, INPUT_TABLE);
+        List<Integer> expectedIds =
+                IntStream.range(0, elementsPerSource * parallelism)
+                        .boxed()
+                        .collect(Collectors.toList());
+        assertTrue(
+                insertedIds.toString(),
+                insertedIds.size() == expectedIds.size() && 
expectedIds.containsAll(insertedIds));
     }
 
     @Override
     protected DbMetadata getDbMetadata() {
-        return DERBY_EBOOKSHOP_DB;
+        return new DbMetadata() {
+            @Override
+            public String getInitUrl() {
+                return db.getJdbcUrl();
+            }
+
+            @Override
+            public String getUrl() {
+                return db.getJdbcUrl();
+            }
+
+            @Override
+            public XADataSource buildXaDataSource() {
+                return getXaDataSource(db.getJdbcUrl(), db.getUsername(), 
db.getPassword());
+            }
+
+            @Override
+            public String getDriverClass() {
+                return db.getDriverClassName();
+            }
+
+            @Override
+            public String getUser() {
+                return db.getUsername();
+            }
+
+            @Override
+            public String getPassword() {
+                return db.getPassword();
+            }
+        };
     }
 
-    /** {@link SourceFunction} emits all the data and waits for the 
checkpoint. */
-    private static class CheckpointAwaitingSource<T extends Serializable>
-            implements SourceFunction<T>, CheckpointListener, 
CheckpointedFunction {
+    /** {@link SourceFunction} emits {@link TestEntry test entries} and waits 
for the checkpoint. */
+    private static class TestEntrySource extends 
RichParallelSourceFunction<TestEntry>
+            implements CheckpointListener, CheckpointedFunction {
+        private final int numElements;
+        private final int numElementsPerCheckpoint;
+
+        private transient ListState<SourceRange> ranges;
+
         private volatile boolean allDataEmitted = false;
-        private volatile boolean dataCheckpointed = false;
+        private volatile boolean snapshotTaken = false;
+        private volatile long lastCheckpointId = -1L;
+        private volatile boolean lastSnapshotConfirmed = false;
         private volatile boolean running = true;
-        private volatile long checkpointAfterData = -1L;
-        private final T[] data;
+        private static volatile CountDownLatch runningSources;
 
-        private CheckpointAwaitingSource(T... data) {
-            this.data = data;
+        private TestEntrySource(int numElements, int numElementsPerCheckpoint) 
{
+            this.numElements = numElements;
+            this.numElementsPerCheckpoint = numElementsPerCheckpoint;
         }
 
         @Override
-        public void run(SourceContext<T> ctx) {
-            for (T datum : data) {
-                ctx.collect(datum);
+        public void run(SourceContext<TestEntry> ctx) throws Exception {
+            for (SourceRange range : ranges.get()) {
+                for (int i = range.from; i < range.to; ) {
+                    synchronized (ctx.getCheckpointLock()) {
+                        snapshotTaken = false;
+                        for (int j = 0; j < numElementsPerCheckpoint && i < 
range.to; j++, i++) {
+                            emit(ctx, i);
+                            range.advance();
+                        }
+                    }
+                    sleep(() -> !snapshotTaken);
+                }
             }
             allDataEmitted = true;
-            while (!dataCheckpointed && running) {
-                Thread.yield();
-            }
+            sleep(() -> !lastSnapshotConfirmed);
+            runningSources.countDown();
+            runningSources.await(); // participate in checkpointing
+        }
+
+        private void emit(SourceContext<TestEntry> ctx, int i) {
+            ctx.collect(new TestEntry(i, Integer.toString(i), 
Integer.toString(i), (double) i, i));
         }
 
         @Override
@@ -102,19 +212,108 @@ public class JdbcExactlyOnceSinkE2eTest extends 
JdbcXaSinkTestBase {
 
         @Override
         public void notifyCheckpointComplete(long checkpointId) {
-            if (checkpointId == this.checkpointAfterData) {
-                dataCheckpointed = true;
+            if (lastCheckpointId > -1L && checkpointId >= 
this.lastCheckpointId) {
+                lastSnapshotConfirmed = true;
+            }
+        }
+
+        @Override
+        public void open(Configuration parameters) throws Exception {
+            if (getRuntimeContext().getIndexOfThisSubtask() == 0) {
+                runningSources =
+                        new 
CountDownLatch(getRuntimeContext().getNumberOfParallelSubtasks());
+            }
+        }
+
+        @Override
+        public void initializeState(FunctionInitializationContext context) 
throws Exception {
+            ranges =
+                    context.getOperatorStateStore()
+                            .getListState(
+                                    new ListStateDescriptor<>("SourceState", 
SourceRange.class));
+            if (!context.isRestored()) {
+                ranges.update(
+                        singletonList(
+                                SourceRange.forSubtask(
+                                        
getRuntimeContext().getIndexOfThisSubtask(), numElements)));
             }
         }
 
         @Override
         public void snapshotState(FunctionSnapshotContext context) {
+            snapshotTaken = true;
             if (allDataEmitted) {
-                checkpointAfterData = context.getCheckpointId();
+                lastCheckpointId = context.getCheckpointId();
+            }
+        }
+
+        private void sleep(Supplier<Boolean> condition) {
+            while (condition.get() && running && 
!Thread.currentThread().isInterrupted()) {
+                try {
+                    Thread.sleep(10);
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                    ExceptionUtils.rethrow(e);
+                }
+            }
+        }
+
+        private static final class SourceRange {
+            private int from;
+            private final int to;
+
+            private SourceRange(int from, int to) {
+                this.from = from;
+                this.to = to;
+            }
+
+            public static SourceRange forSubtask(int subtaskIndex, int 
elementCount) {
+                return new SourceRange(
+                        subtaskIndex * elementCount, (subtaskIndex + 1) * 
elementCount);
+            }
+
+            public void advance() {
+                checkState(from < to);
+                from++;
             }
         }
+    }
+
+    private static XADataSource getXaDataSource(String jdbcUrl, String 
username, String password) {
+        PGXADataSource xaDataSource = new PGXADataSource();
+        xaDataSource.setUrl(jdbcUrl);
+        xaDataSource.setUser(username);
+        xaDataSource.setPassword(password);
+        return xaDataSource;
+    }
+
+    private static class FailingMapper extends RichMapFunction<TestEntry, 
TestEntry> {
+        private final int minElementsPerFailure;
+        private final int maxElementsPerFailure;
+        private transient int remaining;
+
+        public FailingMapper(int minElementsPerFailure, int 
maxElementsPerFailure) {
+            this.minElementsPerFailure = minElementsPerFailure;
+            this.maxElementsPerFailure = maxElementsPerFailure;
+        }
 
         @Override
-        public void initializeState(FunctionInitializationContext context) {}
+        public void open(Configuration parameters) throws Exception {
+            remaining = minElementsPerFailure + new 
Random().nextInt(maxElementsPerFailure);
+        }
+
+        @Override
+        public TestEntry map(TestEntry value) throws Exception {
+            if (--remaining <= 0) {
+                throw new TestException();
+            }
+            return value;
+        }
+    }
+
+    private static final class TestException extends Exception {
+        public TestException() {
+            super("expected", null, true, false);
+        }
     }
 }
diff --git 
a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaFacadeTestHelper.java
 
b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaFacadeTestHelper.java
index 46a1141..91cf0d4 100644
--- 
a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaFacadeTestHelper.java
+++ 
b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaFacadeTestHelper.java
@@ -36,24 +36,18 @@ import static 
org.apache.flink.connector.jdbc.JdbcTestFixture.TEST_DATA;
 import static org.junit.Assert.assertEquals;
 
 class JdbcXaFacadeTestHelper implements AutoCloseable {
-    private final XADataSource xaDataSource;
     private final String table;
     private final String dbUrl;
     private final String user;
     private final String pass;
     private final XaFacade xaFacade;
 
-    JdbcXaFacadeTestHelper(XADataSource xaDataSource, String dbUrl, String 
table) throws Exception {
-        this(xaDataSource, dbUrl, table, "", "");
-    }
-
     JdbcXaFacadeTestHelper(
             XADataSource xaDataSource, String dbUrl, String table, String 
user, String pass)
             throws Exception {
-        this.xaDataSource = xaDataSource;
         this.dbUrl = dbUrl;
         this.table = table;
-        this.xaFacade = XaFacadeImpl.fromXaDataSource(this.xaDataSource);
+        this.xaFacade = XaFacadeImpl.fromXaDataSource(xaDataSource);
         this.xaFacade.open();
         this.user = user;
         this.pass = pass;
@@ -63,14 +57,6 @@ class JdbcXaFacadeTestHelper implements AutoCloseable {
         assertEquals(expected, xaFacade.recover().size());
     }
 
-    void assertDbContentsEquals(int[]... dataIdx) throws SQLException {
-        
assertDbContentsEquals(Arrays.stream(dataIdx).flatMapToInt(Arrays::stream));
-    }
-
-    void assertDbContentsEquals(int... dataIdx) throws SQLException {
-        assertDbContentsEquals(Arrays.stream(dataIdx));
-    }
-
     void assertDbContentsEquals(JdbcTestCheckpoint... checkpoints) throws 
SQLException {
         assertDbContentsEquals(
                 Arrays.stream(checkpoints).flatMapToInt(x -> 
Arrays.stream(x.dataItemsIdx)));
@@ -86,6 +72,11 @@ class JdbcXaFacadeTestHelper implements AutoCloseable {
     }
 
     private List<Integer> getInsertedIds() throws SQLException {
+        return getInsertedIds(dbUrl, user, pass, table);
+    }
+
+    static List<Integer> getInsertedIds(String dbUrl, String user, String 
pass, String table)
+            throws SQLException {
         List<Integer> dbContents = new ArrayList<>();
         try (Connection connection = DriverManager.getConnection(dbUrl, user, 
pass)) {
             
connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);
@@ -114,14 +105,6 @@ class JdbcXaFacadeTestHelper implements AutoCloseable {
         }
     }
 
-    public XADataSource getXaDataSource() {
-        return xaDataSource;
-    }
-
-    XaFacade getXaFacade() {
-        return xaFacade;
-    }
-
     @Override
     public void close() throws Exception {
         xaFacade.close();
diff --git 
a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkMigrationTest.java
 
b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkMigrationTest.java
index b07ed23..888d355 100644
--- 
a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkMigrationTest.java
+++ 
b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkMigrationTest.java
@@ -90,7 +90,9 @@ public class JdbcXaSinkMigrationTest extends JdbcTestBase {
                 new JdbcXaFacadeTestHelper(
                         JdbcXaSinkDerbyTest.derbyXaDs(),
                         getDbMetadata().getUrl(),
-                        JdbcTestFixture.INPUT_TABLE)) {
+                        JdbcTestFixture.INPUT_TABLE,
+                        getDbMetadata().getUser(),
+                        getDbMetadata().getPassword())) {
             h.assertDbContentsEquals(CP0);
         }
     }
@@ -178,7 +180,9 @@ public class JdbcXaSinkMigrationTest extends JdbcTestBase {
                 new JdbcXaFacadeTestHelper(
                         derbyXaDs(),
                         JdbcTestFixture.DERBY_EBOOKSHOP_DB.getUrl(),
-                        JdbcTestFixture.INPUT_TABLE)) {
+                        JdbcTestFixture.INPUT_TABLE,
+                        JdbcTestFixture.DERBY_EBOOKSHOP_DB.getUser(),
+                        JdbcTestFixture.DERBY_EBOOKSHOP_DB.getPassword())) {
             xa.cancelAllTx();
         }
     }
diff --git 
a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkTestBase.java
 
b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkTestBase.java
index daad0ca..79d57f6 100644
--- 
a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkTestBase.java
+++ 
b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkTestBase.java
@@ -86,7 +86,11 @@ public abstract class JdbcXaSinkTestBase extends 
JdbcTestBase {
         xaDataSource = getDbMetadata().buildXaDataSource();
         xaHelper =
                 new JdbcXaFacadeTestHelper(
-                        getDbMetadata().buildXaDataSource(), 
getDbMetadata().getUrl(), INPUT_TABLE);
+                        getDbMetadata().buildXaDataSource(),
+                        getDbMetadata().getUrl(),
+                        INPUT_TABLE,
+                        getDbMetadata().getUser(),
+                        getDbMetadata().getPassword());
         sinkHelper = buildSinkHelper(createStateHandler());
     }
 
@@ -103,7 +107,12 @@ public abstract class JdbcXaSinkTestBase extends 
JdbcTestBase {
             xaHelper.close();
         }
         try (JdbcXaFacadeTestHelper xa =
-                new JdbcXaFacadeTestHelper(xaDataSource, 
getDbMetadata().getUrl(), INPUT_TABLE)) {
+                new JdbcXaFacadeTestHelper(
+                        xaDataSource,
+                        getDbMetadata().getUrl(),
+                        INPUT_TABLE,
+                        getDbMetadata().getUser(),
+                        getDbMetadata().getPassword())) {
             xa.cancelAllTx();
         }
     }

Reply via email to