curcur commented on a change in pull request #15636:
URL: https://github.com/apache/flink/pull/15636#discussion_r615913550
##########
File path:
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/XaFacadeImpl.java
##########
@@ -115,10 +116,6 @@ public void close() throws SQLException {
connection.close();
connection = null;
}
- if (xaConnection != null) {
- xaConnection.close();
- xaConnection = null;
- }
Review comment:
why this is removed and not needed to be closed?
##########
File path:
flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcExactlyOnceSinkE2eTest.java
##########
@@ -102,19 +212,108 @@ public void cancel() {
@Override
public void notifyCheckpointComplete(long checkpointId) {
- if (checkpointId == this.checkpointAfterData) {
- dataCheckpointed = true;
+ if (checkpointId == this.lastCheckpointId) {
+ lastSnapshotConfirmed = true;
+ }
+ }
Review comment:
would this cause test in-stablity?
##########
File path:
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/XaFacadeImpl.java
##########
@@ -178,7 +175,10 @@ public void failOrRollback(Xid xid) {
Command.fromRunnable(
"end (fail)",
xid,
- () -> xaResource.end(xid, XAResource.TMFAIL),
+ () -> {
+ xaResource.end(xid, XAResource.TMFAIL);
+ xaResource.rollback(xid);
Review comment:
This looks like a `FailAndRollback`?
##########
File path:
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkFunction.java
##########
@@ -234,13 +231,19 @@ public void open(Configuration configuration) throws
Exception {
xaGroupOps.recoverAndRollback();
}
beginTx(0L);
+ outputFormat.setRuntimeContext(getRuntimeContext());
+ // open format only after starting the transaction so it gets a ready
to use connection
+ outputFormat.open(
+ getRuntimeContext().getIndexOfThisSubtask(),
+ getRuntimeContext().getNumberOfParallelSubtasks());
}
@Override
public void snapshotState(FunctionSnapshotContext context) throws
Exception {
LOG.debug("snapshot state, checkpointId={}",
context.getCheckpointId());
prepareCurrentTx(context.getCheckpointId());
beginTx(context.getCheckpointId() + 1);
+ outputFormat.reconnect(false); // associate with potentially new
connection
Review comment:
This is to let the output connects to a new connection after a new
transaction begins, right? Maybe name it something similar and makes the
comment more understandable?
It is difficult to infer what does "potentially" potentially mean here...
##########
File path:
flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcExactlyOnceSinkE2eTest.java
##########
@@ -31,68 +37,172 @@
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.util.ExceptionUtils;
+import org.junit.Rule;
import org.junit.Test;
+import org.postgresql.xa.PGXADataSource;
+import org.testcontainers.containers.PostgreSQLContainer;
-import java.io.Serializable;
+import javax.sql.XADataSource;
+
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.CountDownLatch;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
import java.util.stream.IntStream;
-import static
org.apache.flink.connector.jdbc.JdbcTestFixture.DERBY_EBOOKSHOP_DB;
+import static java.util.Collections.singletonList;
+import static
org.apache.flink.configuration.JobManagerOptions.EXECUTION_FAILOVER_STRATEGY;
import static org.apache.flink.connector.jdbc.JdbcTestFixture.INPUT_TABLE;
import static org.apache.flink.connector.jdbc.JdbcTestFixture.INSERT_TEMPLATE;
-import static org.apache.flink.connector.jdbc.JdbcTestFixture.TEST_DATA;
+import static
org.apache.flink.connector.jdbc.xa.JdbcXaFacadeTestHelper.getInsertedIds;
+import static org.apache.flink.util.Preconditions.checkState;
+import static org.junit.Assert.assertTrue;
/** A simple end-to-end test for {@link JdbcXaSinkFunction}. */
-public class JdbcExactlyOnceSinkE2eTest extends JdbcXaSinkTestBase {
+public class JdbcExactlyOnceSinkE2eTest extends JdbcTestBase {
+
+ private static final class PgXaDb extends PostgreSQLContainer<PgXaDb> {
+ public PgXaDb(String dockerImageName) {
+ super(dockerImageName);
+ // set max_prepared_transactions to non-zero
+ this.setCommand("postgres", "-c", "max_prepared_transactions=50",
"-c", "fsync=off");
+ }
+ }
+
+ @Rule public PgXaDb db = new PgXaDb("postgres:9.6.12");
+
+ @Override
+ public void after() throws Exception {
+ // no need for cleanup - done by test container tear down
+ }
@Test
public void testInsert() throws Exception {
- StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(1);
- env.setRestartStrategy(new NoRestartStrategyConfiguration());
+ int parallelism = 4,
+ elementsPerSource = 500,
+ numElementsPerCheckpoint = 7,
+ minElementsPerFailure = numElementsPerCheckpoint / 3,
+ maxElementsPerFailure = numElementsPerCheckpoint * 3;
Review comment:
They are cool, but maybe write them in the normal way....?
int xxx = ...
int yyy = ...
##########
File path:
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkFunction.java
##########
@@ -224,11 +224,11 @@ public void open(Configuration configuration) throws
Exception {
hangingXids = new
LinkedList<>(xaGroupOps.failOrRollback(hangingXids).getForRetry());
commitUpToCheckpoint(Optional.empty());
if (options.isDiscoverAndRollbackOnRecovery()) {
- // todo: consider doing recover-rollback later (e.g. after the 1st
checkpoint)
- // when we are sure that all other subtasks started and committed
any of their prepared
- // transactions
- // this would require to distinguish between this job Xids and
other Xids
- xaGroupOps.recoverAndRollback();
+ // Pending transactions which are not included into the checkpoint
might hold locks and
+ // should be rolled back. However, rolling back ALL transactions
can cause data loss. So
+ // each subtask first commits transactions from its state and then
rolls back discovered
+ // transactions if they belong to it.
+ xaGroupOps.recoverAndRollback(getRuntimeContext(), xidGenerator);
Review comment:
Do not understand this part, sync up off line tomorrow.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]