This is an automated email from the ASF dual-hosted git repository.

sollhui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new e5f3badd010 [fix](job) lock routine load task renew on submit failure 
(#64731)
e5f3badd010 is described below

commit e5f3badd0109e312167f242df5aa53adb86806d8
Author: hui lai <[email protected]>
AuthorDate: Wed Jun 24 10:58:39 2026 +0800

    [fix](job) lock routine load task renew on submit failure (#64731)
    
    ### What problem does this PR solve?
    
    Routine load submit failures can renew a task directly from the
    scheduler after the task has begun a transaction. That path mutates the
    job's `routineLoadTaskInfoList` without holding the job write lock,
    racing with scheduler idle-slot counting that reads the same list. This
    PR protects the submit-failure renew path with the job write lock,
    matching the existing timeout and transaction-status renew paths, and
    adds unit coverage for the locking behavior.
---
 .../load/routineload/RoutineLoadTaskScheduler.java |  42 ++++++---
 .../routineload/RoutineLoadTaskSchedulerTest.java  | 103 +++++++++++++++++++++
 2 files changed, 130 insertions(+), 15 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java
index 4de5f22c026..c901cd41610 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java
@@ -250,22 +250,34 @@ public class RoutineLoadTaskScheduler extends 
MasterDaemon {
                 routineLoadTaskInfo.getBeId(), errorMsg);
         routineLoadTaskInfo.setBeId(-1);
         RoutineLoadJob routineLoadJob = 
routineLoadManager.getJob(routineLoadTaskInfo.getJobId());
-        routineLoadJob.setOtherMsg(errorMsg);
-
-        // Check if this is a resource pressure error that should not be 
immediately rescheduled
-        if (errorMsg.contains("TOO_MANY_TASKS") || 
errorMsg.contains("MEM_LIMIT_EXCEEDED")) {
-            // submit task failed (such as TOO_MANY_TASKS/MEM_LIMIT_EXCEEDED 
error),
-            // but txn has already begun. Here we will still set the 
ExecuteStartTime of
-            // this task, which means we "assume" that this task has been 
successfully submitted.
-            // And this task will then be aborted because of a timeout.
-            // In this way, we can prevent the entire job from being paused 
due to submit errors,
-            // and we can also relieve the pressure on BE by waiting for the 
timeout period.
-            
routineLoadTaskInfo.setExecuteStartTimeMs(System.currentTimeMillis());
-            return;
-        }
+        RoutineLoadTaskInfo newTask;
+
+        routineLoadJob.writeLock();
+        try {
+            routineLoadJob.setOtherMsg(errorMsg);
+
+            // Check if this is a resource pressure error that should not be 
immediately rescheduled
+            if (errorMsg.contains("TOO_MANY_TASKS") || 
errorMsg.contains("MEM_LIMIT_EXCEEDED")) {
+                // submit task failed (such as 
TOO_MANY_TASKS/MEM_LIMIT_EXCEEDED error),
+                // but txn has already begun. Here we will still set the 
ExecuteStartTime of
+                // this task, which means we "assume" that this task has been 
successfully submitted.
+                // And this task will then be aborted because of a timeout.
+                // In this way, we can prevent the entire job from being 
paused due to submit errors,
+                // and we can also relieve the pressure on BE by waiting for 
the timeout period.
+                
routineLoadTaskInfo.setExecuteStartTimeMs(System.currentTimeMillis());
+                return;
+            }
 
-        // for other errors (network issues, BE restart, etc.), reschedule 
immediately
-        RoutineLoadTaskInfo newTask = 
routineLoadJob.unprotectRenewTask(routineLoadTaskInfo, false);
+            if (routineLoadJob.getState() != JobState.RUNNING
+                    || 
!routineLoadJob.containsTask(routineLoadTaskInfo.getId())) {
+                return;
+            }
+
+            // for other errors (network issues, BE restart, etc.), reschedule 
immediately
+            newTask = routineLoadJob.unprotectRenewTask(routineLoadTaskInfo, 
false);
+        } finally {
+            routineLoadJob.writeUnlock();
+        }
         addTaskInQueue(newTask);
     }
 
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadTaskSchedulerTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadTaskSchedulerTest.java
index c3370221334..c99877c2cc4 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadTaskSchedulerTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadTaskSchedulerTest.java
@@ -33,13 +33,16 @@ import org.apache.doris.thrift.BackendService;
 import org.apache.doris.transaction.BeginTransactionException;
 import org.apache.doris.transaction.GlobalTransactionMgr;
 
+import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.MockedStatic;
 import org.mockito.Mockito;
 
+import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentMap;
@@ -108,4 +111,104 @@ public class RoutineLoadTaskSchedulerTest {
             routineLoadTaskScheduler.runAfterCatalogReady();
         }
     }
+
+    @Test
+    public void testSubmitTaskFailureRenewsTaskWithJobWriteLock() {
+        ConcurrentMap<Integer, Long> partitionIdToOffset = 
Maps.newConcurrentMap();
+        partitionIdToOffset.put(1, 100L);
+        KafkaTaskInfo routineLoadTaskInfo = new KafkaTaskInfo(new UUID(1, 1), 
1L, 20000,
+                partitionIdToOffset, false, -1, false);
+        routineLoadTaskInfo.setBeId(100L);
+
+        LockCheckingKafkaRoutineLoadJob routineLoadJob = new 
LockCheckingKafkaRoutineLoadJob();
+        Deencapsulation.setField(routineLoadJob, "state", 
RoutineLoadJob.JobState.RUNNING);
+        Deencapsulation.setField(routineLoadJob, "progress", new 
KafkaProgress(partitionIdToOffset));
+        Deencapsulation.setField(routineLoadJob, "routineLoadTaskInfoList",
+                Lists.newArrayList(routineLoadTaskInfo));
+        Mockito.when(routineLoadManager.getJob(1L)).thenReturn(routineLoadJob);
+
+        RoutineLoadTaskScheduler routineLoadTaskScheduler = new 
RoutineLoadTaskScheduler(routineLoadManager);
+        Deencapsulation.invoke(routineLoadTaskScheduler, 
"handleSubmitTaskFailure",
+                routineLoadTaskInfo, "network error");
+
+        Assert.assertTrue(routineLoadJob.isRenewCalledWithWriteLock());
+        List<RoutineLoadTaskInfo> routineLoadTaskInfoList =
+                Deencapsulation.getField(routineLoadJob, 
"routineLoadTaskInfoList");
+        Assert.assertEquals(1, routineLoadTaskInfoList.size());
+        Assert.assertNotSame(routineLoadTaskInfo, 
routineLoadTaskInfoList.get(0));
+
+        LinkedBlockingDeque<RoutineLoadTaskInfo> needScheduleTasksQueue =
+                Deencapsulation.getField(routineLoadTaskScheduler, 
"needScheduleTasksQueue");
+        Assert.assertSame(routineLoadTaskInfoList.get(0), 
needScheduleTasksQueue.peek());
+    }
+
+    @Test
+    public void testSubmitTaskFailureSkipsRenewWhenTaskRemoved() {
+        ConcurrentMap<Integer, Long> partitionIdToOffset = 
Maps.newConcurrentMap();
+        partitionIdToOffset.put(1, 100L);
+        KafkaTaskInfo routineLoadTaskInfo = new KafkaTaskInfo(new UUID(1, 1), 
1L, 20000,
+                partitionIdToOffset, false, -1, false);
+        routineLoadTaskInfo.setBeId(100L);
+
+        LockCheckingKafkaRoutineLoadJob routineLoadJob = new 
LockCheckingKafkaRoutineLoadJob();
+        Deencapsulation.setField(routineLoadJob, "state", 
RoutineLoadJob.JobState.RUNNING);
+        Deencapsulation.setField(routineLoadJob, "progress", new 
KafkaProgress(partitionIdToOffset));
+        Deencapsulation.setField(routineLoadJob, "routineLoadTaskInfoList", 
Lists.newArrayList());
+        Mockito.when(routineLoadManager.getJob(1L)).thenReturn(routineLoadJob);
+
+        RoutineLoadTaskScheduler routineLoadTaskScheduler = new 
RoutineLoadTaskScheduler(routineLoadManager);
+        Deencapsulation.invoke(routineLoadTaskScheduler, 
"handleSubmitTaskFailure",
+                routineLoadTaskInfo, "network error");
+
+        Assert.assertFalse(routineLoadJob.isRenewCalled());
+        LinkedBlockingDeque<RoutineLoadTaskInfo> needScheduleTasksQueue =
+                Deencapsulation.getField(routineLoadTaskScheduler, 
"needScheduleTasksQueue");
+        Assert.assertTrue(needScheduleTasksQueue.isEmpty());
+    }
+
+    @Test
+    public void testSubmitTaskFailureSkipsRenewWhenJobPaused() {
+        ConcurrentMap<Integer, Long> partitionIdToOffset = 
Maps.newConcurrentMap();
+        partitionIdToOffset.put(1, 100L);
+        KafkaTaskInfo routineLoadTaskInfo = new KafkaTaskInfo(new UUID(1, 1), 
1L, 20000,
+                partitionIdToOffset, false, -1, false);
+        routineLoadTaskInfo.setBeId(100L);
+
+        LockCheckingKafkaRoutineLoadJob routineLoadJob = new 
LockCheckingKafkaRoutineLoadJob();
+        Deencapsulation.setField(routineLoadJob, "state", 
RoutineLoadJob.JobState.PAUSED);
+        Deencapsulation.setField(routineLoadJob, "progress", new 
KafkaProgress(partitionIdToOffset));
+        Deencapsulation.setField(routineLoadJob, "routineLoadTaskInfoList",
+                Lists.newArrayList(routineLoadTaskInfo));
+        Mockito.when(routineLoadManager.getJob(1L)).thenReturn(routineLoadJob);
+
+        RoutineLoadTaskScheduler routineLoadTaskScheduler = new 
RoutineLoadTaskScheduler(routineLoadManager);
+        Deencapsulation.invoke(routineLoadTaskScheduler, 
"handleSubmitTaskFailure",
+                routineLoadTaskInfo, "network error");
+
+        Assert.assertFalse(routineLoadJob.isRenewCalled());
+        LinkedBlockingDeque<RoutineLoadTaskInfo> needScheduleTasksQueue =
+                Deencapsulation.getField(routineLoadTaskScheduler, 
"needScheduleTasksQueue");
+        Assert.assertTrue(needScheduleTasksQueue.isEmpty());
+    }
+
+    private static class LockCheckingKafkaRoutineLoadJob extends 
KafkaRoutineLoadJob {
+        private boolean renewCalled;
+        private boolean renewCalledWithWriteLock;
+
+        @Override
+        protected RoutineLoadTaskInfo unprotectRenewTask(RoutineLoadTaskInfo 
routineLoadTaskInfo,
+                boolean delaySchedule) {
+            renewCalled = true;
+            renewCalledWithWriteLock = lock.isWriteLockedByCurrentThread();
+            return super.unprotectRenewTask(routineLoadTaskInfo, 
delaySchedule);
+        }
+
+        private boolean isRenewCalled() {
+            return renewCalled;
+        }
+
+        private boolean isRenewCalledWithWriteLock() {
+            return renewCalledWithWriteLock;
+        }
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to