This is an automated email from the ASF dual-hosted git repository.

tyrantlucifer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 7823afad5 [Improve][Zeta] Add Slot Sequence To Avoid Active Check 
Error (#4097)
7823afad5 is described below

commit 7823afad50dd53650d127fcfafca90f2971ff466
Author: Hisoka <[email protected]>
AuthorDate: Fri Feb 10 14:01:41 2023 +0800

    [Improve][Zeta] Add Slot Sequence To Avoid Active Check Error (#4097)
    
    * [Improve] [Zeta] Add Slot Sequence To Avoid Active Check Error
    
    * [Improve] [Zeta] Add Slot Sequence To Avoid Active Check Error
---
 .../resourcemanager/AbstractResourceManager.java   |  2 +-
 .../resourcemanager/resource/SlotProfile.java      | 30 +++++++++++++++++++++-
 .../server/service/slot/DefaultSlotService.java    | 11 ++++++--
 3 files changed, 39 insertions(+), 4 deletions(-)

diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java
index f91b4897e..b77eef86e 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java
@@ -179,7 +179,7 @@ public abstract class AbstractResourceManager implements 
ResourceManager {
         boolean active = false;
         if (registerWorker.containsKey(profile.getWorker())) {
             active = 
Arrays.stream(registerWorker.get(profile.getWorker()).getAssignedSlots())
-                .allMatch(s -> s.getSlotID() == profile.getSlotID());
+                .allMatch(s -> s.getSlotID() == profile.getSlotID() && 
s.getSequence().equals(profile.getSequence()));
         }
 
         if (!active) {
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/resource/SlotProfile.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/resource/SlotProfile.java
index b7a53de85..2af9cdb41 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/resource/SlotProfile.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/resource/SlotProfile.java
@@ -25,6 +25,7 @@ import com.hazelcast.nio.ObjectDataOutput;
 import com.hazelcast.nio.serialization.IdentifiedDataSerializable;
 
 import java.io.IOException;
+import java.util.Objects;
 
 /**
  * Used to describe the status of the current slot, including resource size 
and assign status
@@ -41,14 +42,17 @@ public class SlotProfile implements 
IdentifiedDataSerializable {
 
     private ResourceProfile resourceProfile;
 
+    private String sequence;
+
     public SlotProfile() {
         worker = new Address();
     }
 
-    public SlotProfile(Address worker, int slotID, ResourceProfile 
resourceProfile) {
+    public SlotProfile(Address worker, int slotID, ResourceProfile 
resourceProfile, String sequence) {
         this.worker = worker;
         this.slotID = slotID;
         this.resourceProfile = resourceProfile;
+        this.sequence = sequence;
     }
 
     public Address getWorker() {
@@ -76,6 +80,27 @@ public class SlotProfile implements 
IdentifiedDataSerializable {
         }
     }
 
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        SlotProfile that = (SlotProfile) o;
+        return slotID == that.slotID && worker.equals(that.worker) && 
sequence.equals(that.sequence);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(worker, slotID, sequence);
+    }
+
+    public String getSequence() {
+        return sequence;
+    }
+
     public void unassigned() {
         assigned = false;
     }
@@ -88,6 +113,7 @@ public class SlotProfile implements 
IdentifiedDataSerializable {
             ", ownerJobID=" + ownerJobID +
             ", assigned=" + assigned +
             ", resourceProfile=" + resourceProfile +
+            ", sequence='" + sequence + '\'' +
             '}';
     }
 
@@ -108,6 +134,7 @@ public class SlotProfile implements 
IdentifiedDataSerializable {
         out.writeLong(ownerJobID);
         out.writeBoolean(assigned);
         out.writeObject(resourceProfile);
+        out.writeString(sequence);
     }
 
     @Override
@@ -117,5 +144,6 @@ public class SlotProfile implements 
IdentifiedDataSerializable {
         ownerJobID = in.readLong();
         assigned = in.readBoolean();
         resourceProfile = in.readObject();
+        sequence = in.readString();
     }
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/slot/DefaultSlotService.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/slot/DefaultSlotService.java
index c6ab9eaa9..a54f5501b 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/slot/DefaultSlotService.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/slot/DefaultSlotService.java
@@ -35,6 +35,7 @@ import com.hazelcast.spi.impl.operationservice.Operation;
 import com.hazelcast.spi.impl.operationservice.impl.InvocationFuture;
 
 import java.util.Optional;
+import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.Executors;
@@ -64,6 +65,7 @@ public class DefaultSlotService implements SlotService {
     private final IdGenerator idGenerator;
     private final TaskExecutionService taskExecutionService;
     private ConcurrentMap<Integer, SlotContext> contexts;
+    private String slotServiceSequence;
 
     public DefaultSlotService(NodeEngineImpl nodeEngine, TaskExecutionService 
taskExecutionService,
                               SlotServiceConfig config) {
@@ -76,6 +78,7 @@ public class DefaultSlotService implements SlotService {
     @Override
     public void init() {
         initStatus = true;
+        slotServiceSequence = UUID.randomUUID().toString();
         contexts = new ConcurrentHashMap<>();
         assignedSlots = new ConcurrentHashMap<>();
         unassignedSlots = new ConcurrentHashMap<>();
@@ -142,6 +145,10 @@ public class DefaultSlotService implements SlotService {
             throw new WrongTargetSlotException("Not exist this slot in slot 
service, slot profile: " + profile);
         }
 
+        if 
(!assignedSlots.get(profile.getSlotID()).getSequence().equals(profile.getSequence()))
 {
+            throw new WrongTargetSlotException("Wrong slot sequence in 
profile, slot profile: " + profile);
+        }
+
         if (assignedSlots.get(profile.getSlotID()).getOwnerJobID() != jobId) {
             throw new WrongTargetSlotException(String.format("The profile %s 
not belong with job %d",
                 assignedSlots.get(profile.getSlotID()), jobId));
@@ -170,7 +177,7 @@ public class DefaultSlotService implements SlotService {
         }
         if (config.isDynamicSlot()) {
             if (unassignedResource.get().enoughThan(profile)) {
-                return new SlotProfile(nodeEngine.getThisAddress(), (int) 
idGenerator.getNextId(), profile);
+                return new SlotProfile(nodeEngine.getThisAddress(), (int) 
idGenerator.getNextId(), profile, slotServiceSequence);
             }
         } else {
             Optional<SlotProfile> result = unassignedSlots.values().stream()
@@ -194,7 +201,7 @@ public class DefaultSlotService implements SlotService {
         long maxMemory = Runtime.getRuntime().maxMemory();
         for (int i = 0; i < config.getSlotNum(); i++) {
             unassignedSlots.put(i, new 
SlotProfile(nodeEngine.getThisAddress(), i,
-                    new ResourceProfile(CPU.of(0), Memory.of(maxMemory / 
config.getSlotNum()))));
+                new ResourceProfile(CPU.of(0), Memory.of(maxMemory / 
config.getSlotNum())), slotServiceSequence));
         }
     }
 

Reply via email to