This is an automated email from the ASF dual-hosted git repository.

JNSimba pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new d9d22ed81b3 [improve](streaming-job) make from-to streaming task 
timeout progress-aware (#64301)
d9d22ed81b3 is described below

commit d9d22ed81b3ce154f602ff10d598c54285e4e9f6
Author: wudi <[email protected]>
AuthorDate: Tue Jun 23 10:04:27 2026 +0800

    [improve](streaming-job) make from-to streaming task timeout progress-aware 
(#64301)
    
    ## Problem
    
    During a from-to streaming job's full (snapshot) sync, a data-skewed
    table can produce a single split of 100GB+ that keeps one task busy for
    well over an hour. The task timeout is a static
    `streaming_task_timeout_multiplier * maxInterval` measured from task
    start, so such a task is killed and retried in a loop even though it
    keeps making steady progress. The only workaround today is to manually
    raise `streaming_task_timeout_multiplier`.
    
    ## Solution
    
    Make the timeout progress-aware. cdc_client publishes the read-end
    progress (`scannedRows`) per running task and exposes it via
    `/api/getProgress/{taskId}`. On each scheduling tick the FE pulls it as
    a heartbeat: any advance renews the deadline, so a large split that
    keeps scanning is never killed. When there is no progress the check
    falls back to the original budget, preserving the safety net against an
    unresponsive task.
    
    - The window is floored by a new `streaming_task_min_timeout_sec`
    (default 300s), mirroring `routine_load_task_min_timeout_sec`, so a
    small `maxInterval` cannot shrink it below the time a task can
    legitimately make no progress (e.g. a snapshot split's write tail after
    the scan finishes).
    - Write-side hard failures still surface within seconds via the existing
    fail-reason channel, independent of the timeout path.
    - The heartbeat state on the FE task is transient and never persisted,
    so FE replay is unaffected.
---
 .../main/java/org/apache/doris/common/Config.java  |  6 ++
 .../apache/doris/job/cdc/StreamingTaskStatus.java  | 50 ++++++++++++
 .../doris/job/cdc/request/TaskFailureRequest.java  | 46 +++++++++++
 .../doris/httpv2/rest/StreamingJobAction.java      | 40 ++++++++--
 .../insert/streaming/StreamingInsertJob.java       | 45 +++++++++--
 .../insert/streaming/StreamingMultiTblTask.java    | 85 +++++++++++----------
 .../StreamingMultiTblTaskTimeoutTest.java          | 89 ++++++++++++++++++++++
 .../cdcclient/controller/ClientController.java     |  5 ++
 .../cdcclient/service/PipelineCoordinator.java     | 37 ++++++++-
 .../doris/cdcclient/sink/DorisBatchStreamLoad.java | 41 ++++++++++
 .../org/apache/doris/cdcclient/utils/HttpUtil.java |  6 +-
 11 files changed, 390 insertions(+), 60 deletions(-)

diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java 
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index 33432425a53..ff1b7502f23 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -1193,6 +1193,12 @@ public class Config extends ConfigBase {
     @ConfField(mutable = true, masterOnly = true)
     public static int streaming_task_timeout_multiplier = 10;
 
+    /**
+     * streaming task min timeout second.
+     */
+    @ConfField(mutable = true, masterOnly = true)
+    public static int streaming_task_min_timeout_sec = 300;
+
     @ConfField(mutable = true, masterOnly = true)
     public static int streaming_cdc_light_rpc_timeout_sec = 90;
 
diff --git 
a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/StreamingTaskStatus.java 
b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/StreamingTaskStatus.java
new file mode 100644
index 00000000000..16c339dc24c
--- /dev/null
+++ 
b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/StreamingTaskStatus.java
@@ -0,0 +1,50 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.job.cdc;
+
+// cdc_client -> FE running task status, pulled by the FE only when the local 
timeout
+// budget is already exceeded. scannedRows is the read-end heartbeat used to 
renew the
+// deadline (scannedRows < 0 means no progress info: not scanning, or scan 
finished);
+// failReason carries the recorded write error so a kill can report the real 
cause.
+public class StreamingTaskStatus {
+    private long scannedRows = -1;
+    private String failReason = "";
+
+    public StreamingTaskStatus() {}
+
+    public StreamingTaskStatus(long scannedRows, String failReason) {
+        this.scannedRows = scannedRows;
+        this.failReason = failReason;
+    }
+
+    public long getScannedRows() {
+        return scannedRows;
+    }
+
+    public void setScannedRows(long scannedRows) {
+        this.scannedRows = scannedRows;
+    }
+
+    public String getFailReason() {
+        return failReason;
+    }
+
+    public void setFailReason(String failReason) {
+        this.failReason = failReason;
+    }
+}
diff --git 
a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/TaskFailureRequest.java
 
b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/TaskFailureRequest.java
new file mode 100644
index 00000000000..487b891c889
--- /dev/null
+++ 
b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/TaskFailureRequest.java
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.job.cdc.request;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.Setter;
+
+// cdc_client -> FE push: report a hard write failure of a running task 
immediately,
+// so it is failed within seconds instead of waiting out the timeout budget.
+@Getter
+@Setter
+@NoArgsConstructor
+@AllArgsConstructor
+@Builder
+public class TaskFailureRequest {
+    public long jobId;
+    public long taskId;
+    public String reason;
+
+    @Override
+    public String toString() {
+        return "TaskFailureRequest{"
+                + "jobId=" + jobId
+                + ", taskId=" + taskId
+                + ", reason='" + reason + "'"
+                + "}";
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/StreamingJobAction.java 
b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/StreamingJobAction.java
index 34d7abe3133..a27973d5350 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/StreamingJobAction.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/StreamingJobAction.java
@@ -22,6 +22,7 @@ import org.apache.doris.httpv2.entity.ResponseEntityBuilder;
 import org.apache.doris.httpv2.exception.UnauthorizedException;
 import org.apache.doris.job.base.AbstractJob;
 import org.apache.doris.job.cdc.request.CommitOffsetRequest;
+import org.apache.doris.job.cdc.request.TaskFailureRequest;
 import org.apache.doris.job.extensions.insert.streaming.StreamingInsertJob;
 
 import com.google.common.base.Strings;
@@ -39,17 +40,40 @@ public class StreamingJobAction extends RestBaseController {
 
     @RequestMapping(path = "/api/streaming/commit_offset", method = 
RequestMethod.PUT)
     public Object commitOffset(@RequestBody CommitOffsetRequest offsetRequest, 
HttpServletRequest request) {
+        checkAuth(request);
+        return updateOffset(offsetRequest);
+    }
+
+    @RequestMapping(path = "/api/streaming/report_task_failure", method = 
RequestMethod.PUT)
+    public Object reportTaskFailure(@RequestBody TaskFailureRequest 
failureRequest, HttpServletRequest request) {
+        checkAuth(request);
+        return failTask(failureRequest);
+    }
+
+    private void checkAuth(HttpServletRequest request) {
         String authToken = request.getHeader("token");
-        // if auth token is not null, check it first
-        if (!Strings.isNullOrEmpty(authToken)) {
-            if (!checkClusterToken(authToken)) {
-                throw new UnauthorizedException("Invalid token: " + authToken);
-            }
-            return updateOffset(offsetRequest);
-        } else {
-            // only use for token
+        if (Strings.isNullOrEmpty(authToken)) {
             throw new UnauthorizedException("Miss token");
         }
+        if (!checkClusterToken(authToken)) {
+            throw new UnauthorizedException("Invalid token: " + authToken);
+        }
+    }
+
+    private Object failTask(TaskFailureRequest failureRequest) {
+        AbstractJob job = 
Env.getCurrentEnv().getJobManager().getJob(failureRequest.getJobId());
+        if (!(job instanceof StreamingInsertJob)) {
+            return ResponseEntityBuilder
+                    .okWithCommonError("Job " + failureRequest.getJobId() + " 
is not a streaming job");
+        }
+        try {
+            LOG.info("Reporting task failure with {}", 
failureRequest.toString());
+            ((StreamingInsertJob) job).reportTaskFailure(failureRequest);
+            return ResponseEntityBuilder.ok("Task failure reported 
successfully");
+        } catch (Exception e) {
+            LOG.warn("Failed to report task failure for job {}: {}", 
failureRequest.getJobId(), e.getMessage());
+            return ResponseEntityBuilder.okWithCommonError(e.getMessage());
+        }
     }
 
     private Object updateOffset(CommitOffsetRequest offsetRequest) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
index be14f79fa34..be9e6d22f22 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
@@ -39,7 +39,9 @@ import org.apache.doris.job.base.AbstractJob;
 import org.apache.doris.job.base.JobExecutionConfiguration;
 import org.apache.doris.job.base.TimerDefinition;
 import org.apache.doris.job.cdc.DataSourceConfigKeys;
+import org.apache.doris.job.cdc.StreamingTaskStatus;
 import org.apache.doris.job.cdc.request.CommitOffsetRequest;
+import org.apache.doris.job.cdc.request.TaskFailureRequest;
 import org.apache.doris.job.common.DataSourceType;
 import org.apache.doris.job.common.FailureReason;
 import org.apache.doris.job.common.IntervalUnit;
@@ -139,7 +141,7 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
     private String tvfType;
     private Map<String, String> originTvfProps;
     @Getter
-    AbstractStreamingTask runningStreamTask;
+    volatile AbstractStreamingTask runningStreamTask;
     SourceOffsetProvider offsetProvider;
     @Getter
     @Setter
@@ -1404,20 +1406,51 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
         }
     }
 
+    /**
+     * Push from cdc_client: fail the running task immediately on a hard write 
failure.
+     * Reports whose taskId no longer matches the current running task are 
dropped.
+     */
+    public void reportTaskFailure(TaskFailureRequest request) throws 
JobException {
+        AbstractStreamingTask task = this.runningStreamTask;
+        if (!(task instanceof StreamingMultiTblTask)) {
+            return;
+        }
+        StreamingMultiTblTask runningMultiTask = (StreamingMultiTblTask) task;
+        if (runningMultiTask.getTaskId() != request.getTaskId()) {
+            return;
+        }
+        writeLock();
+        try {
+            if (this.runningStreamTask == runningMultiTask
+                    && runningMultiTask.getTaskId() == request.getTaskId()
+                    && 
TaskStatus.RUNNING.equals(runningMultiTask.getStatus())) {
+                runningMultiTask.onFail(request.getReason());
+            }
+        } finally {
+            writeUnlock();
+        }
+    }
+
     /**
      * The current streamingTask times out; create a new streamingTask.
      * Only applies to StreamingMultiTask.
      */
     public void processTimeoutTasks() throws JobException {
-        if (!(runningStreamTask instanceof StreamingMultiTblTask)) {
+        AbstractStreamingTask task = this.runningStreamTask;
+        if (!(task instanceof StreamingMultiTblTask)) {
+            return;
+        }
+        StreamingMultiTblTask runningMultiTask = (StreamingMultiTblTask) task;
+        if (!runningMultiTask.isLocalTimeout()) {
             return;
         }
+        StreamingTaskStatus status = runningMultiTask.fetchTaskStatus();
         writeLock();
         try {
-            StreamingMultiTblTask runningMultiTask = (StreamingMultiTblTask) 
this.runningStreamTask;
-            if (TaskStatus.RUNNING.equals(runningMultiTask.getStatus())
-                    && runningMultiTask.isTimeout()) {
-                String timeoutReason = runningMultiTask.getTimeoutReason();
+            if (this.runningStreamTask == runningMultiTask
+                    && TaskStatus.RUNNING.equals(runningMultiTask.getStatus())
+                    && runningMultiTask.isTimeout(status)) {
+                String timeoutReason = status == null ? "" : 
status.getFailReason();
                 if (StringUtils.isEmpty(timeoutReason)) {
                     timeoutReason = "task failed cause timeout";
                 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java
index 9077ad01a82..85525beb1f8 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java
@@ -25,6 +25,7 @@ import org.apache.doris.httpv2.entity.ResponseBody;
 import org.apache.doris.httpv2.rest.RestApiStatusCode;
 import org.apache.doris.job.base.Job;
 import org.apache.doris.job.cdc.DataSourceConfigKeys;
+import org.apache.doris.job.cdc.StreamingTaskStatus;
 import org.apache.doris.job.cdc.request.CommitOffsetRequest;
 import org.apache.doris.job.cdc.request.JobBaseConfig;
 import org.apache.doris.job.cdc.request.WriteRecordRequest;
@@ -83,7 +84,9 @@ public class StreamingMultiTblTask extends 
AbstractStreamingTask {
     private long loadBytes = 0L;
     private long filteredRows = 0L;
     private long loadedRows = 0L;
-    private long runningBackendId;
+    private volatile long runningBackendId;
+    long lastScannedRows = -1;
+    long lastProgressMs = 0;
 
     public StreamingMultiTblTask(Long jobId,
             long taskId,
@@ -111,8 +114,9 @@ public class StreamingMultiTblTask extends 
AbstractStreamingTask {
             log.info("streaming multi task has been canceled, task id is {}", 
getTaskId());
             return;
         }
-        this.status = TaskStatus.RUNNING;
         this.startTimeMs = System.currentTimeMillis();
+        this.lastProgressMs = this.startTimeMs;
+        this.status = TaskStatus.RUNNING;
         this.runningOffset = offsetProvider.getNextOffset(null, 
sourceProperties);
         log.info("streaming multi task {} get running offset: {}", taskId, 
runningOffset.toString());
     }
@@ -361,15 +365,29 @@ public class StreamingMultiTblTask extends 
AbstractStreamingTask {
         return Env.getCurrentEnv().getMasterHost() + ":" + 
Env.getCurrentEnv().getMasterHttpPort();
     }
 
-    public boolean isTimeout() {
+    // Local pre-check, no RPC: gates whether to pull real progress this tick.
+    boolean isLocalTimeout() {
+        if (startTimeMs == null) {
+            return false;
+        }
+        return System.currentTimeMillis() - lastProgressMs > 
getTaskTimeoutMs();
+    }
+
+    boolean isTimeout(StreamingTaskStatus status) {
         if (startTimeMs == null) {
             // It's still pending, waiting for scheduling.
             return false;
         }
+        long now = System.currentTimeMillis();
+        if (status != null && status.getScannedRows() > lastScannedRows) {
+            lastScannedRows = status.getScannedRows();
+            lastProgressMs = now;
+        }
         long timeoutMs = getTaskTimeoutMs();
-        long elapsed = System.currentTimeMillis() - startTimeMs;
+        long elapsed = now - lastProgressMs;
         if (elapsed > timeoutMs) {
-            log.info("Task {} timeout detected: elapsed={}ms, timeoutMs={}ms", 
taskId, elapsed, timeoutMs);
+            log.info("Task {} timeout detected: no progress for {}ms, 
timeoutMs={}ms",
+                    taskId, elapsed, timeoutMs);
             return true;
         }
         return false;
@@ -377,56 +395,39 @@ public class StreamingMultiTblTask extends 
AbstractStreamingTask {
 
     // Read multiplier live so config changes affect already-running tasks.
     private long getTaskTimeoutMs() {
-        return Config.streaming_task_timeout_multiplier * 
jobProperties.getMaxIntervalSecond() * 1000L;
+        return Math.max(
+                Config.streaming_task_timeout_multiplier * 
jobProperties.getMaxIntervalSecond() * 1000L,
+                Config.streaming_task_min_timeout_sec * 1000L);
     }
 
-    /**
-     * When a task encounters a write error, it will time out.
-     * The job needs to obtain the reason for the timeout,
-     * such as a data quality error, and needs to expose it to the user.
-     */
-    public String getTimeoutReason() {
+    StreamingTaskStatus fetchTaskStatus() {
         if (runningBackendId <= 0) {
-            log.info("No running backend for task {}", runningBackendId);
-            return "";
+            return null;
         }
         Backend backend = 
Env.getCurrentSystemInfo().getBackend(runningBackendId);
+        if (backend == null) {
+            return null;
+        }
         try {
             InternalService.PRequestCdcClientRequest request = 
InternalService.PRequestCdcClientRequest.newBuilder()
-                    .setApi("/api/getFailReason/" + getTaskId())
+                    .setApi("/api/getTaskStatus/" + getTaskId())
                     .build();
             TNetworkAddress address = new TNetworkAddress(backend.getHost(), 
backend.getBrpcPort());
-            InternalService.PRequestCdcClientResult result = null;
             Future<PRequestCdcClientResult> future = 
BackendServiceProxy.getInstance()
                     .requestCdcClient(address, request, 
Config.streaming_cdc_light_rpc_timeout_sec);
-            result = future.get(Config.streaming_cdc_light_rpc_timeout_sec, 
TimeUnit.SECONDS);
-            TStatusCode code = 
TStatusCode.findByValue(result.getStatus().getStatusCode());
-            if (code != TStatusCode.OK) {
-                log.warn("Failed to get task timeout reason, {}", 
result.getStatus().getErrorMsgs(0));
-                return "";
+            PRequestCdcClientResult result = 
future.get(Config.streaming_cdc_light_rpc_timeout_sec, TimeUnit.SECONDS);
+            if (TStatusCode.findByValue(result.getStatus().getStatusCode()) != 
TStatusCode.OK) {
+                return null;
             }
-            String response = result.getResponse();
-            try {
-                ResponseBody<String> responseObj = objectMapper.readValue(
-                        response,
-                        new TypeReference<ResponseBody<String>>() {
-                        }
-                );
-                if (responseObj.getCode() == RestApiStatusCode.OK.code) {
-                    return responseObj.getData();
-                }
-            } catch (JsonProcessingException e) {
-                log.warn("Failed to get task timeout reason, response: {}", 
response);
-            }
-        } catch (TimeoutException te) {
-            log.warn("cdc_client RPC timeout api=/api/getFailReason jobId={} 
taskId={} backend={}:{} "
-                            + "timeout_sec={}",
-                    getJobId(), getTaskId(), backend.getHost(), 
backend.getBrpcPort(),
-                    Config.streaming_cdc_light_rpc_timeout_sec);
-        } catch (ExecutionException | InterruptedException ex) {
-            log.warn("Send get task fail reason request failed: ", ex);
+            ResponseBody<StreamingTaskStatus> body = objectMapper.readValue(
+                    result.getResponse(),
+                    new TypeReference<ResponseBody<StreamingTaskStatus>>() {
+                    });
+            return body.getCode() == RestApiStatusCode.OK.code ? 
body.getData() : null;
+        } catch (Exception e) {
+            log.warn("fetch task status failed, job {} task {}", getJobId(), 
getTaskId(), e);
+            return null;
         }
-        return "";
     }
 
     @Override
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTaskTimeoutTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTaskTimeoutTest.java
new file mode 100644
index 00000000000..453c4ef202e
--- /dev/null
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTaskTimeoutTest.java
@@ -0,0 +1,89 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.job.extensions.insert.streaming;
+
+import org.apache.doris.common.Config;
+import org.apache.doris.job.cdc.StreamingTaskStatus;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class StreamingMultiTblTaskTimeoutTest {
+
+    @Before
+    public void setup() {
+        Config.streaming_task_timeout_multiplier = 10;
+        Config.streaming_task_min_timeout_sec = 300;
+    }
+
+    private StreamingMultiTblTask newTask(long lastProgressMsAgo, long 
intervalSec) {
+        StreamingJobProperties props = 
Mockito.mock(StreamingJobProperties.class);
+        Mockito.when(props.getMaxIntervalSecond()).thenReturn(intervalSec);
+        StreamingMultiTblTask t = new StreamingMultiTblTask(
+                1L, 1L, null, null, null, "db", null, props, null, null);
+        long now = System.currentTimeMillis();
+        t.startTimeMs = now - lastProgressMsAgo;
+        t.lastProgressMs = t.startTimeMs;
+        t.lastScannedRows = 1000;
+        return t;
+    }
+
+    private StreamingTaskStatus status(long scanned) {
+        StreamingTaskStatus s = new StreamingTaskStatus();
+        s.setScannedRows(scanned);
+        return s;
+    }
+
+    @Test
+    public void readAdvancingRenewsDeadline() {
+        StreamingMultiTblTask t = newTask(10 * 3600_000L, 60L);
+        Assert.assertFalse(t.isTimeout(status(2000)));
+    }
+
+    @Test
+    public void noProgressWithinBudgetNotTimeout() {
+        StreamingMultiTblTask t = newTask(5 * 60_000L, 60L);
+        Assert.assertFalse(t.isTimeout(status(1000)));
+    }
+
+    @Test
+    public void noProgressOverBudgetTimeout() {
+        StreamingMultiTblTask t = newTask(11 * 60_000L, 60L);
+        Assert.assertTrue(t.isTimeout(status(1000)));
+    }
+
+    @Test
+    public void smallIntervalFlooredByMinTimeout() {
+        StreamingMultiTblTask t = newTask(4 * 60_000L, 1L);
+        Assert.assertFalse(t.isTimeout(status(1000)));
+    }
+
+    @Test
+    public void nullProgressBehavesLikeOldTimeout() {
+        StreamingMultiTblTask t = newTask(11 * 60_000L, 60L);
+        Assert.assertTrue(t.isTimeout(null));
+    }
+
+    @Test
+    public void localTimeoutGatesProgressPull() {
+        Assert.assertFalse(newTask(5 * 60_000L, 60L).isLocalTimeout());
+        Assert.assertTrue(newTask(11 * 60_000L, 60L).isLocalTimeout());
+    }
+}
diff --git 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/controller/ClientController.java
 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/controller/ClientController.java
index 22509a55e98..1ac874ceeaa 100644
--- 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/controller/ClientController.java
+++ 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/controller/ClientController.java
@@ -165,6 +165,11 @@ public class ClientController {
         return 
RestResponse.success(pipelineCoordinator.getTaskFailReason(taskId));
     }
 
+    @RequestMapping(path = "/api/getTaskStatus/{taskId}", method = 
RequestMethod.POST)
+    public Object getTaskStatus(@PathVariable("taskId") String taskId) {
+        return RestResponse.success(pipelineCoordinator.getTaskStatus(taskId));
+    }
+
     @RequestMapping(path = "/api/getTaskOffset/{taskId}", method = 
RequestMethod.POST)
     public Object getTaskIdOffset(@PathVariable("taskId") String taskId) {
         return 
RestResponse.success(pipelineCoordinator.getOffsetWithTaskId(taskId));
diff --git 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java
 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java
index ebbd9c4acf1..51871db92b1 100644
--- 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java
+++ 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java
@@ -29,6 +29,7 @@ import 
org.apache.doris.cdcclient.source.reader.SplitReadResult;
 import org.apache.doris.cdcclient.utils.ConfigUtil;
 import org.apache.doris.cdcclient.utils.SchemaChangeManager;
 import org.apache.doris.job.cdc.DataSourceConfigKeys;
+import org.apache.doris.job.cdc.StreamingTaskStatus;
 import org.apache.doris.job.cdc.request.FetchRecordRequest;
 import org.apache.doris.job.cdc.request.WriteRecordRequest;
 import org.apache.doris.job.cdc.split.BinlogSplit;
@@ -54,6 +55,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 
 import static 
org.apache.flink.cdc.connectors.base.utils.SourceRecordUtils.SCHEMA_HEARTBEAT_EVENT_KEY_NAME;
 
@@ -61,6 +63,8 @@ import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Preconditions;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
 import io.debezium.data.Envelope;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -78,8 +82,11 @@ public class PipelineCoordinator {
     // taskId -> list of split offsets (accumulates all splits processed in 
one task)
     private final Map<String, List<Map<String, String>>> taskOffsetCache =
             new ConcurrentHashMap<>();
-    // taskId -> writeFailReason
-    private final Map<String, String> taskErrorMaps = new 
ConcurrentHashMap<>();
+    // taskId -> writeFailReason, bounded so old entries are evicted instead 
of accumulating
+    // unbounded
+    private final Cache<String, String> taskErrorMaps =
+            CacheBuilder.newBuilder().maximumSize(1000).build();
+    private final Map<String, AtomicLong> taskProgressMap = new 
ConcurrentHashMap<>();
     private final ThreadPoolExecutor executor;
     private static final int QUEUE_CAPACITY = 128;
     private static final ObjectMapper objectMapper = new ObjectMapper();
@@ -406,6 +413,13 @@ public class PipelineCoordinator {
                         closeJobStreamLoad(writeRecordRequest.getJobId());
                         String rootCauseMessage = 
ExceptionUtils.getRootCauseMessage(ex);
                         taskErrorMaps.put(writeRecordRequest.getTaskId(), 
rootCauseMessage);
+                        taskProgressMap.remove(writeRecordRequest.getTaskId());
+                        DorisBatchStreamLoad.reportTaskFailure(
+                                writeRecordRequest.getFrontendAddress(),
+                                writeRecordRequest.getToken(),
+                                writeRecordRequest.getJobId(),
+                                writeRecordRequest.getTaskId(),
+                                rootCauseMessage);
                         LOG.error(
                                 "Failed to process async write record, 
jobId={} taskId={}",
                                 writeRecordRequest.getJobId(),
@@ -579,6 +593,10 @@ public class PipelineCoordinator {
                         }
                         // Mark last message as data (not heartbeat)
                         lastMessageIsHeartbeat = false;
+                        taskProgressMap
+                                .computeIfAbsent(
+                                        writeRecordRequest.getTaskId(), k -> 
new AtomicLong())
+                                .set(scannedRows);
                     }
                 }
             }
@@ -620,6 +638,7 @@ public class PipelineCoordinator {
                 scannedRows,
                 batchStreamLoad.getLoadStatistic(),
                 tableSchemas);
+        taskProgressMap.remove(currentTaskId);
     }
 
     public static boolean isHeartbeatEvent(SourceRecord record) {
@@ -729,10 +748,22 @@ public class PipelineCoordinator {
     }
 
     public String getTaskFailReason(String taskId) {
-        String taskReason = taskErrorMaps.remove(taskId);
+        String taskReason = taskErrorMaps.getIfPresent(taskId);
+        taskErrorMaps.invalidate(taskId);
         return taskReason == null ? "" : taskReason;
     }
 
+    public StreamingTaskStatus getTaskStatus(String taskId) {
+        // On failure, drop progress so FE won't renew the deadline on a 
failed task.
+        String reason = taskErrorMaps.getIfPresent(taskId);
+        taskErrorMaps.invalidate(taskId);
+        if (StringUtils.isNotEmpty(reason)) {
+            return new StreamingTaskStatus(-1, reason);
+        }
+        AtomicLong scannedRows = taskProgressMap.get(taskId);
+        return new StreamingTaskStatus(scannedRows == null ? -1 : 
scannedRows.get(), "");
+    }
+
     /**
      * Clean up reader resources: commit source offset and finish split 
records.
      *
diff --git 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/sink/DorisBatchStreamLoad.java
 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/sink/DorisBatchStreamLoad.java
index b1d1cd8ba03..207813a523c 100644
--- 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/sink/DorisBatchStreamLoad.java
+++ 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/sink/DorisBatchStreamLoad.java
@@ -21,6 +21,7 @@ import org.apache.doris.cdcclient.common.Env;
 import org.apache.doris.cdcclient.exception.StreamLoadException;
 import org.apache.doris.cdcclient.utils.HttpUtil;
 import org.apache.doris.job.cdc.request.CommitOffsetRequest;
+import org.apache.doris.job.cdc.request.TaskFailureRequest;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.annotation.VisibleForTesting;
@@ -76,6 +77,10 @@ public class DorisBatchStreamLoad implements Serializable {
     private final byte[] lineDelimiter = "\n".getBytes();
     private static final String LOAD_URL_PATTERN = 
"http://%s/api/%s/%s/_stream_load";;
     private static final String COMMIT_URL_PATTERN = 
"http://%s/api/streaming/commit_offset";;
+    private static final String REPORT_FAILURE_URL_PATTERN =
+            "http://%s/api/streaming/report_task_failure";;
+    // best-effort notification: short timeout so an unreachable FE can't pin 
the data-write thread
+    private static final int REPORT_FAILURE_TIMEOUT_MS = 60 * 1000;
     private String hostPort;
     @Setter private String frontendAddress;
     private Map<String, BatchRecordBuffer> bufferMap = new 
ConcurrentHashMap<>();
@@ -600,4 +605,40 @@ public class DorisBatchStreamLoad implements Serializable {
             throw new StreamLoadException("Failed to commit offset", ex);
         }
     }
+
+    /**
+     * Best-effort push: tell the FE a running task hit a hard write failure 
so it is failed within
+     * seconds instead of waiting out the timeout budget. Never throws; if the 
push is lost the
+     * progress-aware timeout on the FE is the backstop.
+     */
+    public static void reportTaskFailure(
+            String frontendAddress, String token, String jobId, String taskId, 
String reason) {
+        try {
+            String url = String.format(REPORT_FAILURE_URL_PATTERN, 
frontendAddress);
+            String param =
+                    OBJECT_MAPPER.writeValueAsString(
+                            TaskFailureRequest.builder()
+                                    .jobId(Long.parseLong(jobId))
+                                    .taskId(Long.parseLong(taskId))
+                                    .reason(reason)
+                                    .build());
+            HttpPutBuilder builder =
+                    new HttpPutBuilder()
+                            .addCommonHeader()
+                            .addBodyContentType()
+                            .addTokenAuth(token)
+                            .setUrl(url)
+                            .setEntity(new StringEntity(param));
+            try (CloseableHttpClient client = 
HttpUtil.getHttpClient(REPORT_FAILURE_TIMEOUT_MS);
+                    CloseableHttpResponse resp = 
client.execute(builder.build())) {
+                LOG.info(
+                        "report task failure jobId={} taskId={} status={}",
+                        jobId,
+                        taskId,
+                        resp.getStatusLine().getStatusCode());
+            }
+        } catch (Exception ex) {
+            LOG.warn("report task failure failed, jobId={} taskId={}", jobId, 
taskId, ex);
+        }
+    }
 }
diff --git 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/HttpUtil.java
 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/HttpUtil.java
index 05407b2c89d..88c8accf65f 100644
--- 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/HttpUtil.java
+++ 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/HttpUtil.java
@@ -31,6 +31,10 @@ public class HttpUtil {
     private static int socketTimeout = 10 * 60 * 1000; // stream load timeout 
10 min
 
     public static CloseableHttpClient getHttpClient() {
+        return getHttpClient(socketTimeout);
+    }
+
+    public static CloseableHttpClient getHttpClient(int socketTimeoutMs) {
         return HttpClients.custom()
                 // default timeout 3s, maybe report 307 error when fe busy
                 .setRequestExecutor(new 
HttpRequestExecutor(waitForContinueTimeout))
@@ -47,7 +51,7 @@ public class HttpUtil {
                         RequestConfig.custom()
                                 .setConnectTimeout(connectTimeout)
                                 .setConnectionRequestTimeout(connectTimeout)
-                                .setSocketTimeout(socketTimeout)
+                                .setSocketTimeout(socketTimeoutMs)
                                 .build())
                 .addInterceptorLast(new RequestContent(true))
                 .build();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to