This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 95b7c22a229 [Fix](Streamingjob) fix show task error info when task
timeout (#59784)
95b7c22a229 is described below
commit 95b7c22a2297159ce86a5da6d08c22870f16f215
Author: wudi <[email protected]>
AuthorDate: Tue Jan 13 11:03:43 2026 +0800
[Fix](Streamingjob) fix show task error info when task timeout (#59784)
### What problem does this PR solve?
fix show task error info when task timeout
Related PR: #58898
---
.../insert/streaming/StreamingInsertJob.java | 6 +-
.../insert/streaming/StreamingMultiTblTask.java | 46 +++++++++
.../cdcclient/controller/ClientController.java | 7 ++
.../cdcclient/service/PipelineCoordinator.java | 10 ++
.../cdc/test_streaming_mysql_job_errormsg.groovy | 111 +++++++++++++++++++++
5 files changed, 179 insertions(+), 1 deletion(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
index 011894f6287..d75427d7554 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
@@ -1076,7 +1076,11 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
StreamingMultiTblTask runningMultiTask = (StreamingMultiTblTask)
this.runningStreamTask;
if (TaskStatus.RUNNING.equals(runningMultiTask.getStatus())
&& runningMultiTask.isTimeout()) {
- runningMultiTask.onFail("task failed cause timeout");
+ String timeoutReason = runningMultiTask.getTimeoutReason();
+ if (StringUtils.isEmpty(timeoutReason)) {
+ timeoutReason = "task failed cause timeout";
+ }
+ runningMultiTask.onFail(timeoutReason);
// renew streaming task by auto resume
}
} finally {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java
index 07d9acf9d3f..08f1bb5ccaf 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java
@@ -69,6 +69,7 @@ public class StreamingMultiTblTask extends
AbstractStreamingTask {
private long scannedRows = 0L;
private long scannedBytes = 0L;
private long timeoutMs;
+ private long runningBackendId;
public StreamingMultiTblTask(Long jobId,
long taskId,
@@ -113,6 +114,7 @@ public class StreamingMultiTblTask extends
AbstractStreamingTask {
private void sendWriteRequest() throws JobException {
Backend backend = StreamingJobUtils.selectBackend();
+ this.runningBackendId = backend.getId();
WriteRecordRequest params = buildRequestParams();
InternalService.PRequestCdcClientRequest request =
InternalService.PRequestCdcClientRequest.newBuilder()
.setApi("/api/writeRecords")
@@ -264,6 +266,50 @@ public class StreamingMultiTblTask extends
AbstractStreamingTask {
return (System.currentTimeMillis() - startTimeMs) > timeoutMs;
}
+ /**
+ * When a task encounters a write error, it will time out.
+ * The job needs to obtain the reason for the timeout,
+ * such as a data quality error, and needs to expose it to the user.
+ */
+ public String getTimeoutReason() {
+ try {
+ if (runningBackendId <= 0) {
+ log.info("No running backend for task {}", runningBackendId);
+ return "";
+ }
+ Backend backend =
Env.getCurrentSystemInfo().getBackend(runningBackendId);
+ InternalService.PRequestCdcClientRequest request =
InternalService.PRequestCdcClientRequest.newBuilder()
+ .setApi("/api/getFailReason/" + getTaskId())
+ .build();
+ TNetworkAddress address = new TNetworkAddress(backend.getHost(),
backend.getBrpcPort());
+ InternalService.PRequestCdcClientResult result = null;
+ Future<PRequestCdcClientResult> future =
+
BackendServiceProxy.getInstance().requestCdcClient(address, request);
+ result = future.get();
+ TStatusCode code =
TStatusCode.findByValue(result.getStatus().getStatusCode());
+ if (code != TStatusCode.OK) {
+ log.warn("Failed to get task timeout reason, {}",
result.getStatus().getErrorMsgs(0));
+ return "";
+ }
+ String response = result.getResponse();
+ try {
+ ResponseBody<String> responseObj = objectMapper.readValue(
+ response,
+ new TypeReference<ResponseBody<String>>() {
+ }
+ );
+ if (responseObj.getCode() == RestApiStatusCode.OK.code) {
+ return responseObj.getData();
+ }
+ } catch (JsonProcessingException e) {
+ log.warn("Failed to get task timeout reason, response: {}",
response);
+ }
+ } catch (ExecutionException | InterruptedException ex) {
+ log.error("Send get fail reason request failed: ", ex);
+ }
+ return "";
+ }
+
@Override
public TRow getTvfInfo(String jobName) {
TRow trow = super.getTvfInfo(jobName);
diff --git
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/controller/ClientController.java
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/controller/ClientController.java
index 2f444260559..2ca45ad2474 100644
---
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/controller/ClientController.java
+++
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/controller/ClientController.java
@@ -32,6 +32,7 @@ import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
@@ -105,4 +106,10 @@ public class ClientController {
pipelineCoordinator.closeJobStreamLoad(jobConfig.getJobId());
return RestResponse.success(true);
}
+
+ /** get task fail reason */
+ @RequestMapping(path = "/api/getFailReason/{taskId}", method =
RequestMethod.POST)
+ public Object getFailReason(@PathVariable("taskId") String taskId) {
+ return
RestResponse.success(pipelineCoordinator.getTaskFailReason(taskId));
+ }
}
diff --git
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java
index 187003ad0e6..73be6f16828 100644
---
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java
+++
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java
@@ -27,6 +27,7 @@ import org.apache.doris.job.cdc.request.WriteRecordRequest;
import org.apache.doris.job.cdc.split.BinlogSplit;
import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
@@ -55,6 +56,8 @@ public class PipelineCoordinator {
private static final String SPLIT_ID = "splitId";
// jobId
private final Map<Long, DorisBatchStreamLoad> batchStreamLoadMap = new
ConcurrentHashMap<>();
+ // taskId -> writeFailReason
+ private final Map<String, String> taskErrorMaps = new
ConcurrentHashMap<>();
private final ThreadPoolExecutor executor;
private static final int MAX_CONCURRENT_TASKS = 10;
private static final int QUEUE_CAPACITY = 128;
@@ -159,6 +162,8 @@ public class PipelineCoordinator {
writeRecordRequest.getTaskId());
} catch (Exception ex) {
closeJobStreamLoad(writeRecordRequest.getJobId());
+ String rootCauseMessage =
ExceptionUtils.getRootCauseMessage(ex);
+ taskErrorMaps.put(writeRecordRequest.getTaskId(),
rootCauseMessage);
LOG.error(
"Failed to process async write record,
jobId={} taskId={}",
writeRecordRequest.getJobId(),
@@ -276,4 +281,9 @@ public class PipelineCoordinator {
Struct value = (Struct) record.value();
return value.getStruct(Envelope.FieldName.SOURCE).getString("table");
}
+
+ public String getTaskFailReason(String taskId) {
+ String taskReason = taskErrorMaps.remove(taskId);
+ return taskReason == null ? "" : taskReason;
+ }
}
diff --git
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_errormsg.groovy
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_errormsg.groovy
new file mode 100644
index 00000000000..f063b629f90
--- /dev/null
+++
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_errormsg.groovy
@@ -0,0 +1,111 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+import org.awaitility.Awaitility
+
+import static java.util.concurrent.TimeUnit.SECONDS
+
+suite("test_streaming_mysql_job_errormsg",
"p0,external,mysql,external_docker,external_docker_mysql,nondatalake") {
+ def jobName = "test_streaming_mysql_job_errormsg"
+ def currentDb = (sql "select database()")[0][0]
+ def table1 = "user_info_errormsg"
+ def mysqlDb = "test_cdc_db"
+
+ sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+ sql """drop table if exists ${currentDb}.${table1} force"""
+
+ // Pre-create table2
+ sql """
+ CREATE TABLE IF NOT EXISTS ${currentDb}.${table1} (
+ `name` varchar(2) NOT NULL,
+ `age` int NOT NULL
+ ) ENGINE=OLAP
+ UNIQUE KEY(`name`)
+ COMMENT 'OLAP'
+ DISTRIBUTED BY HASH(`name`) BUCKETS AUTO
+ PROPERTIES ("replication_allocation" = "tag.location.default: 1");
+ """
+
+ String enabled = context.config.otherConfigs.get("enableJdbcTest")
+ if (enabled != null && enabled.equalsIgnoreCase("true")) {
+ String mysql_port = context.config.otherConfigs.get("mysql_57_port");
+ String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+ String s3_endpoint = getS3Endpoint()
+ String bucket = getS3BucketName()
+ String driver_url =
"https://${bucket}.${s3_endpoint}/regression/jdbc_driver/mysql-connector-j-8.4.0.jar"
+
+ // create test
+ connect("root", "123456",
"jdbc:mysql://${externalEnvIp}:${mysql_port}") {
+ sql """CREATE DATABASE IF NOT EXISTS ${mysqlDb}"""
+ sql """DROP TABLE IF EXISTS ${mysqlDb}.${table1}"""
+ sql """CREATE TABLE ${mysqlDb}.${table1} (
+ `name` varchar(200) NOT NULL,
+ `age` varchar(8) NOT NULL,
+ PRIMARY KEY (`name`)
+ ) ENGINE=InnoDB"""
+ sql """INSERT INTO ${mysqlDb}.${table1} (name, age) VALUES
('ABCDEFG', 'abc');"""
+ }
+
+ sql """CREATE JOB ${jobName}
+ ON STREAMING
+ FROM MYSQL (
+ "jdbc_url" = "jdbc:mysql://${externalEnvIp}:${mysql_port}",
+ "driver_url" = "${driver_url}",
+ "driver_class" = "com.mysql.cj.jdbc.Driver",
+ "user" = "root",
+ "password" = "123456",
+ "database" = "${mysqlDb}",
+ "include_tables" = "${table1}",
+ "offset" = "initial"
+ )
+ TO DATABASE ${currentDb} (
+ "table.create.properties.replication_num" = "1"
+ )
+ """
+
+ // check job running
+ try {
+ Awaitility.await().atMost(300, SECONDS)
+ .pollInterval(1, SECONDS).until(
+ {
+ def jobFailCount = sql """ select FailedTaskCount,
CanceledTaskCount, SucceedTaskCount from jobs("type"="insert") where Name =
'${jobName}' and ExecuteType='STREAMING' """
+ log.info("jobFailCount: " + jobFailCount)
+ // check job status and faile task count larger than 1
+ jobFailCount.size() == 1 && ('1' <=
jobFailCount.get(0).get(0) || '1' <= jobFailCount.get(0).get(1))
+ }
+ )
+ } catch (Exception ex){
+ def showjob = sql """select * from jobs("type"="insert") where
Name='${jobName}'"""
+ def showtask = sql """select * from tasks("type"="insert") where
JobName='${jobName}'"""
+ log.info("show job: " + showjob)
+ log.info("show task: " + showtask)
+ throw ex;
+ }
+
+ def jobFailMsg = sql """select errorMsg from jobs("type"="insert")
where Name = '${jobName}' and ExecuteType='STREAMING'"""
+ log.info("jobFailMsg: " + jobFailMsg)
+ assert jobFailMsg.get(0).get(0).contains("stream load error")
+
+ sql """
+ DROP JOB IF EXISTS where jobname = '${jobName}'
+ """
+
+ def jobCountRsp = sql """select count(1) from jobs("type"="insert")
where Name ='${jobName}'"""
+ assert jobCountRsp.get(0).get(0) == 0
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]