This is an automated email from the ASF dual-hosted git repository.
tyrantlucifer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 7823afad5 [Improve][Zeta] Add Slot Sequence To Avoid Active Check
Error (#4097)
7823afad5 is described below
commit 7823afad50dd53650d127fcfafca90f2971ff466
Author: Hisoka <[email protected]>
AuthorDate: Fri Feb 10 14:01:41 2023 +0800
[Improve][Zeta] Add Slot Sequence To Avoid Active Check Error (#4097)
* [Improve] [Zeta] Add Slot Sequence To Avoid Active Check Error
* [Improve] [Zeta] Add Slot Sequence To Avoid Active Check Error
---
.../resourcemanager/AbstractResourceManager.java | 2 +-
.../resourcemanager/resource/SlotProfile.java | 30 +++++++++++++++++++++-
.../server/service/slot/DefaultSlotService.java | 11 ++++++--
3 files changed, 39 insertions(+), 4 deletions(-)
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java
index f91b4897e..b77eef86e 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java
@@ -179,7 +179,7 @@ public abstract class AbstractResourceManager implements
ResourceManager {
boolean active = false;
if (registerWorker.containsKey(profile.getWorker())) {
active =
Arrays.stream(registerWorker.get(profile.getWorker()).getAssignedSlots())
- .allMatch(s -> s.getSlotID() == profile.getSlotID());
+ .allMatch(s -> s.getSlotID() == profile.getSlotID() &&
s.getSequence().equals(profile.getSequence()));
}
if (!active) {
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/resource/SlotProfile.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/resource/SlotProfile.java
index b7a53de85..2af9cdb41 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/resource/SlotProfile.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/resource/SlotProfile.java
@@ -25,6 +25,7 @@ import com.hazelcast.nio.ObjectDataOutput;
import com.hazelcast.nio.serialization.IdentifiedDataSerializable;
import java.io.IOException;
+import java.util.Objects;
/**
* Used to describe the status of the current slot, including resource size
and assign status
@@ -41,14 +42,17 @@ public class SlotProfile implements
IdentifiedDataSerializable {
private ResourceProfile resourceProfile;
+ private String sequence;
+
public SlotProfile() {
worker = new Address();
}
- public SlotProfile(Address worker, int slotID, ResourceProfile
resourceProfile) {
+ public SlotProfile(Address worker, int slotID, ResourceProfile
resourceProfile, String sequence) {
this.worker = worker;
this.slotID = slotID;
this.resourceProfile = resourceProfile;
+ this.sequence = sequence;
}
public Address getWorker() {
@@ -76,6 +80,27 @@ public class SlotProfile implements
IdentifiedDataSerializable {
}
}
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ SlotProfile that = (SlotProfile) o;
+ return slotID == that.slotID && worker.equals(that.worker) &&
sequence.equals(that.sequence);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(worker, slotID, sequence);
+ }
+
+ public String getSequence() {
+ return sequence;
+ }
+
public void unassigned() {
assigned = false;
}
@@ -88,6 +113,7 @@ public class SlotProfile implements
IdentifiedDataSerializable {
", ownerJobID=" + ownerJobID +
", assigned=" + assigned +
", resourceProfile=" + resourceProfile +
+ ", sequence='" + sequence + '\'' +
'}';
}
@@ -108,6 +134,7 @@ public class SlotProfile implements
IdentifiedDataSerializable {
out.writeLong(ownerJobID);
out.writeBoolean(assigned);
out.writeObject(resourceProfile);
+ out.writeString(sequence);
}
@Override
@@ -117,5 +144,6 @@ public class SlotProfile implements
IdentifiedDataSerializable {
ownerJobID = in.readLong();
assigned = in.readBoolean();
resourceProfile = in.readObject();
+ sequence = in.readString();
}
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/slot/DefaultSlotService.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/slot/DefaultSlotService.java
index c6ab9eaa9..a54f5501b 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/slot/DefaultSlotService.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/slot/DefaultSlotService.java
@@ -35,6 +35,7 @@ import com.hazelcast.spi.impl.operationservice.Operation;
import com.hazelcast.spi.impl.operationservice.impl.InvocationFuture;
import java.util.Optional;
+import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
@@ -64,6 +65,7 @@ public class DefaultSlotService implements SlotService {
private final IdGenerator idGenerator;
private final TaskExecutionService taskExecutionService;
private ConcurrentMap<Integer, SlotContext> contexts;
+ private String slotServiceSequence;
public DefaultSlotService(NodeEngineImpl nodeEngine, TaskExecutionService
taskExecutionService,
SlotServiceConfig config) {
@@ -76,6 +78,7 @@ public class DefaultSlotService implements SlotService {
@Override
public void init() {
initStatus = true;
+ slotServiceSequence = UUID.randomUUID().toString();
contexts = new ConcurrentHashMap<>();
assignedSlots = new ConcurrentHashMap<>();
unassignedSlots = new ConcurrentHashMap<>();
@@ -142,6 +145,10 @@ public class DefaultSlotService implements SlotService {
throw new WrongTargetSlotException("Not exist this slot in slot
service, slot profile: " + profile);
}
+ if
(!assignedSlots.get(profile.getSlotID()).getSequence().equals(profile.getSequence()))
{
+ throw new WrongTargetSlotException("Wrong slot sequence in
profile, slot profile: " + profile);
+ }
+
if (assignedSlots.get(profile.getSlotID()).getOwnerJobID() != jobId) {
throw new WrongTargetSlotException(String.format("The profile %s
not belong with job %d",
assignedSlots.get(profile.getSlotID()), jobId));
@@ -170,7 +177,7 @@ public class DefaultSlotService implements SlotService {
}
if (config.isDynamicSlot()) {
if (unassignedResource.get().enoughThan(profile)) {
- return new SlotProfile(nodeEngine.getThisAddress(), (int)
idGenerator.getNextId(), profile);
+ return new SlotProfile(nodeEngine.getThisAddress(), (int)
idGenerator.getNextId(), profile, slotServiceSequence);
}
} else {
Optional<SlotProfile> result = unassignedSlots.values().stream()
@@ -194,7 +201,7 @@ public class DefaultSlotService implements SlotService {
long maxMemory = Runtime.getRuntime().maxMemory();
for (int i = 0; i < config.getSlotNum(); i++) {
unassignedSlots.put(i, new
SlotProfile(nodeEngine.getThisAddress(), i,
- new ResourceProfile(CPU.of(0), Memory.of(maxMemory /
config.getSlotNum()))));
+ new ResourceProfile(CPU.of(0), Memory.of(maxMemory /
config.getSlotNum())), slotServiceSequence));
}
}