This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new 01646a91745 branch-4.0: [fix](job) fix routine load task schedule
stuck after create task fail #60143 (#60401)
01646a91745 is described below
commit 01646a91745c94cb273dbf76037da2fa5a27ebe4
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Mon Feb 2 11:14:48 2026 +0800
branch-4.0: [fix](job) fix routine load task schedule stuck after create
task fail #60143 (#60401)
Cherry-picked from #60143
Co-authored-by: hui lai <[email protected]>
---
.../load/routineload/RoutineLoadTaskScheduler.java | 7 +-
...est_routine_load_task_exception_recovery.groovy | 101 +++++++++++++++++++++
2 files changed, 106 insertions(+), 2 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java
index 8be721fba58..4156410ef86 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java
@@ -195,6 +195,9 @@ public class RoutineLoadTaskScheduler extends MasterDaemon {
try {
long startTime = System.currentTimeMillis();
tRoutineLoadTask = routineLoadTaskInfo.createRoutineLoadTask();
+ if
(DebugPointUtil.isEnable("FE.RoutineLoadTaskScheduler.createRoutineLoadTask.exception"))
{
+ throw new RuntimeException("debug point:
createRoutineLoadTask.exception");
+ }
if (LOG.isDebugEnabled()) {
LOG.debug("create routine load task cost(ms): {}, job id: {}",
(System.currentTimeMillis() - startTime),
routineLoadTaskInfo.getJobId());
@@ -208,12 +211,12 @@ public class RoutineLoadTaskScheduler extends
MasterDaemon {
new
ErrorReason(InternalErrorCode.META_NOT_FOUND_ERR, "meta not found: " +
e.getMessage()),
false);
throw e;
- } catch (UserException e) {
+ } catch (Exception e) {
// set BE id to -1 to release the BE slot
routineLoadTaskInfo.setBeId(-1);
routineLoadManager.getJob(routineLoadTaskInfo.getJobId())
.updateState(JobState.PAUSED,
- new ErrorReason(e.getErrorCode(),
+ new ErrorReason(InternalErrorCode.CREATE_TASKS_ERR,
"failed to create task: " +
e.getMessage()), false);
throw e;
}
diff --git
a/regression-test/suites/load_p0/routine_load/test_routine_load_task_exception_recovery.groovy
b/regression-test/suites/load_p0/routine_load/test_routine_load_task_exception_recovery.groovy
new file mode 100644
index 00000000000..a9cd82fd702
--- /dev/null
+++
b/regression-test/suites/load_p0/routine_load/test_routine_load_task_exception_recovery.groovy
@@ -0,0 +1,101 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.util.RoutineLoadTestUtils
+import org.junit.Assert
+
+suite("test_routine_load_task_exception_recovery", "nonConcurrent") {
+ def kafkaCsvTpoics = [
+ "test_routine_load_task_exception_recovery",
+ ]
+
+ if (RoutineLoadTestUtils.isKafkaTestEnabled(context)) {
+ def runSql = { String q -> sql q }
+ def kafka_broker = RoutineLoadTestUtils.getKafkaBroker(context)
+ def producer = RoutineLoadTestUtils.createKafkaProducer(kafka_broker)
+
+ def tableName = "test_routine_load_task_exception_recovery"
+ def job = "test_task_exception_recovery"
+ sql """ DROP TABLE IF EXISTS ${tableName} """
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName} (
+ `k1` int(20) NULL,
+ `k2` string NULL,
+ `v1` date NULL,
+ `v2` string NULL,
+ `v3` datetime NULL,
+ `v4` string NULL
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`k1`)
+ COMMENT 'OLAP'
+ DISTRIBUTED BY HASH(`k1`) BUCKETS 3
+ PROPERTIES ("replication_allocation" = "tag.location.default: 1");
+ """
+
+ try {
+ sql """
+ CREATE ROUTINE LOAD ${job} ON ${tableName}
+ COLUMNS TERMINATED BY ","
+ FROM KAFKA
+ (
+ "kafka_broker_list" = "${kafka_broker}",
+ "kafka_topic" = "${kafkaCsvTpoics[0]}",
+ "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+ );
+ """
+
+ // Enable debug point to simulate exception during
createRoutineLoadTask
+ def injection =
"FE.RoutineLoadTaskScheduler.createRoutineLoadTask.exception"
+ try {
+ logger.info("---test task exception recovery: enable debug
point to simulate exception---")
+ GetDebugPoint().enableDebugPointForAllFEs(injection)
+
+ RoutineLoadTestUtils.sendTestDataToKafka(producer,
kafkaCsvTpoics)
+
+ def maxWaitCount = 0
+ while (true) {
+ def res = runSql("show routine load for ${job}")
+ def routineLoadState = res[0][8].toString()
+ def otherMsg = res[0][19].toString()
+ logger.info("Routine load state: ${routineLoadState},
error message: ${otherMsg}")
+ if (routineLoadState == "PAUSED" &&
otherMsg.contains("failed to create task")) {
+ break
+ }
+ Thread.sleep(1000)
+ if (maxWaitCount++ > 60) {
+ Assert.fail("Routine load job did not pause as
expected within timeout")
+ }
+ }
+ } finally {
+ GetDebugPoint().disableDebugPointForAllFEs(injection)
+ }
+
+ // After disabling the debug point, verify that the routine load
can recover
+ // and successfully load data
+ logger.info("---test task exception recovery: verify data loading
after recovery---")
+ RoutineLoadTestUtils.sendTestDataToKafka(producer, kafkaCsvTpoics)
+ RoutineLoadTestUtils.waitForTaskFinish(runSql, job, tableName, 0)
+
+ // Verify data was loaded
+ def rowCount = sql "select count(*) from ${tableName}"
+ logger.info("Row count: ${rowCount[0][0]}")
+ Assert.assertTrue("Expected at least 2 rows in table",
rowCount[0][0] >= 2)
+ } finally {
+ sql "stop routine load for ${job}"
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]