This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 650932cc49a [fix](job) fix routine load task schedule stuck after
create task fail (#60143)
650932cc49a is described below
commit 650932cc49a33be2dc5965eedc1a4d7b628b837e
Author: hui lai <[email protected]>
AuthorDate: Sat Jan 31 05:58:24 2026 +0800
[fix](job) fix routine load task schedule stuck after create task fail
(#60143)
### What problem does this PR solve?
fix routine load task schedule stuck after create task fail:
```
026-01-21 18:46:11,938 WARN (Routine load task scheduler|52)
[RoutineLoadTaskScheduler.process():117] Taking routine load task from queue
has been interrupted
java.lang.IllegalStateException
at
com.google.common.base.Preconditions.checkState(Preconditions.java:499)
at org.apache.doris.analysis.SlotRef.getTableName(SlotRef.java:356)
at
org.apache.doris.rewrite.ExtractCommonFactorsRule.rewriteOrToIn(ExtractCommonFactorsRule.java:536)
at
org.apache.doris.rewrite.ExtractCommonFactorsRule.makeCompoundRemaining(ExtractCommonFactorsRule.java:459)
at
org.apache.doris.rewrite.ExtractCommonFactorsRule.extractCommonFactors(ExtractCommonFactorsRule.java:205)
at
org.apache.doris.rewrite.ExtractCommonFactorsRule.apply(ExtractCommonFactorsRule.java:80)
at
org.apache.doris.rewrite.ExprRewriter.applyRuleOnce(ExprRewriter.java:178)
at
org.apache.doris.rewrite.ExprRewriter.rewrite(ExprRewriter.java:171)
at
org.apache.doris.planner.FileLoadScanNode.initWhereExpr(FileLoadScanNode.java:171)
at
org.apache.doris.planner.FileLoadScanNode.initAndSetPrecedingFilter(FileLoadScanNode.java:144)
at
org.apache.doris.planner.FileLoadScanNode.initParamCreateContexts(FileLoadScanNode.java:134)
at
org.apache.doris.planner.FileLoadScanNode.init(FileLoadScanNode.java:125)
at
org.apache.doris.planner.StreamLoadPlanner.plan(StreamLoadPlanner.java:307)
at
org.apache.doris.planner.StreamLoadPlanner.plan(StreamLoadPlanner.java:116)
at
org.apache.doris.load.routineload.RoutineLoadJob.plan(RoutineLoadJob.java:1032)
at
org.apache.doris.load.routineload.KafkaTaskInfo.rePlan(KafkaTaskInfo.java:136)
at
org.apache.doris.load.routineload.KafkaTaskInfo.createRoutineLoadTask(KafkaTaskInfo.java:99)
at
org.apache.doris.load.routineload.RoutineLoadTaskScheduler.scheduleOneTask(RoutineLoadTaskScheduler.java:193)
at
org.apache.doris.load.routineload.RoutineLoadTaskScheduler.process(RoutineLoadTaskScheduler.java:115)
at
org.apache.doris.load.routineload.RoutineLoadTaskScheduler.runAfterCatalogReady(RoutineLoadTaskScheduler.java:84)
at
org.apache.doris.common.util.MasterDaemon.runOneCycle(MasterDaemon.java:58)
at org.apache.doris.common.util.Daemon.run(Daemon.java:119)
```
---
.../load/routineload/RoutineLoadTaskScheduler.java | 7 +-
...est_routine_load_task_exception_recovery.groovy | 101 +++++++++++++++++++++
2 files changed, 106 insertions(+), 2 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java
index 8be721fba58..4156410ef86 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java
@@ -195,6 +195,9 @@ public class RoutineLoadTaskScheduler extends MasterDaemon {
try {
long startTime = System.currentTimeMillis();
tRoutineLoadTask = routineLoadTaskInfo.createRoutineLoadTask();
+ if
(DebugPointUtil.isEnable("FE.RoutineLoadTaskScheduler.createRoutineLoadTask.exception"))
{
+ throw new RuntimeException("debug point:
createRoutineLoadTask.exception");
+ }
if (LOG.isDebugEnabled()) {
LOG.debug("create routine load task cost(ms): {}, job id: {}",
(System.currentTimeMillis() - startTime),
routineLoadTaskInfo.getJobId());
@@ -208,12 +211,12 @@ public class RoutineLoadTaskScheduler extends
MasterDaemon {
new
ErrorReason(InternalErrorCode.META_NOT_FOUND_ERR, "meta not found: " +
e.getMessage()),
false);
throw e;
- } catch (UserException e) {
+ } catch (Exception e) {
// set BE id to -1 to release the BE slot
routineLoadTaskInfo.setBeId(-1);
routineLoadManager.getJob(routineLoadTaskInfo.getJobId())
.updateState(JobState.PAUSED,
- new ErrorReason(e.getErrorCode(),
+ new ErrorReason(InternalErrorCode.CREATE_TASKS_ERR,
"failed to create task: " +
e.getMessage()), false);
throw e;
}
diff --git
a/regression-test/suites/load_p0/routine_load/test_routine_load_task_exception_recovery.groovy
b/regression-test/suites/load_p0/routine_load/test_routine_load_task_exception_recovery.groovy
new file mode 100644
index 00000000000..a9cd82fd702
--- /dev/null
+++
b/regression-test/suites/load_p0/routine_load/test_routine_load_task_exception_recovery.groovy
@@ -0,0 +1,101 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.util.RoutineLoadTestUtils
+import org.junit.Assert
+
+suite("test_routine_load_task_exception_recovery", "nonConcurrent") {
+ def kafkaCsvTpoics = [
+ "test_routine_load_task_exception_recovery",
+ ]
+
+ if (RoutineLoadTestUtils.isKafkaTestEnabled(context)) {
+ def runSql = { String q -> sql q }
+ def kafka_broker = RoutineLoadTestUtils.getKafkaBroker(context)
+ def producer = RoutineLoadTestUtils.createKafkaProducer(kafka_broker)
+
+ def tableName = "test_routine_load_task_exception_recovery"
+ def job = "test_task_exception_recovery"
+ sql """ DROP TABLE IF EXISTS ${tableName} """
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName} (
+ `k1` int(20) NULL,
+ `k2` string NULL,
+ `v1` date NULL,
+ `v2` string NULL,
+ `v3` datetime NULL,
+ `v4` string NULL
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`k1`)
+ COMMENT 'OLAP'
+ DISTRIBUTED BY HASH(`k1`) BUCKETS 3
+ PROPERTIES ("replication_allocation" = "tag.location.default: 1");
+ """
+
+ try {
+ sql """
+ CREATE ROUTINE LOAD ${job} ON ${tableName}
+ COLUMNS TERMINATED BY ","
+ FROM KAFKA
+ (
+ "kafka_broker_list" = "${kafka_broker}",
+ "kafka_topic" = "${kafkaCsvTpoics[0]}",
+ "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+ );
+ """
+
+ // Enable debug point to simulate exception during
createRoutineLoadTask
+ def injection =
"FE.RoutineLoadTaskScheduler.createRoutineLoadTask.exception"
+ try {
+ logger.info("---test task exception recovery: enable debug
point to simulate exception---")
+ GetDebugPoint().enableDebugPointForAllFEs(injection)
+
+ RoutineLoadTestUtils.sendTestDataToKafka(producer,
kafkaCsvTpoics)
+
+ def maxWaitCount = 0
+ while (true) {
+ def res = runSql("show routine load for ${job}")
+ def routineLoadState = res[0][8].toString()
+ def otherMsg = res[0][19].toString()
+ logger.info("Routine load state: ${routineLoadState},
error message: ${otherMsg}")
+ if (routineLoadState == "PAUSED" &&
otherMsg.contains("failed to create task")) {
+ break
+ }
+ Thread.sleep(1000)
+ if (maxWaitCount++ > 60) {
+ Assert.fail("Routine load job did not pause as
expected within timeout")
+ }
+ }
+ } finally {
+ GetDebugPoint().disableDebugPointForAllFEs(injection)
+ }
+
+ // After disabling the debug point, verify that the routine load
can recover
+ // and successfully load data
+ logger.info("---test task exception recovery: verify data loading
after recovery---")
+ RoutineLoadTestUtils.sendTestDataToKafka(producer, kafkaCsvTpoics)
+ RoutineLoadTestUtils.waitForTaskFinish(runSql, job, tableName, 0)
+
+ // Verify data was loaded
+ def rowCount = sql "select count(*) from ${tableName}"
+ logger.info("Row count: ${rowCount[0][0]}")
+ Assert.assertTrue("Expected at least 2 rows in table",
rowCount[0][0] >= 2)
+ } finally {
+ sql "stop routine load for ${job}"
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]