This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch st-engine
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/st-engine by this push:
new f67bc864c [Engine][ResourceManager] Use hazelcast IMap to save
resource info (#2650)
f67bc864c is described below
commit f67bc864c21e60f07804c18d1fab0ae5ff0554b1
Author: Hisoka <[email protected]>
AuthorDate: Thu Sep 8 14:47:46 2022 +0800
[Engine][ResourceManager] Use hazelcast IMap to save resource info (#2650)
* [Engine][ResourceManager] Use hazelcast IMap to save resource info
* [Engine][ResourceManager] Use hazelcast IMap to save resource info
* [Engine][ResourceManager] Use hazelcast IMap to save resource info
---
.../seatunnel/engine/server/master/JobMaster.java | 12 ++--
.../resourcemanager/AbstractResourceManager.java | 25 ++++++---
.../resourcemanager/ResourceRequestHandler.java | 5 +-
.../resourcemanager/resource/SlotProfile.java | 45 +++++++++++++--
.../resourcemanager/worker/WorkerProfile.java | 64 +++++++++++++++++++---
.../serializable/ResourceDataSerializerHook.java | 10 ++++
.../server/service/slot/DefaultSlotService.java | 4 +-
7 files changed, 135 insertions(+), 30 deletions(-)
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
index 4b21001a6..ca91a15f1 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
@@ -56,7 +56,6 @@ import java.util.concurrent.ExecutorService;
public class JobMaster implements Runnable {
private static final ILogger LOGGER = Logger.getLogger(JobMaster.class);
- private LogicalDag logicalDag;
private PhysicalPlan physicalPlan;
private final Data jobImmutableInformationData;
@@ -64,13 +63,13 @@ public class JobMaster implements Runnable {
private final ExecutorService executorService;
- private FlakeIdGenerator flakeIdGenerator;
+ private final FlakeIdGenerator flakeIdGenerator;
- private ResourceManager resourceManager;
+ private final ResourceManager resourceManager;
private CheckpointManager checkpointManager;
- private CompletableFuture<JobStatus> jobMasterCompleteFuture = new
CompletableFuture<>();
+ private final CompletableFuture<JobStatus> jobMasterCompleteFuture = new
CompletableFuture<>();
private JobImmutableInformation jobImmutableInformation;
@@ -95,13 +94,14 @@ public class JobMaster implements Runnable {
LOGGER.info(
"Job [" + jobImmutableInformation.getJobId() + "] jar urls " +
jobImmutableInformation.getPluginJarsUrls());
+ LogicalDag logicalDag;
if
(!CollectionUtils.isEmpty(jobImmutableInformation.getPluginJarsUrls())) {
- this.logicalDag =
+ logicalDag =
CustomClassLoadedObject.deserializeWithCustomClassLoader(nodeEngine.getSerializationService(),
new
SeatunnelChildFirstClassLoader(jobImmutableInformation.getPluginJarsUrls()),
jobImmutableInformation.getLogicalDag());
} else {
- this.logicalDag =
nodeEngine.getSerializationService().toObject(jobImmutableInformation.getLogicalDag());
+ logicalDag =
nodeEngine.getSerializationService().toObject(jobImmutableInformation.getLogicalDag());
}
final Tuple2<PhysicalPlan, Map<Integer, CheckpointPlan>> planTuple =
PlanUtils.fromLogicalDAG(logicalDag,
nodeEngine,
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java
index a77068747..2572c3c77 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java
@@ -26,6 +26,7 @@ import
org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;
import org.apache.seatunnel.engine.server.resourcemanager.worker.WorkerProfile;
import com.hazelcast.cluster.Address;
+import com.hazelcast.cluster.Member;
import com.hazelcast.internal.services.MembershipServiceEvent;
import com.hazelcast.logging.ILogger;
import com.hazelcast.logging.Logger;
@@ -38,27 +39,36 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.stream.Collectors;
public abstract class AbstractResourceManager implements ResourceManager {
private static final long DEFAULT_WORKER_CHECK_INTERVAL = 500;
private static final ILogger LOGGER =
Logger.getLogger(AbstractResourceManager.class);
- protected final ConcurrentMap<String, WorkerProfile> registerWorker;
+ protected final ConcurrentMap<Address, WorkerProfile> registerWorker;
private final NodeEngine nodeEngine;
private final ExecutionMode mode = ExecutionMode.LOCAL;
public AbstractResourceManager(NodeEngine nodeEngine) {
- this.registerWorker = new ConcurrentHashMap<>();
+ this.registerWorker =
nodeEngine.getHazelcastInstance().getMap("ResourceManager_RegisterWorker");
this.nodeEngine = nodeEngine;
}
@Override
public void init() {
+ checkRegisterWorkerStillAlive();
+ }
+
+ private void checkRegisterWorkerStillAlive() {
+ if (!registerWorker.isEmpty()) {
+ List<Address> aliveWorker =
nodeEngine.getClusterService().getMembers().stream().map(Member::getAddress).collect(Collectors.toList());
+ List<Address> dead = registerWorker.keySet().stream().filter(r ->
!aliveWorker.contains(r)).collect(Collectors.toList());
+ dead.forEach(registerWorker::remove);
+ }
}
@Override
@@ -90,10 +100,9 @@ public abstract class AbstractResourceManager implements
ResourceManager {
@Override
public void memberRemoved(MembershipServiceEvent event) {
- String nodeID = event.getMember().getAddress().toString();
LOGGER.severe("Node heartbeat timeout, disconnected for resource
manager. " +
- "NodeID: " + nodeID);
- registerWorker.remove(nodeID);
+ "Node Address: " + event.getMember().getAddress());
+ registerWorker.remove(event.getMember().getAddress());
}
@Override
@@ -151,12 +160,12 @@ public abstract class AbstractResourceManager implements
ResourceManager {
@Override
public void heartbeat(WorkerProfile workerProfile) {
- if (!registerWorker.containsKey(workerProfile.getWorkerID())) {
+ if (!registerWorker.containsKey(workerProfile.getAddress())) {
LOGGER.info("received new worker register: " +
workerProfile.getAddress());
sendToMember(new ResetResourceOperation(),
workerProfile.getAddress()).join();
} else {
LOGGER.fine("received worker heartbeat from: " +
workerProfile.getAddress());
}
- registerWorker.put(workerProfile.getWorkerID(), workerProfile);
+ registerWorker.put(workerProfile.getAddress(), workerProfile);
}
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceRequestHandler.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceRequestHandler.java
index a8fb6aeb1..6b6b21ff4 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceRequestHandler.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceRequestHandler.java
@@ -24,6 +24,7 @@ import
org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;
import org.apache.seatunnel.engine.server.resourcemanager.worker.WorkerProfile;
import org.apache.seatunnel.engine.server.service.slot.SlotAndWorkerProfile;
+import com.hazelcast.cluster.Address;
import com.hazelcast.logging.ILogger;
import com.hazelcast.logging.Logger;
import com.hazelcast.spi.impl.operationservice.impl.InvocationFuture;
@@ -50,7 +51,7 @@ public class ResourceRequestHandler {
* resourceProfile with same index in resourceProfile haven't requested
successes yet.
*/
private final ConcurrentMap<Integer, SlotProfile> resultSlotProfiles;
- private final ConcurrentMap<String, WorkerProfile> registerWorker;
+ private final ConcurrentMap<Address, WorkerProfile> registerWorker;
private final long jobId;
@@ -60,7 +61,7 @@ public class ResourceRequestHandler {
public ResourceRequestHandler(long jobId,
List<ResourceProfile> resourceProfile,
- ConcurrentMap<String, WorkerProfile>
registerWorker,
+ ConcurrentMap<Address, WorkerProfile>
registerWorker,
AbstractResourceManager resourceManager) {
this.completableFuture = new CompletableFuture<>();
this.resultSlotProfiles = new ConcurrentHashMap<>();
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/resource/SlotProfile.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/resource/SlotProfile.java
index d79fe7151..9e4794dbf 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/resource/SlotProfile.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/resource/SlotProfile.java
@@ -17,24 +17,33 @@
package org.apache.seatunnel.engine.server.resourcemanager.resource;
+import
org.apache.seatunnel.engine.server.serializable.ResourceDataSerializerHook;
+
import com.hazelcast.cluster.Address;
+import com.hazelcast.nio.ObjectDataInput;
+import com.hazelcast.nio.ObjectDataOutput;
+import com.hazelcast.nio.serialization.IdentifiedDataSerializable;
-import java.io.Serializable;
+import java.io.IOException;
/**
* Used to describe the status of the current slot, including resource size
and assign status
*/
-public class SlotProfile implements Serializable {
+public class SlotProfile implements IdentifiedDataSerializable {
private final Address worker;
- private final int slotID;
+ private int slotID;
private long ownerJobID;
private volatile boolean assigned;
- private final ResourceProfile resourceProfile;
+ private ResourceProfile resourceProfile;
+
+ public SlotProfile() {
+ worker = new Address();
+ }
public SlotProfile(Address worker, int slotID, ResourceProfile
resourceProfile) {
this.worker = worker;
@@ -81,4 +90,32 @@ public class SlotProfile implements Serializable {
", resourceProfile=" + resourceProfile +
'}';
}
+
+ @Override
+ public int getFactoryId() {
+ return ResourceDataSerializerHook.FACTORY_ID;
+ }
+
+ @Override
+ public int getClassId() {
+ return ResourceDataSerializerHook.SLOT_PROFILE_TYPE;
+ }
+
+ @Override
+ public void writeData(ObjectDataOutput out) throws IOException {
+ worker.writeData(out);
+ out.writeInt(slotID);
+ out.writeLong(ownerJobID);
+ out.writeBoolean(assigned);
+ out.writeObject(resourceProfile);
+ }
+
+ @Override
+ public void readData(ObjectDataInput in) throws IOException {
+ worker.readData(in);
+ slotID = in.readInt();
+ ownerJobID = in.readLong();
+ assigned = in.readBoolean();
+ resourceProfile = in.readObject();
+ }
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/worker/WorkerProfile.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/worker/WorkerProfile.java
index 329fd9dd1..fb83a7dfd 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/worker/WorkerProfile.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/worker/WorkerProfile.java
@@ -19,21 +19,23 @@ package
org.apache.seatunnel.engine.server.resourcemanager.worker;
import
org.apache.seatunnel.engine.server.resourcemanager.resource.ResourceProfile;
import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;
+import
org.apache.seatunnel.engine.server.serializable.ResourceDataSerializerHook;
import com.hazelcast.cluster.Address;
+import com.hazelcast.nio.ObjectDataInput;
+import com.hazelcast.nio.ObjectDataOutput;
+import com.hazelcast.nio.serialization.IdentifiedDataSerializable;
import lombok.Data;
-import java.io.Serializable;
+import java.io.IOException;
/**
* Used to describe the status of the current Worker, including address and
resource assign status
*/
@Data
-public class WorkerProfile implements Serializable {
+public class WorkerProfile implements IdentifiedDataSerializable {
- private final String workerID;
-
- private final Address address;
+ private Address address;
private ResourceProfile profile;
@@ -43,9 +45,57 @@ public class WorkerProfile implements Serializable {
private SlotProfile[] unassignedSlots;
- public WorkerProfile(String workerID, Address address) {
- this.workerID = workerID;
+ public WorkerProfile(Address address) {
this.address = address;
this.unassignedResource = new ResourceProfile();
}
+
+ public WorkerProfile() {
+ address = new Address();
+ }
+
+ @Override
+ public int getFactoryId() {
+ return ResourceDataSerializerHook.FACTORY_ID;
+ }
+
+ @Override
+ public int getClassId() {
+ return ResourceDataSerializerHook.WORKER_PROFILE_TYPE;
+ }
+
+ @Override
+ public void writeData(ObjectDataOutput out) throws IOException {
+ address.writeData(out);
+ out.writeObject(profile);
+ out.writeObject(unassignedResource);
+ out.writeInt(assignedSlots.length);
+ for (SlotProfile assignedSlot : assignedSlots) {
+ assignedSlot.writeData(out);
+ }
+ out.writeInt(unassignedSlots.length);
+ for (SlotProfile unassignedSlot : unassignedSlots) {
+ unassignedSlot.writeData(out);
+ }
+ out.writeObject(unassignedSlots);
+ }
+
+ @Override
+ public void readData(ObjectDataInput in) throws IOException {
+ address.readData(in);
+ profile = in.readObject();
+ unassignedResource = in.readObject();
+ int assignedSlotsLength = in.readInt();
+ assignedSlots = new SlotProfile[assignedSlotsLength];
+ for (int i = 0; i < assignedSlots.length; i++) {
+ assignedSlots[i] = new SlotProfile();
+ assignedSlots[i].readData(in);
+ }
+ int unassignedSlotsLength = in.readInt();
+ unassignedSlots = new SlotProfile[unassignedSlotsLength];
+ for (int i = 0; i < unassignedSlots.length; i++) {
+ unassignedSlots[i] = new SlotProfile();
+ unassignedSlots[i].readData(in);
+ }
+ }
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/ResourceDataSerializerHook.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/ResourceDataSerializerHook.java
index dfa2d753b..4ec2e16a5 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/ResourceDataSerializerHook.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/ResourceDataSerializerHook.java
@@ -22,6 +22,8 @@ import
org.apache.seatunnel.engine.server.resourcemanager.opeartion.ReleaseSlotO
import
org.apache.seatunnel.engine.server.resourcemanager.opeartion.RequestSlotOperation;
import
org.apache.seatunnel.engine.server.resourcemanager.opeartion.ResetResourceOperation;
import
org.apache.seatunnel.engine.server.resourcemanager.opeartion.WorkerHeartbeatOperation;
+import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;
+import org.apache.seatunnel.engine.server.resourcemanager.worker.WorkerProfile;
import com.hazelcast.internal.serialization.DataSerializerHook;
import com.hazelcast.internal.serialization.impl.FactoryIdHelper;
@@ -39,6 +41,10 @@ public class ResourceDataSerializerHook implements
DataSerializerHook {
public static final int RESET_RESOURCE_TYPE = 4;
+ public static final int WORKER_PROFILE_TYPE = 5;
+
+ public static final int SLOT_PROFILE_TYPE = 6;
+
public static final int FACTORY_ID = FactoryIdHelper.getFactoryId(
SeaTunnelFactoryIdConstant.SEATUNNEL_RESOURCE_DATA_SERIALIZER_FACTORY,
SeaTunnelFactoryIdConstant.SEATUNNEL_RESOURCE_DATA_SERIALIZER_FACTORY_ID
@@ -67,6 +73,10 @@ public class ResourceDataSerializerHook implements
DataSerializerHook {
return new ReleaseSlotOperation();
case RESET_RESOURCE_TYPE:
return new ResetResourceOperation();
+ case WORKER_PROFILE_TYPE:
+ return new WorkerProfile();
+ case SLOT_PROFILE_TYPE:
+ return new SlotProfile();
default:
throw new IllegalArgumentException("Unknown type id " +
typeId);
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/slot/DefaultSlotService.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/slot/DefaultSlotService.java
index 691cbb815..b2791ddf0 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/slot/DefaultSlotService.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/slot/DefaultSlotService.java
@@ -61,7 +61,6 @@ public class DefaultSlotService implements SlotService {
private ConcurrentMap<Integer, SlotProfile> unassignedSlots;
private ScheduledExecutorService scheduledExecutorService;
- private final String serviceID;
private final boolean dynamicSlot;
private final int slotNumber;
private volatile boolean initStatus;
@@ -74,7 +73,6 @@ public class DefaultSlotService implements SlotService {
this.dynamicSlot = dynamicSlot;
this.taskExecutionService = taskExecutionService;
this.slotNumber = slotNumber;
- this.serviceID = nodeEngine.getThisAddress().toString();
this.idGenerator = new IdGenerator();
}
@@ -200,7 +198,7 @@ public class DefaultSlotService implements SlotService {
}
public WorkerProfile toWorkerProfile() {
- WorkerProfile workerProfile = new WorkerProfile(serviceID,
nodeEngine.getThisAddress());
+ WorkerProfile workerProfile = new
WorkerProfile(nodeEngine.getThisAddress());
workerProfile.setProfile(getNodeResource());
workerProfile.setAssignedSlots(assignedSlots.values().toArray(new
SlotProfile[0]));
workerProfile.setUnassignedSlots(unassignedSlots.values().toArray(new
SlotProfile[0]));