This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new ec12ab67779 [fix](streamingjob) fix postgres DML silently dropped on 
task restart (#61481)
ec12ab67779 is described below

commit ec12ab67779dd4739670d971fd794a58837021e7
Author: wudi <[email protected]>
AuthorDate: Fri Mar 20 16:52:41 2026 +0800

    [fix](streamingjob) fix postgres DML silently dropped on task restart 
(#61481)
    
    ### What problem does this PR solve?
    
    #### Problem
    When a streaming job restarts a task, the first DML of the new
    transaction
    is occasionally silently dropped (10-20% failure rate). The affected
    record
    never appears in the Doris target table, with no error logged — only
    "identified as already processed" in cdc-client.log.
    #### Root Cause
    debezium 1.9.x hardcodes `proto_version=1` (non-streaming pgoutput) for
    all
    PG versions. In non-streaming mode, the walsender batches all changes of
    a
    transaction and sends them after COMMIT, and all messages (BEGIN + DML)
    share
    the same `XLogData.data_start` = the transaction's `begin_lsn`.
    When this `begin_lsn` equals the previous transaction's `commit_lsn`
    (i.e.
    the two transactions are adjacent in WAL with no other writes between
    them),
    `WalPositionLocator` behaves incorrectly:
    1. **Find phase**: `COMMIT(T1)` at `lsn=Y` sets
    `storeLsnAfterLastEventStoredLsn=true`.
    `BEGIN(T2)` and `INSERT(T2)` both have `lsn=Y`, so they keep returning
    `Optional.empty()`. Only `COMMIT(T2)` at `lsn=Z` sets
    `startStreamingLsn=Z`, with `lsnSeen={Y, Z}`.
    2. **Actual streaming**: `INSERT(T2)` arrives with `lastReceiveLsn=Y`.
    `skipMessage(Y)`: `Y ∈ lsnSeen` and `Y ≠ startStreamingLsn(Z)` →
    filtered.
    The bug is intermittent because it only triggers when no other WAL
    activity
    (autovacuum, other connections) occurs between the two transactions.
    #### Fix
    Override `extractBinlogStateOffset()` in `PostgresSourceReader` to strip
    `lsn_proc` and `lsn_commit` from the offset before it is passed to
    debezium.
    This constructs `WalPositionLocator(lastCommitStoredLsn=null, lsn=Y)`,
    which
    causes the find phase to exit immediately at the first received message
    (`startStreamingLsn=Y`). In actual streaming, `COMMIT(T1)` triggers
    switch-off (`lastReceiveLsn=Y = startStreamingLsn`), and all subsequent
    messages including `INSERT(T2)` pass through.
    See https://issues.apache.org/jira/browse/FLINK-39265.
    #### Test
    Run `test_streaming_postgres_job` multiple times. Before this fix the
    'Apache' assertion fails ~10-20% of the time; after this fix it passes
      consistently.
---
 .../insert/streaming/StreamingMultiTblTask.java      |  2 +-
 .../source/reader/postgres/PostgresSourceReader.java | 20 ++++++++++++++++++++
 2 files changed, 21 insertions(+), 1 deletion(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java
index 45d2cf2ffbb..32526f9c513 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java
@@ -357,7 +357,7 @@ public class StreamingMultiTblTask extends 
AbstractStreamingTask {
                 log.warn("Failed to get task timeout reason, response: {}", 
response);
             }
         } catch (ExecutionException | InterruptedException ex) {
-            log.error("Send get task fail reason request failed: ", ex);
+            log.warn("Send get task fail reason request failed: ", ex);
         }
         return "";
     }
diff --git 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java
 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java
index 737e36045d9..6a5670ad6de 100644
--- 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java
+++ 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java
@@ -64,6 +64,7 @@ import java.util.Properties;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Preconditions;
+import io.debezium.connector.postgresql.PostgresOffsetContext;
 import io.debezium.connector.postgresql.SourceInfo;
 import io.debezium.connector.postgresql.connection.PostgresConnection;
 import 
io.debezium.connector.postgresql.connection.PostgresReplicationConnection;
@@ -430,6 +431,25 @@ public class PostgresSourceReader extends 
JdbcIncrementalSourceReader {
         }
     }
 
+    /**
+     * Strip lsn_proc and lsn_commit from the binlog state offset before it is 
passed to debezium's
+     * WalPositionLocator. In pgoutput non-streaming mode (proto_version=1, 
used by debezium 1.9.x
+     * even on PG14), BEGIN and DML messages within a transaction share the 
same XLogData.data_start
+     * as the transaction's begin_lsn. When begin_lsn equals the previous 
transaction's commit_lsn
+     * (i.e. no other WAL write exists between them), WalPositionLocator adds 
that lsn to lsnSeen
+     * during the find phase and then incorrectly filters the DML as 
already-processed during actual
+     * streaming. Removing these keys sets lastCommitStoredLsn=null, so the 
find phase exits
+     * immediately at the first received message and switch-off happens before 
any DML is filtered.
+     * See https://issues.apache.org/jira/browse/FLINK-39265.
+     */
+    @Override
+    public Map<String, String> extractBinlogStateOffset(Object splitState) {
+        Map<String, String> offset = 
super.extractBinlogStateOffset(splitState);
+        offset.remove(PostgresOffsetContext.LAST_COMPLETELY_PROCESSED_LSN_KEY);
+        offset.remove(PostgresOffsetContext.LAST_COMMIT_LSN_KEY);
+        return offset;
+    }
+
     @Override
     public void close(JobBaseConfig jobConfig) {
         super.close(jobConfig);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to