This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 4304a947e [Fix] [ResourceManager] Fix ResourceManager Assign Task Not 
Random (#4078)
4304a947e is described below

commit 4304a947e956d2894ebe8abdaf3adf009826945b
Author: Hisoka <[email protected]>
AuthorDate: Wed Feb 8 09:59:54 2023 +0800

    [Fix] [ResourceManager] Fix ResourceManager Assign Task Not Random (#4078)
---
 .../engine/server/resourcemanager/ResourceRequestHandler.java    | 9 ++++++---
 1 file changed, 6 insertions(+), 3 deletions(-)

diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceRequestHandler.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceRequestHandler.java
index b45306645..77f4f871c 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceRequestHandler.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceRequestHandler.java
@@ -33,6 +33,7 @@ import 
com.hazelcast.spi.impl.operationservice.impl.InvocationFuture;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
 import java.util.Optional;
@@ -143,14 +144,16 @@ public class ResourceRequestHandler {
     }
 
     private Optional<WorkerProfile> preCheckWorkerResource(ResourceProfile r) {
+        // Shuffle the order to ensure random selection of workers
+        List<WorkerProfile> workerProfiles = 
Arrays.asList(registerWorker.values().toArray(new WorkerProfile[0]));
+        Collections.shuffle(workerProfiles);
         // Check if there are still unassigned slots
         Optional<WorkerProfile> workerProfile =
-            registerWorker.values().stream().filter(worker -> 
Arrays.stream(worker.getUnassignedSlots()).anyMatch(slot -> 
slot.getResourceProfile().enoughThan(r))).findAny();
+            workerProfiles.stream().filter(worker -> 
Arrays.stream(worker.getUnassignedSlots()).anyMatch(slot -> 
slot.getResourceProfile().enoughThan(r))).findAny();
 
         if (!workerProfile.isPresent()) {
             // Check if there are still unassigned resources
-            workerProfile =
-                registerWorker.values().stream().filter(worker -> 
worker.getUnassignedResource().enoughThan(r)).findAny();
+            workerProfile = workerProfiles.stream().filter(worker -> 
worker.getUnassignedResource().enoughThan(r)).findAny();
         }
 
         return workerProfile;

Reply via email to