This is an automated email from the ASF dual-hosted git repository.

ic4y pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 1ffa17450 [Bugfix] [Zeta] Fix Slot Status Not Sync After Restart When 
All Node Down (#4047)
1ffa17450 is described below

commit 1ffa17450685c106dc37315049ad74ab77e1c7b6
Author: Hisoka <[email protected]>
AuthorDate: Thu Feb 9 16:43:35 2023 +0800

    [Bugfix] [Zeta] Fix Slot Status Not Sync After Restart When All Node Down 
(#4047)
    
    * [Improve] [Zeta] Fix Slot Status Not Sync After Restart When All Node Down
---
 .../engine/e2e/ClusterFaultToleranceIT.java        |  8 +---
 .../e2e/ClusterFaultToleranceTwoPipelineIT.java    |  7 +--
 .../engine/server/CoordinatorService.java          | 40 ++++++++--------
 .../engine/server/TaskExecutionService.java        |  2 +-
 .../engine/server/dag/physical/PhysicalPlan.java   |  3 ++
 .../engine/server/dag/physical/SubPlan.java        |  4 +-
 .../seatunnel/engine/server/master/JobMaster.java  | 16 ++++++-
 .../resourcemanager/AbstractResourceManager.java   |  6 +++
 .../opeartion/SyncWorkerProfileOperation.java      | 56 ++++++++++++++++++++++
 .../serializable/ResourceDataSerializerHook.java   |  5 ++
 .../server/service/slot/DefaultSlotService.java    | 12 ++---
 .../engine/server/service/slot/SlotService.java    |  2 +
 12 files changed, 116 insertions(+), 45 deletions(-)

diff --git 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceIT.java
 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceIT.java
index 6efd298f4..90e721dad 100644
--- 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceIT.java
+++ 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceIT.java
@@ -54,6 +54,7 @@ import java.util.concurrent.TimeUnit;
  * Cluster fault tolerance test. Test the job recovery capability and data 
consistency assurance capability in case of cluster node failure
  */
 @Slf4j
+@Disabled
 public class ClusterFaultToleranceIT {
 
     public static final String DYNAMIC_TEST_CASE_NAME = 
"dynamic_test_case_name";
@@ -66,7 +67,6 @@ public class ClusterFaultToleranceIT {
 
     @SuppressWarnings("checkstyle:RegexpSingleline")
     @Test
-    @Disabled
     public void testBatchJobRunOkIn2Node() throws ExecutionException, 
InterruptedException {
         String testCaseName = "testBatchJobRunOkIn2Node";
         String testClusterName = 
"ClusterFaultToleranceIT_testBatchJobRunOkIn2Node";
@@ -167,7 +167,6 @@ public class ClusterFaultToleranceIT {
 
     @SuppressWarnings("checkstyle:RegexpSingleline")
     @Test
-    @Disabled
     public void testStreamJobRunOkIn2Node() throws ExecutionException, 
InterruptedException {
         String testCaseName = "testStreamJobRunOkIn2Node";
         String testClusterName = 
"ClusterFaultToleranceIT_testStreamJobRunOkIn2Node";
@@ -239,7 +238,6 @@ public class ClusterFaultToleranceIT {
 
     @SuppressWarnings("checkstyle:RegexpSingleline")
     @Test
-    @Disabled
     public void testBatchJobRestoreIn2NodeWorkerDown() throws 
ExecutionException, InterruptedException {
         String testCaseName = "testBatchJobRestoreIn2NodeWorkerDown";
         String testClusterName = 
"ClusterFaultToleranceIT_testBatchJobRestoreIn2NodeWorkerDown";
@@ -313,7 +311,6 @@ public class ClusterFaultToleranceIT {
 
     @SuppressWarnings("checkstyle:RegexpSingleline")
     @Test
-    @Disabled
     public void testStreamJobRestoreIn2NodeWorkerDown() throws 
ExecutionException, InterruptedException {
         String testCaseName = "testStreamJobRestoreIn2NodeWorkerDown";
         String testClusterName = 
"ClusterFaultToleranceIT_testStreamJobRestoreIn2NodeWorkerDown";
@@ -402,7 +399,6 @@ public class ClusterFaultToleranceIT {
 
     @SuppressWarnings("checkstyle:RegexpSingleline")
     @Test
-    @Disabled
     public void testBatchJobRestoreIn2NodeMasterDown() throws 
ExecutionException, InterruptedException {
         String testCaseName = "testBatchJobRestoreIn2NodeMasterDown";
         String testClusterName = 
"ClusterFaultToleranceIT_testBatchJobRestoreIn2NodeMasterDown";
@@ -476,7 +472,6 @@ public class ClusterFaultToleranceIT {
 
     @SuppressWarnings("checkstyle:RegexpSingleline")
     @Test
-    @Disabled
     public void testStreamJobRestoreIn2NodeMasterDown() throws 
ExecutionException, InterruptedException {
         String testCaseName = "testStreamJobRestoreIn2NodeMasterDown";
         String testClusterName = 
"ClusterFaultToleranceIT_testStreamJobRestoreIn2NodeMasterDown";
@@ -564,7 +559,6 @@ public class ClusterFaultToleranceIT {
 
     @SuppressWarnings("checkstyle:RegexpSingleline")
     @Test
-    @Disabled
     public void testStreamJobRestoreInAllNodeDown() throws ExecutionException, 
InterruptedException {
         String testCaseName = "testStreamJobRestoreInAllNodeDown";
         String testClusterName = 
"ClusterFaultToleranceIT_testStreamJobRestoreInAllNodeDown";
diff --git 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceTwoPipelineIT.java
 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceTwoPipelineIT.java
index 2cf2d6e60..9edebf3ea 100644
--- 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceTwoPipelineIT.java
+++ 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceTwoPipelineIT.java
@@ -53,6 +53,7 @@ import java.util.concurrent.TimeUnit;
  * Cluster fault tolerance test. Test the job which have two pipelines can 
recovery capability and data consistency assurance capability in case of 
cluster node failure
  */
 @Slf4j
+@Disabled
 public class ClusterFaultToleranceTwoPipelineIT {
 
     public static final String TEST_TEMPLATE_FILE_NAME = 
"cluster_batch_fake_to_localfile_two_pipeline_template.conf";
@@ -67,7 +68,6 @@ public class ClusterFaultToleranceTwoPipelineIT {
 
     @SuppressWarnings("checkstyle:RegexpSingleline")
     @Test
-    @Disabled
     public void testTwoPipelineBatchJobRunOkIn2Node() throws 
ExecutionException, InterruptedException {
         String testCaseName = "testTwoPipelineBatchJobRunOkIn2Node";
         String testClusterName = 
"ClusterFaultToleranceTwoPipelineIT_testTwoPipelineBatchJobRunOkIn2Node";
@@ -170,7 +170,6 @@ public class ClusterFaultToleranceTwoPipelineIT {
 
     @SuppressWarnings("checkstyle:RegexpSingleline")
     @Test
-    @Disabled
     public void testTwoPipelineStreamJobRunOkIn2Node() throws 
ExecutionException, InterruptedException {
         String testCaseName = "testTwoPipelineStreamJobRunOkIn2Node";
         String testClusterName = 
"ClusterFaultToleranceTwoPipelineIT_testTwoPipelineStreamJobRunOkIn2Node";
@@ -242,7 +241,6 @@ public class ClusterFaultToleranceTwoPipelineIT {
 
     @SuppressWarnings("checkstyle:RegexpSingleline")
     @Test
-    @Disabled
     public void testTwoPipelineBatchJobRestoreIn2NodeWorkerDown() throws 
ExecutionException, InterruptedException {
         String testCaseName = 
"testTwoPipelineBatchJobRestoreIn2NodeWorkerDown";
         String testClusterName = 
"ClusterFaultToleranceTwoPipelineIT_testTwoPipelineBatchJobRestoreIn2NodeWorkerDown";
@@ -316,7 +314,6 @@ public class ClusterFaultToleranceTwoPipelineIT {
 
     @SuppressWarnings("checkstyle:RegexpSingleline")
     @Test
-    @Disabled
     public void testTwoPipelineStreamJobRestoreIn2NodeWorkerDown() throws 
ExecutionException, InterruptedException {
         String testCaseName = 
"testTwoPipelineStreamJobRestoreIn2NodeWorkerDown";
         String testClusterName = 
"ClusterFaultToleranceTwoPipelineIT_testTwoPipelineStreamJobRestoreIn2NodeWorkerDown";
@@ -407,7 +404,6 @@ public class ClusterFaultToleranceTwoPipelineIT {
 
     @SuppressWarnings("checkstyle:RegexpSingleline")
     @Test
-    @Disabled
     public void testTwoPipelineBatchJobRestoreIn2NodeMasterDown() throws 
ExecutionException, InterruptedException {
         String testCaseName = 
"testTwoPipelineBatchJobRestoreIn2NodeMasterDown";
         String testClusterName = 
"ClusterFaultToleranceTwoPipelineIT_testTwoPipelineBatchJobRestoreIn2NodeMasterDown";
@@ -481,7 +477,6 @@ public class ClusterFaultToleranceTwoPipelineIT {
 
     @SuppressWarnings("checkstyle:RegexpSingleline")
     @Test
-    @Disabled
     public void testTwoPipelineStreamJobRestoreIn2NodeMasterDown() throws 
ExecutionException, InterruptedException {
         String testCaseName = 
"testTwoPipelineStreamJobRestoreIn2NodeMasterDown";
         String testClusterName = 
"ClusterFaultToleranceTwoPipelineIT_testTwoPipelineStreamJobRestoreIn2NodeMasterDown";
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
index 9957b2437..455122e7d 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
@@ -215,12 +215,10 @@ public class CoordinatorService {
                 runningJobInfoIMap,
                 engineConfig);
 
-        
jobMaster.init(runningJobInfoIMap.get(jobId).getInitializationTimestamp());
         try {
-            jobMaster.initCheckPointManager();
+            
jobMaster.init(runningJobInfoIMap.get(jobId).getInitializationTimestamp(), 
true);
         } catch (Exception e) {
-            jobMaster.cancelJob();
-            throw new SeaTunnelEngineException(String.format("Job id %s init 
CheckPointManager failed", jobId), e);
+            throw new SeaTunnelEngineException(String.format("Job id %s init 
failed", jobId), e);
         }
 
         String jobFullName = jobMaster.getPhysicalPlan().getJobFullName();
@@ -334,7 +332,7 @@ public class CoordinatorService {
      * call by client to submit job
      */
     public PassiveCompletableFuture<Void> submitJob(long jobId, Data 
jobImmutableInformation) {
-        CompletableFuture<Void> voidCompletableFuture = new 
CompletableFuture<>();
+        CompletableFuture<Void> jobSubmitFuture = new CompletableFuture<>();
         JobMaster jobMaster = new JobMaster(jobImmutableInformation,
             this.nodeEngine,
             executorService,
@@ -349,27 +347,29 @@ public class CoordinatorService {
             try {
                 runningJobInfoIMap.put(jobId, new 
JobInfo(System.currentTimeMillis(), jobImmutableInformation));
                 runningJobMasterMap.put(jobId, jobMaster);
-                
jobMaster.init(runningJobInfoIMap.get(jobId).getInitializationTimestamp());
-                jobMaster.initCheckPointManager();
+                
jobMaster.init(runningJobInfoIMap.get(jobId).getInitializationTimestamp(), 
false);
+                // We specify that when init is complete, the submitJob is 
complete
+                jobSubmitFuture.complete(null);
             } catch (Throwable e) {
                 logger.severe(String.format("submit job %s error %s ", jobId, 
ExceptionUtils.getMessage(e)));
-                voidCompletableFuture.completeExceptionally(e);
-            } finally {
-                // We specify that when init is complete, the submitJob is 
complete
-                voidCompletableFuture.complete(null);
+                jobSubmitFuture.completeExceptionally(e);
             }
-
-            try {
-                jobMaster.run();
-            } finally {
-                // voidCompletableFuture will be cancelled when zeta master 
node shutdown to simulate master failure,
-                // don't update runningJobMasterMap is this case.
-                if (!jobMaster.getJobMasterCompleteFuture().isCancelled()) {
-                    runningJobMasterMap.remove(jobId);
+            if (!jobSubmitFuture.isCompletedExceptionally()) {
+                try {
+                    jobMaster.run();
+                } finally {
+                    // voidCompletableFuture will be cancelled when zeta 
master node shutdown to simulate master failure,
+                    // don't update runningJobMasterMap is this case.
+                    if (!jobMaster.getJobMasterCompleteFuture().isCancelled()) 
{
+                        runningJobMasterMap.remove(jobId);
+                    }
                 }
+            } else {
+                runningJobInfoIMap.remove(jobId);
+                runningJobMasterMap.remove(jobId);
             }
         });
-        return new PassiveCompletableFuture<>(voidCompletableFuture);
+        return new PassiveCompletableFuture<>(jobSubmitFuture);
     }
 
     public PassiveCompletableFuture<Void> savePoint(long jobId) {
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
index 195415c4b..fb032ddc2 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
@@ -375,7 +375,7 @@ public class TaskExecutionService implements 
DynamicMetricsProvider {
                     }
                 });
             });
-        } catch (Exception e){
+        } catch (Exception e) {
             logger.warning("The Imap acquisition failed due to the hazelcast 
node being offline or restarted, and will be retried next time", e);
         }
     }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
index 29b5328a2..522e0f121 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
@@ -205,6 +205,9 @@ public class PhysicalPlan {
      * @param subPlan subPlan
      */
     private void notifyCheckpointManagerPipelineEnd(@NonNull SubPlan subPlan) {
+        if (jobMaster.getCheckpointManager() == null) {
+            return;
+        }
         jobMaster.getCheckpointManager()
             .listenPipeline(subPlan.getPipelineLocation().getPipelineId(), 
subPlan.getPipelineState()).join();
     }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
index f21d49c26..e78cb0839 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
@@ -361,9 +361,7 @@ public class SubPlan {
                     reSchedulerPipelineFuture.join();
                 }
             } catch (Throwable e) {
-                LOGGER.severe(
-                    String.format("Restore pipeline %s error with exception: 
%s", pipelineFullName,
-                        ExceptionUtils.getMessage(e)));
+                LOGGER.severe(String.format("Restore pipeline %s error with 
exception: ", pipelineFullName), e);
                 cancelPipeline();
             }
         }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
index a5e9b78ab..4dfbe17a5 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
@@ -161,7 +161,7 @@ public class JobMaster {
         this.engineConfig = engineConfig;
     }
 
-    public void init(long initializationTimestamp) {
+    public void init(long initializationTimestamp, boolean restart) throws 
Exception {
         jobImmutableInformation = 
nodeEngine.getSerializationService().toObject(
             jobImmutableInformationData);
         LOGGER.info(String.format("Init JobMaster for Job %s (%s) ", 
jobImmutableInformation.getJobConfig().getName(),
@@ -185,10 +185,22 @@ public class JobMaster {
         this.physicalPlan = planTuple.f0();
         this.physicalPlan.setJobMaster(this);
         this.checkpointPlanMap = planTuple.f1();
+        Exception initException = null;
+        try {
+            this.initCheckPointManager();
+        } catch (Exception e) {
+            initException = e;
+        }
         this.initStateFuture();
+        if (initException != null) {
+            if (restart) {
+                cancelJob();
+            }
+            throw initException;
+        }
     }
 
-    public void initCheckPointManager() throws CheckpointStorageException {
+    private void initCheckPointManager() throws CheckpointStorageException {
         CheckpointConfig checkpointConfig = 
mergeEnvAndEngineConfig(engineConfig.getCheckpointConfig(),
             jobImmutableInformation.getJobConfig().getEnvOptions());
         this.checkpointManager = new CheckpointManager(
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java
index 3ad2c5dc2..f91b4897e 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java
@@ -21,6 +21,7 @@ import org.apache.seatunnel.engine.common.Constant;
 import org.apache.seatunnel.engine.common.runtime.ExecutionMode;
 import 
org.apache.seatunnel.engine.server.resourcemanager.opeartion.ReleaseSlotOperation;
 import 
org.apache.seatunnel.engine.server.resourcemanager.opeartion.ResetResourceOperation;
+import 
org.apache.seatunnel.engine.server.resourcemanager.opeartion.SyncWorkerProfileOperation;
 import 
org.apache.seatunnel.engine.server.resourcemanager.resource.ResourceProfile;
 import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;
 import org.apache.seatunnel.engine.server.resourcemanager.worker.WorkerProfile;
@@ -31,6 +32,7 @@ import com.hazelcast.cluster.Member;
 import com.hazelcast.internal.services.MembershipServiceEvent;
 import com.hazelcast.logging.ILogger;
 import com.hazelcast.logging.Logger;
+import com.hazelcast.spi.impl.InternalCompletableFuture;
 import com.hazelcast.spi.impl.NodeEngine;
 import com.hazelcast.spi.impl.operationservice.Operation;
 import com.hazelcast.spi.impl.operationservice.impl.InvocationFuture;
@@ -73,6 +75,10 @@ public abstract class AbstractResourceManager implements 
ResourceManager {
             List<Address> dead =
                 registerWorker.keySet().stream().filter(r -> 
!aliveWorker.contains(r)).collect(Collectors.toList());
             dead.forEach(registerWorker::remove);
+            List<InternalCompletableFuture<Void>> futures = 
aliveWorker.stream().map(worker -> sendToMember(new 
SyncWorkerProfileOperation(), worker).thenAccept(p -> {
+                registerWorker.put(worker, (WorkerProfile) p);
+            })).collect(Collectors.toList());
+            futures.forEach(InternalCompletableFuture::join);
         }
     }
 
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/opeartion/SyncWorkerProfileOperation.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/opeartion/SyncWorkerProfileOperation.java
new file mode 100644
index 000000000..c0981d7c9
--- /dev/null
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/opeartion/SyncWorkerProfileOperation.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.resourcemanager.opeartion;
+
+import org.apache.seatunnel.engine.server.SeaTunnelServer;
+import org.apache.seatunnel.engine.server.resourcemanager.worker.WorkerProfile;
+import 
org.apache.seatunnel.engine.server.serializable.ResourceDataSerializerHook;
+
+import com.hazelcast.nio.serialization.IdentifiedDataSerializable;
+import com.hazelcast.spi.impl.operationservice.Operation;
+
+public class SyncWorkerProfileOperation extends Operation implements 
IdentifiedDataSerializable {
+
+    private WorkerProfile result;
+
+    @Override
+    public void run() throws Exception {
+        SeaTunnelServer server = getService();
+        result = server.getSlotService().getWorkerProfile();
+    }
+
+    @Override
+    public String getServiceName() {
+        return SeaTunnelServer.SERVICE_NAME;
+    }
+
+    @Override
+    public Object getResponse() {
+        return result;
+    }
+
+    @Override
+    public int getFactoryId() {
+        return ResourceDataSerializerHook.FACTORY_ID;
+    }
+
+    @Override
+    public int getClassId() {
+        return ResourceDataSerializerHook.SYNC_SLOT_SERVICE_STATUS_TYPE;
+    }
+}
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/ResourceDataSerializerHook.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/ResourceDataSerializerHook.java
index 28710284c..2670bfea1 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/ResourceDataSerializerHook.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/ResourceDataSerializerHook.java
@@ -21,6 +21,7 @@ import 
org.apache.seatunnel.engine.common.serializeable.SeaTunnelFactoryIdConsta
 import 
org.apache.seatunnel.engine.server.resourcemanager.opeartion.ReleaseSlotOperation;
 import 
org.apache.seatunnel.engine.server.resourcemanager.opeartion.RequestSlotOperation;
 import 
org.apache.seatunnel.engine.server.resourcemanager.opeartion.ResetResourceOperation;
+import 
org.apache.seatunnel.engine.server.resourcemanager.opeartion.SyncWorkerProfileOperation;
 import 
org.apache.seatunnel.engine.server.resourcemanager.opeartion.WorkerHeartbeatOperation;
 import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;
 import org.apache.seatunnel.engine.server.resourcemanager.worker.WorkerProfile;
@@ -48,6 +49,8 @@ public class ResourceDataSerializerHook implements 
DataSerializerHook {
 
     public static final int SLOT_AND_WORKER_PROFILE = 7;
 
+    public static final int SYNC_SLOT_SERVICE_STATUS_TYPE = 8;
+
     public static final int FACTORY_ID = FactoryIdHelper.getFactoryId(
         SeaTunnelFactoryIdConstant.SEATUNNEL_RESOURCE_DATA_SERIALIZER_FACTORY,
         
SeaTunnelFactoryIdConstant.SEATUNNEL_RESOURCE_DATA_SERIALIZER_FACTORY_ID
@@ -82,6 +85,8 @@ public class ResourceDataSerializerHook implements 
DataSerializerHook {
                     return new SlotProfile();
                 case SLOT_AND_WORKER_PROFILE:
                     return new SlotAndWorkerProfile();
+                case SYNC_SLOT_SERVICE_STATUS_TYPE:
+                    return new SyncWorkerProfileOperation();
                 default:
                     throw new IllegalArgumentException("Unknown type id " + 
typeId);
             }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/slot/DefaultSlotService.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/slot/DefaultSlotService.java
index 23b308a6e..c6ab9eaa9 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/slot/DefaultSlotService.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/slot/DefaultSlotService.java
@@ -49,7 +49,6 @@ public class DefaultSlotService implements SlotService {
 
     private static final ILogger LOGGER = 
Logger.getLogger(DefaultSlotService.class);
     private static final long DEFAULT_HEARTBEAT_TIMEOUT = 2000;
-    private static final int HEARTBEAT_RETRY_TIME = 5;
     private final NodeEngineImpl nodeEngine;
 
     private AtomicReference<ResourceProfile> unassignedResource;
@@ -92,7 +91,7 @@ public class DefaultSlotService implements SlotService {
             try {
                 LOGGER.fine("start send heartbeat to resource manager, this 
address: " +
                     nodeEngine.getClusterService().getThisAddress());
-                sendToMaster(new 
WorkerHeartbeatOperation(toWorkerProfile())).join();
+                sendToMaster(new 
WorkerHeartbeatOperation(getWorkerProfile())).join();
             } catch (Exception e) {
                 LOGGER.warning("failed send heartbeat to resource manager, 
will retry later. this address: " +
                     nodeEngine.getClusterService().getThisAddress());
@@ -115,7 +114,6 @@ public class DefaultSlotService implements SlotService {
     @Override
     public synchronized SlotAndWorkerProfile requestSlot(long jobId, 
ResourceProfile resourceProfile) {
         initStatus = false;
-        LOGGER.info(String.format("received slot request, jobID: %d, resource 
profile: %s", jobId, resourceProfile));
         SlotProfile profile = selectBestMatchSlot(resourceProfile);
         if (profile != null) {
             profile.assign(jobId);
@@ -126,7 +124,8 @@ public class DefaultSlotService implements SlotService {
             contexts.computeIfAbsent(profile.getSlotID(),
                 p -> new SlotContext(profile.getSlotID(), 
taskExecutionService));
         }
-        return new SlotAndWorkerProfile(toWorkerProfile(), profile);
+        LOGGER.info(String.format("received slot request, jobID: %d, resource 
profile: %s, return: %s", jobId, resourceProfile, profile));
+        return new SlotAndWorkerProfile(getWorkerProfile(), profile);
     }
 
     public SlotContext getSlotContext(SlotProfile slotProfile) {
@@ -137,7 +136,7 @@ public class DefaultSlotService implements SlotService {
     }
 
     @Override
-    public void releaseSlot(long jobId, SlotProfile profile) {
+    public synchronized void releaseSlot(long jobId, SlotProfile profile) {
         LOGGER.info(String.format("received slot release request, jobID: %d, 
slot: %s", jobId, profile));
         if (!assignedSlots.containsKey(profile.getSlotID())) {
             throw new WrongTargetSlotException("Not exist this slot in slot 
service, slot profile: " + profile);
@@ -199,7 +198,8 @@ public class DefaultSlotService implements SlotService {
         }
     }
 
-    public WorkerProfile toWorkerProfile() {
+    @Override
+    public synchronized WorkerProfile getWorkerProfile() {
         WorkerProfile workerProfile = new 
WorkerProfile(nodeEngine.getThisAddress());
         workerProfile.setProfile(getNodeResource());
         workerProfile.setAssignedSlots(assignedSlots.values().toArray(new 
SlotProfile[0]));
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/slot/SlotService.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/slot/SlotService.java
index d39276b26..e69be545d 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/slot/SlotService.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/slot/SlotService.java
@@ -19,6 +19,7 @@ package org.apache.seatunnel.engine.server.service.slot;
 
 import 
org.apache.seatunnel.engine.server.resourcemanager.resource.ResourceProfile;
 import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;
+import org.apache.seatunnel.engine.server.resourcemanager.worker.WorkerProfile;
 
 public interface SlotService {
 
@@ -34,4 +35,5 @@ public interface SlotService {
 
     void close();
 
+    WorkerProfile getWorkerProfile();
 }

Reply via email to