This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.1 by this push:
     new 3edfeb78815 branch-4.1: [fix](job) lock routine load task renew on 
submit failure (#65007)
3edfeb78815 is described below

commit 3edfeb78815c266cfd9635e7c4a1ac025204b915
Author: hui lai <[email protected]>
AuthorDate: Tue Jun 30 19:38:47 2026 +0800

    branch-4.1: [fix](job) lock routine load task renew on submit failure 
(#65007)
    
    pick https://github.com/apache/doris/pull/64731
---
 .../load/routineload/RoutineLoadTaskScheduler.java |  42 +++++---
 .../routineload/RoutineLoadTaskSchedulerTest.java  | 118 +++++++++++++++++++++
 2 files changed, 145 insertions(+), 15 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java
index 4156410ef86..e1c4ce2b3dc 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java
@@ -250,22 +250,34 @@ public class RoutineLoadTaskScheduler extends 
MasterDaemon {
                 routineLoadTaskInfo.getBeId(), errorMsg);
         routineLoadTaskInfo.setBeId(-1);
         RoutineLoadJob routineLoadJob = 
routineLoadManager.getJob(routineLoadTaskInfo.getJobId());
-        routineLoadJob.setOtherMsg(errorMsg);
-
-        // Check if this is a resource pressure error that should not be 
immediately rescheduled
-        if (errorMsg.contains("TOO_MANY_TASKS") || 
errorMsg.contains("MEM_LIMIT_EXCEEDED")) {
-            // submit task failed (such as TOO_MANY_TASKS/MEM_LIMIT_EXCEEDED 
error),
-            // but txn has already begun. Here we will still set the 
ExecuteStartTime of
-            // this task, which means we "assume" that this task has been 
successfully submitted.
-            // And this task will then be aborted because of a timeout.
-            // In this way, we can prevent the entire job from being paused 
due to submit errors,
-            // and we can also relieve the pressure on BE by waiting for the 
timeout period.
-            
routineLoadTaskInfo.setExecuteStartTimeMs(System.currentTimeMillis());
-            return;
-        }
+        RoutineLoadTaskInfo newTask;
+
+        routineLoadJob.writeLock();
+        try {
+            routineLoadJob.setOtherMsg(errorMsg);
+
+            // Check if this is a resource pressure error that should not be 
immediately rescheduled
+            if (errorMsg.contains("TOO_MANY_TASKS") || 
errorMsg.contains("MEM_LIMIT_EXCEEDED")) {
+                // submit task failed (such as 
TOO_MANY_TASKS/MEM_LIMIT_EXCEEDED error),
+                // but txn has already begun. Here we will still set the 
ExecuteStartTime of
+                // this task, which means we "assume" that this task has been 
successfully submitted.
+                // And this task will then be aborted because of a timeout.
+                // In this way, we can prevent the entire job from being 
paused due to submit errors,
+                // and we can also relieve the pressure on BE by waiting for 
the timeout period.
+                
routineLoadTaskInfo.setExecuteStartTimeMs(System.currentTimeMillis());
+                return;
+            }
 
-        // for other errors (network issues, BE restart, etc.), reschedule 
immediately
-        RoutineLoadTaskInfo newTask = 
routineLoadJob.unprotectRenewTask(routineLoadTaskInfo, false);
+            if (routineLoadJob.getState() != JobState.RUNNING
+                    || 
!routineLoadJob.containsTask(routineLoadTaskInfo.getId())) {
+                return;
+            }
+
+            // for other errors (network issues, BE restart, etc.), reschedule 
immediately
+            newTask = routineLoadJob.unprotectRenewTask(routineLoadTaskInfo, 
false);
+        } finally {
+            routineLoadJob.writeUnlock();
+        }
         addTaskInQueue(newTask);
     }
 
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadTaskSchedulerTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadTaskSchedulerTest.java
index 6e11fc5f71a..4a302b68da9 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadTaskSchedulerTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadTaskSchedulerTest.java
@@ -31,12 +31,15 @@ import org.apache.doris.thrift.BackendService;
 import org.apache.doris.transaction.BeginTransactionException;
 import org.apache.doris.transaction.GlobalTransactionMgr;
 
+import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import mockit.Expectations;
 import mockit.Injectable;
 import mockit.Mocked;
+import org.junit.Assert;
 import org.junit.Test;
 
+import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentMap;
@@ -119,4 +122,119 @@ public class RoutineLoadTaskSchedulerTest {
         Deencapsulation.setField(routineLoadTaskScheduler, 
"needScheduleTasksQueue", routineLoadTaskInfoQueue);
         routineLoadTaskScheduler.runAfterCatalogReady();
     }
+
+    @Test
+    public void testSubmitTaskFailureRenewsTaskWithJobWriteLock() {
+        ConcurrentMap<Integer, Long> partitionIdToOffset = 
Maps.newConcurrentMap();
+        partitionIdToOffset.put(1, 100L);
+        KafkaTaskInfo routineLoadTaskInfo = new KafkaTaskInfo(new UUID(1, 1), 
1L, 20000,
+                partitionIdToOffset, false, -1, false);
+        routineLoadTaskInfo.setBeId(100L);
+
+        LockCheckingKafkaRoutineLoadJob routineLoadJob = new 
LockCheckingKafkaRoutineLoadJob();
+        Deencapsulation.setField(routineLoadJob, "state", 
RoutineLoadJob.JobState.RUNNING);
+        Deencapsulation.setField(routineLoadJob, "progress", new 
KafkaProgress(partitionIdToOffset));
+        Deencapsulation.setField(routineLoadJob, "routineLoadTaskInfoList",
+                Lists.newArrayList(routineLoadTaskInfo));
+        new Expectations() {
+            {
+                routineLoadManager.getJob(1L);
+                result = routineLoadJob;
+            }
+        };
+
+        RoutineLoadTaskScheduler routineLoadTaskScheduler = new 
RoutineLoadTaskScheduler(routineLoadManager);
+        Deencapsulation.invoke(routineLoadTaskScheduler, 
"handleSubmitTaskFailure",
+                routineLoadTaskInfo, "network error");
+
+        Assert.assertTrue(routineLoadJob.isRenewCalledWithWriteLock());
+        List<RoutineLoadTaskInfo> routineLoadTaskInfoList =
+                Deencapsulation.getField(routineLoadJob, 
"routineLoadTaskInfoList");
+        Assert.assertEquals(1, routineLoadTaskInfoList.size());
+        Assert.assertNotSame(routineLoadTaskInfo, 
routineLoadTaskInfoList.get(0));
+
+        LinkedBlockingDeque<RoutineLoadTaskInfo> needScheduleTasksQueue =
+                Deencapsulation.getField(routineLoadTaskScheduler, 
"needScheduleTasksQueue");
+        Assert.assertSame(routineLoadTaskInfoList.get(0), 
needScheduleTasksQueue.peek());
+    }
+
+    @Test
+    public void testSubmitTaskFailureSkipsRenewWhenTaskRemoved() {
+        ConcurrentMap<Integer, Long> partitionIdToOffset = 
Maps.newConcurrentMap();
+        partitionIdToOffset.put(1, 100L);
+        KafkaTaskInfo routineLoadTaskInfo = new KafkaTaskInfo(new UUID(1, 1), 
1L, 20000,
+                partitionIdToOffset, false, -1, false);
+        routineLoadTaskInfo.setBeId(100L);
+
+        LockCheckingKafkaRoutineLoadJob routineLoadJob = new 
LockCheckingKafkaRoutineLoadJob();
+        Deencapsulation.setField(routineLoadJob, "state", 
RoutineLoadJob.JobState.RUNNING);
+        Deencapsulation.setField(routineLoadJob, "progress", new 
KafkaProgress(partitionIdToOffset));
+        Deencapsulation.setField(routineLoadJob, "routineLoadTaskInfoList", 
Lists.newArrayList());
+        new Expectations() {
+            {
+                routineLoadManager.getJob(1L);
+                result = routineLoadJob;
+            }
+        };
+
+        RoutineLoadTaskScheduler routineLoadTaskScheduler = new 
RoutineLoadTaskScheduler(routineLoadManager);
+        Deencapsulation.invoke(routineLoadTaskScheduler, 
"handleSubmitTaskFailure",
+                routineLoadTaskInfo, "network error");
+
+        Assert.assertFalse(routineLoadJob.isRenewCalled());
+        LinkedBlockingDeque<RoutineLoadTaskInfo> needScheduleTasksQueue =
+                Deencapsulation.getField(routineLoadTaskScheduler, 
"needScheduleTasksQueue");
+        Assert.assertTrue(needScheduleTasksQueue.isEmpty());
+    }
+
+    @Test
+    public void testSubmitTaskFailureSkipsRenewWhenJobPaused() {
+        ConcurrentMap<Integer, Long> partitionIdToOffset = 
Maps.newConcurrentMap();
+        partitionIdToOffset.put(1, 100L);
+        KafkaTaskInfo routineLoadTaskInfo = new KafkaTaskInfo(new UUID(1, 1), 
1L, 20000,
+                partitionIdToOffset, false, -1, false);
+        routineLoadTaskInfo.setBeId(100L);
+
+        LockCheckingKafkaRoutineLoadJob routineLoadJob = new 
LockCheckingKafkaRoutineLoadJob();
+        Deencapsulation.setField(routineLoadJob, "state", 
RoutineLoadJob.JobState.PAUSED);
+        Deencapsulation.setField(routineLoadJob, "progress", new 
KafkaProgress(partitionIdToOffset));
+        Deencapsulation.setField(routineLoadJob, "routineLoadTaskInfoList",
+                Lists.newArrayList(routineLoadTaskInfo));
+        new Expectations() {
+            {
+                routineLoadManager.getJob(1L);
+                result = routineLoadJob;
+            }
+        };
+
+        RoutineLoadTaskScheduler routineLoadTaskScheduler = new 
RoutineLoadTaskScheduler(routineLoadManager);
+        Deencapsulation.invoke(routineLoadTaskScheduler, 
"handleSubmitTaskFailure",
+                routineLoadTaskInfo, "network error");
+
+        Assert.assertFalse(routineLoadJob.isRenewCalled());
+        LinkedBlockingDeque<RoutineLoadTaskInfo> needScheduleTasksQueue =
+                Deencapsulation.getField(routineLoadTaskScheduler, 
"needScheduleTasksQueue");
+        Assert.assertTrue(needScheduleTasksQueue.isEmpty());
+    }
+
+    private static class LockCheckingKafkaRoutineLoadJob extends 
KafkaRoutineLoadJob {
+        private boolean renewCalled;
+        private boolean renewCalledWithWriteLock;
+
+        @Override
+        protected RoutineLoadTaskInfo unprotectRenewTask(RoutineLoadTaskInfo 
routineLoadTaskInfo,
+                boolean delaySchedule) {
+            renewCalled = true;
+            renewCalledWithWriteLock = lock.isWriteLockedByCurrentThread();
+            return super.unprotectRenewTask(routineLoadTaskInfo, 
delaySchedule);
+        }
+
+        private boolean isRenewCalled() {
+            return renewCalled;
+        }
+
+        private boolean isRenewCalledWithWriteLock() {
+            return renewCalledWithWriteLock;
+        }
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to