This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 5d370901fe0 branch-3.0: [Bug](routine load) Fix routine load task
failed with MEM_LIMIT_EXCEED never be scheduled again #55481 (#55614)
5d370901fe0 is described below
commit 5d370901fe04ff2bb8c81383228a4eeb1ee6db2c
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Fri Sep 5 09:37:22 2025 +0800
branch-3.0: [Bug](routine load) Fix routine load task failed with
MEM_LIMIT_EXCEED never be scheduled again #55481 (#55614)
Cherry-picked from #55481
Co-authored-by: xy720 <[email protected]>
---
.../load/routineload/RoutineLoadTaskScheduler.java | 13 +++++
.../test_routine_load_job_schedule.groovy | 57 +++++++++++++++++++++-
2 files changed, 69 insertions(+), 1 deletion(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java
index 28e03567765..d1b5a6f73e8 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java
@@ -247,6 +247,13 @@ public class RoutineLoadTaskScheduler extends MasterDaemon
{
// Check if this is a resource pressure error that should not be
immediately rescheduled
if (errorMsg.contains("TOO_MANY_TASKS") ||
errorMsg.contains("MEM_LIMIT_EXCEEDED")) {
+ // submit task failed (such as TOO_MANY_TASKS/MEM_LIMIT_EXCEEDED
error),
+ // but txn has already begun. Here we will still set the
ExecuteStartTime of
+ // this task, which means we "assume" that this task has been
successfully submitted.
+ // And this task will then be aborted because of a timeout.
+ // In this way, we can prevent the entire job from being paused
due to submit errors,
+ // and we can also relieve the pressure on BE by waiting for the
timeout period.
+
routineLoadTaskInfo.setExecuteStartTimeMs(System.currentTimeMillis());
return;
}
@@ -302,6 +309,12 @@ public class RoutineLoadTaskScheduler extends MasterDaemon
{
throw new LoadException("debug point
FE.ROUTINE_LOAD_TASK_SUBMIT_FAILED");
}
+ if
(DebugPointUtil.isEnable("FE.ROUTINE_LOAD_TASK_SUBMIT_FAILED.MEM_LIMIT_EXCEEDED"))
{
+ LOG.warn("debug point
FE.ROUTINE_LOAD_TASK_SUBMIT_FAILED.MEM_LIMIT_EXCEEDED,"
+ + "routine load task submit failed");
+ throw new LoadException("MEM_LIMIT_EXCEEDED");
+ }
+
if (tStatus.getStatusCode() != TStatusCode.OK) {
throw new LoadException("failed to submit task. error code: "
+ tStatus.getStatusCode()
+ ", msg: " + (tStatus.getErrorMsgsSize() > 0 ?
tStatus.getErrorMsgs().get(0) : "NaN"));
diff --git
a/regression-test/suites/load_p0/routine_load/test_routine_load_job_schedule.groovy
b/regression-test/suites/load_p0/routine_load/test_routine_load_job_schedule.groovy
index ed52e60a13f..5cd433283e0 100644
---
a/regression-test/suites/load_p0/routine_load/test_routine_load_job_schedule.groovy
+++
b/regression-test/suites/load_p0/routine_load/test_routine_load_job_schedule.groovy
@@ -111,5 +111,60 @@ suite("test_routine_load_job_schedule","nonConcurrent") {
logger.warn("Failed to stop routine load job: ${e.message}")
}
}
+
+ sql "truncate table ${tableName}"
+ def memJob = "test_routine_load_job_schedule_mem_limit"
+ try {
+
GetDebugPoint().enableDebugPointForAllBEs("FE.ROUTINE_LOAD_TASK_SUBMIT_FAILED.MEM_LIMIT_EXCEEDED")
+ testData.each { line->
+ logger.info("Sending data to kafka: ${line}")
+ def record = new ProducerRecord<>(kafkaCsvTpoics[0], null,
line)
+ producer.send(record)
+ }
+
+ sql """
+ CREATE ROUTINE LOAD ${memJob} ON ${tableName}
+ COLUMNS TERMINATED BY ","
+ FROM KAFKA
+ (
+ "kafka_broker_list" = "${kafka_broker}",
+ "kafka_topic" = "${newTopic.name()}",
+ "property.kafka_default_offsets" = "OFFSET_BEGINNING",
+ "max_batch_interval" = "6"
+ );
+ """
+
+ sleep(5000)
+
+
GetDebugPoint().disableDebugPointForAllBEs("FE.ROUTINE_LOAD_TASK_SUBMIT_FAILED.MEM_LIMIT_EXCEEDED")
+
+ def count = 0
+ def maxWaitCount = 120 // > 60 = maxBatchIntervalS *
Config.routine_load_task_timeout_multiplier
+ while (true) {
+ def state = sql "show routine load for ${memJob}"
+ def routineLoadState = state[0][8].toString()
+ def statistic = state[0][14].toString()
+ logger.info("Routine load state: ${routineLoadState}")
+ logger.info("Routine load statistic: ${statistic}")
+ def rowCount = sql "select count(*) from ${memTableName}"
+ if (routineLoadState == "RUNNING" && rowCount[0][0] == 5) {
+ break
+ }
+ if (count > maxWaitCount) {
+ assertEquals(1, 2)
+ }
+ sleep(1000)
+ count++
+ }
+ } catch (Exception e) {
+ logger.error("MEM_LIMIT_EXCEEDED test failed with exception:
${e.message}")
+ } finally {
+
GetDebugPoint().disableDebugPointForAllBEs("FE.ROUTINE_LOAD_TASK_SUBMIT_FAILED.MEM_LIMIT_EXCEEDED")
+ try {
+ sql "stop routine load for
test_routine_load_job_schedule_mem_limit"
+ } catch (Exception e) {
+ logger.warn("Failed to stop routine load job: ${e.message}")
+ }
+ }
}
-}
\ No newline at end of file
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]