[ 
https://issues.apache.org/jira/browse/GOBBLIN-1784?focusedWorklogId=845692&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-845692
 ]

ASF GitHub Bot logged work on GOBBLIN-1784:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 15/Feb/23 17:26
            Start Date: 15/Feb/23 17:26
    Worklog Time Spent: 10m 
      Work Description: Will-Lo commented on code in PR #3641:
URL: https://github.com/apache/gobblin/pull/3641#discussion_r1107255530


##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java:
##########
@@ -1117,67 +1107,90 @@ private boolean hasRunningJobs(String dagId) {
      * Perform clean up. Remove a dag from the dagstore if the dag is complete 
and update internal state.
      */
     private void cleanUp() {
-      List<String> dagIdstoClean = new ArrayList<>();
-      //Clean up failed dags
-      for (String dagId : this.failedDagIdsFinishRunning) {
-        //Skip monitoring of any other jobs of the failed dag.
-        LinkedList<DagNode<JobExecutionPlan>> dagNodeList = 
this.dagToJobs.get(dagId);
-        while (!dagNodeList.isEmpty()) {
-          DagNode<JobExecutionPlan> dagNode = dagNodeList.poll();
-          deleteJobState(dagId, dagNode);
-        }
-        Dag<JobExecutionPlan> dag = this.dags.get(dagId);
-        String status = TimingEvent.FlowTimings.FLOW_FAILED;
-        addFailedDag(dagId, dag);
-        log.info("Dag {} has finished with status {}; Cleaning up dag from the 
state store.", dagId, status);
-        // send an event before cleaning up dag
-        DagManagerUtils.emitFlowEvent(this.eventSubmitter, 
this.dags.get(dagId), status);
-        dagIdstoClean.add(dagId);
-      }
+      // Approximate the time when the flow events are emitted to account for 
delay when the flow event is received by the job monitor
+      long cleanUpProcessingTime = System.currentTimeMillis();
 
       // Remove dags that are finished and emit their appropriate metrics
       for (Map.Entry<String, Dag<JobExecutionPlan>> dagIdKeyPair : 
this.dags.entrySet()) {
         String dagId = dagIdKeyPair.getKey();
+        // On service restart, we repopulate the dags that are waiting to be 
cleaned up
+        if (dagIdstoClean.contains(dagId)) {
+          continue;
+        }
         Dag<JobExecutionPlan> dag = dagIdKeyPair.getValue();
-        if (!hasRunningJobs(dagId) && 
!this.failedDagIdsFinishRunning.contains(dagId)) {
-          String status = TimingEvent.FlowTimings.FLOW_SUCCEEDED;
-          if (this.failedDagIdsFinishAllPossible.contains(dagId)) {
-            status = TimingEvent.FlowTimings.FLOW_FAILED;
-            addFailedDag(dagId, dag);
-            this.failedDagIdsFinishAllPossible.remove(dagId);
+        if ((TimingEvent.FlowTimings.FLOW_FAILED.equals(dag.getFlowEvent()) || 
TimingEvent.FlowTimings.FLOW_CANCELLED.equals(dag.getFlowEvent())) &&
+            DagManagerUtils.getFailureOption(dag) == 
FailureOption.FINISH_RUNNING) {
+          //Skip monitoring of any other jobs of the failed dag.
+          LinkedList<DagNode<JobExecutionPlan>> dagNodeList = 
this.dagToJobs.get(dagId);
+          while (!dagNodeList.isEmpty()) {
+            DagNode<JobExecutionPlan> dagNode = dagNodeList.poll();
+            deleteJobState(dagId, dagNode);
+          }
+        }
+        if (!hasRunningJobs(dagId)) {
+          // Collect all the dagIds that are finished
+          this.dagIdstoClean.add(dagId);
+          if (dag.getFlowEvent() == null) {
+            // If the dag flow event is not set, then it is successful
+            dag.setFlowEvent(TimingEvent.FlowTimings.FLOW_SUCCEEDED);
           } else {
-            
dagManagerMetrics.emitFlowSuccessMetrics(DagManagerUtils.getFlowId(this.dags.get(dagId)));
+            addFailedDag(dagId, dag);
           }
-          log.info("Dag {} has finished with status {}; Cleaning up dag from 
the state store.", dagId, status);
+          log.info("Dag {} has finished with status {}; Cleaning up dag from 
the state store.", dagId, dag.getFlowEvent());
           // send an event before cleaning up dag
-          DagManagerUtils.emitFlowEvent(this.eventSubmitter, 
this.dags.get(dagId), status);
-          dagIdstoClean.add(dagId);
+          DagManagerUtils.emitFlowEvent(this.eventSubmitter, 
this.dags.get(dagId), dag.getFlowEvent());
+          dag.setEventEmittedTimeMillis(cleanUpProcessingTime);
         }
       }
 
-      for (String dagId: dagIdstoClean) {
-        cleanUpDag(dagId);
+      // Only clean up dags after the job status monitor processed the flow 
event
+      Set<String> cleanedDags = new HashSet<>();
+      for (String dagId: this.dagIdstoClean) {
+        Dag<JobExecutionPlan> dag = this.dags.get(dagId);
+        JobStatus flowStatus = pollFlowStatus(dag);
+        if (flowStatus != null && 
FlowStatusGenerator.FINISHED_STATUSES.contains(flowStatus.getEventName())) {
+          FlowId flowId = DagManagerUtils.getFlowId(dag);
+          switch(dag.getFlowEvent()) {
+            case TimingEvent.FlowTimings.FLOW_SUCCEEDED:
+              this.dagManagerMetrics.emitFlowSuccessMetrics(flowId);
+              this.dagManagerMetrics.conditionallyMarkFlowAsState(flowId, 
FlowState.SUCCESSFUL);
+              break;
+            case TimingEvent.FlowTimings.FLOW_FAILED:
+              this.dagManagerMetrics.emitFlowFailedMetrics(flowId);
+              this.dagManagerMetrics.conditionallyMarkFlowAsState(flowId, 
FlowState.FAILED);
+              break;
+            case TimingEvent.FlowTimings.FLOW_CANCELLED:
+              this.dagManagerMetrics.emitFlowSlaExceededMetrics(flowId);

Review Comment:
   Ah I realize there's a bug with how we handle start SLA exceeded and 
manually cancelled retries, I will put a PR later to fix that.





Issue Time Tracking
-------------------

    Worklog Id:     (was: 845692)
    Time Spent: 1.5h  (was: 1h 20m)

> Race condition where on service restart DagManager will lose track of dags
> --------------------------------------------------------------------------
>
>                 Key: GOBBLIN-1784
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-1784
>             Project: Apache Gobblin
>          Issue Type: Bug
>          Components: gobblin-service
>            Reporter: William Lo
>            Assignee: Abhishek Tiwari
>            Priority: Major
>          Time Spent: 1.5h
>  Remaining Estimate: 0h
>
> Gobblin-as-a-Service has a bug where on restart, the DagManager will clean up 
> dags but a flow event is never sent.
> This leads to a scenario where if the event is never sent by the underlying 
> notification system, the dag will already be cleaned up and thus the job 
> status will permanently be stuck in a running state.
> The DagManager thus should only clean up its own reference of dags after it 
> reads that the jobstatus monitor has properly saved the final flow status, 
> and if a status hasn't been received by some timestamp (e.g. 5 mins), then 
> the DagManager will re-emit the event in case it was lost.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to