This is an automated email from the ASF dual-hosted git repository.
ic4y pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 1ffa17450 [Bugfix] [Zeta] Fix Slot Status Not Sync After Restart When
All Node Down (#4047)
1ffa17450 is described below
commit 1ffa17450685c106dc37315049ad74ab77e1c7b6
Author: Hisoka <[email protected]>
AuthorDate: Thu Feb 9 16:43:35 2023 +0800
[Bugfix] [Zeta] Fix Slot Status Not Sync After Restart When All Node Down
(#4047)
* [Improve] [Zeta] Fix Slot Status Not Sync After Restart When All Node Down
---
.../engine/e2e/ClusterFaultToleranceIT.java | 8 +---
.../e2e/ClusterFaultToleranceTwoPipelineIT.java | 7 +--
.../engine/server/CoordinatorService.java | 40 ++++++++--------
.../engine/server/TaskExecutionService.java | 2 +-
.../engine/server/dag/physical/PhysicalPlan.java | 3 ++
.../engine/server/dag/physical/SubPlan.java | 4 +-
.../seatunnel/engine/server/master/JobMaster.java | 16 ++++++-
.../resourcemanager/AbstractResourceManager.java | 6 +++
.../opeartion/SyncWorkerProfileOperation.java | 56 ++++++++++++++++++++++
.../serializable/ResourceDataSerializerHook.java | 5 ++
.../server/service/slot/DefaultSlotService.java | 12 ++---
.../engine/server/service/slot/SlotService.java | 2 +
12 files changed, 116 insertions(+), 45 deletions(-)
diff --git
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceIT.java
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceIT.java
index 6efd298f4..90e721dad 100644
---
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceIT.java
+++
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceIT.java
@@ -54,6 +54,7 @@ import java.util.concurrent.TimeUnit;
* Cluster fault tolerance test. Test the job recovery capability and data
consistency assurance capability in case of cluster node failure
*/
@Slf4j
+@Disabled
public class ClusterFaultToleranceIT {
public static final String DYNAMIC_TEST_CASE_NAME =
"dynamic_test_case_name";
@@ -66,7 +67,6 @@ public class ClusterFaultToleranceIT {
@SuppressWarnings("checkstyle:RegexpSingleline")
@Test
- @Disabled
public void testBatchJobRunOkIn2Node() throws ExecutionException,
InterruptedException {
String testCaseName = "testBatchJobRunOkIn2Node";
String testClusterName =
"ClusterFaultToleranceIT_testBatchJobRunOkIn2Node";
@@ -167,7 +167,6 @@ public class ClusterFaultToleranceIT {
@SuppressWarnings("checkstyle:RegexpSingleline")
@Test
- @Disabled
public void testStreamJobRunOkIn2Node() throws ExecutionException,
InterruptedException {
String testCaseName = "testStreamJobRunOkIn2Node";
String testClusterName =
"ClusterFaultToleranceIT_testStreamJobRunOkIn2Node";
@@ -239,7 +238,6 @@ public class ClusterFaultToleranceIT {
@SuppressWarnings("checkstyle:RegexpSingleline")
@Test
- @Disabled
public void testBatchJobRestoreIn2NodeWorkerDown() throws
ExecutionException, InterruptedException {
String testCaseName = "testBatchJobRestoreIn2NodeWorkerDown";
String testClusterName =
"ClusterFaultToleranceIT_testBatchJobRestoreIn2NodeWorkerDown";
@@ -313,7 +311,6 @@ public class ClusterFaultToleranceIT {
@SuppressWarnings("checkstyle:RegexpSingleline")
@Test
- @Disabled
public void testStreamJobRestoreIn2NodeWorkerDown() throws
ExecutionException, InterruptedException {
String testCaseName = "testStreamJobRestoreIn2NodeWorkerDown";
String testClusterName =
"ClusterFaultToleranceIT_testStreamJobRestoreIn2NodeWorkerDown";
@@ -402,7 +399,6 @@ public class ClusterFaultToleranceIT {
@SuppressWarnings("checkstyle:RegexpSingleline")
@Test
- @Disabled
public void testBatchJobRestoreIn2NodeMasterDown() throws
ExecutionException, InterruptedException {
String testCaseName = "testBatchJobRestoreIn2NodeMasterDown";
String testClusterName =
"ClusterFaultToleranceIT_testBatchJobRestoreIn2NodeMasterDown";
@@ -476,7 +472,6 @@ public class ClusterFaultToleranceIT {
@SuppressWarnings("checkstyle:RegexpSingleline")
@Test
- @Disabled
public void testStreamJobRestoreIn2NodeMasterDown() throws
ExecutionException, InterruptedException {
String testCaseName = "testStreamJobRestoreIn2NodeMasterDown";
String testClusterName =
"ClusterFaultToleranceIT_testStreamJobRestoreIn2NodeMasterDown";
@@ -564,7 +559,6 @@ public class ClusterFaultToleranceIT {
@SuppressWarnings("checkstyle:RegexpSingleline")
@Test
- @Disabled
public void testStreamJobRestoreInAllNodeDown() throws ExecutionException,
InterruptedException {
String testCaseName = "testStreamJobRestoreInAllNodeDown";
String testClusterName =
"ClusterFaultToleranceIT_testStreamJobRestoreInAllNodeDown";
diff --git
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceTwoPipelineIT.java
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceTwoPipelineIT.java
index 2cf2d6e60..9edebf3ea 100644
---
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceTwoPipelineIT.java
+++
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceTwoPipelineIT.java
@@ -53,6 +53,7 @@ import java.util.concurrent.TimeUnit;
* Cluster fault tolerance test. Test the job which have two pipelines can
recovery capability and data consistency assurance capability in case of
cluster node failure
*/
@Slf4j
+@Disabled
public class ClusterFaultToleranceTwoPipelineIT {
public static final String TEST_TEMPLATE_FILE_NAME =
"cluster_batch_fake_to_localfile_two_pipeline_template.conf";
@@ -67,7 +68,6 @@ public class ClusterFaultToleranceTwoPipelineIT {
@SuppressWarnings("checkstyle:RegexpSingleline")
@Test
- @Disabled
public void testTwoPipelineBatchJobRunOkIn2Node() throws
ExecutionException, InterruptedException {
String testCaseName = "testTwoPipelineBatchJobRunOkIn2Node";
String testClusterName =
"ClusterFaultToleranceTwoPipelineIT_testTwoPipelineBatchJobRunOkIn2Node";
@@ -170,7 +170,6 @@ public class ClusterFaultToleranceTwoPipelineIT {
@SuppressWarnings("checkstyle:RegexpSingleline")
@Test
- @Disabled
public void testTwoPipelineStreamJobRunOkIn2Node() throws
ExecutionException, InterruptedException {
String testCaseName = "testTwoPipelineStreamJobRunOkIn2Node";
String testClusterName =
"ClusterFaultToleranceTwoPipelineIT_testTwoPipelineStreamJobRunOkIn2Node";
@@ -242,7 +241,6 @@ public class ClusterFaultToleranceTwoPipelineIT {
@SuppressWarnings("checkstyle:RegexpSingleline")
@Test
- @Disabled
public void testTwoPipelineBatchJobRestoreIn2NodeWorkerDown() throws
ExecutionException, InterruptedException {
String testCaseName =
"testTwoPipelineBatchJobRestoreIn2NodeWorkerDown";
String testClusterName =
"ClusterFaultToleranceTwoPipelineIT_testTwoPipelineBatchJobRestoreIn2NodeWorkerDown";
@@ -316,7 +314,6 @@ public class ClusterFaultToleranceTwoPipelineIT {
@SuppressWarnings("checkstyle:RegexpSingleline")
@Test
- @Disabled
public void testTwoPipelineStreamJobRestoreIn2NodeWorkerDown() throws
ExecutionException, InterruptedException {
String testCaseName =
"testTwoPipelineStreamJobRestoreIn2NodeWorkerDown";
String testClusterName =
"ClusterFaultToleranceTwoPipelineIT_testTwoPipelineStreamJobRestoreIn2NodeWorkerDown";
@@ -407,7 +404,6 @@ public class ClusterFaultToleranceTwoPipelineIT {
@SuppressWarnings("checkstyle:RegexpSingleline")
@Test
- @Disabled
public void testTwoPipelineBatchJobRestoreIn2NodeMasterDown() throws
ExecutionException, InterruptedException {
String testCaseName =
"testTwoPipelineBatchJobRestoreIn2NodeMasterDown";
String testClusterName =
"ClusterFaultToleranceTwoPipelineIT_testTwoPipelineBatchJobRestoreIn2NodeMasterDown";
@@ -481,7 +477,6 @@ public class ClusterFaultToleranceTwoPipelineIT {
@SuppressWarnings("checkstyle:RegexpSingleline")
@Test
- @Disabled
public void testTwoPipelineStreamJobRestoreIn2NodeMasterDown() throws
ExecutionException, InterruptedException {
String testCaseName =
"testTwoPipelineStreamJobRestoreIn2NodeMasterDown";
String testClusterName =
"ClusterFaultToleranceTwoPipelineIT_testTwoPipelineStreamJobRestoreIn2NodeMasterDown";
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
index 9957b2437..455122e7d 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
@@ -215,12 +215,10 @@ public class CoordinatorService {
runningJobInfoIMap,
engineConfig);
-
jobMaster.init(runningJobInfoIMap.get(jobId).getInitializationTimestamp());
try {
- jobMaster.initCheckPointManager();
+
jobMaster.init(runningJobInfoIMap.get(jobId).getInitializationTimestamp(),
true);
} catch (Exception e) {
- jobMaster.cancelJob();
- throw new SeaTunnelEngineException(String.format("Job id %s init
CheckPointManager failed", jobId), e);
+ throw new SeaTunnelEngineException(String.format("Job id %s init
failed", jobId), e);
}
String jobFullName = jobMaster.getPhysicalPlan().getJobFullName();
@@ -334,7 +332,7 @@ public class CoordinatorService {
* call by client to submit job
*/
public PassiveCompletableFuture<Void> submitJob(long jobId, Data
jobImmutableInformation) {
- CompletableFuture<Void> voidCompletableFuture = new
CompletableFuture<>();
+ CompletableFuture<Void> jobSubmitFuture = new CompletableFuture<>();
JobMaster jobMaster = new JobMaster(jobImmutableInformation,
this.nodeEngine,
executorService,
@@ -349,27 +347,29 @@ public class CoordinatorService {
try {
runningJobInfoIMap.put(jobId, new
JobInfo(System.currentTimeMillis(), jobImmutableInformation));
runningJobMasterMap.put(jobId, jobMaster);
-
jobMaster.init(runningJobInfoIMap.get(jobId).getInitializationTimestamp());
- jobMaster.initCheckPointManager();
+
jobMaster.init(runningJobInfoIMap.get(jobId).getInitializationTimestamp(),
false);
+ // We specify that when init is complete, the submitJob is
complete
+ jobSubmitFuture.complete(null);
} catch (Throwable e) {
logger.severe(String.format("submit job %s error %s ", jobId,
ExceptionUtils.getMessage(e)));
- voidCompletableFuture.completeExceptionally(e);
- } finally {
- // We specify that when init is complete, the submitJob is
complete
- voidCompletableFuture.complete(null);
+ jobSubmitFuture.completeExceptionally(e);
}
-
- try {
- jobMaster.run();
- } finally {
- // voidCompletableFuture will be cancelled when zeta master
node shutdown to simulate master failure,
- // don't update runningJobMasterMap is this case.
- if (!jobMaster.getJobMasterCompleteFuture().isCancelled()) {
- runningJobMasterMap.remove(jobId);
+ if (!jobSubmitFuture.isCompletedExceptionally()) {
+ try {
+ jobMaster.run();
+ } finally {
+ // voidCompletableFuture will be cancelled when zeta
master node shutdown to simulate master failure,
+ // don't update runningJobMasterMap is this case.
+ if (!jobMaster.getJobMasterCompleteFuture().isCancelled())
{
+ runningJobMasterMap.remove(jobId);
+ }
}
+ } else {
+ runningJobInfoIMap.remove(jobId);
+ runningJobMasterMap.remove(jobId);
}
});
- return new PassiveCompletableFuture<>(voidCompletableFuture);
+ return new PassiveCompletableFuture<>(jobSubmitFuture);
}
public PassiveCompletableFuture<Void> savePoint(long jobId) {
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
index 195415c4b..fb032ddc2 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
@@ -375,7 +375,7 @@ public class TaskExecutionService implements
DynamicMetricsProvider {
}
});
});
- } catch (Exception e){
+ } catch (Exception e) {
logger.warning("The Imap acquisition failed due to the hazelcast
node being offline or restarted, and will be retried next time", e);
}
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
index 29b5328a2..522e0f121 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
@@ -205,6 +205,9 @@ public class PhysicalPlan {
* @param subPlan subPlan
*/
private void notifyCheckpointManagerPipelineEnd(@NonNull SubPlan subPlan) {
+ if (jobMaster.getCheckpointManager() == null) {
+ return;
+ }
jobMaster.getCheckpointManager()
.listenPipeline(subPlan.getPipelineLocation().getPipelineId(),
subPlan.getPipelineState()).join();
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
index f21d49c26..e78cb0839 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
@@ -361,9 +361,7 @@ public class SubPlan {
reSchedulerPipelineFuture.join();
}
} catch (Throwable e) {
- LOGGER.severe(
- String.format("Restore pipeline %s error with exception:
%s", pipelineFullName,
- ExceptionUtils.getMessage(e)));
+ LOGGER.severe(String.format("Restore pipeline %s error with
exception: ", pipelineFullName), e);
cancelPipeline();
}
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
index a5e9b78ab..4dfbe17a5 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
@@ -161,7 +161,7 @@ public class JobMaster {
this.engineConfig = engineConfig;
}
- public void init(long initializationTimestamp) {
+ public void init(long initializationTimestamp, boolean restart) throws
Exception {
jobImmutableInformation =
nodeEngine.getSerializationService().toObject(
jobImmutableInformationData);
LOGGER.info(String.format("Init JobMaster for Job %s (%s) ",
jobImmutableInformation.getJobConfig().getName(),
@@ -185,10 +185,22 @@ public class JobMaster {
this.physicalPlan = planTuple.f0();
this.physicalPlan.setJobMaster(this);
this.checkpointPlanMap = planTuple.f1();
+ Exception initException = null;
+ try {
+ this.initCheckPointManager();
+ } catch (Exception e) {
+ initException = e;
+ }
this.initStateFuture();
+ if (initException != null) {
+ if (restart) {
+ cancelJob();
+ }
+ throw initException;
+ }
}
- public void initCheckPointManager() throws CheckpointStorageException {
+ private void initCheckPointManager() throws CheckpointStorageException {
CheckpointConfig checkpointConfig =
mergeEnvAndEngineConfig(engineConfig.getCheckpointConfig(),
jobImmutableInformation.getJobConfig().getEnvOptions());
this.checkpointManager = new CheckpointManager(
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java
index 3ad2c5dc2..f91b4897e 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java
@@ -21,6 +21,7 @@ import org.apache.seatunnel.engine.common.Constant;
import org.apache.seatunnel.engine.common.runtime.ExecutionMode;
import
org.apache.seatunnel.engine.server.resourcemanager.opeartion.ReleaseSlotOperation;
import
org.apache.seatunnel.engine.server.resourcemanager.opeartion.ResetResourceOperation;
+import
org.apache.seatunnel.engine.server.resourcemanager.opeartion.SyncWorkerProfileOperation;
import
org.apache.seatunnel.engine.server.resourcemanager.resource.ResourceProfile;
import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;
import org.apache.seatunnel.engine.server.resourcemanager.worker.WorkerProfile;
@@ -31,6 +32,7 @@ import com.hazelcast.cluster.Member;
import com.hazelcast.internal.services.MembershipServiceEvent;
import com.hazelcast.logging.ILogger;
import com.hazelcast.logging.Logger;
+import com.hazelcast.spi.impl.InternalCompletableFuture;
import com.hazelcast.spi.impl.NodeEngine;
import com.hazelcast.spi.impl.operationservice.Operation;
import com.hazelcast.spi.impl.operationservice.impl.InvocationFuture;
@@ -73,6 +75,10 @@ public abstract class AbstractResourceManager implements
ResourceManager {
List<Address> dead =
registerWorker.keySet().stream().filter(r ->
!aliveWorker.contains(r)).collect(Collectors.toList());
dead.forEach(registerWorker::remove);
+ List<InternalCompletableFuture<Void>> futures =
aliveWorker.stream().map(worker -> sendToMember(new
SyncWorkerProfileOperation(), worker).thenAccept(p -> {
+ registerWorker.put(worker, (WorkerProfile) p);
+ })).collect(Collectors.toList());
+ futures.forEach(InternalCompletableFuture::join);
}
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/opeartion/SyncWorkerProfileOperation.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/opeartion/SyncWorkerProfileOperation.java
new file mode 100644
index 000000000..c0981d7c9
--- /dev/null
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/opeartion/SyncWorkerProfileOperation.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.resourcemanager.opeartion;
+
+import org.apache.seatunnel.engine.server.SeaTunnelServer;
+import org.apache.seatunnel.engine.server.resourcemanager.worker.WorkerProfile;
+import
org.apache.seatunnel.engine.server.serializable.ResourceDataSerializerHook;
+
+import com.hazelcast.nio.serialization.IdentifiedDataSerializable;
+import com.hazelcast.spi.impl.operationservice.Operation;
+
+public class SyncWorkerProfileOperation extends Operation implements
IdentifiedDataSerializable {
+
+ private WorkerProfile result;
+
+ @Override
+ public void run() throws Exception {
+ SeaTunnelServer server = getService();
+ result = server.getSlotService().getWorkerProfile();
+ }
+
+ @Override
+ public String getServiceName() {
+ return SeaTunnelServer.SERVICE_NAME;
+ }
+
+ @Override
+ public Object getResponse() {
+ return result;
+ }
+
+ @Override
+ public int getFactoryId() {
+ return ResourceDataSerializerHook.FACTORY_ID;
+ }
+
+ @Override
+ public int getClassId() {
+ return ResourceDataSerializerHook.SYNC_SLOT_SERVICE_STATUS_TYPE;
+ }
+}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/ResourceDataSerializerHook.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/ResourceDataSerializerHook.java
index 28710284c..2670bfea1 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/ResourceDataSerializerHook.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/ResourceDataSerializerHook.java
@@ -21,6 +21,7 @@ import
org.apache.seatunnel.engine.common.serializeable.SeaTunnelFactoryIdConsta
import
org.apache.seatunnel.engine.server.resourcemanager.opeartion.ReleaseSlotOperation;
import
org.apache.seatunnel.engine.server.resourcemanager.opeartion.RequestSlotOperation;
import
org.apache.seatunnel.engine.server.resourcemanager.opeartion.ResetResourceOperation;
+import
org.apache.seatunnel.engine.server.resourcemanager.opeartion.SyncWorkerProfileOperation;
import
org.apache.seatunnel.engine.server.resourcemanager.opeartion.WorkerHeartbeatOperation;
import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;
import org.apache.seatunnel.engine.server.resourcemanager.worker.WorkerProfile;
@@ -48,6 +49,8 @@ public class ResourceDataSerializerHook implements
DataSerializerHook {
public static final int SLOT_AND_WORKER_PROFILE = 7;
+ public static final int SYNC_SLOT_SERVICE_STATUS_TYPE = 8;
+
public static final int FACTORY_ID = FactoryIdHelper.getFactoryId(
SeaTunnelFactoryIdConstant.SEATUNNEL_RESOURCE_DATA_SERIALIZER_FACTORY,
SeaTunnelFactoryIdConstant.SEATUNNEL_RESOURCE_DATA_SERIALIZER_FACTORY_ID
@@ -82,6 +85,8 @@ public class ResourceDataSerializerHook implements
DataSerializerHook {
return new SlotProfile();
case SLOT_AND_WORKER_PROFILE:
return new SlotAndWorkerProfile();
+ case SYNC_SLOT_SERVICE_STATUS_TYPE:
+ return new SyncWorkerProfileOperation();
default:
throw new IllegalArgumentException("Unknown type id " +
typeId);
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/slot/DefaultSlotService.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/slot/DefaultSlotService.java
index 23b308a6e..c6ab9eaa9 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/slot/DefaultSlotService.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/slot/DefaultSlotService.java
@@ -49,7 +49,6 @@ public class DefaultSlotService implements SlotService {
private static final ILogger LOGGER =
Logger.getLogger(DefaultSlotService.class);
private static final long DEFAULT_HEARTBEAT_TIMEOUT = 2000;
- private static final int HEARTBEAT_RETRY_TIME = 5;
private final NodeEngineImpl nodeEngine;
private AtomicReference<ResourceProfile> unassignedResource;
@@ -92,7 +91,7 @@ public class DefaultSlotService implements SlotService {
try {
LOGGER.fine("start send heartbeat to resource manager, this
address: " +
nodeEngine.getClusterService().getThisAddress());
- sendToMaster(new
WorkerHeartbeatOperation(toWorkerProfile())).join();
+ sendToMaster(new
WorkerHeartbeatOperation(getWorkerProfile())).join();
} catch (Exception e) {
LOGGER.warning("failed send heartbeat to resource manager,
will retry later. this address: " +
nodeEngine.getClusterService().getThisAddress());
@@ -115,7 +114,6 @@ public class DefaultSlotService implements SlotService {
@Override
public synchronized SlotAndWorkerProfile requestSlot(long jobId,
ResourceProfile resourceProfile) {
initStatus = false;
- LOGGER.info(String.format("received slot request, jobID: %d, resource
profile: %s", jobId, resourceProfile));
SlotProfile profile = selectBestMatchSlot(resourceProfile);
if (profile != null) {
profile.assign(jobId);
@@ -126,7 +124,8 @@ public class DefaultSlotService implements SlotService {
contexts.computeIfAbsent(profile.getSlotID(),
p -> new SlotContext(profile.getSlotID(),
taskExecutionService));
}
- return new SlotAndWorkerProfile(toWorkerProfile(), profile);
+ LOGGER.info(String.format("received slot request, jobID: %d, resource
profile: %s, return: %s", jobId, resourceProfile, profile));
+ return new SlotAndWorkerProfile(getWorkerProfile(), profile);
}
public SlotContext getSlotContext(SlotProfile slotProfile) {
@@ -137,7 +136,7 @@ public class DefaultSlotService implements SlotService {
}
@Override
- public void releaseSlot(long jobId, SlotProfile profile) {
+ public synchronized void releaseSlot(long jobId, SlotProfile profile) {
LOGGER.info(String.format("received slot release request, jobID: %d,
slot: %s", jobId, profile));
if (!assignedSlots.containsKey(profile.getSlotID())) {
throw new WrongTargetSlotException("Not exist this slot in slot
service, slot profile: " + profile);
@@ -199,7 +198,8 @@ public class DefaultSlotService implements SlotService {
}
}
- public WorkerProfile toWorkerProfile() {
+ @Override
+ public synchronized WorkerProfile getWorkerProfile() {
WorkerProfile workerProfile = new
WorkerProfile(nodeEngine.getThisAddress());
workerProfile.setProfile(getNodeResource());
workerProfile.setAssignedSlots(assignedSlots.values().toArray(new
SlotProfile[0]));
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/slot/SlotService.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/slot/SlotService.java
index d39276b26..e69be545d 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/slot/SlotService.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/slot/SlotService.java
@@ -19,6 +19,7 @@ package org.apache.seatunnel.engine.server.service.slot;
import
org.apache.seatunnel.engine.server.resourcemanager.resource.ResourceProfile;
import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;
+import org.apache.seatunnel.engine.server.resourcemanager.worker.WorkerProfile;
public interface SlotService {
@@ -34,4 +35,5 @@ public interface SlotService {
void close();
+ WorkerProfile getWorkerProfile();
}