[ 
https://issues.apache.org/jira/browse/GOBBLIN-2133?focusedWorklogId=930471&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-930471
 ]

ASF GitHub Bot logged work on GOBBLIN-2133:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 15/Aug/24 18:53
            Start Date: 15/Aug/24 18:53
    Worklog Time Spent: 10m 
      Work Description: phet commented on code in PR #4027:
URL: https://github.com/apache/gobblin/pull/4027#discussion_r1718818928


##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java:
##########
@@ -164,19 +163,23 @@ public static void 
cancelDagNode(Dag.DagNode<JobExecutionPlan> dagNodeToCancel,
     }
 
     try {
+      if (dagNodeToCancel.getValue().getJobFuture().isPresent()) {
+        Future<?> future = dagNodeToCancel.getValue().getJobFuture().get();
+        String serializedFuture = 
DagManagerUtils.getSpecProducer(dagNodeToCancel).serializeAddSpecResponse(future);
+        props.put(ConfigurationKeys.SPEC_PRODUCER_SERIALIZED_FUTURE, 
serializedFuture);
+      } else {
+        log.warn("No Job future when canceling DAG node - {}", 
dagNodeToCancel.getValue().getJobSpec().getUri());
+      }

Review Comment:
   suggest a comment along the lines of:
   > `.get()` to force `.cancelJob()`'s execution
   
   otherwise, it's subtle and could be mistaken for a superfluous 
`Optional::get`



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java:
##########
@@ -164,19 +163,23 @@ public static void 
cancelDagNode(Dag.DagNode<JobExecutionPlan> dagNodeToCancel,
     }
 
     try {
+      if (dagNodeToCancel.getValue().getJobFuture().isPresent()) {
+        Future<?> future = dagNodeToCancel.getValue().getJobFuture().get();
+        String serializedFuture = 
DagManagerUtils.getSpecProducer(dagNodeToCancel).serializeAddSpecResponse(future);
+        props.put(ConfigurationKeys.SPEC_PRODUCER_SERIALIZED_FUTURE, 
serializedFuture);
+      } else {
+        log.warn("No Job future when canceling DAG node - {}", 
dagNodeToCancel.getValue().getJobSpec().getUri());

Review Comment:
   [over here 
](https://github.com/apache/gobblin/pull/4026/files#diff-7f4071fe99cb01d1a409effdde97cedd9667ace597ad46efcd765ec712fc0066)you
 started logging `getId()` rather than the jobSpec URI.  shall we do that here 
too for consistency?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java:
##########
@@ -191,15 +194,10 @@ public static void cancelDag(Dag<JobExecutionPlan> dag, 
DagManagementStateStore
     }
   }
 
-  private static void sendCancellationEvent(Dag.DagNode<JobExecutionPlan> 
dagNodeToCancel, Properties props)
-      throws ExecutionException, InterruptedException {
+  private static void sendCancellationEvent(Dag.DagNode<JobExecutionPlan> 
dagNodeToCancel) {
     JobExecutionPlan jobExecutionPlan = dagNodeToCancel.getValue();
-    Future<?> future = jobExecutionPlan.getJobFuture().get();
-    String serializedFuture = 
DagManagerUtils.getSpecProducer(dagNodeToCancel).serializeAddSpecResponse(future);
-    props.put(ConfigurationKeys.SPEC_PRODUCER_SERIALIZED_FUTURE, 
serializedFuture);
     Map<String, String> jobMetadata = 
TimingEventUtils.getJobMetadata(Maps.newHashMap(), jobExecutionPlan);
     
DagProc.eventSubmitter.getTimingEvent(TimingEvent.LauncherTimings.JOB_CANCEL).stop(jobMetadata);

Review Comment:
   Ideally we'd have a unit test verifying that `.getTimingEvent` is called on 
an `EventSubmitter` mock/spy, but that's challenging because 
`DagProc.eventSubmitter` is `static`.
   
   instead the `DagProcFactory` might pass an ES param to each `DagProc` 
derived type it creates (all can still share the same one).  the DPF itself 
could probably create the `MetricContext` and `EventSubmitter` and since the 
DPF is a `@Singleton`, those DO NOT need to be `static`.
   
   of course this means making `DagProcUtils::submitJobToExecutor` and 
`::cancelDag` and `::cancelDagNode` base class methods of `DagProc`



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java:
##########
@@ -164,19 +163,23 @@ public static void 
cancelDagNode(Dag.DagNode<JobExecutionPlan> dagNodeToCancel,
     }
 
     try {
+      if (dagNodeToCancel.getValue().getJobFuture().isPresent()) {
+        Future<?> future = dagNodeToCancel.getValue().getJobFuture().get();
+        String serializedFuture = 
DagManagerUtils.getSpecProducer(dagNodeToCancel).serializeAddSpecResponse(future);
+        props.put(ConfigurationKeys.SPEC_PRODUCER_SERIALIZED_FUTURE, 
serializedFuture);
+      } else {
+        log.warn("No Job future when canceling DAG node - {}", 
dagNodeToCancel.getValue().getJobSpec().getUri());
+      }
       
DagManagerUtils.getSpecProducer(dagNodeToCancel).cancelJob(dagNodeToCancel.getValue().getJobSpec().getUri(),
 props).get();

Review Comment:
   it took me a moment to figure out what `props` are for and that they're used 
here.  suggest to rename to `cancelJobArgs` or `cancelJobProps`





Issue Time Tracking
-------------------

    Worklog Id:     (was: 930471)
    Time Spent: 20m  (was: 10m)

> provide job future before calling SpecProducer::cancelJob
> ---------------------------------------------------------
>
>                 Key: GOBBLIN-2133
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-2133
>             Project: Apache Gobblin
>          Issue Type: Bug
>            Reporter: Arjun Singh Bora
>            Priority: Major
>          Time Spent: 20m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to