This is an automated email from the ASF dual-hosted git repository.
wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new bdbf43a37b [GOBBLIN-2130] avoid race condition b/w add dag node and
delete dag (#4021)
bdbf43a37b is described below
commit bdbf43a37b170d9cbfbad955d8c0b4b085b094ec
Author: Arjun Singh Bora <[email protected]>
AuthorDate: Sun Aug 11 12:10:17 2024 -0700
[GOBBLIN-2130] avoid race condition b/w add dag node and delete dag (#4021)
---
.../modules/orchestration/proc/DagProcUtils.java | 25 +++++++++++++---------
1 file changed, 15 insertions(+), 10 deletions(-)
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java
index b63c15caea..75836fb6ab 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java
@@ -24,6 +24,7 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
@@ -162,18 +163,17 @@ public class DagProcUtils {
}
try {
- if (dagNodeToCancel.getValue().getJobFuture().isPresent()) {
- Future<?> future = dagNodeToCancel.getValue().getJobFuture().get();
- String serializedFuture =
DagManagerUtils.getSpecProducer(dagNodeToCancel).serializeAddSpecResponse(future);
- props.put(ConfigurationKeys.SPEC_PRODUCER_SERIALIZED_FUTURE,
serializedFuture);
- sendCancellationEvent(dagNodeToCancel.getValue());
- } else {
- log.warn("No Job future when canceling DAG node (hence, not sending
cancellation event) - {}",
- dagNodeToCancel.getValue().getJobSpec().getUri());
- }
DagManagerUtils.getSpecProducer(dagNodeToCancel).cancelJob(dagNodeToCancel.getValue().getJobSpec().getUri(),
props).get();
// add back the dag node with updated states in the store
dagManagementStateStore.addDagNodeState(dagNodeToCancel, dagId);
+ // send cancellation event after updating the state, because
cancellation event triggers a ReevaluateDagAction
+ // that will delete the dag. Due to race condition between adding dag
node and deleting dag, state store may get
+ // into inconsistent state.
+ if (dagNodeToCancel.getValue().getJobFuture().isPresent()) {
+ sendCancellationEvent(dagNodeToCancel, props);
+ } else {
+ log.warn("No Job future when canceling DAG node - {}",
dagNodeToCancel.getValue().getJobSpec().getUri());
+ }
} catch (Exception e) {
throw new IOException(e);
}
@@ -188,7 +188,12 @@ public class DagProcUtils {
}
}
- public static void sendCancellationEvent(JobExecutionPlan jobExecutionPlan) {
+ private static void sendCancellationEvent(Dag.DagNode<JobExecutionPlan>
dagNodeToCancel, Properties props)
+ throws ExecutionException, InterruptedException {
+ JobExecutionPlan jobExecutionPlan = dagNodeToCancel.getValue();
+ Future<?> future = jobExecutionPlan.getJobFuture().get();
+ String serializedFuture =
DagManagerUtils.getSpecProducer(dagNodeToCancel).serializeAddSpecResponse(future);
+ props.put(ConfigurationKeys.SPEC_PRODUCER_SERIALIZED_FUTURE,
serializedFuture);
Map<String, String> jobMetadata =
TimingEventUtils.getJobMetadata(Maps.newHashMap(), jobExecutionPlan);
DagProc.eventSubmitter.getTimingEvent(TimingEvent.LauncherTimings.JOB_CANCEL).stop(jobMetadata);
jobExecutionPlan.setExecutionStatus(CANCELLED);