This is an automated email from the ASF dual-hosted git repository.
gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 3536a7dad fix
https://github.com/apache/incubator-seatunnel/issues/4042 (#4045)
3536a7dad is described below
commit 3536a7dad9c2f0b7814c3f970a8948897d118588
Author: Eric <[email protected]>
AuthorDate: Fri Feb 3 15:50:46 2023 +0800
fix https://github.com/apache/incubator-seatunnel/issues/4042 (#4045)
---
.../engine/server/CoordinatorService.java | 7 +++--
.../engine/server/dag/physical/PhysicalPlan.java | 7 +++--
.../engine/server/dag/physical/SubPlan.java | 4 ++-
.../seatunnel/engine/server/master/JobMaster.java | 35 ++++++++++++++--------
4 files changed, 36 insertions(+), 17 deletions(-)
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
index f5e502201..5e68e661b 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
@@ -214,10 +214,12 @@ public class CoordinatorService {
ownedSlotProfilesIMap,
engineConfig);
+
jobMaster.init(runningJobInfoIMap.get(jobId).getInitializationTimestamp());
try {
-
jobMaster.init(runningJobInfoIMap.get(jobId).getInitializationTimestamp());
+ jobMaster.initCheckPointManager();
} catch (Exception e) {
- throw new SeaTunnelEngineException(String.format("Job id %s init
JobMaster failed", jobId), e);
+ jobMaster.cancelJob();
+ throw new SeaTunnelEngineException(String.format("Job id %s init
CheckPointManager failed", jobId), e);
}
String jobFullName = jobMaster.getPhysicalPlan().getJobFullName();
@@ -339,6 +341,7 @@ public class CoordinatorService {
runningJobInfoIMap.put(jobId, new
JobInfo(System.currentTimeMillis(), jobImmutableInformation));
runningJobMasterMap.put(jobId, jobMaster);
jobMaster.init(runningJobInfoIMap.get(jobId).getInitializationTimestamp());
+ jobMaster.initCheckPointManager();
} catch (Throwable e) {
logger.severe(String.format("submit job %s error %s ", jobId,
ExceptionUtils.getMessage(e)));
voidCompletableFuture.completeExceptionally(e);
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
index dee54f555..29b5328a2 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
@@ -144,8 +144,11 @@ public class PhysicalPlan {
future.thenAcceptAsync(pipelineState -> {
try {
// Notify checkpoint manager when the pipeline end, Whether
the pipeline will be restarted or not
- jobMaster.getCheckpointManager()
-
.listenPipelineRetry(subPlan.getPipelineLocation().getPipelineId(),
subPlan.getPipelineState()).join();
+ if (jobMaster.getCheckpointManager() != null) {
+ jobMaster.getCheckpointManager()
+
.listenPipelineRetry(subPlan.getPipelineLocation().getPipelineId(),
subPlan.getPipelineState())
+ .join();
+ }
if
(PipelineStatus.CANCELED.equals(pipelineState.getPipelineStatus())) {
if (canRestorePipeline(subPlan)) {
subPlan.restorePipeline();
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
index b88224458..f21d49c26 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
@@ -262,7 +262,9 @@ public class SubPlan {
}
private void cancelCheckpointCoordinator() {
- jobMaster.getCheckpointManager().listenPipelineRetry(pipelineId,
PipelineStatus.CANCELING).join();
+ if (jobMaster.getCheckpointManager() != null) {
+ jobMaster.getCheckpointManager().listenPipelineRetry(pipelineId,
PipelineStatus.CANCELING).join();
+ }
}
private void cancelPipelineTasks() {
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
index 0e967a2bc..c9352461f 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
@@ -25,6 +25,7 @@ import org.apache.seatunnel.api.env.EnvCommonOptions;
import org.apache.seatunnel.common.utils.ExceptionUtils;
import org.apache.seatunnel.common.utils.RetryUtils;
import org.apache.seatunnel.common.utils.SeaTunnelException;
+import
org.apache.seatunnel.engine.checkpoint.storage.exception.CheckpointStorageException;
import org.apache.seatunnel.engine.common.Constant;
import org.apache.seatunnel.engine.common.config.EngineConfig;
import org.apache.seatunnel.engine.common.config.server.CheckpointConfig;
@@ -130,6 +131,8 @@ public class JobMaster {
private boolean isRunning = true;
+ private Map<Integer, CheckpointPlan> checkpointPlanMap;
+
public JobMaster(@NonNull Data jobImmutableInformationData,
@NonNull NodeEngine nodeEngine,
@NonNull ExecutorService executorService,
@@ -151,7 +154,7 @@ public class JobMaster {
this.engineConfig = engineConfig;
}
- public void init(long initializationTimestamp) throws Exception {
+ public void init(long initializationTimestamp) {
jobImmutableInformation =
nodeEngine.getSerializationService().toObject(
jobImmutableInformationData);
LOGGER.info(String.format("Init JobMaster for Job %s (%s) ",
jobImmutableInformation.getJobConfig().getName(),
@@ -162,8 +165,6 @@ public class JobMaster {
classLoader = new
SeaTunnelChildFirstClassLoader(jobImmutableInformation.getPluginJarsUrls());
logicalDag =
CustomClassLoadedObject.deserializeWithCustomClassLoader(nodeEngine.getSerializationService(),
classLoader, jobImmutableInformation.getLogicalDag());
- CheckpointConfig checkpointConfig =
mergeEnvAndEngineConfig(engineConfig.getCheckpointConfig(),
- jobImmutableInformation.getJobConfig().getEnvOptions());
final Tuple2<PhysicalPlan, Map<Integer, CheckpointPlan>> planTuple =
PlanUtils.fromLogicalDAG(logicalDag,
nodeEngine,
@@ -176,14 +177,20 @@ public class JobMaster {
engineConfig.getQueueType());
this.physicalPlan = planTuple.f0();
this.physicalPlan.setJobMaster(this);
+ this.checkpointPlanMap = planTuple.f1();
+ this.initStateFuture();
+ }
+
+ public void initCheckPointManager() throws CheckpointStorageException {
+ CheckpointConfig checkpointConfig =
mergeEnvAndEngineConfig(engineConfig.getCheckpointConfig(),
+ jobImmutableInformation.getJobConfig().getEnvOptions());
this.checkpointManager = new CheckpointManager(
jobImmutableInformation.getJobId(),
jobImmutableInformation.isStartWithSavePoint(),
nodeEngine,
this,
- planTuple.f1(),
+ checkpointPlanMap,
checkpointConfig);
- this.initStateFuture();
}
// TODO replace it after ReadableConfig Support parse yaml format, then
use only one config to read engine and env config.
@@ -323,8 +330,9 @@ public class JobMaster {
groupLocation.forEach((taskGroupLocation, slotProfile) -> {
if (taskGroupLocation.getJobId() ==
this.getJobImmutableInformation().getJobId()) {
try {
- RawJobMetrics rawJobMetrics = (RawJobMetrics)
NodeEngineUtil.sendOperationToMemberNode(nodeEngine,
- new
GetTaskGroupMetricsOperation(taskGroupLocation), slotProfile.getWorker()).get();
+ RawJobMetrics rawJobMetrics =
+ (RawJobMetrics)
NodeEngineUtil.sendOperationToMemberNode(nodeEngine,
+ new
GetTaskGroupMetricsOperation(taskGroupLocation), slotProfile.getWorker()).get();
metrics.add(rawJobMetrics);
} catch (Exception e) {
throw new SeaTunnelException(e.getMessage());
@@ -336,7 +344,8 @@ public class JobMaster {
}
public void savePipelineMetricsToHistory(PipelineLocation
pipelineLocation) {
- List<RawJobMetrics> currJobMetrics =
this.getCurrJobMetrics(Collections.singleton(this.getOwnedSlotProfiles(pipelineLocation)));
+ List<RawJobMetrics> currJobMetrics =
+
this.getCurrJobMetrics(Collections.singleton(this.getOwnedSlotProfiles(pipelineLocation)));
JobMetrics jobMetrics = JobMetricsUtil.toJobMetrics(currJobMetrics);
long jobId = this.getJobImmutableInformation().getJobId();
synchronized (this) {
@@ -346,8 +355,9 @@ public class JobMaster {
this.cleanTaskGroupContext(pipelineLocation);
}
- public void removeMetricsContext(PipelineLocation pipelineLocation,
PipelineStatus pipelineStatus){
- if (pipelineStatus.equals(PipelineStatus.FINISHED) &&
!checkpointManager.isSavePointEnd() ||
pipelineStatus.equals(PipelineStatus.CANCELED)){
+ public void removeMetricsContext(PipelineLocation pipelineLocation,
PipelineStatus pipelineStatus) {
+ if (pipelineStatus.equals(PipelineStatus.FINISHED) &&
!checkpointManager.isSavePointEnd() ||
+ pipelineStatus.equals(PipelineStatus.CANCELED)) {
IMap<TaskLocation, MetricsContext> map =
nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_RUNNING_JOB_METRICS);
map.keySet().stream().filter(taskLocation -> {
@@ -399,7 +409,7 @@ public class JobMaster {
/**
* Execute savePoint, which will cause the job to end.
*/
- public CompletableFuture<Void> savePoint(){
+ public CompletableFuture<Void> savePoint() {
PassiveCompletableFuture<CompletedCheckpoint>[]
passiveCompletableFutures =
checkpointManager.triggerSavepoints();
return CompletableFuture.allOf(passiveCompletableFutures);
@@ -414,7 +424,8 @@ public class JobMaster {
@NonNull Map<TaskGroupLocation,
SlotProfile> pipelineOwnedSlotProfiles) {
ownedSlotProfilesIMap.put(pipelineLocation, pipelineOwnedSlotProfiles);
try {
- RetryUtils.retryWithException(() ->
pipelineOwnedSlotProfiles.equals(ownedSlotProfilesIMap.get(pipelineLocation)),
+ RetryUtils.retryWithException(
+ () ->
pipelineOwnedSlotProfiles.equals(ownedSlotProfilesIMap.get(pipelineLocation)),
new RetryUtils.RetryMaterial(20, true,
exception -> exception instanceof NullPointerException &&
isRunning, 1000));
} catch (Exception e) {