This is an automated email from the ASF dual-hosted git repository.
gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 996c4741f put imap names to constant (#3036)
996c4741f is described below
commit 996c4741f799e1bb8726f2eb1b35282ab0932505
Author: Eric <[email protected]>
AuthorDate: Sun Oct 9 22:26:18 2022 +0800
put imap names to constant (#3036)
---
.../main/java/org/apache/seatunnel/engine/common/Constant.java | 10 ++++++++++
.../org/apache/seatunnel/engine/server/CoordinatorService.java | 9 +++++----
.../org/apache/seatunnel/engine/server/SeaTunnelServer.java | 3 ++-
.../engine/server/resourcemanager/AbstractResourceManager.java | 3 ++-
4 files changed, 19 insertions(+), 6 deletions(-)
diff --git
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/Constant.java
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/Constant.java
index 6aab140cd..ba02b7f4f 100644
---
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/Constant.java
+++
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/Constant.java
@@ -39,4 +39,14 @@ public class Constant {
public static final int OPERATION_RETRY_TIME = 5;
public static final int OPERATION_RETRY_SLEEP = 2000;
+
+ public static final String IMAP_RUNNING_JOB_INFO = "runningJobInfo";
+
+ public static final String IMAP_RUNNING_JOB_STATE = "runningJobState";
+
+ public static final String IMAP_STATE_TIMESTAMPS = "stateTimestamps";
+
+ public static final String IMAP_OWNED_SLOT_PROFILES =
"ownedSlotProfilesIMap";
+
+ public static final String IMAP_RESOURCE_MANAGER_REGISTER_WORKER =
"ResourceManager_RegisterWorker";
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
index 14b61a91d..5af9d47c9 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
@@ -18,6 +18,7 @@
package org.apache.seatunnel.engine.server;
import org.apache.seatunnel.common.utils.ExceptionUtils;
+import org.apache.seatunnel.engine.common.Constant;
import org.apache.seatunnel.engine.common.exception.JobException;
import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
@@ -139,10 +140,10 @@ public class CoordinatorService {
// 4. If runningJobStateIMap.get(jobId) != null and the value is CANCELING
or RUNNING. We need recover the JobMaster
// from runningJobStateIMap and then waiting for it complete.
private void initCoordinatorService() {
- runningJobInfoIMap =
nodeEngine.getHazelcastInstance().getMap("runningJobInfo");
- runningJobStateIMap =
nodeEngine.getHazelcastInstance().getMap("runningJobState");
- runningJobStateTimestampsIMap =
nodeEngine.getHazelcastInstance().getMap("stateTimestamps");
- ownedSlotProfilesIMap =
nodeEngine.getHazelcastInstance().getMap("ownedSlotProfilesIMap");
+ runningJobInfoIMap =
nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_RUNNING_JOB_INFO);
+ runningJobStateIMap =
nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_RUNNING_JOB_STATE);
+ runningJobStateTimestampsIMap =
nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_STATE_TIMESTAMPS);
+ ownedSlotProfilesIMap =
nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_OWNED_SLOT_PROFILES);
List<CompletableFuture<Void>> collect =
runningJobInfoIMap.entrySet().stream().map(entry -> {
return CompletableFuture.runAsync(() ->
restoreJobFromMasterActiveSwitch(entry.getKey(), entry.getValue()),
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
index ead141dac..fe7ce55e0 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
@@ -17,6 +17,7 @@
package org.apache.seatunnel.engine.server;
+import org.apache.seatunnel.engine.common.Constant;
import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
import org.apache.seatunnel.engine.server.execution.ExecutionState;
@@ -181,7 +182,7 @@ public class SeaTunnelServer implements ManagedService,
MembershipAwareService,
* @return
*/
public boolean taskIsEnded(@NonNull TaskGroupLocation taskGroupLocation) {
- IMap<Object, Object> runningJobState =
nodeEngine.getHazelcastInstance().getMap("runningJobState");
+ IMap<Object, Object> runningJobState =
nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_RUNNING_JOB_STATE);
if (runningJobState == null) {
return false;
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java
index c10d928db..f4a504b47 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java
@@ -17,6 +17,7 @@
package org.apache.seatunnel.engine.server.resourcemanager;
+import org.apache.seatunnel.engine.common.Constant;
import org.apache.seatunnel.engine.common.runtime.ExecutionMode;
import org.apache.seatunnel.engine.server.SeaTunnelServer;
import
org.apache.seatunnel.engine.server.resourcemanager.opeartion.ReleaseSlotOperation;
@@ -55,7 +56,7 @@ public abstract class AbstractResourceManager implements
ResourceManager {
private final ExecutionMode mode = ExecutionMode.LOCAL;
public AbstractResourceManager(NodeEngine nodeEngine) {
- this.registerWorker =
nodeEngine.getHazelcastInstance().getMap("ResourceManager_RegisterWorker");
+ this.registerWorker =
nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_RESOURCE_MANAGER_REGISTER_WORKER);
this.nodeEngine = nodeEngine;
}