This is an automated email from the ASF dual-hosted git repository.
leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push:
new b283da6d9 [FLINK-37449][cdc-connector/postgres] Fix Postgres CDC
source doesn't commit lsn when TaskManager failover
b283da6d9 is described below
commit b283da6d9a2247c90f864a60e42abfba42e2183a
Author: Xin Gong <[email protected]>
AuthorDate: Wed Mar 12 21:25:08 2025 +0800
[FLINK-37449][cdc-connector/postgres] Fix Postgres CDC source doesn't
commit lsn when TaskManager failover
This closes #3944.
---
.../enumerator/PostgresSourceEnumerator.java | 4 +
.../source/reader/PostgresSourceReader.java | 5 +-
.../postgres/source/PostgresSourceITCase.java | 128 +++++++++++++++++++++
3 files changed, 135 insertions(+), 2 deletions(-)
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/enumerator/PostgresSourceEnumerator.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/enumerator/PostgresSourceEnumerator.java
index be6cfa71f..30799309d 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/enumerator/PostgresSourceEnumerator.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/enumerator/PostgresSourceEnumerator.java
@@ -24,6 +24,7 @@ import org.apache.flink.cdc.common.annotation.Internal;
import org.apache.flink.cdc.connectors.base.source.assigner.AssignerStatus;
import org.apache.flink.cdc.connectors.base.source.assigner.SplitAssigner;
import
org.apache.flink.cdc.connectors.base.source.enumerator.IncrementalSourceEnumerator;
+import
org.apache.flink.cdc.connectors.base.source.meta.events.StreamSplitAssignedEvent;
import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitBase;
import org.apache.flink.cdc.connectors.postgres.source.PostgresDialect;
import
org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceConfig;
@@ -83,6 +84,9 @@ public class PostgresSourceEnumerator extends
IncrementalSourceEnumerator {
@Override
public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
+ if (sourceEvent instanceof StreamSplitAssignedEvent &&
this.receiveOffsetCommitAck) {
+ this.receiveOffsetCommitAck = false;
+ }
if (sourceEvent instanceof OffsetCommitAckEvent) {
if (streamSplitTaskId != null && streamSplitTaskId == subtaskId) {
this.receiveOffsetCommitAck = true;
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresSourceReader.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresSourceReader.java
index 81bff8488..0bad047cf 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresSourceReader.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresSourceReader.java
@@ -119,9 +119,10 @@ public class PostgresSourceReader extends
IncrementalSourceReaderWithCommit {
}
final long checkpointIdToCommit = this.minHeap.poll();
LOG.info(
- "Pending checkpoints '{}', to be committed checkpoint id
'{}'.",
+ "Pending checkpoints '{}', to be committed checkpoint id '{}',
isCommitOffset is {}.",
this.minHeap,
- checkpointIdToCommit);
+ checkpointIdToCommit,
+ isCommitOffset());
// After all snapshot splits are finished, update stream split's
metadata and reset start
// offset, which maybe smaller than before.
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceITCase.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceITCase.java
index 7d2c6dde8..582578829 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceITCase.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceITCase.java
@@ -698,6 +698,74 @@ public class PostgresSourceITCase extends PostgresTestBase
{
}
}
+ @Test
+ public void testCommitLsnWhenTaskManagerFailover() throws Exception {
+ int parallelism = 1;
+ PostgresTestUtils.FailoverType failoverType =
PostgresTestUtils.FailoverType.TM;
+ PostgresTestUtils.FailoverPhase failoverPhase =
PostgresTestUtils.FailoverPhase.STREAM;
+ String[] captureCustomerTables = new String[] {"customers"};
+ RestartStrategies.RestartStrategyConfiguration
restartStrategyConfiguration =
+ RestartStrategies.fixedDelayRestart(1, 0);
+ boolean skipSnapshotBackfill = false;
+
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+
+ env.setParallelism(parallelism);
+ env.enableCheckpointing(1000L);
+ env.setRestartStrategy(restartStrategyConfiguration);
+ String sourceDDL =
+ format(
+ "CREATE TABLE customers ("
+ + " id BIGINT NOT NULL,"
+ + " name STRING,"
+ + " address STRING,"
+ + " phone_number STRING,"
+ + " primary key (id) not enforced"
+ + ") WITH ("
+ + " 'connector' = 'postgres-cdc',"
+ + " 'scan.incremental.snapshot.enabled' =
'true',"
+ + " 'hostname' = '%s',"
+ + " 'port' = '%s',"
+ + " 'username' = '%s',"
+ + " 'password' = '%s',"
+ + " 'database-name' = '%s',"
+ + " 'schema-name' = '%s',"
+ + " 'table-name' = '%s',"
+ + " 'scan.startup.mode' = '%s',"
+ + " 'scan.incremental.snapshot.chunk.size' =
'100',"
+ + " 'slot.name' = '%s',"
+ + " 'scan.incremental.snapshot.backfill.skip'
= '%s',"
+ + " 'scan.newly-added-table.enabled' = 'true',"
+ + " 'scan.lsn-commit.checkpoints-num-delay' =
'0'"
+ + ")",
+ customDatabase.getHost(),
+ customDatabase.getDatabasePort(),
+ customDatabase.getUsername(),
+ customDatabase.getPassword(),
+ customDatabase.getDatabaseName(),
+ SCHEMA_NAME,
+ getTableNameRegex(captureCustomerTables),
+ scanStartupMode,
+ slotName,
+ skipSnapshotBackfill);
+ tEnv.executeSql(sourceDDL);
+ TableResult tableResult = tEnv.executeSql("select * from customers");
+
+ // first step: check the snapshot data
+ if (DEFAULT_SCAN_STARTUP_MODE.equals(scanStartupMode)) {
+ checkSnapshotData(tableResult, failoverType, failoverPhase,
captureCustomerTables);
+ }
+
+ // second step: check the stream data
+ checkStreamDataWithTestLsn(tableResult, failoverType, failoverPhase,
captureCustomerTables);
+
+ tableResult.getJobClient().get().cancel().get();
+
+ // sleep 1000ms to wait until connections are closed.
+ Thread.sleep(1000L);
+ }
+
private List<String> testBackfillWhenWritingEvents(
boolean skipSnapshotBackfill,
int fetchSize,
@@ -1113,6 +1181,46 @@ public class PostgresSourceITCase extends
PostgresTestBase {
assertTrue(!hasNextData(iterator));
}
+ private void checkStreamDataWithTestLsn(
+ TableResult tableResult,
+ PostgresTestUtils.FailoverType failoverType,
+ PostgresTestUtils.FailoverPhase failoverPhase,
+ String[] captureCustomerTables)
+ throws Exception {
+ waitUntilJobRunning(tableResult);
+ JobID jobId = tableResult.getJobClient().get().getJobID();
+
+ for (String tableId : captureCustomerTables) {
+ makeFirstPartStreamEvents(
+ getConnection(),
+ customDatabase.getDatabaseName() + '.' + SCHEMA_NAME + '.'
+ tableId);
+ }
+
+ // wait for the stream reading and isCommitOffset is true
+ Thread.sleep(20000L);
+
+ String confirmedFlushLsn;
+ try (PostgresConnection connection = getConnection()) {
+ confirmedFlushLsn = getConfirmedFlushLsn(connection);
+ }
+ if (failoverPhase == PostgresTestUtils.FailoverPhase.STREAM) {
+ triggerFailover(
+ failoverType, jobId, miniClusterResource.getMiniCluster(),
() -> sleepMs(200));
+ waitUntilJobRunning(tableResult);
+ }
+ // wait for the stream reading and isCommitOffset is true
+ Thread.sleep(30000L);
+ for (String tableId : captureCustomerTables) {
+ makeSecondPartStreamEvents(
+ getConnection(),
+ customDatabase.getDatabaseName() + '.' + SCHEMA_NAME + '.'
+ tableId);
+ }
+ Thread.sleep(5000L);
+ try (PostgresConnection connection = getConnection()) {
+
assertTrue(!confirmedFlushLsn.equals(getConfirmedFlushLsn(connection)));
+ }
+ }
+
private void sleepMs(long millis) {
try {
Thread.sleep(millis);
@@ -1226,4 +1334,24 @@ public class PostgresSourceITCase extends
PostgresTestBase {
return rs.getLong(1);
});
}
+
+ private String getConfirmedFlushLsn(JdbcConnection jdbc) throws
SQLException {
+ final String query =
+ String.format(
+ "SELECT\n"
+ + "confirmed_flush_lsn\n"
+ + "FROM pg_replication_slots where slot_name =
'%s'",
+ slotName);
+
+ return jdbc.queryAndMap(
+ query,
+ rs -> {
+ if (!rs.next()) {
+ throw new SQLException(
+ String.format(
+ "No result returned after running
query [%s]", query));
+ }
+ return rs.getString(1);
+ });
+ }
}