Will-Lo commented on code in PR #3641:
URL: https://github.com/apache/gobblin/pull/3641#discussion_r1106274419


##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java:
##########
@@ -1117,67 +1107,90 @@ private boolean hasRunningJobs(String dagId) {
      * Perform clean up. Remove a dag from the dagstore if the dag is complete 
and update internal state.
      */
     private void cleanUp() {
-      List<String> dagIdstoClean = new ArrayList<>();
-      //Clean up failed dags
-      for (String dagId : this.failedDagIdsFinishRunning) {
-        //Skip monitoring of any other jobs of the failed dag.
-        LinkedList<DagNode<JobExecutionPlan>> dagNodeList = 
this.dagToJobs.get(dagId);
-        while (!dagNodeList.isEmpty()) {
-          DagNode<JobExecutionPlan> dagNode = dagNodeList.poll();
-          deleteJobState(dagId, dagNode);
-        }
-        Dag<JobExecutionPlan> dag = this.dags.get(dagId);
-        String status = TimingEvent.FlowTimings.FLOW_FAILED;
-        addFailedDag(dagId, dag);
-        log.info("Dag {} has finished with status {}; Cleaning up dag from the 
state store.", dagId, status);
-        // send an event before cleaning up dag
-        DagManagerUtils.emitFlowEvent(this.eventSubmitter, 
this.dags.get(dagId), status);
-        dagIdstoClean.add(dagId);
-      }
+      // Approximate the time when the flow events are emitted to account for 
delay when the flow event is received by the job monitor
+      long cleanUpProcessingTime = System.currentTimeMillis();
 
       // Remove dags that are finished and emit their appropriate metrics
       for (Map.Entry<String, Dag<JobExecutionPlan>> dagIdKeyPair : 
this.dags.entrySet()) {
         String dagId = dagIdKeyPair.getKey();
+        // On service restart, we repopulate the dags that are waiting to be 
cleaned up
+        if (dagIdstoClean.contains(dagId)) {
+          continue;
+        }
         Dag<JobExecutionPlan> dag = dagIdKeyPair.getValue();
-        if (!hasRunningJobs(dagId) && 
!this.failedDagIdsFinishRunning.contains(dagId)) {
-          String status = TimingEvent.FlowTimings.FLOW_SUCCEEDED;
-          if (this.failedDagIdsFinishAllPossible.contains(dagId)) {
-            status = TimingEvent.FlowTimings.FLOW_FAILED;
-            addFailedDag(dagId, dag);
-            this.failedDagIdsFinishAllPossible.remove(dagId);
+        if ((TimingEvent.FlowTimings.FLOW_FAILED.equals(dag.getFlowEvent()) || 
TimingEvent.FlowTimings.FLOW_CANCELLED.equals(dag.getFlowEvent())) &&
+            DagManagerUtils.getFailureOption(dag) == 
FailureOption.FINISH_RUNNING) {
+          //Skip monitoring of any other jobs of the failed dag.
+          LinkedList<DagNode<JobExecutionPlan>> dagNodeList = 
this.dagToJobs.get(dagId);
+          while (!dagNodeList.isEmpty()) {
+            DagNode<JobExecutionPlan> dagNode = dagNodeList.poll();
+            deleteJobState(dagId, dagNode);
+          }
+        }
+        if (!hasRunningJobs(dagId)) {
+          // Collect all the dagIds that are finished
+          this.dagIdstoClean.add(dagId);
+          if (dag.getFlowEvent() == null) {
+            // If the dag flow event is not set, then it is successful
+            dag.setFlowEvent(TimingEvent.FlowTimings.FLOW_SUCCEEDED);
           } else {
-            
dagManagerMetrics.emitFlowSuccessMetrics(DagManagerUtils.getFlowId(this.dags.get(dagId)));
+            addFailedDag(dagId, dag);
           }
-          log.info("Dag {} has finished with status {}; Cleaning up dag from 
the state store.", dagId, status);
+          log.info("Dag {} has finished with status {}; Cleaning up dag from 
the state store.", dagId, dag.getFlowEvent());
           // send an event before cleaning up dag
-          DagManagerUtils.emitFlowEvent(this.eventSubmitter, 
this.dags.get(dagId), status);
-          dagIdstoClean.add(dagId);
+          DagManagerUtils.emitFlowEvent(this.eventSubmitter, 
this.dags.get(dagId), dag.getFlowEvent());
+          dag.setEventEmittedTimeMillis(cleanUpProcessingTime);
         }
       }
 
-      for (String dagId: dagIdstoClean) {
-        cleanUpDag(dagId);
+      // Only clean up dags after the job status monitor processed the flow 
event
+      Set<String> cleanedDags = new HashSet<>();
+      for (String dagId: this.dagIdstoClean) {
+        Dag<JobExecutionPlan> dag = this.dags.get(dagId);
+        JobStatus flowStatus = pollFlowStatus(dag);
+        if (flowStatus != null && 
FlowStatusGenerator.FINISHED_STATUSES.contains(flowStatus.getEventName())) {
+          FlowId flowId = DagManagerUtils.getFlowId(dag);
+          switch(dag.getFlowEvent()) {
+            case TimingEvent.FlowTimings.FLOW_SUCCEEDED:
+              this.dagManagerMetrics.emitFlowSuccessMetrics(flowId);
+              this.dagManagerMetrics.conditionallyMarkFlowAsState(flowId, 
FlowState.SUCCESSFUL);
+              break;
+            case TimingEvent.FlowTimings.FLOW_FAILED:
+              this.dagManagerMetrics.emitFlowFailedMetrics(flowId);
+              this.dagManagerMetrics.conditionallyMarkFlowAsState(flowId, 
FlowState.FAILED);
+              break;
+            case TimingEvent.FlowTimings.FLOW_CANCELLED:
+              this.dagManagerMetrics.emitFlowSlaExceededMetrics(flowId);
+              this.dagManagerMetrics.conditionallyMarkFlowAsState(flowId, 
FlowState.FAILED);
+              break;
+            default:
+              log.warn("Unexpected flow event {} for dag {}", 
dag.getFlowEvent(), dagId);
+          }
+          cleanUpDag(dagId);
+          cleanedDags.add(dagId);

Review Comment:
   There's a concurrent modification exception due to the way how the dag Ids 
are stored, could store them in a map instead?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to