[
https://issues.apache.org/jira/browse/FLINK-39265?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Di Wu updated FLINK-39265:
--------------------------
Description:
h1. Root Cause Analysis
h3. Background: pgoutput WAL message layout
In pgoutput non-streaming mode, the XLogData data_start field is:
* For BEGIN + all DML messages: = the begin WAL record LSN of that transaction
* For COMMIT messages: = the commit WAL record LSN
This means when transaction T1 commits at LSN = Y, the next transaction
T2's BEGIN record starts at LSN = Y as well (immediately adjacent in WAL).
So *data_start(T1 COMMIT) = data_start(T2 BEGIN) = Y*
h3. The bug
When Flink CDC recovers from a checkpoint, the stored offset contains:
* lsn_proc = Y (last completely processed LSN)
* lsn_commit = Y (last commit LSN)
PostgresStreamingChangeEventSource constructs:
{code:java}
walPosition = new WalPositionLocator(
offsetContext.lastCommitLsn(), // = Y
offsetContext.lastCompletelyProcessedLsn() // = Y
); {code}
*Find phase behavior (read WAL from Y)*
|Message|data_start (lastReceiveLsn)|resumeFromLsn result|
|----------|---------------------------|-----------------------------------------|
|COMMIT(T1)|Y|storeLsnAfterLastEventStoredLsn = true|
|BEGIN(T2)|Y|Y == lastEventStoredLsn → return empty|
|INSERT(T2)|Y|Y == lastEventStoredLsn → return empty|
|COMMIT(T2)|Z|Z != Y → startStreamingLsn = Z, find ends|
Result: lsnSeen = \{Y, Z} startStreamingLsn = Z
*Actual streaming phase (reconnect from Y)*
|Message|lastReceiveLsn|skipMessage() result|
|----------|--------------|-----------------------------------|
|BEGIN(T2)|Y|BEGIN → forced false (pass)|
|INSERT(T2)|Y|Y ∈ lsnSeen, Y ≠ Z → FILTERED ✗|
|COMMIT(T2)|Z|Z == startStreamingLsn → switch-off|
*Problem The INSERT is permanently dropped before switch-off occurs.*
h2. Reproduction Steps
h3. Prerequisites
A running PostgreSQL instance
Flink CDC PostgreSQL source using pgoutput plugin
Setup
{code:java}
CREATE TABLE cdc_test.repro_table (
id SERIAL PRIMARY KEY,
name VARCHAR(100)
);{code}
Steps
*Step 1: Establish anchor LSN*
Commit two back-to-back transactions with no WAL gap.
Save as repro.sql:
{code:java}
– tx1: anchor transaction (establishes lastEventStoredLsn = Y)
UPDATE cdc_test.repro_table SET name = name WHERE false;
– capture Y immediately (this SELECT writes no WAL)
SELECT pg_wal_lsn_diff(pg_current_wal_lsn(), '0/0'::pg_lsn) AS anchor_lsn;
– tx2: target record — BEGIN WAL record lands at exactly Y
INSERT INTO cdc_test.repro_table (name) VALUES ('should_appear');
{code}
Run:
{code:java}
psql -h <host> -p <port> -U <user> -d <db> -f repro.sql{code}
Record the printed anchor_lsn (e.g. 39515048).
*Step 2: Configure Flink CDC offset or recover by checkpoint has these offset*
{code:java}
Map<String, String> offset = new HashMap<>();
offset.put("lsn", "39515048");
offset.put("lsn_proc", "39515048"); // triggers the bug
offset.put("lsn_commit", "39515048");
offset.put("txId", "0");
offset.put("ts_usec", "0");
PostgreSQLSource.<String>builder()
...
.startupOptions(StartupOptions.specificOffset(offset))
.build();
{code}
*Step 3: Run job*
Start the Flink job and observe:
The inserted record 'should_appear' is missing
> [postgres-cdc] PostgreSQL CDC intermittently drops INSERT records after
> checkpoint recovery due to WalPositionLocator filtering
> -------------------------------------------------------------------------------------------------------------------------------
>
> Key: FLINK-39265
> URL: https://issues.apache.org/jira/browse/FLINK-39265
> Project: Flink
> Issue Type: Bug
> Components: Flink CDC
> Affects Versions: cdc-3.5.0
> Environment: - Flink CDC version: 3.5
> - PostgreSQL version: 14.3
> Reporter: Di Wu
> Priority: Major
>
> h1. Root Cause Analysis
> h3. Background: pgoutput WAL message layout
> In pgoutput non-streaming mode, the XLogData data_start field is:
> * For BEGIN + all DML messages: = the begin WAL record LSN of that
> transaction
> * For COMMIT messages: = the commit WAL record LSN
> This means when transaction T1 commits at LSN = Y, the next transaction
> T2's BEGIN record starts at LSN = Y as well (immediately adjacent in WAL).
> So *data_start(T1 COMMIT) = data_start(T2 BEGIN) = Y*
> h3. The bug
> When Flink CDC recovers from a checkpoint, the stored offset contains:
> * lsn_proc = Y (last completely processed LSN)
> * lsn_commit = Y (last commit LSN)
> PostgresStreamingChangeEventSource constructs:
>
> {code:java}
> walPosition = new WalPositionLocator(
> offsetContext.lastCommitLsn(), // = Y
> offsetContext.lastCompletelyProcessedLsn() // = Y
> ); {code}
>
>
> *Find phase behavior (read WAL from Y)*
> |Message|data_start (lastReceiveLsn)|resumeFromLsn result|
> |----------|---------------------------|-----------------------------------------|
> |COMMIT(T1)|Y|storeLsnAfterLastEventStoredLsn = true|
> |BEGIN(T2)|Y|Y == lastEventStoredLsn → return empty|
> |INSERT(T2)|Y|Y == lastEventStoredLsn → return empty|
> |COMMIT(T2)|Z|Z != Y → startStreamingLsn = Z, find ends|
> Result: lsnSeen = \{Y, Z} startStreamingLsn = Z
> *Actual streaming phase (reconnect from Y)*
> |Message|lastReceiveLsn|skipMessage() result|
> |----------|--------------|-----------------------------------|
> |BEGIN(T2)|Y|BEGIN → forced false (pass)|
> |INSERT(T2)|Y|Y ∈ lsnSeen, Y ≠ Z → FILTERED ✗|
> |COMMIT(T2)|Z|Z == startStreamingLsn → switch-off|
> *Problem The INSERT is permanently dropped before switch-off occurs.*
> h2. Reproduction Steps
> h3. Prerequisites
> A running PostgreSQL instance
> Flink CDC PostgreSQL source using pgoutput plugin
> Setup
> {code:java}
> CREATE TABLE cdc_test.repro_table (
> id SERIAL PRIMARY KEY,
> name VARCHAR(100)
> );{code}
> Steps
> *Step 1: Establish anchor LSN*
> Commit two back-to-back transactions with no WAL gap.
> Save as repro.sql:
>
> {code:java}
> – tx1: anchor transaction (establishes lastEventStoredLsn = Y)
> UPDATE cdc_test.repro_table SET name = name WHERE false;
> – capture Y immediately (this SELECT writes no WAL)
> SELECT pg_wal_lsn_diff(pg_current_wal_lsn(), '0/0'::pg_lsn) AS anchor_lsn;
> – tx2: target record — BEGIN WAL record lands at exactly Y
> INSERT INTO cdc_test.repro_table (name) VALUES ('should_appear');
> {code}
>
> Run:
> {code:java}
> psql -h <host> -p <port> -U <user> -d <db> -f repro.sql{code}
> Record the printed anchor_lsn (e.g. 39515048).
> *Step 2: Configure Flink CDC offset or recover by checkpoint has these offset*
> {code:java}
> Map<String, String> offset = new HashMap<>();
> offset.put("lsn", "39515048");
> offset.put("lsn_proc", "39515048"); // triggers the bug
> offset.put("lsn_commit", "39515048");
> offset.put("txId", "0");
> offset.put("ts_usec", "0");
> PostgreSQLSource.<String>builder()
> ...
> .startupOptions(StartupOptions.specificOffset(offset))
> .build();
> {code}
> *Step 3: Run job*
> Start the Flink job and observe:
> The inserted record 'should_appear' is missing
--
This message was sent by Atlassian Jira
(v8.20.10#820010)