Copilot commented on code in PR #61481:
URL: https://github.com/apache/doris/pull/61481#discussion_r2957572162
##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java:
##########
@@ -430,6 +431,25 @@ && getCurrentFetchTask() instanceof
PostgresStreamFetchTask) {
}
}
+ /**
+ * Strip lsn_proc and lsn_commit from the binlog state offset before it is
passed to debezium's
+ * WalPositionLocator. In pgoutput non-streaming mode (proto_version=1,
used by debezium 1.9.x
+ * even on PG14), BEGIN and DML messages within a transaction share the
same XLogData.data_start
+ * as the transaction's begin_lsn. When begin_lsn equals the previous
transaction's commit_lsn
+ * (i.e. no other WAL write exists between them), WalPositionLocator adds
that lsn to lsnSeen
+ * during the find phase and then incorrectly filters the DML as
already-processed during actual
+ * streaming. Removing these keys sets lastCommitStoredLsn=null, so the
find phase exits
+ * immediately at the first received message and switch-off happens before
any DML is filtered.
+ * See https://issues.apache.org/jira/browse/FLINK-39265.
+ */
+ @Override
+ public Map<String, String> extractBinlogStateOffset(Object splitState) {
+ Map<String, String> offset =
super.extractBinlogStateOffset(splitState);
+ offset.remove(PostgresOffsetContext.LAST_COMPLETELY_PROCESSED_LSN_KEY);
+ offset.remove(PostgresOffsetContext.LAST_COMMIT_LSN_KEY);
+ return offset;
Review Comment:
`super.extractBinlogStateOffset(splitState)` may return an unmodifiable map
(common for “offset/state” snapshots). Calling `remove()` would then throw
`UnsupportedOperationException` during task initialization/restart. Make a
defensive mutable copy before removing keys (e.g., construct a new
`HashMap<>(...)` and return that), so this override is safe regardless of the
parent implementation.
##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java:
##########
@@ -430,6 +431,25 @@ && getCurrentFetchTask() instanceof
PostgresStreamFetchTask) {
}
}
+ /**
+ * Strip lsn_proc and lsn_commit from the binlog state offset before it is
passed to debezium's
+ * WalPositionLocator. In pgoutput non-streaming mode (proto_version=1,
used by debezium 1.9.x
+ * even on PG14), BEGIN and DML messages within a transaction share the
same XLogData.data_start
+ * as the transaction's begin_lsn. When begin_lsn equals the previous
transaction's commit_lsn
+ * (i.e. no other WAL write exists between them), WalPositionLocator adds
that lsn to lsnSeen
+ * during the find phase and then incorrectly filters the DML as
already-processed during actual
+ * streaming. Removing these keys sets lastCommitStoredLsn=null, so the
find phase exits
+ * immediately at the first received message and switch-off happens before
any DML is filtered.
Review Comment:
The comment says “Strip lsn_proc and lsn_commit”, but the code removes
Debezium’s `LAST_COMPLETELY_PROCESSED_LSN_KEY` and `LAST_COMMIT_LSN_KEY`. To
avoid confusion for future maintainers, align terminology in the comment with
the actual offset keys removed (e.g., explicitly mention the Debezium offset
field names/keys being removed and how they map to `lsn_proc`/`lsn_commit` if
applicable).
##########
fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java:
##########
@@ -357,7 +357,7 @@ public String getTimeoutReason() {
log.warn("Failed to get task timeout reason, response: {}",
response);
}
} catch (ExecutionException | InterruptedException ex) {
- log.error("Send get task fail reason request failed: ", ex);
+ log.error("Send get task timeout reason request failed: ", ex);
Review Comment:
This log line has a trailing colon and space but no placeholder; it reads a
bit odd in logs. Consider removing the colon or changing to a consistent
pattern used elsewhere (e.g., `"Send get task timeout reason request failed"`),
relying on the throwable to provide details.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]