[
https://issues.apache.org/jira/browse/FLINK-39265?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
ASF GitHub Bot updated FLINK-39265:
-----------------------------------
Labels: pull-request-available (was: )
> [postgres-cdc] PostgreSQL CDC intermittently drops INSERT records after
> checkpoint recovery due to WalPositionLocator filtering
> -------------------------------------------------------------------------------------------------------------------------------
>
> Key: FLINK-39265
> URL: https://issues.apache.org/jira/browse/FLINK-39265
> Project: Flink
> Issue Type: Bug
> Components: Flink CDC
> Affects Versions: cdc-3.5.0
> Environment: - Flink CDC version: 3.5
> - PostgreSQL version: 14.3
> Reporter: Di Wu
> Priority: Major
> Labels: pull-request-available
>
> h1. Root Cause Analysis
> h3. Background: pgoutput WAL message layout
> In pgoutput non-streaming mode, the XLogData data_start field is:
> * For BEGIN + all DML messages: = the begin WAL record LSN of that
> transaction
> * For COMMIT messages: = the commit WAL record LSN
> This means when transaction T1 commits at LSN = Y, the next transaction
> T2's BEGIN record starts at LSN = Y as well (immediately adjacent in WAL).
> So *data_start(T1 COMMIT) = data_start(T2 BEGIN) = Y*
> h3. The bug
> When Flink CDC recovers from a checkpoint, the stored offset contains:
> * lsn_proc = Y (last completely processed LSN)
> * lsn_commit = Y (last commit LSN)
> PostgresStreamingChangeEventSource constructs:
>
> {code:java}
> walPosition = new WalPositionLocator(
> offsetContext.lastCommitLsn(), // = Y
> offsetContext.lastCompletelyProcessedLsn() // = Y
> ); {code}
>
>
> *Find phase behavior (read WAL from Y)*
> |Message|data_start (lastReceiveLsn)|resumeFromLsn result|
> |----------|---------------------------|-----------------------------------------|
> |COMMIT(T1)|Y|storeLsnAfterLastEventStoredLsn = true|
> |BEGIN(T2)|Y|Y == lastEventStoredLsn → return empty|
> |INSERT(T2)|Y|Y == lastEventStoredLsn → return empty|
> |COMMIT(T2)|Z|Z != Y → startStreamingLsn = Z, find ends|
> Result: lsnSeen = \{Y, Z} startStreamingLsn = Z
> *Actual streaming phase (reconnect from Y)*
> |Message|lastReceiveLsn|skipMessage() result|
> |----------|--------------|-----------------------------------|
> |BEGIN(T2)|Y|BEGIN → forced false (pass)|
> |INSERT(T2)|Y|Y ∈ lsnSeen, Y ≠ Z → FILTERED ✗|
> |COMMIT(T2)|Z|Z == startStreamingLsn → switch-off|
> *Problem The INSERT is permanently dropped before switch-off occurs.*
> h2. Reproduction Steps
> h3. Prerequisites
> A running PostgreSQL instance
> Flink CDC PostgreSQL source using pgoutput plugin
> Setup
> {code:java}
> CREATE TABLE cdc_test.repro_table (
> id SERIAL PRIMARY KEY,
> name VARCHAR(100)
> );{code}
> Steps
> *Step 1: Establish anchor LSN*
> Commit two back-to-back transactions with no WAL gap.
> Save as repro.sql:
>
> {code:java}
> – tx1: anchor transaction (establishes lastEventStoredLsn = Y)
> UPDATE cdc_test.repro_table SET name = name WHERE false;
> – capture Y immediately (this SELECT writes no WAL)
> SELECT pg_wal_lsn_diff(pg_current_wal_lsn(), '0/0'::pg_lsn) AS anchor_lsn;
> – tx2: target record — BEGIN WAL record lands at exactly Y
> INSERT INTO cdc_test.repro_table (name) VALUES ('should_appear');
> {code}
>
> Run:
> {code:java}
> psql -h <host> -p <port> -U <user> -d <db> -f repro.sql{code}
> Record the printed anchor_lsn (e.g. 39515048).
> *Step 2: Configure Flink CDC offset or recover by checkpoint has these offset*
> {code:java}
> Map<String, String> offset = new HashMap<>();
> offset.put("lsn", "39515048");
> offset.put("lsn_proc", "39515048"); // triggers the bug
> offset.put("lsn_commit", "39515048");
> offset.put("txId", "0");
> offset.put("ts_usec", "0");
> PostgreSQLSource.<String>builder()
> ...
> .startupOptions(StartupOptions.specificOffset(offset))
> .build();
> {code}
> *Step 3: Run job*
> Start the Flink job and observe:
> The inserted record 'should_appear' is missing
--
This message was sent by Atlassian Jira
(v8.20.10#820010)