umustafi commented on code in PR #3837:
URL: https://github.com/apache/gobblin/pull/3837#discussion_r1408482374
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java:
##########
@@ -335,22 +343,25 @@ public void orchestrate(Spec spec, Properties jobProps,
long triggerTimestampMil
Instrumented.updateTimer(this.flowOrchestrationTimer, System.nanoTime() -
startTime, TimeUnit.NANOSECONDS);
}
- public void submitFlowToDagManager(FlowSpec flowSpec, Optional<String>
optionalFlowExecutionId) throws IOException, InterruptedException {
+ public void submitFlowToDagManager(FlowSpec flowSpec,
DagActionStore.DagAction flowAction) throws IOException, InterruptedException {
Optional<Dag<JobExecutionPlan>> optionalJobExecutionPlanDag =
-
this.flowCompilationValidationHelper.createExecutionPlanIfValid(flowSpec,
optionalFlowExecutionId);
+
this.flowCompilationValidationHelper.createExecutionPlanIfValid(flowSpec,
+ Optional.of(flowAction.getFlowExecutionId()));
if (optionalJobExecutionPlanDag.isPresent()) {
- submitFlowToDagManager(flowSpec, optionalJobExecutionPlanDag.get());
+ submitFlowToDagManager(flowSpec, optionalJobExecutionPlanDag.get(),
flowAction);
} else {
_log.warn("Flow: {} submitted to dagManager failed to compile and
produce a job execution plan dag", flowSpec);
Instrumented.markMeter(this.flowOrchestrationFailedMeter);
}
}
- public void submitFlowToDagManager(FlowSpec flowSpec, Dag<JobExecutionPlan>
jobExecutionPlanDag)
+ public void submitFlowToDagManager(FlowSpec flowSpec, Dag<JobExecutionPlan>
jobExecutionPlanDag,
+ DagActionStore.DagAction launchAction)
throws IOException {
try {
- //Send the dag to the DagManager.
+ // Send the dag to the DagManager and delete the action after persisting
it to avoid redundant execution on start up
this.dagManager.get().addDag(jobExecutionPlanDag, true, true);
+ this.dagActionStore.get().deleteDagAction(launchAction);
Review Comment:
If `DagManager.addDag` completes then we have persisted
([code](https://github.com/apache/gobblin/blob/35304e9676f82feeb6cc93332707a2bd9cae6b74/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java#L317))
the `dag` in MySQL `DagStateStore` already and its status and completion will
be tracked by existing `DagManager` functionality via series of checkpoints
once its [submitted to
executor](https://github.com/apache/gobblin/blob/35304e9676f82feeb6cc93332707a2bd9cae6b74/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java#L1107)
or fails or next dag is called etc... We load all dags from state store upon
startup and while `DagManager` is active it should keep track of it. In
pre-multi-active `DagMgr` I don't believe we should proceed further
necessarily. What do you think?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]