This is an automated email from the ASF dual-hosted git repository.
JNSimba pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 416ad95f80b [fix](streaming-job) start counting task max interval
after the first record is received (#63141)
416ad95f80b is described below
commit 416ad95f80bd8be7af0fcac6c61b78a6023c403f
Author: wudi <[email protected]>
AuthorDate: Tue May 12 14:28:28 2026 +0800
[fix](streaming-job) start counting task max interval after the first
record is received (#63141)
### What problem does this PR solve?
Problem Summary:
For PG CDC streaming jobs, when a task creates a fresh logical
replication
connection, the walsender must re-decode the WAL region from
`slot.restart_lsn`
up to the client-supplied `startLsn` before any event can be emitted. On
high
RTT networks (e.g. cross-region Aurora) this catch-up alone can take
several
seconds.
Under the previous behavior the task's `maxInterval` window started the
moment
the task entered `writeRecords`, so the time spent on WAL position
lookup +
walsender catch-up was charged against the interval. With
`max_interval=10s`
this consistently caused tasks to finish with `0 records, 0 heartbeats`,
the
slot's `confirmed_flush_lsn` never advanced, and every following task
repeated
the same catch-up — the slot was effectively stuck and replication lag
grew
indefinitely.
This PR delays the start of the `maxInterval` countdown until the first
record
(or heartbeat) is actually received from the source reader, so the
per-task
interval governs the real streaming window rather than being consumed by
setup.
The FE-side `streaming_task_timeout_multiplier * maxIntervalSec` still
acts as
the hard ceiling.
---
.../cdcclient/service/PipelineCoordinator.java | 30 +++++++++++++++++-----
1 file changed, 24 insertions(+), 6 deletions(-)
diff --git
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java
index 46fabe2d418..9b2dd5d357f 100644
---
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java
+++
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java
@@ -454,15 +454,17 @@ public class PipelineCoordinator {
isSnapshotSplit =
sourceReader.isSnapshotSplit(readResult.getSplit());
long startTime = System.currentTimeMillis();
+ long streamingStartTime = -1;
long maxIntervalMillis = writeRecordRequest.getMaxInterval() *
1000;
boolean shouldStop = false;
boolean lastMessageIsHeartbeat = false;
LOG.info(
- "Start polling records for jobId={} taskId={},
isSnapshotSplit={}",
+ "Start polling records for jobId={} taskId={},
isSnapshotSplit={}, maxIntervalMillis={}",
writeRecordRequest.getJobId(),
writeRecordRequest.getTaskId(),
- isSnapshotSplit);
+ isSnapshotSplit,
+ maxIntervalMillis);
// 2. poll record
while (!shouldStop) {
@@ -472,9 +474,14 @@ public class PipelineCoordinator {
Thread.sleep(100);
// Check if should stop
- long elapsedTime = System.currentTimeMillis() - startTime;
+ long elapsedTime =
+ streamingStartTime > 0
+ ? System.currentTimeMillis() -
streamingStartTime
+ : 0;
boolean timeoutReached =
- maxIntervalMillis > 0 && elapsedTime >=
maxIntervalMillis;
+ streamingStartTime > 0
+ && maxIntervalMillis > 0
+ && elapsedTime >= maxIntervalMillis;
if (shouldStop(
isSnapshotSplit,
@@ -488,6 +495,15 @@ public class PipelineCoordinator {
continue;
}
+ if (streamingStartTime < 0) {
+ streamingStartTime = System.currentTimeMillis();
+ LOG.info(
+ "Streaming phase started after {} ms setup for
jobId={} taskId={}",
+ streamingStartTime - startTime,
+ writeRecordRequest.getJobId(),
+ writeRecordRequest.getTaskId());
+ }
+
while (recordIterator.hasNext()) {
SourceRecord element = recordIterator.next();
@@ -501,9 +517,11 @@ public class PipelineCoordinator {
}
// If already timeout, stop immediately when heartbeat
received
- long elapsedTime = System.currentTimeMillis() -
startTime;
+ long elapsedTime = System.currentTimeMillis() -
streamingStartTime;
boolean timeoutReached =
- maxIntervalMillis > 0 && elapsedTime >=
maxIntervalMillis;
+ streamingStartTime > 0
+ && maxIntervalMillis > 0
+ && elapsedTime >= maxIntervalMillis;
if (!isSnapshotSplit && timeoutReached) {
LOG.info(
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]