This is an automated email from the ASF dual-hosted git repository.
gaojun2048 pushed a commit to branch st-engine
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/st-engine by this push:
new a5e89247f [Engine] [ResourceManager] Add slot active check in
ResourceManager (#2740)
a5e89247f is described below
commit a5e89247f0cf02677e01b6a37bc5a36bb2ee450a
Author: Hisoka <[email protected]>
AuthorDate: Thu Sep 15 18:40:20 2022 +0800
[Engine] [ResourceManager] Add slot active check in ResourceManager (#2740)
---
.../engine/server/resourcemanager/AbstractResourceManager.java | 7 +++++++
.../seatunnel/engine/server/resourcemanager/ResourceManager.java | 7 +++++++
2 files changed, 14 insertions(+)
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java
index 2572c3c77..c10d928db 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java
@@ -36,6 +36,7 @@ import com.hazelcast.spi.impl.operationservice.Operation;
import com.hazelcast.spi.impl.operationservice.impl.InvocationFuture;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
@@ -158,6 +159,12 @@ public abstract class AbstractResourceManager implements
ResourceManager {
return sendToMember(new ReleaseSlotOperation(jobId, profile),
profile.getWorker());
}
+ @Override
+ public boolean slotActiveCheck(SlotProfile profile) {
+ return registerWorker.values().stream().flatMap(workerProfile ->
Arrays.stream(workerProfile.getAssignedSlots()))
+ .anyMatch(s -> s.getSlotID() == profile.getSlotID());
+ }
+
@Override
public void heartbeat(WorkerProfile workerProfile) {
if (!registerWorker.containsKey(workerProfile.getAddress())) {
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManager.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManager.java
index c8fa75c84..3e2a9b558 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManager.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManager.java
@@ -37,6 +37,13 @@ public interface ResourceManager {
CompletableFuture<Void> releaseResource(long jobId, SlotProfile profile);
+ /**
+ * Check {@link SlotProfile} is active or not. Not active meaning can't
use this slot to deploy task.
+ *
+ * @return active or not
+ */
+ boolean slotActiveCheck(SlotProfile profile);
+
/**
* Every time ResourceManager and Worker communicate, heartbeat method
should be called to
* record the latest Worker status