This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.1 by this push:
new 004d7408119 branch-4.1: [fix](streamingjob) fix postgres DML silently
dropped on task restart #61481 (#61564)
004d7408119 is described below
commit 004d74081199b55ade0ade9b42913d6f290741e8
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Sat Mar 21 10:46:48 2026 +0800
branch-4.1: [fix](streamingjob) fix postgres DML silently dropped on task
restart #61481 (#61564)
Cherry-picked from #61481
Co-authored-by: wudi <[email protected]>
---
.../insert/streaming/StreamingMultiTblTask.java | 2 +-
.../source/reader/postgres/PostgresSourceReader.java | 20 ++++++++++++++++++++
2 files changed, 21 insertions(+), 1 deletion(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java
index 45d2cf2ffbb..32526f9c513 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java
@@ -357,7 +357,7 @@ public class StreamingMultiTblTask extends
AbstractStreamingTask {
log.warn("Failed to get task timeout reason, response: {}",
response);
}
} catch (ExecutionException | InterruptedException ex) {
- log.error("Send get task fail reason request failed: ", ex);
+ log.warn("Send get task fail reason request failed: ", ex);
}
return "";
}
diff --git
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java
index 737e36045d9..6a5670ad6de 100644
---
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java
+++
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java
@@ -64,6 +64,7 @@ import java.util.Properties;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
+import io.debezium.connector.postgresql.PostgresOffsetContext;
import io.debezium.connector.postgresql.SourceInfo;
import io.debezium.connector.postgresql.connection.PostgresConnection;
import
io.debezium.connector.postgresql.connection.PostgresReplicationConnection;
@@ -430,6 +431,25 @@ public class PostgresSourceReader extends
JdbcIncrementalSourceReader {
}
}
+ /**
+ * Strip lsn_proc and lsn_commit from the binlog state offset before it is
passed to debezium's
+ * WalPositionLocator. In pgoutput non-streaming mode (proto_version=1,
used by debezium 1.9.x
+ * even on PG14), BEGIN and DML messages within a transaction share the
same XLogData.data_start
+ * as the transaction's begin_lsn. When begin_lsn equals the previous
transaction's commit_lsn
+ * (i.e. no other WAL write exists between them), WalPositionLocator adds
that lsn to lsnSeen
+ * during the find phase and then incorrectly filters the DML as
already-processed during actual
+ * streaming. Removing these keys sets lastCommitStoredLsn=null, so the
find phase exits
+ * immediately at the first received message and switch-off happens before
any DML is filtered.
+ * See https://issues.apache.org/jira/browse/FLINK-39265.
+ */
+ @Override
+ public Map<String, String> extractBinlogStateOffset(Object splitState) {
+ Map<String, String> offset =
super.extractBinlogStateOffset(splitState);
+ offset.remove(PostgresOffsetContext.LAST_COMPLETELY_PROCESSED_LSN_KEY);
+ offset.remove(PostgresOffsetContext.LAST_COMMIT_LSN_KEY);
+ return offset;
+ }
+
@Override
public void close(JobBaseConfig jobConfig) {
super.close(jobConfig);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]