This is an automated email from the ASF dual-hosted git repository.
gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 4304a947e [Fix] [ResourceManager] Fix ResourceManager Assign Task Not
Random (#4078)
4304a947e is described below
commit 4304a947e956d2894ebe8abdaf3adf009826945b
Author: Hisoka <[email protected]>
AuthorDate: Wed Feb 8 09:59:54 2023 +0800
[Fix] [ResourceManager] Fix ResourceManager Assign Task Not Random (#4078)
---
.../engine/server/resourcemanager/ResourceRequestHandler.java | 9 ++++++---
1 file changed, 6 insertions(+), 3 deletions(-)
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceRequestHandler.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceRequestHandler.java
index b45306645..77f4f871c 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceRequestHandler.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceRequestHandler.java
@@ -33,6 +33,7 @@ import
com.hazelcast.spi.impl.operationservice.impl.InvocationFuture;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
@@ -143,14 +144,16 @@ public class ResourceRequestHandler {
}
private Optional<WorkerProfile> preCheckWorkerResource(ResourceProfile r) {
+ // Shuffle the order to ensure random selection of workers
+ List<WorkerProfile> workerProfiles =
Arrays.asList(registerWorker.values().toArray(new WorkerProfile[0]));
+ Collections.shuffle(workerProfiles);
// Check if there are still unassigned slots
Optional<WorkerProfile> workerProfile =
- registerWorker.values().stream().filter(worker ->
Arrays.stream(worker.getUnassignedSlots()).anyMatch(slot ->
slot.getResourceProfile().enoughThan(r))).findAny();
+ workerProfiles.stream().filter(worker ->
Arrays.stream(worker.getUnassignedSlots()).anyMatch(slot ->
slot.getResourceProfile().enoughThan(r))).findAny();
if (!workerProfile.isPresent()) {
// Check if there are still unassigned resources
- workerProfile =
- registerWorker.values().stream().filter(worker ->
worker.getUnassignedResource().enoughThan(r)).findAny();
+ workerProfile = workerProfiles.stream().filter(worker ->
worker.getUnassignedResource().enoughThan(r)).findAny();
}
return workerProfile;