This is an automated email from the ASF dual-hosted git repository.

JNSimba pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 416ad95f80b [fix](streaming-job) start counting task max interval 
after the first record is received (#63141)
416ad95f80b is described below

commit 416ad95f80bd8be7af0fcac6c61b78a6023c403f
Author: wudi <[email protected]>
AuthorDate: Tue May 12 14:28:28 2026 +0800

    [fix](streaming-job) start counting task max interval after the first 
record is received (#63141)
    
    ### What problem does this PR solve?
    
    Problem Summary:
    
    For PG CDC streaming jobs, when a task creates a fresh logical
    replication
    connection, the walsender must re-decode the WAL region from
    `slot.restart_lsn`
    up to the client-supplied `startLsn` before any event can be emitted. On
    high
    RTT networks (e.g. cross-region Aurora) this catch-up alone can take
    several
    seconds.
    
    Under the previous behavior the task's `maxInterval` window started the
    moment
    the task entered `writeRecords`, so the time spent on WAL position
    lookup +
    walsender catch-up was charged against the interval. With
    `max_interval=10s`
    this consistently caused tasks to finish with `0 records, 0 heartbeats`,
    the
    slot's `confirmed_flush_lsn` never advanced, and every following task
    repeated
    the same catch-up — the slot was effectively stuck and replication lag
    grew
    indefinitely.
    
    This PR delays the start of the `maxInterval` countdown until the first
    record
    (or heartbeat) is actually received from the source reader, so the
    per-task
    interval governs the real streaming window rather than being consumed by
    setup.
    The FE-side `streaming_task_timeout_multiplier * maxIntervalSec` still
    acts as
    the hard ceiling.
---
 .../cdcclient/service/PipelineCoordinator.java     | 30 +++++++++++++++++-----
 1 file changed, 24 insertions(+), 6 deletions(-)

diff --git 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java
 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java
index 46fabe2d418..9b2dd5d357f 100644
--- 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java
+++ 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java
@@ -454,15 +454,17 @@ public class PipelineCoordinator {
 
             isSnapshotSplit = 
sourceReader.isSnapshotSplit(readResult.getSplit());
             long startTime = System.currentTimeMillis();
+            long streamingStartTime = -1;
             long maxIntervalMillis = writeRecordRequest.getMaxInterval() * 
1000;
             boolean shouldStop = false;
             boolean lastMessageIsHeartbeat = false;
 
             LOG.info(
-                    "Start polling records for jobId={} taskId={}, 
isSnapshotSplit={}",
+                    "Start polling records for jobId={} taskId={}, 
isSnapshotSplit={}, maxIntervalMillis={}",
                     writeRecordRequest.getJobId(),
                     writeRecordRequest.getTaskId(),
-                    isSnapshotSplit);
+                    isSnapshotSplit,
+                    maxIntervalMillis);
 
             // 2. poll record
             while (!shouldStop) {
@@ -472,9 +474,14 @@ public class PipelineCoordinator {
                     Thread.sleep(100);
 
                     // Check if should stop
-                    long elapsedTime = System.currentTimeMillis() - startTime;
+                    long elapsedTime =
+                            streamingStartTime > 0
+                                    ? System.currentTimeMillis() - 
streamingStartTime
+                                    : 0;
                     boolean timeoutReached =
-                            maxIntervalMillis > 0 && elapsedTime >= 
maxIntervalMillis;
+                            streamingStartTime > 0
+                                    && maxIntervalMillis > 0
+                                    && elapsedTime >= maxIntervalMillis;
 
                     if (shouldStop(
                             isSnapshotSplit,
@@ -488,6 +495,15 @@ public class PipelineCoordinator {
                     continue;
                 }
 
+                if (streamingStartTime < 0) {
+                    streamingStartTime = System.currentTimeMillis();
+                    LOG.info(
+                            "Streaming phase started after {} ms setup for 
jobId={} taskId={}",
+                            streamingStartTime - startTime,
+                            writeRecordRequest.getJobId(),
+                            writeRecordRequest.getTaskId());
+                }
+
                 while (recordIterator.hasNext()) {
                     SourceRecord element = recordIterator.next();
 
@@ -501,9 +517,11 @@ public class PipelineCoordinator {
                         }
 
                         // If already timeout, stop immediately when heartbeat 
received
-                        long elapsedTime = System.currentTimeMillis() - 
startTime;
+                        long elapsedTime = System.currentTimeMillis() - 
streamingStartTime;
                         boolean timeoutReached =
-                                maxIntervalMillis > 0 && elapsedTime >= 
maxIntervalMillis;
+                                streamingStartTime > 0
+                                        && maxIntervalMillis > 0
+                                        && elapsedTime >= maxIntervalMillis;
 
                         if (!isSnapshotSplit && timeoutReached) {
                             LOG.info(


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to