This is an automated email from the ASF dual-hosted git repository.

wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new bdbf43a37b [GOBBLIN-2130] avoid race condition b/w add dag node and 
delete dag (#4021)
bdbf43a37b is described below

commit bdbf43a37b170d9cbfbad955d8c0b4b085b094ec
Author: Arjun Singh Bora <[email protected]>
AuthorDate: Sun Aug 11 12:10:17 2024 -0700

    [GOBBLIN-2130] avoid race condition b/w add dag node and delete dag (#4021)
---
 .../modules/orchestration/proc/DagProcUtils.java   | 25 +++++++++++++---------
 1 file changed, 15 insertions(+), 10 deletions(-)

diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java
index b63c15caea..75836fb6ab 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java
@@ -24,6 +24,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
@@ -162,18 +163,17 @@ public class DagProcUtils {
     }
 
     try {
-      if (dagNodeToCancel.getValue().getJobFuture().isPresent()) {
-        Future<?> future = dagNodeToCancel.getValue().getJobFuture().get();
-        String serializedFuture = 
DagManagerUtils.getSpecProducer(dagNodeToCancel).serializeAddSpecResponse(future);
-        props.put(ConfigurationKeys.SPEC_PRODUCER_SERIALIZED_FUTURE, 
serializedFuture);
-        sendCancellationEvent(dagNodeToCancel.getValue());
-      } else {
-        log.warn("No Job future when canceling DAG node (hence, not sending 
cancellation event) - {}",
-            dagNodeToCancel.getValue().getJobSpec().getUri());
-      }
       
DagManagerUtils.getSpecProducer(dagNodeToCancel).cancelJob(dagNodeToCancel.getValue().getJobSpec().getUri(),
 props).get();
       // add back the dag node with updated states in the store
       dagManagementStateStore.addDagNodeState(dagNodeToCancel, dagId);
+      // send cancellation event after updating the state, because 
cancellation event triggers a ReevaluateDagAction
+      // that will delete the dag. Due to race condition between adding dag 
node and deleting dag, state store may get
+      // into inconsistent state.
+      if (dagNodeToCancel.getValue().getJobFuture().isPresent()) {
+        sendCancellationEvent(dagNodeToCancel, props);
+      } else {
+        log.warn("No Job future when canceling DAG node - {}", 
dagNodeToCancel.getValue().getJobSpec().getUri());
+      }
     } catch (Exception e) {
       throw new IOException(e);
     }
@@ -188,7 +188,12 @@ public class DagProcUtils {
     }
   }
 
-  public static void sendCancellationEvent(JobExecutionPlan jobExecutionPlan) {
+  private static void sendCancellationEvent(Dag.DagNode<JobExecutionPlan> 
dagNodeToCancel, Properties props)
+      throws ExecutionException, InterruptedException {
+    JobExecutionPlan jobExecutionPlan = dagNodeToCancel.getValue();
+    Future<?> future = jobExecutionPlan.getJobFuture().get();
+    String serializedFuture = 
DagManagerUtils.getSpecProducer(dagNodeToCancel).serializeAddSpecResponse(future);
+    props.put(ConfigurationKeys.SPEC_PRODUCER_SERIALIZED_FUTURE, 
serializedFuture);
     Map<String, String> jobMetadata = 
TimingEventUtils.getJobMetadata(Maps.newHashMap(), jobExecutionPlan);
     
DagProc.eventSubmitter.getTimingEvent(TimingEvent.LauncherTimings.JOB_CANCEL).stop(jobMetadata);
     jobExecutionPlan.setExecutionStatus(CANCELLED);

Reply via email to