This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git

commit df4b289825907882033d71279260229df17335b8
Author: HHoflittlefish777 <[email protected]>
AuthorDate: Thu Feb 29 14:57:52 2024 +0800

    fix total task exec time is far more than actual (#31273)
---
 .../doris/load/routineload/RoutineLoadTaskScheduler.java    | 13 ++++++-------
 .../load/routineload/RoutineLoadTaskSchedulerTest.java      |  7 +++----
 2 files changed, 9 insertions(+), 11 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java
index b4661ba32b4..a3e20b46bdc 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java
@@ -38,12 +38,11 @@ import org.apache.doris.thrift.TStatusCode;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
-import com.google.common.collect.Queues;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
 import java.util.List;
-import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.LinkedBlockingDeque;
 
 /**
  * Routine load task scheduler is a function which allocate task to be.
@@ -61,7 +60,7 @@ public class RoutineLoadTaskScheduler extends MasterDaemon {
     private static final long SLOT_FULL_SLEEP_MS = 10000; // 10s
 
     private RoutineLoadManager routineLoadManager;
-    private LinkedBlockingQueue<RoutineLoadTaskInfo> needScheduleTasksQueue = 
Queues.newLinkedBlockingQueue();
+    private LinkedBlockingDeque<RoutineLoadTaskInfo> needScheduleTasksQueue = 
new LinkedBlockingDeque<>();
 
     private long lastBackendSlotUpdateTime = -1;
 
@@ -105,7 +104,7 @@ public class RoutineLoadTaskScheduler extends MasterDaemon {
             if (System.currentTimeMillis() - 
routineLoadTaskInfo.getLastScheduledTime()
                     < routineLoadTaskInfo.getTimeoutMs()) {
                 // try to delay scheduling this task for 'timeout', to void 
too many failure
-                needScheduleTasksQueue.put(routineLoadTaskInfo);
+                needScheduleTasksQueue.addLast(routineLoadTaskInfo);
                 return;
             }
             scheduleOneTask(routineLoadTaskInfo);
@@ -133,7 +132,7 @@ public class RoutineLoadTaskScheduler extends MasterDaemon {
         try {
             // check if topic has more data to consume
             if (!routineLoadTaskInfo.hasMoreDataToConsume()) {
-                needScheduleTasksQueue.put(routineLoadTaskInfo);
+                needScheduleTasksQueue.addLast(routineLoadTaskInfo);
                 return;
             }
 
@@ -141,7 +140,7 @@ public class RoutineLoadTaskScheduler extends MasterDaemon {
             // this should be done before txn begin, or the txn may be begun 
successfully but failed to be allocated.
             if (!allocateTaskToBe(routineLoadTaskInfo)) {
                 // allocate failed, push it back to the queue to wait next 
scheduling
-                needScheduleTasksQueue.put(routineLoadTaskInfo);
+                needScheduleTasksQueue.addFirst(routineLoadTaskInfo);
                 return;
             }
         } catch (UserException e) {
@@ -164,7 +163,7 @@ public class RoutineLoadTaskScheduler extends MasterDaemon {
                 // begin txn failed. push it back to the queue to wait next 
scheduling
                 // set BE id to -1 to release the BE slot
                 routineLoadTaskInfo.setBeId(-1);
-                needScheduleTasksQueue.put(routineLoadTaskInfo);
+                needScheduleTasksQueue.addFirst(routineLoadTaskInfo);
                 return;
             }
         } catch (Exception e) {
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadTaskSchedulerTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadTaskSchedulerTest.java
index bdbe603babd..c1f5731329f 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadTaskSchedulerTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadTaskSchedulerTest.java
@@ -32,15 +32,14 @@ import 
org.apache.doris.transaction.BeginTransactionException;
 import org.apache.doris.transaction.GlobalTransactionMgr;
 
 import com.google.common.collect.Maps;
-import com.google.common.collect.Queues;
 import mockit.Expectations;
 import mockit.Injectable;
 import mockit.Mocked;
 import org.junit.Test;
 
 import java.util.Map;
-import java.util.Queue;
 import java.util.UUID;
+import java.util.concurrent.LinkedBlockingDeque;
 
 public class RoutineLoadTaskSchedulerTest {
 
@@ -68,10 +67,10 @@ public class RoutineLoadTaskSchedulerTest {
         KafkaProgress kafkaProgress = new KafkaProgress();
         Deencapsulation.setField(kafkaProgress, "partitionIdToOffset", 
partitionIdToOffset);
 
-        Queue<RoutineLoadTaskInfo> routineLoadTaskInfoQueue = 
Queues.newLinkedBlockingQueue();
+        LinkedBlockingDeque<RoutineLoadTaskInfo> routineLoadTaskInfoQueue = 
new LinkedBlockingDeque<>();
         KafkaTaskInfo routineLoadTaskInfo1 = new KafkaTaskInfo(new UUID(1, 1), 
1L, 20000,
                 partitionIdToOffset, false);
-        routineLoadTaskInfoQueue.add(routineLoadTaskInfo1);
+        routineLoadTaskInfoQueue.addFirst(routineLoadTaskInfo1);
 
         Map<Long, RoutineLoadTaskInfo> idToRoutineLoadTask = Maps.newHashMap();
         idToRoutineLoadTask.put(1L, routineLoadTaskInfo1);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to