This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new da8a37f0c [Improve][Zeta] job clean before JobMaster future complete
(#4087)
da8a37f0c is described below
commit da8a37f0c3c361152559ce80b1fedb96ad559436
Author: Eric <[email protected]>
AuthorDate: Wed Feb 8 19:16:59 2023 +0800
[Improve][Zeta] job clean before JobMaster future complete (#4087)
* improve job clean code
* Improve clean job
---
.../engine/server/CoordinatorService.java | 58 ++++++++--------------
.../seatunnel/engine/server/master/JobMaster.java | 39 +++++++++++++--
2 files changed, 55 insertions(+), 42 deletions(-)
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
index 5e68e661b..9957b2437 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
@@ -212,6 +212,7 @@ public class CoordinatorService {
runningJobStateIMap,
runningJobStateTimestampsIMap,
ownedSlotProfilesIMap,
+ runningJobInfoIMap,
engineConfig);
jobMaster.init(runningJobInfoIMap.get(jobId).getInitializationTimestamp());
@@ -227,7 +228,7 @@ public class CoordinatorService {
logger.info(String.format(
"The restore %s is in an end state %s, store the job info to
JobHistory and clear the job running time info",
jobFullName, jobStatus));
- removeJobIMap(jobMaster);
+ jobMaster.cleanJob();
return;
}
@@ -250,8 +251,11 @@ public class CoordinatorService {
jobMaster.cancelJob();
jobMaster.run();
} finally {
- // storage job state info to HistoryStorage
- onJobDone(jobMaster, jobId);
+ // voidCompletableFuture will be cancelled when zeta
master node shutdown to simulate master failure,
+ // don't update runningJobMasterMap is this case.
+ if (!jobMaster.getJobMasterCompleteFuture().isCancelled())
{
+ runningJobMasterMap.remove(jobId);
+ }
}
});
return;
@@ -265,7 +269,11 @@ public class CoordinatorService {
jobMaster.getPhysicalPlan().getPipelineList().forEach(SubPlan::restorePipelineState);
jobMaster.run();
} finally {
- onJobDone(jobMaster, jobId);
+ // voidCompletableFuture will be cancelled when zeta
master node shutdown to simulate master failure,
+ // don't update runningJobMasterMap is this case.
+ if (!jobMaster.getJobMasterCompleteFuture().isCancelled())
{
+ runningJobMasterMap.remove(jobId);
+ }
}
});
}
@@ -297,7 +305,6 @@ public class CoordinatorService {
try {
executorService.awaitTermination(20, TimeUnit.SECONDS);
- runningJobMasterMap = new ConcurrentHashMap<>();
} catch (InterruptedException e) {
throw new SeaTunnelEngineException("wait clean executor service
error", e);
}
@@ -335,7 +342,9 @@ public class CoordinatorService {
getJobHistoryService(),
runningJobStateIMap,
runningJobStateTimestampsIMap,
- ownedSlotProfilesIMap, engineConfig);
+ ownedSlotProfilesIMap,
+ runningJobInfoIMap,
+ engineConfig);
executorService.submit(() -> {
try {
runningJobInfoIMap.put(jobId, new
JobInfo(System.currentTimeMillis(), jobImmutableInformation));
@@ -353,7 +362,11 @@ public class CoordinatorService {
try {
jobMaster.run();
} finally {
- onJobDone(jobMaster, jobId);
+ // voidCompletableFuture will be cancelled when zeta master
node shutdown to simulate master failure,
+ // don't update runningJobMasterMap is this case.
+ if (!jobMaster.getJobMasterCompleteFuture().isCancelled()) {
+ runningJobMasterMap.remove(jobId);
+ }
}
});
return new PassiveCompletableFuture<>(voidCompletableFuture);
@@ -372,37 +385,6 @@ public class CoordinatorService {
return new PassiveCompletableFuture<>(voidCompletableFuture);
}
- private void onJobDone(JobMaster jobMaster, long jobId) {
- // storage job state and metrics to HistoryStorage
- jobHistoryService.storeJobInfo(jobId,
runningJobMasterMap.get(jobId).getJobDAGInfo());
- jobHistoryService.storeFinishedJobState(jobMaster);
- removeJobIMap(jobMaster);
- runningJobMasterMap.remove(jobId);
- }
-
- private void removeJobIMap(JobMaster jobMaster) {
- Long jobId = jobMaster.getJobImmutableInformation().getJobId();
- runningJobStateTimestampsIMap.remove(jobId);
-
- jobMaster.getPhysicalPlan().getPipelineList().forEach(pipeline -> {
- runningJobStateIMap.remove(pipeline.getPipelineLocation());
-
runningJobStateTimestampsIMap.remove(pipeline.getPipelineLocation());
- pipeline.getCoordinatorVertexList().forEach(coordinator -> {
- runningJobStateIMap.remove(coordinator.getTaskGroupLocation());
-
runningJobStateTimestampsIMap.remove(coordinator.getTaskGroupLocation());
- });
-
- pipeline.getPhysicalVertexList().forEach(task -> {
- runningJobStateIMap.remove(task.getTaskGroupLocation());
-
runningJobStateTimestampsIMap.remove(task.getTaskGroupLocation());
- });
- });
-
- // These should be deleted at the end.
- runningJobStateIMap.remove(jobId);
- runningJobInfoIMap.remove(jobId);
- }
-
public PassiveCompletableFuture<JobResult> waitForJobComplete(long jobId) {
JobMaster runningJobMaster = runningJobMasterMap.get(jobId);
if (runningJobMaster == null) {
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
index 06af3d84f..a5e9b78ab 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
@@ -36,6 +36,7 @@ import
org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
import org.apache.seatunnel.engine.core.dag.logical.LogicalDag;
import org.apache.seatunnel.engine.core.job.JobDAGInfo;
import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
+import org.apache.seatunnel.engine.core.job.JobInfo;
import org.apache.seatunnel.engine.core.job.JobResult;
import org.apache.seatunnel.engine.core.job.JobStatus;
import org.apache.seatunnel.engine.core.job.PipelineStatus;
@@ -134,6 +135,8 @@ public class JobMaster {
private Map<Integer, CheckpointPlan> checkpointPlanMap;
+ private final IMap<Long, JobInfo> runningJobInfoIMap;
+
public JobMaster(@NonNull Data jobImmutableInformationData,
@NonNull NodeEngine nodeEngine,
@NonNull ExecutorService executorService,
@@ -141,7 +144,9 @@ public class JobMaster {
@NonNull JobHistoryService jobHistoryService,
@NonNull IMap runningJobStateIMap,
@NonNull IMap runningJobStateTimestampsIMap,
- @NonNull IMap ownedSlotProfilesIMap, EngineConfig
engineConfig) {
+ @NonNull IMap ownedSlotProfilesIMap,
+ @NonNull IMap<Long, JobInfo> runningJobInfoIMap,
+ EngineConfig engineConfig) {
this.jobImmutableInformationData = jobImmutableInformationData;
this.nodeEngine = nodeEngine;
this.executorService = executorService;
@@ -152,6 +157,7 @@ public class JobMaster {
this.jobHistoryService = jobHistoryService;
this.runningJobStateIMap = runningJobStateIMap;
this.runningJobStateTimestampsIMap = runningJobStateTimestampsIMap;
+ this.runningJobInfoIMap = runningJobInfoIMap;
this.engineConfig = engineConfig;
}
@@ -217,10 +223,11 @@ public class JobMaster {
jobStatusFuture.whenComplete(withTryCatch(LOGGER, (v, t) -> {
// We need not handle t, Because we will not return t from
physicalPlan
if (JobStatus.FAILING.equals(v.getStatus())) {
- cleanJob();
physicalPlan.updateJobState(JobStatus.FAILING,
JobStatus.FAILED);
}
- jobMasterCompleteFuture.complete(new
JobResult(physicalPlan.getJobStatus(), v.getError()));
+ JobResult jobResult = new JobResult(physicalPlan.getJobStatus(),
v.getError());
+ cleanJob();
+ jobMasterCompleteFuture.complete(jobResult);
}));
}
@@ -256,6 +263,28 @@ public class JobMaster {
});
}
+ private void removeJobIMap() {
+ Long jobId = getJobImmutableInformation().getJobId();
+ runningJobStateTimestampsIMap.remove(jobId);
+
+ getPhysicalPlan().getPipelineList().forEach(pipeline -> {
+ runningJobStateIMap.remove(pipeline.getPipelineLocation());
+
runningJobStateTimestampsIMap.remove(pipeline.getPipelineLocation());
+ pipeline.getCoordinatorVertexList().forEach(coordinator -> {
+ runningJobStateIMap.remove(coordinator.getTaskGroupLocation());
+
runningJobStateTimestampsIMap.remove(coordinator.getTaskGroupLocation());
+ });
+
+ pipeline.getPhysicalVertexList().forEach(task -> {
+ runningJobStateIMap.remove(task.getTaskGroupLocation());
+
runningJobStateTimestampsIMap.remove(task.getTaskGroupLocation());
+ });
+ });
+
+ runningJobStateIMap.remove(jobId);
+ runningJobInfoIMap.remove(jobId);
+ }
+
public JobDAGInfo getJobDAGInfo() {
if (jobDAGInfo == null) {
jobDAGInfo = DAGUtils.getJobDAGInfo(logicalDag,
jobImmutableInformation, isPhysicalDAGIInfo);
@@ -277,7 +306,9 @@ public class JobMaster {
}
public void cleanJob() {
- // TODO Add some job clean operation
+ jobHistoryService.storeJobInfo(jobImmutableInformation.getJobId(),
getJobDAGInfo());
+ jobHistoryService.storeFinishedJobState(this);
+ removeJobIMap();
}
public Address queryTaskGroupAddress(long taskGroupId) {