This is an automated email from the ASF dual-hosted git repository.

arjun4084346 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 2695699363 [GOBBLIN-2143] handle concurrent ReevaluateDagProc for 
cancelled dag nodes correctly (#4038)
2695699363 is described below

commit 2695699363acbf8a07df61d6118f560f3839cb81
Author: Arjun Singh Bora <[email protected]>
AuthorDate: Thu Aug 29 15:33:58 2024 -0700

    [GOBBLIN-2143] handle concurrent ReevaluateDagProc for cancelled dag nodes 
correctly (#4038)
    
    * avoid adding dag node where update only is required
    * address review comments
---
 .../orchestration/DagManagementStateStore.java     |  2 +-
 .../orchestration/DagStateStoreWithDagNodes.java   |  8 +--
 .../MySqlDagManagementStateStore.java              | 11 ++--
 .../MysqlDagStateStoreWithDagNodes.java            | 65 +++++++++++-----------
 .../modules/orchestration/proc/DagProcUtils.java   |  9 +--
 .../orchestration/proc/ReevaluateDagProc.java      | 14 ++++-
 .../MySqlDagManagementStateStoreTest.java          |  9 ++-
 .../orchestration/proc/ReevaluateDagProcTest.java  |  2 +-
 .../orchestration/proc/ResumeDagProcTest.java      |  2 +-
 9 files changed, 62 insertions(+), 60 deletions(-)

diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java
index 0a8514f274..5e3c49d13c 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java
@@ -101,7 +101,7 @@ public interface DagManagementStateStore {
    * @param dagNode dag node to be added
    * @param dagId dag id of the dag this dag node belongs to
    */
-  void addDagNodeState(Dag.DagNode<JobExecutionPlan> dagNode, DagManager.DagId 
dagId) throws IOException;
+  void updateDagNode(Dag.DagNode<JobExecutionPlan> dagNode) throws IOException;
 
   /**
    * Returns the requested {@link  
org.apache.gobblin.service.modules.flowgraph.Dag.DagNode} and its {@link 
JobStatus}.
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagStateStoreWithDagNodes.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagStateStoreWithDagNodes.java
index 03aaf41520..171ce8855e 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagStateStoreWithDagNodes.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagStateStoreWithDagNodes.java
@@ -35,12 +35,10 @@ import 
org.apache.gobblin.service.modules.spec.JobExecutionPlan;
 public interface DagStateStoreWithDagNodes extends DagStateStore {
 
   /**
-   * Updates a dag node identified by the provided {@link DagManager.DagId}
-   * with the given {@link Dag.DagNode}.
-   * Returns 1 if the dag node is inserted as a new one, 2 if is updated, and 
0 if new dag node is same as the existing one
-   * <a 
href="https://dev.mysql.com/doc/refman/8.4/en/insert-on-duplicate.html";>Refer</a>
+   * Updates the {@link Dag.DagNode} with the provided value.
+   * Returns true if the dag node is updated successfully, false otherwise
    */
-  int updateDagNode(DagManager.DagId dagId, Dag.DagNode<JobExecutionPlan> 
dagNode) throws IOException;
+  boolean updateDagNode(Dag.DagNode<JobExecutionPlan> dagNode) throws 
IOException;
 
   /**
    * Returns all the {@link 
org.apache.gobblin.service.modules.flowgraph.Dag.DagNode}s for the given
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStore.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStore.java
index 45ee013c7d..c76c5d15ff 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStore.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStore.java
@@ -142,8 +142,11 @@ public class MySqlDagManagementStateStore implements 
DagManagementStateStore {
 
   @Override
   public void deleteDag(DagManager.DagId dagId) throws IOException {
-    this.dagStateStore.cleanUp(dagId);
-    log.info("Deleted dag {}", dagId);
+    if (this.dagStateStore.cleanUp(dagId)) {
+      log.info("Deleted dag {}", dagId);
+    } else {
+      log.info("Dag deletion was tried but did not happen {}", dagId);
+    }
   }
 
   @Override
@@ -158,9 +161,9 @@ public class MySqlDagManagementStateStore implements 
DagManagementStateStore {
   }
 
   @Override
-  public synchronized void addDagNodeState(Dag.DagNode<JobExecutionPlan> 
dagNode, DagManager.DagId dagId)
+  public synchronized void updateDagNode(Dag.DagNode<JobExecutionPlan> dagNode)
       throws IOException {
-    this.dagStateStore.updateDagNode(dagId, dagNode);
+    this.dagStateStore.updateDagNode(dagNode);
   }
 
   @Override
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStoreWithDagNodes.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStoreWithDagNodes.java
index 2692e20697..98aa356660 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStoreWithDagNodes.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStoreWithDagNodes.java
@@ -47,9 +47,7 @@ import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.instrumented.Instrumented;
 import org.apache.gobblin.metastore.MysqlDataSourceFactory;
-import org.apache.gobblin.metrics.ContextAwareCounter;
 import org.apache.gobblin.metrics.MetricContext;
-import org.apache.gobblin.metrics.ServiceMetricNames;
 import org.apache.gobblin.runtime.api.TopologySpec;
 import org.apache.gobblin.runtime.spec_serde.GsonSerDe;
 import org.apache.gobblin.service.ServiceConfigKeys;
@@ -87,12 +85,11 @@ public class MysqlDagStateStoreWithDagNodes implements 
DagStateStoreWithDagNodes
       + "UNIQUE INDEX dag_node_index (dag_node_id), "
       + "INDEX dag_index (parent_dag_id))";
 
-  protected static final String INSERT_STATEMENT = "INSERT INTO %s 
(dag_node_id, parent_dag_id, dag_node) "
-      + "VALUES (?, ?, ?) AS new ON DUPLICATE KEY UPDATE dag_node = 
new.dag_node";
+  protected static final String INSERT_DAG_NODE_STATEMENT = "INSERT INTO %s 
(dag_node_id, parent_dag_id, dag_node) VALUES (?, ?, ?)";
+  protected static final String UPDATE_DAG_NODE_STATEMENT = "UPDATE %s SET 
dag_node = ? WHERE dag_node_id = ?";
   protected static final String GET_DAG_NODES_STATEMENT = "SELECT dag_node 
FROM %s WHERE parent_dag_id = ?";
   protected static final String GET_DAG_NODE_STATEMENT = "SELECT dag_node FROM 
%s WHERE dag_node_id = ?";
   protected static final String DELETE_DAG_STATEMENT = "DELETE FROM %s WHERE 
parent_dag_id = ?";
-  private final ContextAwareCounter totalDagCount;
 
   public MysqlDagStateStoreWithDagNodes(Config config, Map<URI, TopologySpec> 
topologySpecMap) throws IOException {
     if (config.hasPath(CONFIG_PREFIX)) {
@@ -121,23 +118,26 @@ public class MysqlDagStateStoreWithDagNodes implements 
DagStateStoreWithDagNodes
     this.jobExecPlanDagFactory = new JobExecutionPlanDagFactory();
     MetricContext metricContext =
         Instrumented.getMetricContext(new 
State(ConfigUtils.configToProperties(config)), this.getClass());
-    this.totalDagCount = 
metricContext.contextAwareCounter(ServiceMetricNames.DAG_COUNT_MYSQL_DAG_STATE_COUNT);
     log.info("Instantiated {}", getClass().getSimpleName());
   }
 
   @Override
-  public void writeCheckpoint(Dag<JobExecutionPlan> dag)
-      throws IOException {
-    DagManager.DagId dagId = DagManagerUtils.generateDagId(dag);
-    boolean newDag = false;
-    for (Dag.DagNode<JobExecutionPlan> dagNode : dag.getNodes()) {
-      if (updateDagNode(dagId, dagNode) == 1) {
-        newDag = true;
+  public void writeCheckpoint(Dag<JobExecutionPlan> dag) throws IOException {
+    String dagId = DagManagerUtils.generateDagId(dag).toString();
+    
dbStatementExecutor.withPreparedStatement(String.format(INSERT_DAG_NODE_STATEMENT,
 tableName), insertStatement -> {
+
+      for (Dag.DagNode<JobExecutionPlan> dagNode : dag.getNodes()) {
+        insertStatement.setObject(1, dagNode.getValue().getId().toString());
+        insertStatement.setObject(2, dagId);
+        insertStatement.setObject(3, 
this.serDe.serialize(Collections.singletonList(dagNode.getValue())));
+        insertStatement.addBatch();
       }
-    }
-    if (newDag) {
-      this.totalDagCount.inc();
-    }
+
+      try {
+        return insertStatement.executeBatch();
+      } catch (SQLException e) {
+        throw new IOException(String.format("Failure adding dag for %s", 
dagId), e);
+      }}, true);
   }
 
   @Override
@@ -147,15 +147,15 @@ public class MysqlDagStateStoreWithDagNodes implements 
DagStateStoreWithDagNodes
 
   @Override
   public boolean cleanUp(DagManager.DagId dagId) throws IOException {
-    
dbStatementExecutor.withPreparedStatement(String.format(DELETE_DAG_STATEMENT, 
tableName), deleteStatement -> {
-      try {
-        deleteStatement.setString(1, dagId.toString());
-        return deleteStatement.executeUpdate() != 0;
-      } catch (SQLException e) {
-        throw new IOException(String.format("Failure deleting dag for %s", 
dagId), e);
-      }}, true);
-    this.totalDagCount.dec();
-    return true;
+    return 
dbStatementExecutor.withPreparedStatement(String.format(DELETE_DAG_STATEMENT, 
tableName),
+        deleteStatement -> {
+          try {
+            deleteStatement.setString(1, dagId.toString());
+            return deleteStatement.executeUpdate() != 0;
+          } catch (SQLException e) {
+            throw new IOException(String.format("Failure deleting dag for %s", 
dagId), e);
+          }
+        }, true);
   }
 
   @Override
@@ -195,16 +195,15 @@ public class MysqlDagStateStoreWithDagNodes implements 
DagStateStoreWithDagNodes
   }
 
   @Override
-  public int updateDagNode(DagManager.DagId parentDagId, 
Dag.DagNode<JobExecutionPlan> dagNode) throws IOException {
+  public boolean updateDagNode(Dag.DagNode<JobExecutionPlan> dagNode) throws 
IOException {
     String dagNodeId = dagNode.getValue().getId().toString();
-    return 
dbStatementExecutor.withPreparedStatement(String.format(INSERT_STATEMENT, 
tableName), insertStatement -> {
+    return 
dbStatementExecutor.withPreparedStatement(String.format(UPDATE_DAG_NODE_STATEMENT,
 tableName), updateStatement -> {
       try {
-        insertStatement.setString(1, dagNodeId);
-        insertStatement.setString(2, parentDagId.toString());
-        insertStatement.setString(3, 
this.serDe.serialize(Collections.singletonList(dagNode.getValue())));
-        return insertStatement.executeUpdate();
+        updateStatement.setString(1, 
this.serDe.serialize(Collections.singletonList(dagNode.getValue())));
+        updateStatement.setString(2, dagNodeId);
+        return updateStatement.executeUpdate() == 1;
       } catch (SQLException e) {
-        throw new IOException(String.format("Failure adding dag node for %s", 
dagNodeId), e);
+        throw new IOException(String.format("Failure updating dag node for 
%s", dagNodeId), e);
       }}, true);
   }
 
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java
index 0a9f6dcd67..71693705fc 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java
@@ -134,7 +134,7 @@ public class DagProcUtils {
       jobOrchestrationTimer.stop(jobMetadata);
       log.info("Orchestrated job: {} on Executor: {}", 
DagManagerUtils.getFullyQualifiedJobName(dagNode), specExecutorUri);
       
dagManagementStateStore.getDagManagerMetrics().incrementJobsSentToExecutor(dagNode);
-      dagManagementStateStore.addDagNodeState(dagNode, dagId);
+      dagManagementStateStore.updateDagNode(dagNode);
       sendEnforceJobStartDeadlineDagAction(dagManagementStateStore, dagNode);
     } catch (Exception e) {
       TimingEvent jobFailedTimer = 
DagProc.eventSubmitter.getTimingEvent(TimingEvent.LauncherTimings.JOB_FAILED);
@@ -158,7 +158,6 @@ public class DagProcUtils {
 
   public static void cancelDagNode(Dag.DagNode<JobExecutionPlan> 
dagNodeToCancel, DagManagementStateStore dagManagementStateStore) throws 
IOException {
     Properties cancelJobArgs = new Properties();
-    DagManager.DagId dagId = DagManagerUtils.generateDagId(dagNodeToCancel);
     String serializedFuture = null;
 
     if 
(dagNodeToCancel.getValue().getJobSpec().getConfig().hasPath(ConfigurationKeys.FLOW_EXECUTION_ID_KEY))
 {
@@ -175,12 +174,6 @@ public class DagProcUtils {
         log.warn("No Job future when canceling DAG node - {}", 
dagNodeToCancel.getValue().getId());
       }
       
DagManagerUtils.getSpecProducer(dagNodeToCancel).cancelJob(dagNodeToCancel.getValue().getJobSpec().getUri(),
 cancelJobArgs).get();
-      // add back the dag node with updated states in the store
-      dagNodeToCancel.getValue().setExecutionStatus(ExecutionStatus.CANCELLED);
-      dagManagementStateStore.addDagNodeState(dagNodeToCancel, dagId);
-      // send cancellation event after updating the state, because 
cancellation event triggers a ReevaluateDagAction
-      // that will delete the dag. Due to race condition between adding dag 
node and deleting dag, state store may get
-      // into inconsistent state.
       sendCancellationEvent(dagNodeToCancel);
       log.info("Cancelled dag node {}, spec_producer_future {}", 
dagNodeToCancel.getValue().getId(), serializedFuture);
     } catch (Exception e) {
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java
index ef554f6418..cdfa3d0509 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java
@@ -93,7 +93,17 @@ public class ReevaluateDagProc extends 
DagProc<Pair<Optional<Dag.DagNode<JobExec
     }
 
     // get the dag after updating dag node status
-    Dag<JobExecutionPlan> dag = 
dagManagementStateStore.getDag(getDagId()).get();
+    Optional<Dag<JobExecutionPlan>> dagOptional = 
dagManagementStateStore.getDag(getDagId());
+    if (!dagOptional.isPresent()) {
+      // This may happen if another ReevaluateDagProc removed the dag after 
this DagProc updated the dag node status.
+      // The other ReevaluateDagProc can do that purely out of race condition 
when the dag is cancelled and ReevaluateDagProcs
+      // are being processed for dag node kill requests; or when this DagProc 
ran into some exception after updating the
+      // status and thus gave the other ReevaluateDagProc sufficient time to 
delete the dag before being retried.
+      log.warn("Dag not found {}", getDagId());
+      return;
+    }
+
+    Dag<JobExecutionPlan> dag = dagOptional.get();
     onJobFinish(dagManagementStateStore, dagNode, dag);
 
     if (jobStatus.isShouldRetry()) {
@@ -132,7 +142,7 @@ public class ReevaluateDagProc extends 
DagProc<Pair<Optional<Dag.DagNode<JobExec
   private void updateDagNodeStatus(DagManagementStateStore 
dagManagementStateStore,
       Dag.DagNode<JobExecutionPlan> dagNode, ExecutionStatus executionStatus) 
throws IOException {
     dagNode.getValue().setExecutionStatus(executionStatus);
-    dagManagementStateStore.addDagNodeState(dagNode, getDagId());
+    dagManagementStateStore.updateDagNode(dagNode);
   }
 
   /**
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStoreTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStoreTest.java
index 3185943902..27a9dd553d 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStoreTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStoreTest.java
@@ -100,14 +100,13 @@ public class MySqlDagManagementStateStoreTest {
     Dag.DagNode<JobExecutionPlan> dagNode2 = dag.getNodes().get(1);
     Dag.DagNode<JobExecutionPlan> dagNode3 = dag2.getNodes().get(0);
     DagManager.DagId dagId = DagManagerUtils.generateDagId(dag);
-    DagManager.DagId dagId2 = DagManagerUtils.generateDagId(dag2);
     DagNodeId dagNodeId = 
DagManagerUtils.calcJobId(dagNode.getValue().getJobSpec().getConfig());
 
     this.dagManagementStateStore.addDag(dag);
     this.dagManagementStateStore.addDag(dag2);
-    this.dagManagementStateStore.addDagNodeState(dagNode, dagId);
-    this.dagManagementStateStore.addDagNodeState(dagNode2, dagId);
-    this.dagManagementStateStore.addDagNodeState(dagNode3, dagId2);
+    this.dagManagementStateStore.updateDagNode(dagNode);
+    this.dagManagementStateStore.updateDagNode(dagNode2);
+    this.dagManagementStateStore.updateDagNode(dagNode3);
 
     Assert.assertTrue(compareLists(dag.getNodes(), 
this.dagManagementStateStore.getDag(dagId).get().getNodes()));
     Assert.assertEquals(dagNode, 
this.dagManagementStateStore.getDagNodeWithJobStatus(dagNodeId).getLeft().get());
@@ -126,7 +125,7 @@ public class MySqlDagManagementStateStoreTest {
     jobExecutionPlan.setJobFuture(Optional.of(future));
 
     Dag.DagNode<JobExecutionPlan> duplicateDagNode = new 
Dag.DagNode<>(jobExecutionPlan);
-    this.dagManagementStateStore.addDagNodeState(duplicateDagNode, dagId);
+    this.dagManagementStateStore.updateDagNode(duplicateDagNode);
     
Assert.assertEquals(this.dagManagementStateStore.getDagNodes(dagId).size(), 2);
   }
 
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProcTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProcTest.java
index 651cc441df..6a15bf1645 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProcTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProcTest.java
@@ -123,7 +123,7 @@ public class ReevaluateDagProcTest {
     // next job is sent to spec producer
     Mockito.verify(specProducers.get(1), Mockito.times(1)).addSpec(any());
     // there are two invocations, one after setting status and other after 
sending new job to specProducer
-    Mockito.verify(this.dagManagementStateStore, 
Mockito.times(2)).addDagNodeState(any(), any());
+    Mockito.verify(this.dagManagementStateStore, 
Mockito.times(2)).updateDagNode(any());
 
     // assert that the first job is completed
     Assert.assertEquals(ExecutionStatus.COMPLETE,
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ResumeDagProcTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ResumeDagProcTest.java
index 150bed10e6..0623d00288 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ResumeDagProcTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ResumeDagProcTest.java
@@ -109,7 +109,7 @@ public class ResumeDagProcTest {
      buildNaiveTopologySpec().getSpecExecutor() respectively.
      the result will be that after serializing/deserializing the test dag, the 
spec executor (and producer) type may change */
 
-    Mockito.verify(this.dagManagementStateStore, 
Mockito.times(expectedNumOfResumedJobs)).addDagNodeState(any(), any());
+    Mockito.verify(this.dagManagementStateStore, 
Mockito.times(expectedNumOfResumedJobs)).updateDagNode(any());
 
     
Assert.assertFalse(DagProcUtils.isDagFinished(this.dagManagementStateStore.getDag(dagId).get()));
   }

Reply via email to