This is an automated email from the ASF dual-hosted git repository.

leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new b283da6d9 [FLINK-37449][cdc-connector/postgres] Fix Postgres CDC 
source doesn't commit lsn when TaskManager failover
b283da6d9 is described below

commit b283da6d9a2247c90f864a60e42abfba42e2183a
Author: Xin Gong <[email protected]>
AuthorDate: Wed Mar 12 21:25:08 2025 +0800

    [FLINK-37449][cdc-connector/postgres] Fix Postgres CDC source doesn't 
commit lsn when TaskManager failover
    
    This closes #3944.
---
 .../enumerator/PostgresSourceEnumerator.java       |   4 +
 .../source/reader/PostgresSourceReader.java        |   5 +-
 .../postgres/source/PostgresSourceITCase.java      | 128 +++++++++++++++++++++
 3 files changed, 135 insertions(+), 2 deletions(-)

diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/enumerator/PostgresSourceEnumerator.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/enumerator/PostgresSourceEnumerator.java
index be6cfa71f..30799309d 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/enumerator/PostgresSourceEnumerator.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/enumerator/PostgresSourceEnumerator.java
@@ -24,6 +24,7 @@ import org.apache.flink.cdc.common.annotation.Internal;
 import org.apache.flink.cdc.connectors.base.source.assigner.AssignerStatus;
 import org.apache.flink.cdc.connectors.base.source.assigner.SplitAssigner;
 import 
org.apache.flink.cdc.connectors.base.source.enumerator.IncrementalSourceEnumerator;
+import 
org.apache.flink.cdc.connectors.base.source.meta.events.StreamSplitAssignedEvent;
 import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitBase;
 import org.apache.flink.cdc.connectors.postgres.source.PostgresDialect;
 import 
org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceConfig;
@@ -83,6 +84,9 @@ public class PostgresSourceEnumerator extends 
IncrementalSourceEnumerator {
 
     @Override
     public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
+        if (sourceEvent instanceof StreamSplitAssignedEvent && 
this.receiveOffsetCommitAck) {
+            this.receiveOffsetCommitAck = false;
+        }
         if (sourceEvent instanceof OffsetCommitAckEvent) {
             if (streamSplitTaskId != null && streamSplitTaskId == subtaskId) {
                 this.receiveOffsetCommitAck = true;
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresSourceReader.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresSourceReader.java
index 81bff8488..0bad047cf 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresSourceReader.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresSourceReader.java
@@ -119,9 +119,10 @@ public class PostgresSourceReader extends 
IncrementalSourceReaderWithCommit {
         }
         final long checkpointIdToCommit = this.minHeap.poll();
         LOG.info(
-                "Pending checkpoints '{}', to be committed checkpoint id 
'{}'.",
+                "Pending checkpoints '{}', to be committed checkpoint id '{}', 
isCommitOffset is {}.",
                 this.minHeap,
-                checkpointIdToCommit);
+                checkpointIdToCommit,
+                isCommitOffset());
 
         // After all snapshot splits are finished, update stream split's 
metadata and reset start
         // offset, which maybe smaller than before.
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceITCase.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceITCase.java
index 7d2c6dde8..582578829 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceITCase.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceITCase.java
@@ -698,6 +698,74 @@ public class PostgresSourceITCase extends PostgresTestBase 
{
         }
     }
 
+    @Test
+    public void testCommitLsnWhenTaskManagerFailover() throws Exception {
+        int parallelism = 1;
+        PostgresTestUtils.FailoverType failoverType = 
PostgresTestUtils.FailoverType.TM;
+        PostgresTestUtils.FailoverPhase failoverPhase = 
PostgresTestUtils.FailoverPhase.STREAM;
+        String[] captureCustomerTables = new String[] {"customers"};
+        RestartStrategies.RestartStrategyConfiguration 
restartStrategyConfiguration =
+                RestartStrategies.fixedDelayRestart(1, 0);
+        boolean skipSnapshotBackfill = false;
+
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+
+        env.setParallelism(parallelism);
+        env.enableCheckpointing(1000L);
+        env.setRestartStrategy(restartStrategyConfiguration);
+        String sourceDDL =
+                format(
+                        "CREATE TABLE customers ("
+                                + " id BIGINT NOT NULL,"
+                                + " name STRING,"
+                                + " address STRING,"
+                                + " phone_number STRING,"
+                                + " primary key (id) not enforced"
+                                + ") WITH ("
+                                + " 'connector' = 'postgres-cdc',"
+                                + " 'scan.incremental.snapshot.enabled' = 
'true',"
+                                + " 'hostname' = '%s',"
+                                + " 'port' = '%s',"
+                                + " 'username' = '%s',"
+                                + " 'password' = '%s',"
+                                + " 'database-name' = '%s',"
+                                + " 'schema-name' = '%s',"
+                                + " 'table-name' = '%s',"
+                                + " 'scan.startup.mode' = '%s',"
+                                + " 'scan.incremental.snapshot.chunk.size' = 
'100',"
+                                + " 'slot.name' = '%s',"
+                                + " 'scan.incremental.snapshot.backfill.skip' 
= '%s',"
+                                + " 'scan.newly-added-table.enabled' = 'true',"
+                                + " 'scan.lsn-commit.checkpoints-num-delay' = 
'0'"
+                                + ")",
+                        customDatabase.getHost(),
+                        customDatabase.getDatabasePort(),
+                        customDatabase.getUsername(),
+                        customDatabase.getPassword(),
+                        customDatabase.getDatabaseName(),
+                        SCHEMA_NAME,
+                        getTableNameRegex(captureCustomerTables),
+                        scanStartupMode,
+                        slotName,
+                        skipSnapshotBackfill);
+        tEnv.executeSql(sourceDDL);
+        TableResult tableResult = tEnv.executeSql("select * from customers");
+
+        // first step: check the snapshot data
+        if (DEFAULT_SCAN_STARTUP_MODE.equals(scanStartupMode)) {
+            checkSnapshotData(tableResult, failoverType, failoverPhase, 
captureCustomerTables);
+        }
+
+        // second step: check the stream data
+        checkStreamDataWithTestLsn(tableResult, failoverType, failoverPhase, 
captureCustomerTables);
+
+        tableResult.getJobClient().get().cancel().get();
+
+        // sleep 1000ms to wait until connections are closed.
+        Thread.sleep(1000L);
+    }
+
     private List<String> testBackfillWhenWritingEvents(
             boolean skipSnapshotBackfill,
             int fetchSize,
@@ -1113,6 +1181,46 @@ public class PostgresSourceITCase extends 
PostgresTestBase {
         assertTrue(!hasNextData(iterator));
     }
 
+    private void checkStreamDataWithTestLsn(
+            TableResult tableResult,
+            PostgresTestUtils.FailoverType failoverType,
+            PostgresTestUtils.FailoverPhase failoverPhase,
+            String[] captureCustomerTables)
+            throws Exception {
+        waitUntilJobRunning(tableResult);
+        JobID jobId = tableResult.getJobClient().get().getJobID();
+
+        for (String tableId : captureCustomerTables) {
+            makeFirstPartStreamEvents(
+                    getConnection(),
+                    customDatabase.getDatabaseName() + '.' + SCHEMA_NAME + '.' 
+ tableId);
+        }
+
+        // wait for the stream reading and isCommitOffset is true
+        Thread.sleep(20000L);
+
+        String confirmedFlushLsn;
+        try (PostgresConnection connection = getConnection()) {
+            confirmedFlushLsn = getConfirmedFlushLsn(connection);
+        }
+        if (failoverPhase == PostgresTestUtils.FailoverPhase.STREAM) {
+            triggerFailover(
+                    failoverType, jobId, miniClusterResource.getMiniCluster(), 
() -> sleepMs(200));
+            waitUntilJobRunning(tableResult);
+        }
+        // wait for the stream reading and isCommitOffset is true
+        Thread.sleep(30000L);
+        for (String tableId : captureCustomerTables) {
+            makeSecondPartStreamEvents(
+                    getConnection(),
+                    customDatabase.getDatabaseName() + '.' + SCHEMA_NAME + '.' 
+ tableId);
+        }
+        Thread.sleep(5000L);
+        try (PostgresConnection connection = getConnection()) {
+            
assertTrue(!confirmedFlushLsn.equals(getConfirmedFlushLsn(connection)));
+        }
+    }
+
     private void sleepMs(long millis) {
         try {
             Thread.sleep(millis);
@@ -1226,4 +1334,24 @@ public class PostgresSourceITCase extends 
PostgresTestBase {
                     return rs.getLong(1);
                 });
     }
+
+    private String getConfirmedFlushLsn(JdbcConnection jdbc) throws 
SQLException {
+        final String query =
+                String.format(
+                        "SELECT\n"
+                                + "confirmed_flush_lsn\n"
+                                + "FROM pg_replication_slots where slot_name = 
'%s'",
+                        slotName);
+
+        return jdbc.queryAndMap(
+                query,
+                rs -> {
+                    if (!rs.next()) {
+                        throw new SQLException(
+                                String.format(
+                                        "No result returned after running 
query [%s]", query));
+                    }
+                    return rs.getString(1);
+                });
+    }
 }

Reply via email to