This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 5d370901fe0 branch-3.0: [Bug](routine load) Fix routine load task 
failed with MEM_LIMIT_EXCEED never be scheduled again #55481 (#55614)
5d370901fe0 is described below

commit 5d370901fe04ff2bb8c81383228a4eeb1ee6db2c
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Fri Sep 5 09:37:22 2025 +0800

    branch-3.0: [Bug](routine load) Fix routine load task failed with 
MEM_LIMIT_EXCEED never be scheduled again #55481 (#55614)
    
    Cherry-picked from #55481
    
    Co-authored-by: xy720 <[email protected]>
---
 .../load/routineload/RoutineLoadTaskScheduler.java | 13 +++++
 .../test_routine_load_job_schedule.groovy          | 57 +++++++++++++++++++++-
 2 files changed, 69 insertions(+), 1 deletion(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java
index 28e03567765..d1b5a6f73e8 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java
@@ -247,6 +247,13 @@ public class RoutineLoadTaskScheduler extends MasterDaemon 
{
 
         // Check if this is a resource pressure error that should not be 
immediately rescheduled
         if (errorMsg.contains("TOO_MANY_TASKS") || 
errorMsg.contains("MEM_LIMIT_EXCEEDED")) {
+            // submit task failed (such as TOO_MANY_TASKS/MEM_LIMIT_EXCEEDED 
error),
+            // but txn has already begun. Here we will still set the 
ExecuteStartTime of
+            // this task, which means we "assume" that this task has been 
successfully submitted.
+            // And this task will then be aborted because of a timeout.
+            // In this way, we can prevent the entire job from being paused 
due to submit errors,
+            // and we can also relieve the pressure on BE by waiting for the 
timeout period.
+            
routineLoadTaskInfo.setExecuteStartTimeMs(System.currentTimeMillis());
             return;
         }
 
@@ -302,6 +309,12 @@ public class RoutineLoadTaskScheduler extends MasterDaemon 
{
                 throw new LoadException("debug point 
FE.ROUTINE_LOAD_TASK_SUBMIT_FAILED");
             }
 
+            if 
(DebugPointUtil.isEnable("FE.ROUTINE_LOAD_TASK_SUBMIT_FAILED.MEM_LIMIT_EXCEEDED"))
 {
+                LOG.warn("debug point 
FE.ROUTINE_LOAD_TASK_SUBMIT_FAILED.MEM_LIMIT_EXCEEDED,"
+                        + "routine load task submit failed");
+                throw new LoadException("MEM_LIMIT_EXCEEDED");
+            }
+
             if (tStatus.getStatusCode() != TStatusCode.OK) {
                 throw new LoadException("failed to submit task. error code: " 
+ tStatus.getStatusCode()
                         + ", msg: " + (tStatus.getErrorMsgsSize() > 0 ? 
tStatus.getErrorMsgs().get(0) : "NaN"));
diff --git 
a/regression-test/suites/load_p0/routine_load/test_routine_load_job_schedule.groovy
 
b/regression-test/suites/load_p0/routine_load/test_routine_load_job_schedule.groovy
index ed52e60a13f..5cd433283e0 100644
--- 
a/regression-test/suites/load_p0/routine_load/test_routine_load_job_schedule.groovy
+++ 
b/regression-test/suites/load_p0/routine_load/test_routine_load_job_schedule.groovy
@@ -111,5 +111,60 @@ suite("test_routine_load_job_schedule","nonConcurrent") {
                 logger.warn("Failed to stop routine load job: ${e.message}")
             }
         }
+
+        sql "truncate table ${tableName}"
+        def memJob = "test_routine_load_job_schedule_mem_limit"
+        try {
+            
GetDebugPoint().enableDebugPointForAllBEs("FE.ROUTINE_LOAD_TASK_SUBMIT_FAILED.MEM_LIMIT_EXCEEDED")
+            testData.each { line->
+                logger.info("Sending data to kafka: ${line}")
+                def record = new ProducerRecord<>(kafkaCsvTpoics[0], null, 
line)
+                producer.send(record)
+            }
+
+            sql """
+                CREATE ROUTINE LOAD ${memJob} ON ${tableName}
+                COLUMNS TERMINATED BY ","
+                FROM KAFKA
+                (
+                    "kafka_broker_list" = "${kafka_broker}",
+                    "kafka_topic" = "${newTopic.name()}",
+                    "property.kafka_default_offsets" = "OFFSET_BEGINNING",
+                    "max_batch_interval" = "6"
+                );
+            """
+
+            sleep(5000)
+
+            
GetDebugPoint().disableDebugPointForAllBEs("FE.ROUTINE_LOAD_TASK_SUBMIT_FAILED.MEM_LIMIT_EXCEEDED")
+
+            def count = 0
+            def maxWaitCount = 120 // > 60 = maxBatchIntervalS * 
Config.routine_load_task_timeout_multiplier
+            while (true) {
+                def state = sql "show routine load for ${memJob}"
+                def routineLoadState = state[0][8].toString()
+                def statistic = state[0][14].toString()
+                logger.info("Routine load state: ${routineLoadState}")
+                logger.info("Routine load statistic: ${statistic}")
+                def rowCount = sql "select count(*) from ${memTableName}"
+                if (routineLoadState == "RUNNING" && rowCount[0][0] == 5) {
+                    break
+                }
+                if (count > maxWaitCount) {
+                    assertEquals(1, 2)
+                }
+                sleep(1000)
+                count++
+            }
+        } catch (Exception e) {
+            logger.error("MEM_LIMIT_EXCEEDED test failed with exception: 
${e.message}")
+        } finally {
+            
GetDebugPoint().disableDebugPointForAllBEs("FE.ROUTINE_LOAD_TASK_SUBMIT_FAILED.MEM_LIMIT_EXCEEDED")
+            try {
+                sql "stop routine load for 
test_routine_load_job_schedule_mem_limit"
+            } catch (Exception e) {
+                logger.warn("Failed to stop routine load job: ${e.message}")
+            }
+        }
     }
-}
\ No newline at end of file
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to