This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new ec12ab67779 [fix](streamingjob) fix postgres DML silently dropped on
task restart (#61481)
ec12ab67779 is described below
commit ec12ab67779dd4739670d971fd794a58837021e7
Author: wudi <[email protected]>
AuthorDate: Fri Mar 20 16:52:41 2026 +0800
[fix](streamingjob) fix postgres DML silently dropped on task restart
(#61481)
### What problem does this PR solve?
#### Problem
When a streaming job restarts a task, the first DML of the new
transaction
is occasionally silently dropped (10-20% failure rate). The affected
record
never appears in the Doris target table, with no error logged — only
"identified as already processed" in cdc-client.log.
#### Root Cause
debezium 1.9.x hardcodes `proto_version=1` (non-streaming pgoutput) for
all
PG versions. In non-streaming mode, the walsender batches all changes of
a
transaction and sends them after COMMIT, and all messages (BEGIN + DML)
share
the same `XLogData.data_start` = the transaction's `begin_lsn`.
When this `begin_lsn` equals the previous transaction's `commit_lsn`
(i.e.
the two transactions are adjacent in WAL with no other writes between
them),
`WalPositionLocator` behaves incorrectly:
1. **Find phase**: `COMMIT(T1)` at `lsn=Y` sets
`storeLsnAfterLastEventStoredLsn=true`.
`BEGIN(T2)` and `INSERT(T2)` both have `lsn=Y`, so they keep returning
`Optional.empty()`. Only `COMMIT(T2)` at `lsn=Z` sets
`startStreamingLsn=Z`, with `lsnSeen={Y, Z}`.
2. **Actual streaming**: `INSERT(T2)` arrives with `lastReceiveLsn=Y`.
`skipMessage(Y)`: `Y ∈ lsnSeen` and `Y ≠ startStreamingLsn(Z)` →
filtered.
The bug is intermittent because it only triggers when no other WAL
activity
(autovacuum, other connections) occurs between the two transactions.
#### Fix
Override `extractBinlogStateOffset()` in `PostgresSourceReader` to strip
`lsn_proc` and `lsn_commit` from the offset before it is passed to
debezium.
This constructs `WalPositionLocator(lastCommitStoredLsn=null, lsn=Y)`,
which
causes the find phase to exit immediately at the first received message
(`startStreamingLsn=Y`). In actual streaming, `COMMIT(T1)` triggers
switch-off (`lastReceiveLsn=Y = startStreamingLsn`), and all subsequent
messages including `INSERT(T2)` pass through.
See https://issues.apache.org/jira/browse/FLINK-39265.
#### Test
Run `test_streaming_postgres_job` multiple times. Before this fix the
'Apache' assertion fails ~10-20% of the time; after this fix it passes
consistently.
---
.../insert/streaming/StreamingMultiTblTask.java | 2 +-
.../source/reader/postgres/PostgresSourceReader.java | 20 ++++++++++++++++++++
2 files changed, 21 insertions(+), 1 deletion(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java
index 45d2cf2ffbb..32526f9c513 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java
@@ -357,7 +357,7 @@ public class StreamingMultiTblTask extends
AbstractStreamingTask {
log.warn("Failed to get task timeout reason, response: {}",
response);
}
} catch (ExecutionException | InterruptedException ex) {
- log.error("Send get task fail reason request failed: ", ex);
+ log.warn("Send get task fail reason request failed: ", ex);
}
return "";
}
diff --git
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java
index 737e36045d9..6a5670ad6de 100644
---
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java
+++
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java
@@ -64,6 +64,7 @@ import java.util.Properties;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
+import io.debezium.connector.postgresql.PostgresOffsetContext;
import io.debezium.connector.postgresql.SourceInfo;
import io.debezium.connector.postgresql.connection.PostgresConnection;
import
io.debezium.connector.postgresql.connection.PostgresReplicationConnection;
@@ -430,6 +431,25 @@ public class PostgresSourceReader extends
JdbcIncrementalSourceReader {
}
}
+ /**
+ * Strip lsn_proc and lsn_commit from the binlog state offset before it is
passed to debezium's
+ * WalPositionLocator. In pgoutput non-streaming mode (proto_version=1,
used by debezium 1.9.x
+ * even on PG14), BEGIN and DML messages within a transaction share the
same XLogData.data_start
+ * as the transaction's begin_lsn. When begin_lsn equals the previous
transaction's commit_lsn
+ * (i.e. no other WAL write exists between them), WalPositionLocator adds
that lsn to lsnSeen
+ * during the find phase and then incorrectly filters the DML as
already-processed during actual
+ * streaming. Removing these keys sets lastCommitStoredLsn=null, so the
find phase exits
+ * immediately at the first received message and switch-off happens before
any DML is filtered.
+ * See https://issues.apache.org/jira/browse/FLINK-39265.
+ */
+ @Override
+ public Map<String, String> extractBinlogStateOffset(Object splitState) {
+ Map<String, String> offset =
super.extractBinlogStateOffset(splitState);
+ offset.remove(PostgresOffsetContext.LAST_COMPLETELY_PROCESSED_LSN_KEY);
+ offset.remove(PostgresOffsetContext.LAST_COMMIT_LSN_KEY);
+ return offset;
+ }
+
@Override
public void close(JobBaseConfig jobConfig) {
super.close(jobConfig);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]