This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch st-engine
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/st-engine by this push:
     new f67bc864c [Engine][ResourceManager] Use hazelcast IMap to save 
resource info (#2650)
f67bc864c is described below

commit f67bc864c21e60f07804c18d1fab0ae5ff0554b1
Author: Hisoka <[email protected]>
AuthorDate: Thu Sep 8 14:47:46 2022 +0800

    [Engine][ResourceManager] Use hazelcast IMap to save resource info (#2650)
    
    * [Engine][ResourceManager] Use hazelcast IMap to save resource info
    
    * [Engine][ResourceManager] Use hazelcast IMap to save resource info
    
    * [Engine][ResourceManager] Use hazelcast IMap to save resource info
---
 .../seatunnel/engine/server/master/JobMaster.java  | 12 ++--
 .../resourcemanager/AbstractResourceManager.java   | 25 ++++++---
 .../resourcemanager/ResourceRequestHandler.java    |  5 +-
 .../resourcemanager/resource/SlotProfile.java      | 45 +++++++++++++--
 .../resourcemanager/worker/WorkerProfile.java      | 64 +++++++++++++++++++---
 .../serializable/ResourceDataSerializerHook.java   | 10 ++++
 .../server/service/slot/DefaultSlotService.java    |  4 +-
 7 files changed, 135 insertions(+), 30 deletions(-)

diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
index 4b21001a6..ca91a15f1 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
@@ -56,7 +56,6 @@ import java.util.concurrent.ExecutorService;
 public class JobMaster implements Runnable {
     private static final ILogger LOGGER = Logger.getLogger(JobMaster.class);
 
-    private LogicalDag logicalDag;
     private PhysicalPlan physicalPlan;
     private final Data jobImmutableInformationData;
 
@@ -64,13 +63,13 @@ public class JobMaster implements Runnable {
 
     private final ExecutorService executorService;
 
-    private FlakeIdGenerator flakeIdGenerator;
+    private final FlakeIdGenerator flakeIdGenerator;
 
-    private ResourceManager resourceManager;
+    private final ResourceManager resourceManager;
 
     private CheckpointManager checkpointManager;
 
-    private CompletableFuture<JobStatus> jobMasterCompleteFuture = new 
CompletableFuture<>();
+    private final CompletableFuture<JobStatus> jobMasterCompleteFuture = new 
CompletableFuture<>();
 
     private JobImmutableInformation jobImmutableInformation;
 
@@ -95,13 +94,14 @@ public class JobMaster implements Runnable {
         LOGGER.info(
             "Job [" + jobImmutableInformation.getJobId() + "] jar urls " + 
jobImmutableInformation.getPluginJarsUrls());
 
+        LogicalDag logicalDag;
         if 
(!CollectionUtils.isEmpty(jobImmutableInformation.getPluginJarsUrls())) {
-            this.logicalDag =
+            logicalDag =
                 
CustomClassLoadedObject.deserializeWithCustomClassLoader(nodeEngine.getSerializationService(),
                     new 
SeatunnelChildFirstClassLoader(jobImmutableInformation.getPluginJarsUrls()),
                     jobImmutableInformation.getLogicalDag());
         } else {
-            this.logicalDag = 
nodeEngine.getSerializationService().toObject(jobImmutableInformation.getLogicalDag());
+            logicalDag = 
nodeEngine.getSerializationService().toObject(jobImmutableInformation.getLogicalDag());
         }
         final Tuple2<PhysicalPlan, Map<Integer, CheckpointPlan>> planTuple = 
PlanUtils.fromLogicalDAG(logicalDag,
             nodeEngine,
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java
index a77068747..2572c3c77 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java
@@ -26,6 +26,7 @@ import 
org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;
 import org.apache.seatunnel.engine.server.resourcemanager.worker.WorkerProfile;
 
 import com.hazelcast.cluster.Address;
+import com.hazelcast.cluster.Member;
 import com.hazelcast.internal.services.MembershipServiceEvent;
 import com.hazelcast.logging.ILogger;
 import com.hazelcast.logging.Logger;
@@ -38,27 +39,36 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.stream.Collectors;
 
 public abstract class AbstractResourceManager implements ResourceManager {
 
     private static final long DEFAULT_WORKER_CHECK_INTERVAL = 500;
     private static final ILogger LOGGER = 
Logger.getLogger(AbstractResourceManager.class);
 
-    protected final ConcurrentMap<String, WorkerProfile> registerWorker;
+    protected final ConcurrentMap<Address, WorkerProfile> registerWorker;
 
     private final NodeEngine nodeEngine;
 
     private final ExecutionMode mode = ExecutionMode.LOCAL;
 
     public AbstractResourceManager(NodeEngine nodeEngine) {
-        this.registerWorker = new ConcurrentHashMap<>();
+        this.registerWorker = 
nodeEngine.getHazelcastInstance().getMap("ResourceManager_RegisterWorker");
         this.nodeEngine = nodeEngine;
     }
 
     @Override
     public void init() {
+        checkRegisterWorkerStillAlive();
+    }
+
+    private void checkRegisterWorkerStillAlive() {
+        if (!registerWorker.isEmpty()) {
+            List<Address> aliveWorker = 
nodeEngine.getClusterService().getMembers().stream().map(Member::getAddress).collect(Collectors.toList());
+            List<Address> dead = registerWorker.keySet().stream().filter(r -> 
!aliveWorker.contains(r)).collect(Collectors.toList());
+            dead.forEach(registerWorker::remove);
+        }
     }
 
     @Override
@@ -90,10 +100,9 @@ public abstract class AbstractResourceManager implements 
ResourceManager {
 
     @Override
     public void memberRemoved(MembershipServiceEvent event) {
-        String nodeID = event.getMember().getAddress().toString();
         LOGGER.severe("Node heartbeat timeout, disconnected for resource 
manager. " +
-            "NodeID: " + nodeID);
-        registerWorker.remove(nodeID);
+            "Node Address: " + event.getMember().getAddress());
+        registerWorker.remove(event.getMember().getAddress());
     }
 
     @Override
@@ -151,12 +160,12 @@ public abstract class AbstractResourceManager implements 
ResourceManager {
 
     @Override
     public void heartbeat(WorkerProfile workerProfile) {
-        if (!registerWorker.containsKey(workerProfile.getWorkerID())) {
+        if (!registerWorker.containsKey(workerProfile.getAddress())) {
             LOGGER.info("received new worker register: " + 
workerProfile.getAddress());
             sendToMember(new ResetResourceOperation(), 
workerProfile.getAddress()).join();
         } else {
             LOGGER.fine("received worker heartbeat from: " + 
workerProfile.getAddress());
         }
-        registerWorker.put(workerProfile.getWorkerID(), workerProfile);
+        registerWorker.put(workerProfile.getAddress(), workerProfile);
     }
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceRequestHandler.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceRequestHandler.java
index a8fb6aeb1..6b6b21ff4 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceRequestHandler.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceRequestHandler.java
@@ -24,6 +24,7 @@ import 
org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;
 import org.apache.seatunnel.engine.server.resourcemanager.worker.WorkerProfile;
 import org.apache.seatunnel.engine.server.service.slot.SlotAndWorkerProfile;
 
+import com.hazelcast.cluster.Address;
 import com.hazelcast.logging.ILogger;
 import com.hazelcast.logging.Logger;
 import com.hazelcast.spi.impl.operationservice.impl.InvocationFuture;
@@ -50,7 +51,7 @@ public class ResourceRequestHandler {
      * resourceProfile with same index in resourceProfile haven't requested 
successes yet.
      */
     private final ConcurrentMap<Integer, SlotProfile> resultSlotProfiles;
-    private final ConcurrentMap<String, WorkerProfile> registerWorker;
+    private final ConcurrentMap<Address, WorkerProfile> registerWorker;
 
     private final long jobId;
 
@@ -60,7 +61,7 @@ public class ResourceRequestHandler {
 
     public ResourceRequestHandler(long jobId,
                                   List<ResourceProfile> resourceProfile,
-                                  ConcurrentMap<String, WorkerProfile> 
registerWorker,
+                                  ConcurrentMap<Address, WorkerProfile> 
registerWorker,
                                   AbstractResourceManager resourceManager) {
         this.completableFuture = new CompletableFuture<>();
         this.resultSlotProfiles = new ConcurrentHashMap<>();
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/resource/SlotProfile.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/resource/SlotProfile.java
index d79fe7151..9e4794dbf 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/resource/SlotProfile.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/resource/SlotProfile.java
@@ -17,24 +17,33 @@
 
 package org.apache.seatunnel.engine.server.resourcemanager.resource;
 
+import 
org.apache.seatunnel.engine.server.serializable.ResourceDataSerializerHook;
+
 import com.hazelcast.cluster.Address;
+import com.hazelcast.nio.ObjectDataInput;
+import com.hazelcast.nio.ObjectDataOutput;
+import com.hazelcast.nio.serialization.IdentifiedDataSerializable;
 
-import java.io.Serializable;
+import java.io.IOException;
 
 /**
  * Used to describe the status of the current slot, including resource size 
and assign status
  */
-public class SlotProfile implements Serializable {
+public class SlotProfile implements IdentifiedDataSerializable {
 
     private final Address worker;
 
-    private final int slotID;
+    private int slotID;
 
     private long ownerJobID;
 
     private volatile boolean assigned;
 
-    private final ResourceProfile resourceProfile;
+    private ResourceProfile resourceProfile;
+
+    public SlotProfile() {
+        worker = new Address();
+    }
 
     public SlotProfile(Address worker, int slotID, ResourceProfile 
resourceProfile) {
         this.worker = worker;
@@ -81,4 +90,32 @@ public class SlotProfile implements Serializable {
                 ", resourceProfile=" + resourceProfile +
                 '}';
     }
+
+    @Override
+    public int getFactoryId() {
+        return ResourceDataSerializerHook.FACTORY_ID;
+    }
+
+    @Override
+    public int getClassId() {
+        return ResourceDataSerializerHook.SLOT_PROFILE_TYPE;
+    }
+
+    @Override
+    public void writeData(ObjectDataOutput out) throws IOException {
+        worker.writeData(out);
+        out.writeInt(slotID);
+        out.writeLong(ownerJobID);
+        out.writeBoolean(assigned);
+        out.writeObject(resourceProfile);
+    }
+
+    @Override
+    public void readData(ObjectDataInput in) throws IOException {
+        worker.readData(in);
+        slotID = in.readInt();
+        ownerJobID = in.readLong();
+        assigned = in.readBoolean();
+        resourceProfile = in.readObject();
+    }
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/worker/WorkerProfile.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/worker/WorkerProfile.java
index 329fd9dd1..fb83a7dfd 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/worker/WorkerProfile.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/worker/WorkerProfile.java
@@ -19,21 +19,23 @@ package 
org.apache.seatunnel.engine.server.resourcemanager.worker;
 
 import 
org.apache.seatunnel.engine.server.resourcemanager.resource.ResourceProfile;
 import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;
+import 
org.apache.seatunnel.engine.server.serializable.ResourceDataSerializerHook;
 
 import com.hazelcast.cluster.Address;
+import com.hazelcast.nio.ObjectDataInput;
+import com.hazelcast.nio.ObjectDataOutput;
+import com.hazelcast.nio.serialization.IdentifiedDataSerializable;
 import lombok.Data;
 
-import java.io.Serializable;
+import java.io.IOException;
 
 /**
  * Used to describe the status of the current Worker, including address and 
resource assign status
  */
 @Data
-public class WorkerProfile implements Serializable {
+public class WorkerProfile implements IdentifiedDataSerializable {
 
-    private final String workerID;
-
-    private final Address address;
+    private Address address;
 
     private ResourceProfile profile;
 
@@ -43,9 +45,57 @@ public class WorkerProfile implements Serializable {
 
     private SlotProfile[] unassignedSlots;
 
-    public WorkerProfile(String workerID, Address address) {
-        this.workerID = workerID;
+    public WorkerProfile(Address address) {
         this.address = address;
         this.unassignedResource = new ResourceProfile();
     }
+
+    public WorkerProfile() {
+        address = new Address();
+    }
+
+    @Override
+    public int getFactoryId() {
+        return ResourceDataSerializerHook.FACTORY_ID;
+    }
+
+    @Override
+    public int getClassId() {
+        return ResourceDataSerializerHook.WORKER_PROFILE_TYPE;
+    }
+
+    @Override
+    public void writeData(ObjectDataOutput out) throws IOException {
+        address.writeData(out);
+        out.writeObject(profile);
+        out.writeObject(unassignedResource);
+        out.writeInt(assignedSlots.length);
+        for (SlotProfile assignedSlot : assignedSlots) {
+            assignedSlot.writeData(out);
+        }
+        out.writeInt(unassignedSlots.length);
+        for (SlotProfile unassignedSlot : unassignedSlots) {
+            unassignedSlot.writeData(out);
+        }
+        out.writeObject(unassignedSlots);
+    }
+
+    @Override
+    public void readData(ObjectDataInput in) throws IOException {
+        address.readData(in);
+        profile = in.readObject();
+        unassignedResource = in.readObject();
+        int assignedSlotsLength = in.readInt();
+        assignedSlots = new SlotProfile[assignedSlotsLength];
+        for (int i = 0; i < assignedSlots.length; i++) {
+            assignedSlots[i] = new SlotProfile();
+            assignedSlots[i].readData(in);
+        }
+        int unassignedSlotsLength = in.readInt();
+        unassignedSlots = new SlotProfile[unassignedSlotsLength];
+        for (int i = 0; i < unassignedSlots.length; i++) {
+            unassignedSlots[i] = new SlotProfile();
+            unassignedSlots[i].readData(in);
+        }
+    }
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/ResourceDataSerializerHook.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/ResourceDataSerializerHook.java
index dfa2d753b..4ec2e16a5 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/ResourceDataSerializerHook.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/ResourceDataSerializerHook.java
@@ -22,6 +22,8 @@ import 
org.apache.seatunnel.engine.server.resourcemanager.opeartion.ReleaseSlotO
 import 
org.apache.seatunnel.engine.server.resourcemanager.opeartion.RequestSlotOperation;
 import 
org.apache.seatunnel.engine.server.resourcemanager.opeartion.ResetResourceOperation;
 import 
org.apache.seatunnel.engine.server.resourcemanager.opeartion.WorkerHeartbeatOperation;
+import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;
+import org.apache.seatunnel.engine.server.resourcemanager.worker.WorkerProfile;
 
 import com.hazelcast.internal.serialization.DataSerializerHook;
 import com.hazelcast.internal.serialization.impl.FactoryIdHelper;
@@ -39,6 +41,10 @@ public class ResourceDataSerializerHook implements 
DataSerializerHook {
 
     public static final int RESET_RESOURCE_TYPE = 4;
 
+    public static final int WORKER_PROFILE_TYPE = 5;
+
+    public static final int SLOT_PROFILE_TYPE = 6;
+
     public static final int FACTORY_ID = FactoryIdHelper.getFactoryId(
             
SeaTunnelFactoryIdConstant.SEATUNNEL_RESOURCE_DATA_SERIALIZER_FACTORY,
             
SeaTunnelFactoryIdConstant.SEATUNNEL_RESOURCE_DATA_SERIALIZER_FACTORY_ID
@@ -67,6 +73,10 @@ public class ResourceDataSerializerHook implements 
DataSerializerHook {
                     return new ReleaseSlotOperation();
                 case RESET_RESOURCE_TYPE:
                     return new ResetResourceOperation();
+                case WORKER_PROFILE_TYPE:
+                    return new WorkerProfile();
+                case  SLOT_PROFILE_TYPE:
+                    return new SlotProfile();
                 default:
                     throw new IllegalArgumentException("Unknown type id " + 
typeId);
             }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/slot/DefaultSlotService.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/slot/DefaultSlotService.java
index 691cbb815..b2791ddf0 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/slot/DefaultSlotService.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/slot/DefaultSlotService.java
@@ -61,7 +61,6 @@ public class DefaultSlotService implements SlotService {
 
     private ConcurrentMap<Integer, SlotProfile> unassignedSlots;
     private ScheduledExecutorService scheduledExecutorService;
-    private final String serviceID;
     private final boolean dynamicSlot;
     private final int slotNumber;
     private volatile boolean initStatus;
@@ -74,7 +73,6 @@ public class DefaultSlotService implements SlotService {
         this.dynamicSlot = dynamicSlot;
         this.taskExecutionService = taskExecutionService;
         this.slotNumber = slotNumber;
-        this.serviceID = nodeEngine.getThisAddress().toString();
         this.idGenerator = new IdGenerator();
     }
 
@@ -200,7 +198,7 @@ public class DefaultSlotService implements SlotService {
     }
 
     public WorkerProfile toWorkerProfile() {
-        WorkerProfile workerProfile = new WorkerProfile(serviceID, 
nodeEngine.getThisAddress());
+        WorkerProfile workerProfile = new 
WorkerProfile(nodeEngine.getThisAddress());
         workerProfile.setProfile(getNodeResource());
         workerProfile.setAssignedSlots(assignedSlots.values().toArray(new 
SlotProfile[0]));
         workerProfile.setUnassignedSlots(unassignedSlots.values().toArray(new 
SlotProfile[0]));

Reply via email to