This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
commit df4b289825907882033d71279260229df17335b8 Author: HHoflittlefish777 <[email protected]> AuthorDate: Thu Feb 29 14:57:52 2024 +0800 fix total task exec time is far more than actual (#31273) --- .../doris/load/routineload/RoutineLoadTaskScheduler.java | 13 ++++++------- .../load/routineload/RoutineLoadTaskSchedulerTest.java | 7 +++---- 2 files changed, 9 insertions(+), 11 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java index b4661ba32b4..a3e20b46bdc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java @@ -38,12 +38,11 @@ import org.apache.doris.thrift.TStatusCode; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; -import com.google.common.collect.Queues; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.util.List; -import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.LinkedBlockingDeque; /** * Routine load task scheduler is a function which allocate task to be. @@ -61,7 +60,7 @@ public class RoutineLoadTaskScheduler extends MasterDaemon { private static final long SLOT_FULL_SLEEP_MS = 10000; // 10s private RoutineLoadManager routineLoadManager; - private LinkedBlockingQueue<RoutineLoadTaskInfo> needScheduleTasksQueue = Queues.newLinkedBlockingQueue(); + private LinkedBlockingDeque<RoutineLoadTaskInfo> needScheduleTasksQueue = new LinkedBlockingDeque<>(); private long lastBackendSlotUpdateTime = -1; @@ -105,7 +104,7 @@ public class RoutineLoadTaskScheduler extends MasterDaemon { if (System.currentTimeMillis() - routineLoadTaskInfo.getLastScheduledTime() < routineLoadTaskInfo.getTimeoutMs()) { // try to delay scheduling this task for 'timeout', to void too many failure - needScheduleTasksQueue.put(routineLoadTaskInfo); + needScheduleTasksQueue.addLast(routineLoadTaskInfo); return; } scheduleOneTask(routineLoadTaskInfo); @@ -133,7 +132,7 @@ public class RoutineLoadTaskScheduler extends MasterDaemon { try { // check if topic has more data to consume if (!routineLoadTaskInfo.hasMoreDataToConsume()) { - needScheduleTasksQueue.put(routineLoadTaskInfo); + needScheduleTasksQueue.addLast(routineLoadTaskInfo); return; } @@ -141,7 +140,7 @@ public class RoutineLoadTaskScheduler extends MasterDaemon { // this should be done before txn begin, or the txn may be begun successfully but failed to be allocated. if (!allocateTaskToBe(routineLoadTaskInfo)) { // allocate failed, push it back to the queue to wait next scheduling - needScheduleTasksQueue.put(routineLoadTaskInfo); + needScheduleTasksQueue.addFirst(routineLoadTaskInfo); return; } } catch (UserException e) { @@ -164,7 +163,7 @@ public class RoutineLoadTaskScheduler extends MasterDaemon { // begin txn failed. push it back to the queue to wait next scheduling // set BE id to -1 to release the BE slot routineLoadTaskInfo.setBeId(-1); - needScheduleTasksQueue.put(routineLoadTaskInfo); + needScheduleTasksQueue.addFirst(routineLoadTaskInfo); return; } } catch (Exception e) { diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadTaskSchedulerTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadTaskSchedulerTest.java index bdbe603babd..c1f5731329f 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadTaskSchedulerTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadTaskSchedulerTest.java @@ -32,15 +32,14 @@ import org.apache.doris.transaction.BeginTransactionException; import org.apache.doris.transaction.GlobalTransactionMgr; import com.google.common.collect.Maps; -import com.google.common.collect.Queues; import mockit.Expectations; import mockit.Injectable; import mockit.Mocked; import org.junit.Test; import java.util.Map; -import java.util.Queue; import java.util.UUID; +import java.util.concurrent.LinkedBlockingDeque; public class RoutineLoadTaskSchedulerTest { @@ -68,10 +67,10 @@ public class RoutineLoadTaskSchedulerTest { KafkaProgress kafkaProgress = new KafkaProgress(); Deencapsulation.setField(kafkaProgress, "partitionIdToOffset", partitionIdToOffset); - Queue<RoutineLoadTaskInfo> routineLoadTaskInfoQueue = Queues.newLinkedBlockingQueue(); + LinkedBlockingDeque<RoutineLoadTaskInfo> routineLoadTaskInfoQueue = new LinkedBlockingDeque<>(); KafkaTaskInfo routineLoadTaskInfo1 = new KafkaTaskInfo(new UUID(1, 1), 1L, 20000, partitionIdToOffset, false); - routineLoadTaskInfoQueue.add(routineLoadTaskInfo1); + routineLoadTaskInfoQueue.addFirst(routineLoadTaskInfo1); Map<Long, RoutineLoadTaskInfo> idToRoutineLoadTask = Maps.newHashMap(); idToRoutineLoadTask.put(1L, routineLoadTaskInfo1); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
