This is an automated email from the ASF dual-hosted git repository.
arjun4084346 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 2695699363 [GOBBLIN-2143] handle concurrent ReevaluateDagProc for
cancelled dag nodes correctly (#4038)
2695699363 is described below
commit 2695699363acbf8a07df61d6118f560f3839cb81
Author: Arjun Singh Bora <[email protected]>
AuthorDate: Thu Aug 29 15:33:58 2024 -0700
[GOBBLIN-2143] handle concurrent ReevaluateDagProc for cancelled dag nodes
correctly (#4038)
* avoid adding dag node where update only is required
* address review comments
---
.../orchestration/DagManagementStateStore.java | 2 +-
.../orchestration/DagStateStoreWithDagNodes.java | 8 +--
.../MySqlDagManagementStateStore.java | 11 ++--
.../MysqlDagStateStoreWithDagNodes.java | 65 +++++++++++-----------
.../modules/orchestration/proc/DagProcUtils.java | 9 +--
.../orchestration/proc/ReevaluateDagProc.java | 14 ++++-
.../MySqlDagManagementStateStoreTest.java | 9 ++-
.../orchestration/proc/ReevaluateDagProcTest.java | 2 +-
.../orchestration/proc/ResumeDagProcTest.java | 2 +-
9 files changed, 62 insertions(+), 60 deletions(-)
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java
index 0a8514f274..5e3c49d13c 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java
@@ -101,7 +101,7 @@ public interface DagManagementStateStore {
* @param dagNode dag node to be added
* @param dagId dag id of the dag this dag node belongs to
*/
- void addDagNodeState(Dag.DagNode<JobExecutionPlan> dagNode, DagManager.DagId
dagId) throws IOException;
+ void updateDagNode(Dag.DagNode<JobExecutionPlan> dagNode) throws IOException;
/**
* Returns the requested {@link
org.apache.gobblin.service.modules.flowgraph.Dag.DagNode} and its {@link
JobStatus}.
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagStateStoreWithDagNodes.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagStateStoreWithDagNodes.java
index 03aaf41520..171ce8855e 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagStateStoreWithDagNodes.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagStateStoreWithDagNodes.java
@@ -35,12 +35,10 @@ import
org.apache.gobblin.service.modules.spec.JobExecutionPlan;
public interface DagStateStoreWithDagNodes extends DagStateStore {
/**
- * Updates a dag node identified by the provided {@link DagManager.DagId}
- * with the given {@link Dag.DagNode}.
- * Returns 1 if the dag node is inserted as a new one, 2 if is updated, and
0 if new dag node is same as the existing one
- * <a
href="https://dev.mysql.com/doc/refman/8.4/en/insert-on-duplicate.html">Refer</a>
+ * Updates the {@link Dag.DagNode} with the provided value.
+ * Returns true if the dag node is updated successfully, false otherwise
*/
- int updateDagNode(DagManager.DagId dagId, Dag.DagNode<JobExecutionPlan>
dagNode) throws IOException;
+ boolean updateDagNode(Dag.DagNode<JobExecutionPlan> dagNode) throws
IOException;
/**
* Returns all the {@link
org.apache.gobblin.service.modules.flowgraph.Dag.DagNode}s for the given
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStore.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStore.java
index 45ee013c7d..c76c5d15ff 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStore.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStore.java
@@ -142,8 +142,11 @@ public class MySqlDagManagementStateStore implements
DagManagementStateStore {
@Override
public void deleteDag(DagManager.DagId dagId) throws IOException {
- this.dagStateStore.cleanUp(dagId);
- log.info("Deleted dag {}", dagId);
+ if (this.dagStateStore.cleanUp(dagId)) {
+ log.info("Deleted dag {}", dagId);
+ } else {
+ log.info("Dag deletion was tried but did not happen {}", dagId);
+ }
}
@Override
@@ -158,9 +161,9 @@ public class MySqlDagManagementStateStore implements
DagManagementStateStore {
}
@Override
- public synchronized void addDagNodeState(Dag.DagNode<JobExecutionPlan>
dagNode, DagManager.DagId dagId)
+ public synchronized void updateDagNode(Dag.DagNode<JobExecutionPlan> dagNode)
throws IOException {
- this.dagStateStore.updateDagNode(dagId, dagNode);
+ this.dagStateStore.updateDagNode(dagNode);
}
@Override
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStoreWithDagNodes.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStoreWithDagNodes.java
index 2692e20697..98aa356660 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStoreWithDagNodes.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStoreWithDagNodes.java
@@ -47,9 +47,7 @@ import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.instrumented.Instrumented;
import org.apache.gobblin.metastore.MysqlDataSourceFactory;
-import org.apache.gobblin.metrics.ContextAwareCounter;
import org.apache.gobblin.metrics.MetricContext;
-import org.apache.gobblin.metrics.ServiceMetricNames;
import org.apache.gobblin.runtime.api.TopologySpec;
import org.apache.gobblin.runtime.spec_serde.GsonSerDe;
import org.apache.gobblin.service.ServiceConfigKeys;
@@ -87,12 +85,11 @@ public class MysqlDagStateStoreWithDagNodes implements
DagStateStoreWithDagNodes
+ "UNIQUE INDEX dag_node_index (dag_node_id), "
+ "INDEX dag_index (parent_dag_id))";
- protected static final String INSERT_STATEMENT = "INSERT INTO %s
(dag_node_id, parent_dag_id, dag_node) "
- + "VALUES (?, ?, ?) AS new ON DUPLICATE KEY UPDATE dag_node =
new.dag_node";
+ protected static final String INSERT_DAG_NODE_STATEMENT = "INSERT INTO %s
(dag_node_id, parent_dag_id, dag_node) VALUES (?, ?, ?)";
+ protected static final String UPDATE_DAG_NODE_STATEMENT = "UPDATE %s SET
dag_node = ? WHERE dag_node_id = ?";
protected static final String GET_DAG_NODES_STATEMENT = "SELECT dag_node
FROM %s WHERE parent_dag_id = ?";
protected static final String GET_DAG_NODE_STATEMENT = "SELECT dag_node FROM
%s WHERE dag_node_id = ?";
protected static final String DELETE_DAG_STATEMENT = "DELETE FROM %s WHERE
parent_dag_id = ?";
- private final ContextAwareCounter totalDagCount;
public MysqlDagStateStoreWithDagNodes(Config config, Map<URI, TopologySpec>
topologySpecMap) throws IOException {
if (config.hasPath(CONFIG_PREFIX)) {
@@ -121,23 +118,26 @@ public class MysqlDagStateStoreWithDagNodes implements
DagStateStoreWithDagNodes
this.jobExecPlanDagFactory = new JobExecutionPlanDagFactory();
MetricContext metricContext =
Instrumented.getMetricContext(new
State(ConfigUtils.configToProperties(config)), this.getClass());
- this.totalDagCount =
metricContext.contextAwareCounter(ServiceMetricNames.DAG_COUNT_MYSQL_DAG_STATE_COUNT);
log.info("Instantiated {}", getClass().getSimpleName());
}
@Override
- public void writeCheckpoint(Dag<JobExecutionPlan> dag)
- throws IOException {
- DagManager.DagId dagId = DagManagerUtils.generateDagId(dag);
- boolean newDag = false;
- for (Dag.DagNode<JobExecutionPlan> dagNode : dag.getNodes()) {
- if (updateDagNode(dagId, dagNode) == 1) {
- newDag = true;
+ public void writeCheckpoint(Dag<JobExecutionPlan> dag) throws IOException {
+ String dagId = DagManagerUtils.generateDagId(dag).toString();
+
dbStatementExecutor.withPreparedStatement(String.format(INSERT_DAG_NODE_STATEMENT,
tableName), insertStatement -> {
+
+ for (Dag.DagNode<JobExecutionPlan> dagNode : dag.getNodes()) {
+ insertStatement.setObject(1, dagNode.getValue().getId().toString());
+ insertStatement.setObject(2, dagId);
+ insertStatement.setObject(3,
this.serDe.serialize(Collections.singletonList(dagNode.getValue())));
+ insertStatement.addBatch();
}
- }
- if (newDag) {
- this.totalDagCount.inc();
- }
+
+ try {
+ return insertStatement.executeBatch();
+ } catch (SQLException e) {
+ throw new IOException(String.format("Failure adding dag for %s",
dagId), e);
+ }}, true);
}
@Override
@@ -147,15 +147,15 @@ public class MysqlDagStateStoreWithDagNodes implements
DagStateStoreWithDagNodes
@Override
public boolean cleanUp(DagManager.DagId dagId) throws IOException {
-
dbStatementExecutor.withPreparedStatement(String.format(DELETE_DAG_STATEMENT,
tableName), deleteStatement -> {
- try {
- deleteStatement.setString(1, dagId.toString());
- return deleteStatement.executeUpdate() != 0;
- } catch (SQLException e) {
- throw new IOException(String.format("Failure deleting dag for %s",
dagId), e);
- }}, true);
- this.totalDagCount.dec();
- return true;
+ return
dbStatementExecutor.withPreparedStatement(String.format(DELETE_DAG_STATEMENT,
tableName),
+ deleteStatement -> {
+ try {
+ deleteStatement.setString(1, dagId.toString());
+ return deleteStatement.executeUpdate() != 0;
+ } catch (SQLException e) {
+ throw new IOException(String.format("Failure deleting dag for %s",
dagId), e);
+ }
+ }, true);
}
@Override
@@ -195,16 +195,15 @@ public class MysqlDagStateStoreWithDagNodes implements
DagStateStoreWithDagNodes
}
@Override
- public int updateDagNode(DagManager.DagId parentDagId,
Dag.DagNode<JobExecutionPlan> dagNode) throws IOException {
+ public boolean updateDagNode(Dag.DagNode<JobExecutionPlan> dagNode) throws
IOException {
String dagNodeId = dagNode.getValue().getId().toString();
- return
dbStatementExecutor.withPreparedStatement(String.format(INSERT_STATEMENT,
tableName), insertStatement -> {
+ return
dbStatementExecutor.withPreparedStatement(String.format(UPDATE_DAG_NODE_STATEMENT,
tableName), updateStatement -> {
try {
- insertStatement.setString(1, dagNodeId);
- insertStatement.setString(2, parentDagId.toString());
- insertStatement.setString(3,
this.serDe.serialize(Collections.singletonList(dagNode.getValue())));
- return insertStatement.executeUpdate();
+ updateStatement.setString(1,
this.serDe.serialize(Collections.singletonList(dagNode.getValue())));
+ updateStatement.setString(2, dagNodeId);
+ return updateStatement.executeUpdate() == 1;
} catch (SQLException e) {
- throw new IOException(String.format("Failure adding dag node for %s",
dagNodeId), e);
+ throw new IOException(String.format("Failure updating dag node for
%s", dagNodeId), e);
}}, true);
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java
index 0a9f6dcd67..71693705fc 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java
@@ -134,7 +134,7 @@ public class DagProcUtils {
jobOrchestrationTimer.stop(jobMetadata);
log.info("Orchestrated job: {} on Executor: {}",
DagManagerUtils.getFullyQualifiedJobName(dagNode), specExecutorUri);
dagManagementStateStore.getDagManagerMetrics().incrementJobsSentToExecutor(dagNode);
- dagManagementStateStore.addDagNodeState(dagNode, dagId);
+ dagManagementStateStore.updateDagNode(dagNode);
sendEnforceJobStartDeadlineDagAction(dagManagementStateStore, dagNode);
} catch (Exception e) {
TimingEvent jobFailedTimer =
DagProc.eventSubmitter.getTimingEvent(TimingEvent.LauncherTimings.JOB_FAILED);
@@ -158,7 +158,6 @@ public class DagProcUtils {
public static void cancelDagNode(Dag.DagNode<JobExecutionPlan>
dagNodeToCancel, DagManagementStateStore dagManagementStateStore) throws
IOException {
Properties cancelJobArgs = new Properties();
- DagManager.DagId dagId = DagManagerUtils.generateDagId(dagNodeToCancel);
String serializedFuture = null;
if
(dagNodeToCancel.getValue().getJobSpec().getConfig().hasPath(ConfigurationKeys.FLOW_EXECUTION_ID_KEY))
{
@@ -175,12 +174,6 @@ public class DagProcUtils {
log.warn("No Job future when canceling DAG node - {}",
dagNodeToCancel.getValue().getId());
}
DagManagerUtils.getSpecProducer(dagNodeToCancel).cancelJob(dagNodeToCancel.getValue().getJobSpec().getUri(),
cancelJobArgs).get();
- // add back the dag node with updated states in the store
- dagNodeToCancel.getValue().setExecutionStatus(ExecutionStatus.CANCELLED);
- dagManagementStateStore.addDagNodeState(dagNodeToCancel, dagId);
- // send cancellation event after updating the state, because
cancellation event triggers a ReevaluateDagAction
- // that will delete the dag. Due to race condition between adding dag
node and deleting dag, state store may get
- // into inconsistent state.
sendCancellationEvent(dagNodeToCancel);
log.info("Cancelled dag node {}, spec_producer_future {}",
dagNodeToCancel.getValue().getId(), serializedFuture);
} catch (Exception e) {
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java
index ef554f6418..cdfa3d0509 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java
@@ -93,7 +93,17 @@ public class ReevaluateDagProc extends
DagProc<Pair<Optional<Dag.DagNode<JobExec
}
// get the dag after updating dag node status
- Dag<JobExecutionPlan> dag =
dagManagementStateStore.getDag(getDagId()).get();
+ Optional<Dag<JobExecutionPlan>> dagOptional =
dagManagementStateStore.getDag(getDagId());
+ if (!dagOptional.isPresent()) {
+ // This may happen if another ReevaluateDagProc removed the dag after
this DagProc updated the dag node status.
+ // The other ReevaluateDagProc can do that purely out of race condition
when the dag is cancelled and ReevaluateDagProcs
+ // are being processed for dag node kill requests; or when this DagProc
ran into some exception after updating the
+ // status and thus gave the other ReevaluateDagProc sufficient time to
delete the dag before being retried.
+ log.warn("Dag not found {}", getDagId());
+ return;
+ }
+
+ Dag<JobExecutionPlan> dag = dagOptional.get();
onJobFinish(dagManagementStateStore, dagNode, dag);
if (jobStatus.isShouldRetry()) {
@@ -132,7 +142,7 @@ public class ReevaluateDagProc extends
DagProc<Pair<Optional<Dag.DagNode<JobExec
private void updateDagNodeStatus(DagManagementStateStore
dagManagementStateStore,
Dag.DagNode<JobExecutionPlan> dagNode, ExecutionStatus executionStatus)
throws IOException {
dagNode.getValue().setExecutionStatus(executionStatus);
- dagManagementStateStore.addDagNodeState(dagNode, getDagId());
+ dagManagementStateStore.updateDagNode(dagNode);
}
/**
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStoreTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStoreTest.java
index 3185943902..27a9dd553d 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStoreTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStoreTest.java
@@ -100,14 +100,13 @@ public class MySqlDagManagementStateStoreTest {
Dag.DagNode<JobExecutionPlan> dagNode2 = dag.getNodes().get(1);
Dag.DagNode<JobExecutionPlan> dagNode3 = dag2.getNodes().get(0);
DagManager.DagId dagId = DagManagerUtils.generateDagId(dag);
- DagManager.DagId dagId2 = DagManagerUtils.generateDagId(dag2);
DagNodeId dagNodeId =
DagManagerUtils.calcJobId(dagNode.getValue().getJobSpec().getConfig());
this.dagManagementStateStore.addDag(dag);
this.dagManagementStateStore.addDag(dag2);
- this.dagManagementStateStore.addDagNodeState(dagNode, dagId);
- this.dagManagementStateStore.addDagNodeState(dagNode2, dagId);
- this.dagManagementStateStore.addDagNodeState(dagNode3, dagId2);
+ this.dagManagementStateStore.updateDagNode(dagNode);
+ this.dagManagementStateStore.updateDagNode(dagNode2);
+ this.dagManagementStateStore.updateDagNode(dagNode3);
Assert.assertTrue(compareLists(dag.getNodes(),
this.dagManagementStateStore.getDag(dagId).get().getNodes()));
Assert.assertEquals(dagNode,
this.dagManagementStateStore.getDagNodeWithJobStatus(dagNodeId).getLeft().get());
@@ -126,7 +125,7 @@ public class MySqlDagManagementStateStoreTest {
jobExecutionPlan.setJobFuture(Optional.of(future));
Dag.DagNode<JobExecutionPlan> duplicateDagNode = new
Dag.DagNode<>(jobExecutionPlan);
- this.dagManagementStateStore.addDagNodeState(duplicateDagNode, dagId);
+ this.dagManagementStateStore.updateDagNode(duplicateDagNode);
Assert.assertEquals(this.dagManagementStateStore.getDagNodes(dagId).size(), 2);
}
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProcTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProcTest.java
index 651cc441df..6a15bf1645 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProcTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProcTest.java
@@ -123,7 +123,7 @@ public class ReevaluateDagProcTest {
// next job is sent to spec producer
Mockito.verify(specProducers.get(1), Mockito.times(1)).addSpec(any());
// there are two invocations, one after setting status and other after
sending new job to specProducer
- Mockito.verify(this.dagManagementStateStore,
Mockito.times(2)).addDagNodeState(any(), any());
+ Mockito.verify(this.dagManagementStateStore,
Mockito.times(2)).updateDagNode(any());
// assert that the first job is completed
Assert.assertEquals(ExecutionStatus.COMPLETE,
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ResumeDagProcTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ResumeDagProcTest.java
index 150bed10e6..0623d00288 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ResumeDagProcTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ResumeDagProcTest.java
@@ -109,7 +109,7 @@ public class ResumeDagProcTest {
buildNaiveTopologySpec().getSpecExecutor() respectively.
the result will be that after serializing/deserializing the test dag, the
spec executor (and producer) type may change */
- Mockito.verify(this.dagManagementStateStore,
Mockito.times(expectedNumOfResumedJobs)).addDagNodeState(any(), any());
+ Mockito.verify(this.dagManagementStateStore,
Mockito.times(expectedNumOfResumedJobs)).updateDagNode(any());
Assert.assertFalse(DagProcUtils.isDagFinished(this.dagManagementStateStore.getDag(dagId).get()));
}