This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 9a49c881d6 [CI] Fix FixSlotResourceTest testNotEnoughResource test
error (#6820)
9a49c881d6 is described below
commit 9a49c881d69394ad5ed8a306d671cd754a5f2b31
Author: Eric <[email protected]>
AuthorDate: Thu May 9 21:01:47 2024 +0800
[CI] Fix FixSlotResourceTest testNotEnoughResource test error (#6820)
---
.../server/resourcemanager/AbstractResourceManager.java | 7 +++++++
.../engine/server/resourcemanager/ResourceManager.java | 2 ++
.../engine/server/resourcemanager/FixSlotResourceTest.java | 14 ++++++++++++++
3 files changed, 23 insertions(+)
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java
index 8b7e0b1864..2caa6e6816 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java
@@ -216,4 +216,11 @@ public abstract class AbstractResourceManager implements
ResourceManager {
}
registerWorker.put(workerProfile.getAddress(), workerProfile);
}
+
+ @Override
+ public List<SlotProfile> getUnassignedSlots() {
+ return registerWorker.values().stream()
+ .flatMap(workerProfile ->
Arrays.stream(workerProfile.getUnassignedSlots()))
+ .collect(Collectors.toList());
+ }
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManager.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManager.java
index 4a47517bfd..ca668482aa 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManager.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManager.java
@@ -58,4 +58,6 @@ public interface ResourceManager {
void memberRemoved(MembershipServiceEvent event);
void close();
+
+ List<SlotProfile> getUnassignedSlots();
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/resourcemanager/FixSlotResourceTest.java
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/resourcemanager/FixSlotResourceTest.java
index 6c26f2398c..cf67ec8de0 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/resourcemanager/FixSlotResourceTest.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/resourcemanager/FixSlotResourceTest.java
@@ -29,6 +29,9 @@ import org.junit.jupiter.api.Test;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import static org.awaitility.Awaitility.await;
public class FixSlotResourceTest extends
AbstractSeaTunnelServerTest<FixSlotResourceTest> {
@@ -75,6 +78,17 @@ public class FixSlotResourceTest extends
AbstractSeaTunnelServerTest<FixSlotReso
} catch (ExecutionException e) {
Assertions.assertTrue(e.getMessage().contains("NoEnoughResourceException"));
}
+ // wait for release resource complete
+ await().atMost(20000, TimeUnit.MILLISECONDS)
+ .untilAsserted(
+ () -> {
+ Assertions.assertEquals(
+ 3,
+ server.getCoordinatorService()
+ .getResourceManager()
+ .getUnassignedSlots()
+ .size());
+ });
resourceProfiles.remove(0);
List<SlotProfile> slotProfiles =
server.getCoordinatorService()