This is an automated email from the ASF dual-hosted git repository.
liaoxin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new d4c6ae54c4c [opt](routine-load) optimize routine load task allocation
algorithm (#34778)
d4c6ae54c4c is described below
commit d4c6ae54c4c77b8db08281b02924b1314011b6d7
Author: HHoflittlefish777 <[email protected]>
AuthorDate: Fri May 17 11:49:22 2024 +0800
[opt](routine-load) optimize routine load task allocation algorithm (#34778)
---
.../doris/load/routineload/RoutineLoadManager.java | 18 ++++++++++++------
1 file changed, 12 insertions(+), 6 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
index ba54914d672..871781737d5 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
@@ -489,6 +489,7 @@ public class RoutineLoadManager implements Writable {
readLock();
try {
Map<Long, Integer> beIdToConcurrentTasks =
getBeCurrentTasksNumMap();
+ int previousBeIdleTaskNum = 0;
// 1. Find if the given BE id has more than half of available slots
if (previousBeId != -1L && availableBeIds.contains(previousBeId)) {
@@ -496,22 +497,22 @@ public class RoutineLoadManager implements Writable {
Backend previousBackend =
Env.getCurrentSystemInfo().getBackend(previousBeId);
// check previousBackend is not null && load available
if (previousBackend != null &&
previousBackend.isLoadAvailable()) {
- int idleTaskNum = 0;
if (!beIdToMaxConcurrentTasks.containsKey(previousBeId)) {
- idleTaskNum = 0;
+ previousBeIdleTaskNum = 0;
} else if
(beIdToConcurrentTasks.containsKey(previousBeId)) {
- idleTaskNum =
beIdToMaxConcurrentTasks.get(previousBeId)
+ previousBeIdleTaskNum =
beIdToMaxConcurrentTasks.get(previousBeId)
- beIdToConcurrentTasks.get(previousBeId);
} else {
- idleTaskNum =
beIdToMaxConcurrentTasks.get(previousBeId);
+ previousBeIdleTaskNum =
beIdToMaxConcurrentTasks.get(previousBeId);
}
- if (idleTaskNum > (Config.max_routine_load_task_num_per_be
>> 1)) {
+ if (previousBeIdleTaskNum ==
Config.max_routine_load_task_num_per_be) {
return previousBeId;
}
}
}
- // 2. The given BE id does not have available slots, find a BE
with min tasks
+ // 2. we believe that the benefits of load balance outweigh the
benefits of object pool cache,
+ // so we try to find the one with the most idle slots as much
as possible
// 3. The previous BE is not in cluster && is not load available,
find a new BE with min tasks
int idleTaskNum = 0;
long resultBeId = -1L;
@@ -531,6 +532,11 @@ public class RoutineLoadManager implements Writable {
maxIdleSlotNum = Math.max(maxIdleSlotNum, idleTaskNum);
}
}
+ // 4. on the basis of selecting the maximum idle slot be,
+ // try to reuse the object cache as much as possible
+ if (previousBeIdleTaskNum == maxIdleSlotNum) {
+ return previousBeId;
+ }
return resultBeId;
} finally {
readUnlock();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]