This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new bc6759fc8d9 [branch-2.0](routine-load) optimize routine load task
schedule to make consume real-time (#31273) (#37431)
bc6759fc8d9 is described below
commit bc6759fc8d9768238117f05942609ea83c86a720
Author: hui lai <[email protected]>
AuthorDate: Mon Jul 8 17:16:39 2024 +0800
[branch-2.0](routine-load) optimize routine load task schedule to make
consume real-time (#31273) (#37431)
pick from: #31273
---
.../doris/load/routineload/RoutineLoadTaskScheduler.java | 13 ++++++-------
.../load/routineload/RoutineLoadTaskSchedulerTest.java | 7 +++----
2 files changed, 9 insertions(+), 11 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java
index 10eec92b5e3..e310116fbd6 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java
@@ -38,12 +38,11 @@ import org.apache.doris.thrift.TStatusCode;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
-import com.google.common.collect.Queues;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.List;
-import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.LinkedBlockingDeque;
/**
* Routine load task scheduler is a function which allocate task to be.
@@ -61,7 +60,7 @@ public class RoutineLoadTaskScheduler extends MasterDaemon {
private static final long SLOT_FULL_SLEEP_MS = 10000; // 10s
private RoutineLoadManager routineLoadManager;
- private LinkedBlockingQueue<RoutineLoadTaskInfo> needScheduleTasksQueue =
Queues.newLinkedBlockingQueue();
+ private LinkedBlockingDeque<RoutineLoadTaskInfo> needScheduleTasksQueue =
new LinkedBlockingDeque<>();
private long lastBackendSlotUpdateTime = -1;
@@ -105,7 +104,7 @@ public class RoutineLoadTaskScheduler extends MasterDaemon {
if (System.currentTimeMillis() -
routineLoadTaskInfo.getLastScheduledTime()
< routineLoadTaskInfo.getTimeoutMs()) {
// try to delay scheduling this task for 'timeout', to void
too many failure
- needScheduleTasksQueue.put(routineLoadTaskInfo);
+ needScheduleTasksQueue.addLast(routineLoadTaskInfo);
return;
}
scheduleOneTask(routineLoadTaskInfo);
@@ -131,7 +130,7 @@ public class RoutineLoadTaskScheduler extends MasterDaemon {
try {
// check if topic has more data to consume
if (!routineLoadTaskInfo.hasMoreDataToConsume()) {
- needScheduleTasksQueue.put(routineLoadTaskInfo);
+ needScheduleTasksQueue.addLast(routineLoadTaskInfo);
return;
}
@@ -139,7 +138,7 @@ public class RoutineLoadTaskScheduler extends MasterDaemon {
// this should be done before txn begin, or the txn may be begun
successfully but failed to be allocated.
if (!allocateTaskToBe(routineLoadTaskInfo)) {
// allocate failed, push it back to the queue to wait next
scheduling
- needScheduleTasksQueue.put(routineLoadTaskInfo);
+ needScheduleTasksQueue.addFirst(routineLoadTaskInfo);
return;
}
} catch (UserException e) {
@@ -162,7 +161,7 @@ public class RoutineLoadTaskScheduler extends MasterDaemon {
// begin txn failed. push it back to the queue to wait next
scheduling
// set BE id to -1 to release the BE slot
routineLoadTaskInfo.setBeId(-1);
- needScheduleTasksQueue.put(routineLoadTaskInfo);
+ needScheduleTasksQueue.addFirst(routineLoadTaskInfo);
return;
}
} catch (Exception e) {
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadTaskSchedulerTest.java
b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadTaskSchedulerTest.java
index 02db47538fb..0ce694bfb11 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadTaskSchedulerTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadTaskSchedulerTest.java
@@ -32,15 +32,14 @@ import
org.apache.doris.transaction.BeginTransactionException;
import org.apache.doris.transaction.GlobalTransactionMgr;
import com.google.common.collect.Maps;
-import com.google.common.collect.Queues;
import mockit.Expectations;
import mockit.Injectable;
import mockit.Mocked;
import org.junit.Test;
import java.util.Map;
-import java.util.Queue;
import java.util.UUID;
+import java.util.concurrent.LinkedBlockingDeque;
public class RoutineLoadTaskSchedulerTest {
@@ -68,10 +67,10 @@ public class RoutineLoadTaskSchedulerTest {
KafkaProgress kafkaProgress = new KafkaProgress();
Deencapsulation.setField(kafkaProgress, "partitionIdToOffset",
partitionIdToOffset);
- Queue<RoutineLoadTaskInfo> routineLoadTaskInfoQueue =
Queues.newLinkedBlockingQueue();
+ LinkedBlockingDeque<RoutineLoadTaskInfo> routineLoadTaskInfoQueue =
new LinkedBlockingDeque<>();
KafkaTaskInfo routineLoadTaskInfo1 = new KafkaTaskInfo(new UUID(1, 1),
1L, "default_cluster", 20000, 0,
partitionIdToOffset, false);
- routineLoadTaskInfoQueue.add(routineLoadTaskInfo1);
+ routineLoadTaskInfoQueue.addFirst(routineLoadTaskInfo1);
Map<Long, RoutineLoadTaskInfo> idToRoutineLoadTask = Maps.newHashMap();
idToRoutineLoadTask.put(1L, routineLoadTaskInfo1);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]