This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 95b7c22a229 [Fix](Streamingjob) fix show task error info when task 
timeout (#59784)
95b7c22a229 is described below

commit 95b7c22a2297159ce86a5da6d08c22870f16f215
Author: wudi <[email protected]>
AuthorDate: Tue Jan 13 11:03:43 2026 +0800

    [Fix](Streamingjob) fix show task error info when task timeout (#59784)
    
    ### What problem does this PR solve?
    
    fix show task error info when task timeout
    
    Related PR: #58898
---
 .../insert/streaming/StreamingInsertJob.java       |   6 +-
 .../insert/streaming/StreamingMultiTblTask.java    |  46 +++++++++
 .../cdcclient/controller/ClientController.java     |   7 ++
 .../cdcclient/service/PipelineCoordinator.java     |  10 ++
 .../cdc/test_streaming_mysql_job_errormsg.groovy   | 111 +++++++++++++++++++++
 5 files changed, 179 insertions(+), 1 deletion(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
index 011894f6287..d75427d7554 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
@@ -1076,7 +1076,11 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
             StreamingMultiTblTask runningMultiTask = (StreamingMultiTblTask) 
this.runningStreamTask;
             if (TaskStatus.RUNNING.equals(runningMultiTask.getStatus())
                     && runningMultiTask.isTimeout()) {
-                runningMultiTask.onFail("task failed cause timeout");
+                String timeoutReason = runningMultiTask.getTimeoutReason();
+                if (StringUtils.isEmpty(timeoutReason)) {
+                    timeoutReason = "task failed cause timeout";
+                }
+                runningMultiTask.onFail(timeoutReason);
                 // renew streaming task by auto resume
             }
         } finally {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java
index 07d9acf9d3f..08f1bb5ccaf 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java
@@ -69,6 +69,7 @@ public class StreamingMultiTblTask extends 
AbstractStreamingTask {
     private long scannedRows = 0L;
     private long scannedBytes = 0L;
     private long timeoutMs;
+    private long runningBackendId;
 
     public StreamingMultiTblTask(Long jobId,
             long taskId,
@@ -113,6 +114,7 @@ public class StreamingMultiTblTask extends 
AbstractStreamingTask {
 
     private void sendWriteRequest() throws JobException {
         Backend backend = StreamingJobUtils.selectBackend();
+        this.runningBackendId = backend.getId();
         WriteRecordRequest params = buildRequestParams();
         InternalService.PRequestCdcClientRequest request = 
InternalService.PRequestCdcClientRequest.newBuilder()
                 .setApi("/api/writeRecords")
@@ -264,6 +266,50 @@ public class StreamingMultiTblTask extends 
AbstractStreamingTask {
         return (System.currentTimeMillis() - startTimeMs) > timeoutMs;
     }
 
+    /**
+     * When a task encounters a write error, it will time out.
+     * The job needs to obtain the reason for the timeout,
+     * such as a data quality error, and needs to expose it to the user.
+     */
+    public String getTimeoutReason() {
+        try {
+            if (runningBackendId <= 0) {
+                log.info("No running backend for task {}", runningBackendId);
+                return "";
+            }
+            Backend backend = 
Env.getCurrentSystemInfo().getBackend(runningBackendId);
+            InternalService.PRequestCdcClientRequest request = 
InternalService.PRequestCdcClientRequest.newBuilder()
+                    .setApi("/api/getFailReason/" + getTaskId())
+                    .build();
+            TNetworkAddress address = new TNetworkAddress(backend.getHost(), 
backend.getBrpcPort());
+            InternalService.PRequestCdcClientResult result = null;
+            Future<PRequestCdcClientResult> future =
+                    
BackendServiceProxy.getInstance().requestCdcClient(address, request);
+            result = future.get();
+            TStatusCode code = 
TStatusCode.findByValue(result.getStatus().getStatusCode());
+            if (code != TStatusCode.OK) {
+                log.warn("Failed to get task timeout reason, {}", 
result.getStatus().getErrorMsgs(0));
+                return "";
+            }
+            String response = result.getResponse();
+            try {
+                ResponseBody<String> responseObj = objectMapper.readValue(
+                        response,
+                        new TypeReference<ResponseBody<String>>() {
+                        }
+                );
+                if (responseObj.getCode() == RestApiStatusCode.OK.code) {
+                    return responseObj.getData();
+                }
+            } catch (JsonProcessingException e) {
+                log.warn("Failed to get task timeout reason, response: {}", 
response);
+            }
+        } catch (ExecutionException | InterruptedException ex) {
+            log.error("Send get fail reason request failed: ", ex);
+        }
+        return "";
+    }
+
     @Override
     public TRow getTvfInfo(String jobName) {
         TRow trow = super.getTvfInfo(jobName);
diff --git 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/controller/ClientController.java
 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/controller/ClientController.java
index 2f444260559..2ca45ad2474 100644
--- 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/controller/ClientController.java
+++ 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/controller/ClientController.java
@@ -32,6 +32,7 @@ import java.util.List;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.web.bind.annotation.PathVariable;
 import org.springframework.web.bind.annotation.RequestBody;
 import org.springframework.web.bind.annotation.RequestMapping;
 import org.springframework.web.bind.annotation.RequestMethod;
@@ -105,4 +106,10 @@ public class ClientController {
         pipelineCoordinator.closeJobStreamLoad(jobConfig.getJobId());
         return RestResponse.success(true);
     }
+
+    /** get task fail reason */
+    @RequestMapping(path = "/api/getFailReason/{taskId}", method = 
RequestMethod.POST)
+    public Object getFailReason(@PathVariable("taskId") String taskId) {
+        return 
RestResponse.success(pipelineCoordinator.getTaskFailReason(taskId));
+    }
 }
diff --git 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java
 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java
index 187003ad0e6..73be6f16828 100644
--- 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java
+++ 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java
@@ -27,6 +27,7 @@ import org.apache.doris.job.cdc.request.WriteRecordRequest;
 import org.apache.doris.job.cdc.split.BinlogSplit;
 
 import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.apache.flink.api.connector.source.SourceSplit;
 import org.apache.kafka.connect.data.Struct;
 import org.apache.kafka.connect.source.SourceRecord;
@@ -55,6 +56,8 @@ public class PipelineCoordinator {
     private static final String SPLIT_ID = "splitId";
     // jobId
     private final Map<Long, DorisBatchStreamLoad> batchStreamLoadMap = new 
ConcurrentHashMap<>();
+    // taskId -> writeFailReason
+    private final Map<String, String> taskErrorMaps = new 
ConcurrentHashMap<>();
     private final ThreadPoolExecutor executor;
     private static final int MAX_CONCURRENT_TASKS = 10;
     private static final int QUEUE_CAPACITY = 128;
@@ -159,6 +162,8 @@ public class PipelineCoordinator {
                                 writeRecordRequest.getTaskId());
                     } catch (Exception ex) {
                         closeJobStreamLoad(writeRecordRequest.getJobId());
+                        String rootCauseMessage = 
ExceptionUtils.getRootCauseMessage(ex);
+                        taskErrorMaps.put(writeRecordRequest.getTaskId(), 
rootCauseMessage);
                         LOG.error(
                                 "Failed to process async write record, 
jobId={} taskId={}",
                                 writeRecordRequest.getJobId(),
@@ -276,4 +281,9 @@ public class PipelineCoordinator {
         Struct value = (Struct) record.value();
         return value.getStruct(Envelope.FieldName.SOURCE).getString("table");
     }
+
+    public String getTaskFailReason(String taskId) {
+        String taskReason = taskErrorMaps.remove(taskId);
+        return taskReason == null ? "" : taskReason;
+    }
 }
diff --git 
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_errormsg.groovy
 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_errormsg.groovy
new file mode 100644
index 00000000000..f063b629f90
--- /dev/null
+++ 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_errormsg.groovy
@@ -0,0 +1,111 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+import org.awaitility.Awaitility
+
+import static java.util.concurrent.TimeUnit.SECONDS
+
+suite("test_streaming_mysql_job_errormsg", 
"p0,external,mysql,external_docker,external_docker_mysql,nondatalake") {
+    def jobName = "test_streaming_mysql_job_errormsg"
+    def currentDb = (sql "select database()")[0][0]
+    def table1 = "user_info_errormsg"
+    def mysqlDb = "test_cdc_db"
+
+    sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+    sql """drop table if exists ${currentDb}.${table1} force"""
+
+    // Pre-create table2
+    sql """
+        CREATE TABLE IF NOT EXISTS ${currentDb}.${table1} (
+            `name` varchar(2) NOT NULL,
+            `age` int NOT NULL
+        ) ENGINE=OLAP
+        UNIQUE KEY(`name`)
+        COMMENT 'OLAP'
+        DISTRIBUTED BY HASH(`name`) BUCKETS AUTO
+        PROPERTIES ("replication_allocation" = "tag.location.default: 1");
+    """
+
+    String enabled = context.config.otherConfigs.get("enableJdbcTest")
+    if (enabled != null && enabled.equalsIgnoreCase("true")) {
+        String mysql_port = context.config.otherConfigs.get("mysql_57_port");
+        String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+        String s3_endpoint = getS3Endpoint()
+        String bucket = getS3BucketName()
+        String driver_url = 
"https://${bucket}.${s3_endpoint}/regression/jdbc_driver/mysql-connector-j-8.4.0.jar";
+
+        // create test
+        connect("root", "123456", 
"jdbc:mysql://${externalEnvIp}:${mysql_port}") {
+            sql """CREATE DATABASE IF NOT EXISTS ${mysqlDb}"""
+            sql """DROP TABLE IF EXISTS ${mysqlDb}.${table1}"""
+            sql """CREATE TABLE ${mysqlDb}.${table1} (
+                  `name` varchar(200) NOT NULL,
+                  `age` varchar(8) NOT NULL,
+                  PRIMARY KEY (`name`)
+                ) ENGINE=InnoDB"""
+            sql """INSERT INTO ${mysqlDb}.${table1} (name, age) VALUES 
('ABCDEFG', 'abc');"""
+        }
+
+        sql """CREATE JOB ${jobName}
+                ON STREAMING
+                FROM MYSQL (
+                    "jdbc_url" = "jdbc:mysql://${externalEnvIp}:${mysql_port}",
+                    "driver_url" = "${driver_url}",
+                    "driver_class" = "com.mysql.cj.jdbc.Driver",
+                    "user" = "root",
+                    "password" = "123456",
+                    "database" = "${mysqlDb}",
+                    "include_tables" = "${table1}", 
+                    "offset" = "initial"
+                )
+                TO DATABASE ${currentDb} (
+                  "table.create.properties.replication_num" = "1"
+                )
+            """
+
+        // check job running
+        try {
+            Awaitility.await().atMost(300, SECONDS)
+                    .pollInterval(1, SECONDS).until(
+                    {
+                        def jobFailCount = sql """ select FailedTaskCount, 
CanceledTaskCount, SucceedTaskCount from jobs("type"="insert") where Name = 
'${jobName}' and ExecuteType='STREAMING' """
+                        log.info("jobFailCount: " + jobFailCount)
+                        // check job status and faile task count larger than 1
+                        jobFailCount.size() == 1 && ('1' <= 
jobFailCount.get(0).get(0) || '1' <= jobFailCount.get(0).get(1))
+                    }
+            )
+        } catch (Exception ex){
+            def showjob = sql """select * from jobs("type"="insert") where 
Name='${jobName}'"""
+            def showtask = sql """select * from tasks("type"="insert") where 
JobName='${jobName}'"""
+            log.info("show job: " + showjob)
+            log.info("show task: " + showtask)
+            throw ex;
+        }
+
+        def jobFailMsg = sql """select errorMsg  from jobs("type"="insert") 
where Name = '${jobName}' and ExecuteType='STREAMING'"""
+        log.info("jobFailMsg: " + jobFailMsg)
+        assert jobFailMsg.get(0).get(0).contains("stream load error")
+
+        sql """
+            DROP JOB IF EXISTS where jobname =  '${jobName}'
+        """
+
+        def jobCountRsp = sql """select count(1) from jobs("type"="insert")  
where Name ='${jobName}'"""
+        assert jobCountRsp.get(0).get(0) == 0
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to