[ 
https://issues.apache.org/jira/browse/GOBBLIN-2069?focusedWorklogId=919960&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-919960
 ]

ASF GitHub Bot logged work on GOBBLIN-2069:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 19/May/24 21:20
            Start Date: 19/May/24 21:20
    Worklog Time Spent: 10m 
      Work Description: arjun4084346 commented on code in PR #3950:
URL: https://github.com/apache/gobblin/pull/3950#discussion_r1606104577


##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java:
##########
@@ -111,4 +122,52 @@ public static void 
submitJobToExecutor(DagManagementStateStore dagManagementStat
       throw new RuntimeException(e);
     }
   }
+
+  public static void cancelDagNode(Dag.DagNode<JobExecutionPlan> 
dagNodeToCancel, DagManagementStateStore dagManagementStateStore) throws 
IOException {
+    Properties props = new Properties();
+    DagManager.DagId dagId = DagManagerUtils.generateDagId(dagNodeToCancel);
+    if 
(dagNodeToCancel.getValue().getJobSpec().getConfig().hasPath(ConfigurationKeys.FLOW_EXECUTION_ID_KEY))
 {
+      props.setProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY,
+          
dagNodeToCancel.getValue().getJobSpec().getConfig().getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY));
+    }
+
+    try {
+      if (dagNodeToCancel.getValue().getJobFuture().isPresent()) {
+        Future future = dagNodeToCancel.getValue().getJobFuture().get();
+        String serializedFuture = 
DagManagerUtils.getSpecProducer(dagNodeToCancel).serializeAddSpecResponse(future);
+        props.put(ConfigurationKeys.SPEC_PRODUCER_SERIALIZED_FUTURE, 
serializedFuture);
+        sendCancellationEvent(dagNodeToCancel.getValue());
+      } else {
+        log.warn("No Job future when canceling DAG node (hence, not sending 
cancellation event) - {}",
+            dagNodeToCancel.getValue().getJobSpec().getUri());

Review Comment:
   Sometimes dag node has not yet started running and does not have a spec 
producer in it. Yet, user can submit its cancellation request. In those cases 
it will come to this `else` block. 





Issue Time Tracking
-------------------

    Worklog Id:     (was: 919960)
    Time Spent: 1h 50m  (was: 1h 40m)

> implement EnforceStartDeadlineDagProc
> -------------------------------------
>
>                 Key: GOBBLIN-2069
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-2069
>             Project: Apache Gobblin
>          Issue Type: Task
>            Reporter: Arjun Singh Bora
>            Priority: Major
>          Time Spent: 1h 50m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to