Will-Lo commented on code in PR #3641:
URL: https://github.com/apache/gobblin/pull/3641#discussion_r1107255530
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java:
##########
@@ -1117,67 +1107,90 @@ private boolean hasRunningJobs(String dagId) {
* Perform clean up. Remove a dag from the dagstore if the dag is complete
and update internal state.
*/
private void cleanUp() {
- List<String> dagIdstoClean = new ArrayList<>();
- //Clean up failed dags
- for (String dagId : this.failedDagIdsFinishRunning) {
- //Skip monitoring of any other jobs of the failed dag.
- LinkedList<DagNode<JobExecutionPlan>> dagNodeList =
this.dagToJobs.get(dagId);
- while (!dagNodeList.isEmpty()) {
- DagNode<JobExecutionPlan> dagNode = dagNodeList.poll();
- deleteJobState(dagId, dagNode);
- }
- Dag<JobExecutionPlan> dag = this.dags.get(dagId);
- String status = TimingEvent.FlowTimings.FLOW_FAILED;
- addFailedDag(dagId, dag);
- log.info("Dag {} has finished with status {}; Cleaning up dag from the
state store.", dagId, status);
- // send an event before cleaning up dag
- DagManagerUtils.emitFlowEvent(this.eventSubmitter,
this.dags.get(dagId), status);
- dagIdstoClean.add(dagId);
- }
+ // Approximate the time when the flow events are emitted to account for
delay when the flow event is received by the job monitor
+ long cleanUpProcessingTime = System.currentTimeMillis();
// Remove dags that are finished and emit their appropriate metrics
for (Map.Entry<String, Dag<JobExecutionPlan>> dagIdKeyPair :
this.dags.entrySet()) {
String dagId = dagIdKeyPair.getKey();
+ // On service restart, we repopulate the dags that are waiting to be
cleaned up
+ if (dagIdstoClean.contains(dagId)) {
+ continue;
+ }
Dag<JobExecutionPlan> dag = dagIdKeyPair.getValue();
- if (!hasRunningJobs(dagId) &&
!this.failedDagIdsFinishRunning.contains(dagId)) {
- String status = TimingEvent.FlowTimings.FLOW_SUCCEEDED;
- if (this.failedDagIdsFinishAllPossible.contains(dagId)) {
- status = TimingEvent.FlowTimings.FLOW_FAILED;
- addFailedDag(dagId, dag);
- this.failedDagIdsFinishAllPossible.remove(dagId);
+ if ((TimingEvent.FlowTimings.FLOW_FAILED.equals(dag.getFlowEvent()) ||
TimingEvent.FlowTimings.FLOW_CANCELLED.equals(dag.getFlowEvent())) &&
+ DagManagerUtils.getFailureOption(dag) ==
FailureOption.FINISH_RUNNING) {
+ //Skip monitoring of any other jobs of the failed dag.
+ LinkedList<DagNode<JobExecutionPlan>> dagNodeList =
this.dagToJobs.get(dagId);
+ while (!dagNodeList.isEmpty()) {
+ DagNode<JobExecutionPlan> dagNode = dagNodeList.poll();
+ deleteJobState(dagId, dagNode);
+ }
+ }
+ if (!hasRunningJobs(dagId)) {
+ // Collect all the dagIds that are finished
+ this.dagIdstoClean.add(dagId);
+ if (dag.getFlowEvent() == null) {
+ // If the dag flow event is not set, then it is successful
+ dag.setFlowEvent(TimingEvent.FlowTimings.FLOW_SUCCEEDED);
} else {
-
dagManagerMetrics.emitFlowSuccessMetrics(DagManagerUtils.getFlowId(this.dags.get(dagId)));
+ addFailedDag(dagId, dag);
}
- log.info("Dag {} has finished with status {}; Cleaning up dag from
the state store.", dagId, status);
+ log.info("Dag {} has finished with status {}; Cleaning up dag from
the state store.", dagId, dag.getFlowEvent());
// send an event before cleaning up dag
- DagManagerUtils.emitFlowEvent(this.eventSubmitter,
this.dags.get(dagId), status);
- dagIdstoClean.add(dagId);
+ DagManagerUtils.emitFlowEvent(this.eventSubmitter,
this.dags.get(dagId), dag.getFlowEvent());
+ dag.setEventEmittedTimeMillis(cleanUpProcessingTime);
}
}
- for (String dagId: dagIdstoClean) {
- cleanUpDag(dagId);
+ // Only clean up dags after the job status monitor processed the flow
event
+ Set<String> cleanedDags = new HashSet<>();
+ for (String dagId: this.dagIdstoClean) {
+ Dag<JobExecutionPlan> dag = this.dags.get(dagId);
+ JobStatus flowStatus = pollFlowStatus(dag);
+ if (flowStatus != null &&
FlowStatusGenerator.FINISHED_STATUSES.contains(flowStatus.getEventName())) {
+ FlowId flowId = DagManagerUtils.getFlowId(dag);
+ switch(dag.getFlowEvent()) {
+ case TimingEvent.FlowTimings.FLOW_SUCCEEDED:
+ this.dagManagerMetrics.emitFlowSuccessMetrics(flowId);
+ this.dagManagerMetrics.conditionallyMarkFlowAsState(flowId,
FlowState.SUCCESSFUL);
+ break;
+ case TimingEvent.FlowTimings.FLOW_FAILED:
+ this.dagManagerMetrics.emitFlowFailedMetrics(flowId);
+ this.dagManagerMetrics.conditionallyMarkFlowAsState(flowId,
FlowState.FAILED);
+ break;
+ case TimingEvent.FlowTimings.FLOW_CANCELLED:
+ this.dagManagerMetrics.emitFlowSlaExceededMetrics(flowId);
Review Comment:
Ah I realize there's a bug with how we handle start SLA exceeded and
manually cancelled retries, I will put a PR later to fix that. I can fix this
bug though, have a check if the dag is not already set status
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]