arjun4084346 commented on code in PR #3950:
URL: https://github.com/apache/gobblin/pull/3950#discussion_r1606105120


##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java:
##########
@@ -111,4 +122,52 @@ public static void 
submitJobToExecutor(DagManagementStateStore dagManagementStat
       throw new RuntimeException(e);
     }
   }
+
+  public static void cancelDagNode(Dag.DagNode<JobExecutionPlan> 
dagNodeToCancel, DagManagementStateStore dagManagementStateStore) throws 
IOException {
+    Properties props = new Properties();
+    DagManager.DagId dagId = DagManagerUtils.generateDagId(dagNodeToCancel);
+    if 
(dagNodeToCancel.getValue().getJobSpec().getConfig().hasPath(ConfigurationKeys.FLOW_EXECUTION_ID_KEY))
 {
+      props.setProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY,
+          
dagNodeToCancel.getValue().getJobSpec().getConfig().getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY));
+    }
+
+    try {
+      if (dagNodeToCancel.getValue().getJobFuture().isPresent()) {
+        Future future = dagNodeToCancel.getValue().getJobFuture().get();
+        String serializedFuture = 
DagManagerUtils.getSpecProducer(dagNodeToCancel).serializeAddSpecResponse(future);
+        props.put(ConfigurationKeys.SPEC_PRODUCER_SERIALIZED_FUTURE, 
serializedFuture);
+        sendCancellationEvent(dagNodeToCancel.getValue());
+      } else {
+        log.warn("No Job future when canceling DAG node (hence, not sending 
cancellation event) - {}",
+            dagNodeToCancel.getValue().getJobSpec().getUri());
+      }
+      
DagManagerUtils.getSpecProducer(dagNodeToCancel).cancelJob(dagNodeToCancel.getValue().getJobSpec().getUri(),
 props).get();
+      // todo - why was it not being cleaned up in DagManager?
+      dagManagementStateStore.deleteDagNodeState(dagId, dagNodeToCancel);
+    } catch (Exception e) {
+      throw new IOException(e);
+    }
+  }
+
+  public static void cancelDag(Dag<JobExecutionPlan> dag, 
DagManagementStateStore dagManagementStateStore) throws IOException {
+    List<Dag.DagNode<JobExecutionPlan>> dagNodesToCancel = dag.getNodes();
+    log.info("Found {} DagNodes to cancel (DagId {}).", 
dagNodesToCancel.size(), DagManagerUtils.generateDagId(dag));
+
+    for (Dag.DagNode<JobExecutionPlan> dagNodeToCancel : dagNodesToCancel) {
+      DagProcUtils.cancelDagNode(dagNodeToCancel, dagManagementStateStore);
+    }
+  }
+
+  public static void sendCancellationEvent(JobExecutionPlan jobExecutionPlan) {
+    Map<String, String> jobMetadata = 
TimingEventUtils.getJobMetadata(Maps.newHashMap(), jobExecutionPlan);
+    
DagProc.eventSubmitter.getTimingEvent(TimingEvent.LauncherTimings.JOB_CANCEL).stop(jobMetadata);
+    jobExecutionPlan.setExecutionStatus(CANCELLED);
+  }
+
+  private static void 
sendEnforceStartDeadlineDagAction(Dag.DagNode<JobExecutionPlan> dagNode)

Review Comment:
   sure, added it to enhance readability



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to