This is an automated email from the ASF dual-hosted git repository.

arjun4084346 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new ceca1e5de [GOBBLIN-2022] add re-evaluate dag proc (#3896)
ceca1e5de is described below

commit ceca1e5ded294711ec44ca183fe4675161beb963
Author: Arjun Singh Bora <[email protected]>
AuthorDate: Thu Apr 4 10:28:29 2024 -0700

    [GOBBLIN-2022] add re-evaluate dag proc (#3896)
    
    * add re-evaluate dag proc
    address review comments
    fix test
    merge conflicts
---
 .../src/main/avro/DagActionStoreChangeEvent.avsc   |   6 +-
 .../runtime/KafkaAvroJobStatusMonitorTest.java     |   7 +-
 .../modules/orchestration/DagActionStore.java      |   2 +-
 .../orchestration/DagManagementStateStore.java     |   7 +-
 .../orchestration/DagManagementTaskStreamImpl.java |   3 +
 .../modules/orchestration/DagProcFactory.java      |   7 +
 .../modules/orchestration/DagTaskVisitor.java      |   2 +
 .../MostlyMySqlDagManagementStateStore.java        |  10 +-
 .../modules/orchestration/proc/DagProc.java        |   4 +
 .../modules/orchestration/proc/LaunchDagProc.java  |   8 +-
 .../orchestration/proc/ReevaluateDagProc.java      | 194 +++++++++++++++++++++
 .../modules/orchestration/task/DagTask.java        |   2 +-
 .../ReevaluateDagTask.java}                        |  21 ++-
 .../DagManagementDagActionStoreChangeMonitor.java  |   3 +-
 .../monitoring/KafkaAvroJobStatusMonitor.java      |   7 +-
 .../service/monitoring/KafkaJobStatusMonitor.java  |  59 +++++--
 .../monitoring/KafkaJobStatusMonitorFactory.java   |  16 +-
 .../gobblin/service/GobblinServiceManagerTest.java |   4 +-
 .../DagManagementTaskStreamImplTest.java           |   6 +
 .../orchestration/DagProcessingEngineTest.java     |  28 ++-
 .../MostlyMySqlDagManagementStateStoreTest.java    |   2 +-
 .../MysqlMultiActiveLeaseArbiterTest.java          |   9 +-
 .../orchestration/MysqlUserQuotaManagerTest.java   |  10 +-
 .../orchestration/proc/LaunchDagProcTest.java      |   9 +
 .../orchestration/proc/ReevaluateDagProcTest.java  | 177 +++++++++++++++++++
 .../service/monitoring/JobStatusRetrieverTest.java |   8 +-
 .../monitoring/MysqlJobStatusRetrieverTest.java    |  40 ++++-
 27 files changed, 581 insertions(+), 70 deletions(-)

diff --git 
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/DagActionStoreChangeEvent.avsc
 
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/DagActionStoreChangeEvent.avsc
index 60eeb154a..c6d94a628 100644
--- 
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/DagActionStoreChangeEvent.avsc
+++ 
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/DagActionStoreChangeEvent.avsc
@@ -36,12 +36,14 @@
       "symbols": [
         "KILL",
         "RESUME",
-        "LAUNCH"
+        "LAUNCH",
+        "REEVALUATE"
       ],
       "symbolDocs": {
         "KILL": "Kill the flow corresponding to this dag",
         "RESUME": "Resume or start a new flow corresponding to this dag",
-        "LAUNCH": "Launch a new execution of the flow corresponding to this 
dag"
+        "LAUNCH": "Launch a new execution of the flow corresponding to this 
dag",
+        "REEVALUATE": "Re-evaluate what needs to be done upon receipt of a 
final job status"
       }
     },
     "doc" : "type of dag action",
diff --git 
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
 
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
index d9102a1be..94980017c 100644
--- 
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
+++ 
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
@@ -71,6 +71,7 @@ import 
org.apache.gobblin.runtime.troubleshooter.InMemoryMultiContextIssueReposi
 import org.apache.gobblin.runtime.troubleshooter.JobIssueEventHandler;
 import org.apache.gobblin.runtime.troubleshooter.MultiContextIssueRepository;
 import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.modules.orchestration.MysqlDagActionStore;
 import org.apache.gobblin.service.monitoring.GaaSObservabilityEventProducer;
 import org.apache.gobblin.service.monitoring.JobStatusRetriever;
 import org.apache.gobblin.service.monitoring.KafkaAvroJobStatusMonitor;
@@ -97,6 +98,7 @@ public class KafkaAvroJobStatusMonitorTest {
   private String stateStoreDir = "/tmp/jobStatusMonitor/statestore";
   private MetricContext context;
   private KafkaAvroEventKeyValueReporter.Builder<?> builder;
+  private MysqlDagActionStore mysqlDagActionStore;
 
   @BeforeClass
   public void setUp() throws Exception {
@@ -115,6 +117,7 @@ public class KafkaAvroJobStatusMonitorTest {
     builder = KafkaAvroEventKeyValueReporter.Factory.forContext(context);
     builder = 
builder.withKafkaPusher(pusher).withKeys(Lists.newArrayList(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD,
         TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD, 
TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD));
+    this.mysqlDagActionStore = mock(MysqlDagActionStore.class);
   }
 
   @Test
@@ -775,7 +778,7 @@ public class KafkaAvroJobStatusMonitorTest {
     public MockKafkaAvroJobStatusMonitor(String topic, Config config, int 
numThreads,
         AtomicBoolean shouldThrowFakeExceptionInParseJobStatusToggle, 
GaaSObservabilityEventProducer producer)
         throws IOException, ReflectiveOperationException {
-      super(topic, config, numThreads, mock(JobIssueEventHandler.class), 
producer);
+      super(topic, config, numThreads, mock(JobIssueEventHandler.class), 
producer, mysqlDagActionStore);
       shouldThrowFakeExceptionInParseJobStatus = 
shouldThrowFakeExceptionInParseJobStatusToggle;
     }
 
@@ -791,7 +794,7 @@ public class KafkaAvroJobStatusMonitorTest {
 
     /**
      * Overridden to stub potential exception within core processing of 
`processMessage` (specifically retried portion).
-     * Although truly plausible (IO)Exceptions would originate from 
`KafkaJobStatusMonitor.addJobStatusToStateStore`,
+     * Although truly plausible (IO)Exceptions would originate from 
`KafkaJobStatusMonitor.updateJobStatus`,
      * that is `static`, so unavailable for override.  The approach here is a 
pragmatic compromise, being simpler than
      * the alternative of writing a mock `StateStore.Factory` that the 
`KafkaJobStatusMonitor` ctor could reflect,
      * instantiate, and finally create a mock `StateStore` from.
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionStore.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionStore.java
index 94beebd16..eafb05750 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionStore.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionStore.java
@@ -35,7 +35,7 @@ public interface DagActionStore {
     LAUNCH, // Launch new flow execution invoked adhoc or through scheduled 
trigger
     RETRY, // Invoked through DagManager for flows configured to allow retries
     CANCEL, // Invoked through DagManager if flow has been stuck in 
Orchestrated state for a while
-    ADVANCE // Launch next step in multi-hop dag
+    REEVALUATE // Re-evaluate what needs to be done upon receipt of a final 
job status
   }
 
   @Data
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java
index 274abeaa0..b56459da7 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java
@@ -132,11 +132,12 @@ public interface DagManagementStateStore {
   void addDagNodeState(Dag.DagNode<JobExecutionPlan> dagNode, DagManager.DagId 
dagId) throws IOException;
 
   /**
-   * Returns the requested {@link  
org.apache.gobblin.service.modules.flowgraph.Dag.DagNode} and its {@link 
JobStatus},
-   * or Optional.absent if it is not found.
+   * Returns the requested {@link  
org.apache.gobblin.service.modules.flowgraph.Dag.DagNode} and its {@link 
JobStatus}.
+   * Both params are returned as optional and are empty if not present in the 
store.
+   * JobStatus can be non-empty only if DagNode is non-empty.
    * @param dagNodeId of the dag node
    */
-  Optional<Pair<Dag.DagNode<JobExecutionPlan>, JobStatus>> 
getDagNodeWithJobStatus(DagNodeId dagNodeId);
+  Pair<Optional<Dag.DagNode<JobExecutionPlan>>, Optional<JobStatus>> 
getDagNodeWithJobStatus(DagNodeId dagNodeId);
 
   /**
    * Returns a list of {@link 
org.apache.gobblin.service.modules.flowgraph.Dag.DagNode} for a {@link Dag}.
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java
index 4409a673f..8ad246aa2 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java
@@ -41,6 +41,7 @@ import org.apache.gobblin.metrics.event.EventSubmitter;
 import org.apache.gobblin.runtime.util.InjectionNames;
 import org.apache.gobblin.service.modules.orchestration.task.DagTask;
 import org.apache.gobblin.service.modules.orchestration.task.LaunchDagTask;
+import org.apache.gobblin.service.modules.orchestration.task.ReevaluateDagTask;
 import org.apache.gobblin.util.ConfigUtils;
 
 
@@ -161,6 +162,8 @@ public class DagManagementTaskStreamImpl implements 
DagManagement, DagTaskStream
     switch (dagActionType) {
       case LAUNCH:
         return new LaunchDagTask(dagAction, leaseObtainedStatus, 
dagActionStore.get());
+      case REEVALUATE:
+        return new ReevaluateDagTask(dagAction, leaseObtainedStatus, 
dagActionStore.get());
       default:
         throw new UnsupportedOperationException(dagActionType + " not yet 
implemented");
     }
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcFactory.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcFactory.java
index 24e65eb0b..ef7aea3d2 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcFactory.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcFactory.java
@@ -23,8 +23,10 @@ import com.google.inject.Singleton;
 import org.apache.gobblin.annotation.Alpha;
 import org.apache.gobblin.service.modules.orchestration.proc.DagProc;
 import org.apache.gobblin.service.modules.orchestration.proc.LaunchDagProc;
+import org.apache.gobblin.service.modules.orchestration.proc.ReevaluateDagProc;
 import org.apache.gobblin.service.modules.orchestration.task.DagTask;
 import org.apache.gobblin.service.modules.orchestration.task.LaunchDagTask;
+import org.apache.gobblin.service.modules.orchestration.task.ReevaluateDagTask;
 import 
org.apache.gobblin.service.modules.utils.FlowCompilationValidationHelper;
 
 
@@ -49,6 +51,11 @@ public class DagProcFactory implements 
DagTaskVisitor<DagProc> {
   public LaunchDagProc meet(LaunchDagTask launchDagTask) {
     return new LaunchDagProc(launchDagTask, 
this.flowCompilationValidationHelper);
   }
+
+  @Override
+  public ReevaluateDagProc meet(ReevaluateDagTask reEvaluateDagTask) {
+    return new ReevaluateDagProc(reEvaluateDagTask);
+  }
   //todo - overload meet method for other dag tasks
 }
 
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagTaskVisitor.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagTaskVisitor.java
index 0b27b4e9d..e946f9835 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagTaskVisitor.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagTaskVisitor.java
@@ -18,8 +18,10 @@
 package org.apache.gobblin.service.modules.orchestration;
 
 import org.apache.gobblin.service.modules.orchestration.task.LaunchDagTask;
+import org.apache.gobblin.service.modules.orchestration.task.ReevaluateDagTask;
 
 
 public interface DagTaskVisitor<T> {
   T meet(LaunchDagTask launchDagTask);
+  T meet(ReevaluateDagTask reevaluateDagTask);
 }
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStore.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStore.java
index 5304c8003..b010a0cb5 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStore.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStore.java
@@ -211,12 +211,12 @@ public class MostlyMySqlDagManagementStateStore 
implements DagManagementStateSto
     return this.dagStateStore.existsDag(dagId);
   }
 
-  public Optional<Pair<Dag.DagNode<JobExecutionPlan>, JobStatus>> 
getDagNodeWithJobStatus(DagNodeId dagNodeId) {
-    Optional<JobStatus> jobStatus = getJobStatus(dagNodeId);
-    if (this.dagNodes.containsKey(dagNodeId) && jobStatus.isPresent()) {
-      return Optional.of(ImmutablePair.of(this.dagNodes.get(dagNodeId), 
jobStatus.get()));
+  public Pair<Optional<Dag.DagNode<JobExecutionPlan>>, Optional<JobStatus>> 
getDagNodeWithJobStatus(DagNodeId dagNodeId) {
+    if (this.dagNodes.containsKey(dagNodeId)) {
+      return ImmutablePair.of(Optional.of(this.dagNodes.get(dagNodeId)), 
getJobStatus(dagNodeId));
     } else {
-      return Optional.empty();
+      // no point of searching for status if the node itself is absent.
+      return ImmutablePair.of(Optional.empty(), Optional.empty());
     }
   }
 
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProc.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProc.java
index 4f0a254b5..fece28d8c 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProc.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProc.java
@@ -40,6 +40,10 @@ import 
org.apache.gobblin.service.modules.orchestration.task.DagTask;
  * actions based on the type of {@link DagTask}. Submitting events in time is 
found to be important in
  * <a href="https://github.com/apache/gobblin/pull/3641";>PR#3641</a>, hence 
initialize and act methods submit events as
  * they happen.
+ * Parameter T is the type of object that needs an initialisation before the 
dag proc does the main work in `act` method.
+ * This object is then passed to `act` method.
+ *
+ * @param <T> type of the initialization "state" on which to {@link 
DagProc#act}
  */
 @Alpha
 @Data
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProc.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProc.java
index 4cef3b7a7..482cb563d 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProc.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProc.java
@@ -77,6 +77,8 @@ public class LaunchDagProc extends 
DagProc<Optional<Dag<JobExecutionPlan>>> {
       // todo - add metrics
     } else {
       submitNextNodes(dagManagementStateStore, dag.get());
+      //Checkpoint the dag state, it should have an updated value of dag nodes
+      dagManagementStateStore.checkpointDag(dag.get());
       orchestrationDelayCounter.set(System.currentTimeMillis() - 
DagManagerUtils.getFlowExecId(dag.get()));
     }
   }
@@ -84,8 +86,7 @@ public class LaunchDagProc extends 
DagProc<Optional<Dag<JobExecutionPlan>>> {
   /**
    * Submit next set of Dag nodes in the provided Dag.
    */
-   private void submitNextNodes(DagManagementStateStore 
dagManagementStateStore,
-       Dag<JobExecutionPlan> dag) throws IOException {
+   private void submitNextNodes(DagManagementStateStore 
dagManagementStateStore, Dag<JobExecutionPlan> dag) {
      Set<Dag.DagNode<JobExecutionPlan>> nextNodes = 
DagManagerUtils.getNext(dag);
 
      if (nextNodes.size() > 1) {
@@ -97,9 +98,6 @@ public class LaunchDagProc extends 
DagProc<Optional<Dag<JobExecutionPlan>>> {
        DagProcUtils.submitJobToExecutor(dagManagementStateStore, dagNode, 
getDagId());
        log.info("Submitted job {} for dagId {}", 
DagManagerUtils.getJobName(dagNode), getDagId());
      }
-
-     //Checkpoint the dag state, it should have an updated value of dag nodes
-     dagManagementStateStore.checkpointDag(dag);
    }
 
   private void handleMultipleJobs(Set<Dag.DagNode<JobExecutionPlan>> 
nextNodes) {
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java
new file mode 100644
index 000000000..6189a9b6e
--- /dev/null
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.proc;
+
+import java.io.IOException;
+import java.util.Optional;
+import java.util.Set;
+
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.flowgraph.DagNodeId;
+import 
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
+import org.apache.gobblin.service.modules.orchestration.task.ReevaluateDagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
+import org.apache.gobblin.service.monitoring.JobStatus;
+
+
+/**
+ * suggest:
+ * A {@link DagProc} to launch any subsequent (dependent) job(s) once all 
pre-requisite job(s) in the Dag have succeeded.
+ * When there are no more jobs to run and no more running, it cleans up the 
Dag.
+ * (In future), if there are multiple new jobs to be launched, separate launch 
dag actions are created for each of them.
+ */
+@Slf4j
+public class ReevaluateDagProc extends 
DagProc<Pair<Optional<Dag.DagNode<JobExecutionPlan>>, Optional<JobStatus>>> {
+
+  public ReevaluateDagProc(ReevaluateDagTask reEvaluateDagTask) {
+    super(reEvaluateDagTask);
+  }
+
+  @Override
+  protected Pair<Optional<Dag.DagNode<JobExecutionPlan>>, Optional<JobStatus>> 
initialize(DagManagementStateStore dagManagementStateStore)
+      throws IOException {
+    Pair<Optional<Dag.DagNode<JobExecutionPlan>>, Optional<JobStatus>> 
dagNodeWithJobStatus =
+        dagManagementStateStore.getDagNodeWithJobStatus(this.dagNodeId);
+
+    if (!dagNodeWithJobStatus.getLeft().isPresent() || 
!dagNodeWithJobStatus.getRight().isPresent()) {
+      // this is possible when MALA malfunctions and a duplicated reevaluate 
dag proc is launched for a dag node that is
+      // already "reevaluated" and cleaned up.
+      return ImmutablePair.of(Optional.empty(), Optional.empty());
+    }
+
+    ExecutionStatus executionStatus = 
ExecutionStatus.valueOf(dagNodeWithJobStatus.getRight().get().getEventName());
+    if 
(!FlowStatusGenerator.FINISHED_STATUSES.contains(executionStatus.name())) {
+      log.warn("Job status for dagNode {} is {}. Re-evaluate dag action should 
have been created only for finished status - {}",
+          dagNodeId, executionStatus, FlowStatusGenerator.FINISHED_STATUSES);
+      // this may happen if adding job status in the store failed after adding 
a ReevaluateDagAction in KafkaJobStatusMonitor
+      throw new RuntimeException(String.format("Job status %s is not final for 
job %s", executionStatus, getDagId()));
+    }
+
+    setStatus(dagManagementStateStore, dagNodeWithJobStatus.getLeft().get(), 
executionStatus);
+    return dagNodeWithJobStatus;
+  }
+
+  @Override
+  protected void act(DagManagementStateStore dagManagementStateStore, 
Pair<Optional<Dag.DagNode<JobExecutionPlan>>, Optional<JobStatus>> 
dagNodeWithJobStatus)
+      throws IOException {
+    if (!dagNodeWithJobStatus.getLeft().isPresent()) {
+      // one of the reason this could arise is when the MALA leasing doesn't 
work cleanly and another DagProc::process
+      // has cleaned up the Dag, yet did not complete the lease before this 
current one acquired its own
+      log.error("DagNode or its job status not found for a Reevaluate 
DagAction with dag node id {}", this.dagNodeId);
+      // todo - add metrics to count such occurrences
+      return;
+    }
+
+    Dag.DagNode<JobExecutionPlan> dagNode = 
dagNodeWithJobStatus.getLeft().get();
+    JobStatus jobStatus = dagNodeWithJobStatus.getRight().get();
+    ExecutionStatus executionStatus = dagNode.getValue().getExecutionStatus();
+    Dag<JobExecutionPlan> dag = 
dagManagementStateStore.getDag(getDagId()).get();
+    onJobFinish(dagManagementStateStore, dagNode, executionStatus, dag);
+
+    if (jobStatus.isShouldRetry()) {
+      log.info("Retrying job: {}, current attempts: {}, max attempts: {}",
+          DagManagerUtils.getFullyQualifiedJobName(dagNode), 
jobStatus.getCurrentAttempts(), jobStatus.getMaxAttempts());
+      // todo - be careful when unsetting this, it is possible that this is 
set to FAILED because some other job in the
+      // dag failed and is also not retryable. in that case if this job's 
retry passes, overall status of the dag can be
+      // set to PASS, which would be incorrect.
+      dag.setFlowEvent(null);
+      DagProcUtils.submitJobToExecutor(dagManagementStateStore, dagNode, 
getDagId());
+    } else if (!dagManagementStateStore.hasRunningJobs(getDagId())) {
+      if (dag.getFlowEvent() == null) {
+        // If the dag flow event is not set and there are no more jobs 
running, then it is successful
+        // also note that `onJobFinish` method does whatever is required to do 
after job finish, determining a Dag's
+        // status is not possible on individual job's finish status
+        dag.setFlowEvent(TimingEvent.FlowTimings.FLOW_SUCCEEDED);
+      }
+      String flowEvent = dag.getFlowEvent();
+      DagManagerUtils.emitFlowEvent(eventSubmitter, dag, flowEvent);
+      if (flowEvent.equals(TimingEvent.FlowTimings.FLOW_SUCCEEDED)) {
+        // todo - verify if work from PR#3641 is required
+        dagManagementStateStore.deleteDag(getDagId());
+      } else {
+        dagManagementStateStore.markDagFailed(dag);
+      }
+    }
+  }
+
+  /**
+   * Sets status of a dag node inside the given Dag.
+   * todo - DMSS should support this functionality like an atomic get-and-set 
operation.
+   */
+  private void setStatus(DagManagementStateStore dagManagementStateStore,
+      Dag.DagNode<JobExecutionPlan> dagNode, ExecutionStatus executionStatus) 
throws IOException {
+    Dag<JobExecutionPlan> dag = 
dagManagementStateStore.getDag(getDagId()).get();
+    DagNodeId dagNodeId = dagNode.getValue().getId();
+    for (Dag.DagNode<JobExecutionPlan> node : dag.getNodes()) {
+      if (node.getValue().getId().equals(dagNodeId)) {
+        node.getValue().setExecutionStatus(executionStatus);
+        dagManagementStateStore.checkpointDag(dag);
+        return;
+      }
+    }
+    log.error("DagNode with id {} not found in Dag {}", dagNodeId, getDagId());
+  }
+
+  /**
+   * Method that defines the actions to be performed when a job finishes 
either successfully or with failure.
+   * This method updates the state of the dag and performs clean up actions as 
necessary.
+   */
+  private void onJobFinish(DagManagementStateStore dagManagementStateStore, 
Dag.DagNode<JobExecutionPlan> dagNode,
+      ExecutionStatus executionStatus, Dag<JobExecutionPlan> dag) throws 
IOException {
+    String jobName = DagManagerUtils.getFullyQualifiedJobName(dagNode);
+    log.info("Job {} of Dag {} has finished with status {}", jobName, 
getDagId(), executionStatus.name());
+    // Only decrement counters and quota for jobs that actually ran on the 
executor, not from a GaaS side failure/skip event
+    if (dagManagementStateStore.releaseQuota(dagNode)) {
+      
dagManagementStateStore.getDagManagerMetrics().decrementRunningJobMetrics(dagNode);
+    }
+
+    switch (executionStatus) {
+      case FAILED:
+        dag.setMessage("Flow failed because job " + jobName + " failed");
+        dag.setFlowEvent(TimingEvent.FlowTimings.FLOW_FAILED);
+        
dagManagementStateStore.getDagManagerMetrics().incrementExecutorFailed(dagNode);
+        break;
+      case CANCELLED:
+        dag.setFlowEvent(TimingEvent.FlowTimings.FLOW_CANCELLED);
+        break;
+      case COMPLETE:
+        
dagManagementStateStore.getDagManagerMetrics().incrementExecutorSuccess(dagNode);
+        submitNextNodes(dagManagementStateStore, dag);
+        break;
+      default:
+        log.warn("It should not reach here. Job status {} is unexpected.", 
executionStatus);
+    }
+
+    //Checkpoint the dag state, it should have an updated value of dag fields
+    dagManagementStateStore.checkpointDag(dag);
+    dagManagementStateStore.deleteDagNodeState(getDagId(), dagNode);
+  }
+
+  /**
+   * Submit next set of Dag nodes in the Dag identified by the provided dagId
+   */
+  private void submitNextNodes(DagManagementStateStore 
dagManagementStateStore, Dag<JobExecutionPlan> dag) {
+    Set<Dag.DagNode<JobExecutionPlan>> nextNodes = 
DagManagerUtils.getNext(dag);
+
+    if (nextNodes.size() > 1) {
+      handleMultipleJobs(nextNodes);
+    }
+
+    if (!nextNodes.isEmpty()) {
+      Dag.DagNode<JobExecutionPlan> nextNode = 
nextNodes.stream().findFirst().get();
+      DagProcUtils.submitJobToExecutor(dagManagementStateStore, nextNode, 
getDagId());
+      log.info("Submitted job {} for dagId {}", 
DagManagerUtils.getJobName(nextNode), getDagId());
+    }
+  }
+
+  private void handleMultipleJobs(Set<Dag.DagNode<JobExecutionPlan>> 
nextNodes) {
+    throw new UnsupportedOperationException("More than one start job is not 
allowed");
+  }
+}
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagTask.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagTask.java
index bc12926c7..f031134d1 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagTask.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagTask.java
@@ -57,7 +57,7 @@ public abstract class DagTask {
    * work on this task, is done in this method.
    * Returns true if concluding dag task finished successfully otherwise false.
    */
-  public boolean conclude() {
+  public final boolean conclude() {
     try {
       this.dagActionStore.deleteDagAction(this.dagAction);
       return this.leaseObtainedStatus.completeLease();
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagTaskVisitor.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/ReevaluateDagTask.java
similarity index 52%
copy from 
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagTaskVisitor.java
copy to 
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/ReevaluateDagTask.java
index 0b27b4e9d..7a90c9912 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagTaskVisitor.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/ReevaluateDagTask.java
@@ -15,11 +15,24 @@
  * limitations under the License.
  */
 
-package org.apache.gobblin.service.modules.orchestration;
+package org.apache.gobblin.service.modules.orchestration.task;
 
-import org.apache.gobblin.service.modules.orchestration.task.LaunchDagTask;
+import org.apache.gobblin.service.modules.orchestration.DagActionStore;
+import org.apache.gobblin.service.modules.orchestration.DagTaskVisitor;
+import org.apache.gobblin.service.modules.orchestration.LeaseAttemptStatus;
 
 
-public interface DagTaskVisitor<T> {
-  T meet(LaunchDagTask launchDagTask);
+/**
+ * A {@link DagTask} responsible to handle re-evaluate dag actions.
+ */
+
+public class ReevaluateDagTask extends DagTask {
+  public ReevaluateDagTask(DagActionStore.DagAction dagAction, 
LeaseAttemptStatus.LeaseObtainedStatus leaseObtainedStatus,
+      DagActionStore dagActionStore) {
+    super(dagAction, leaseObtainedStatus, dagActionStore);
+  }
+
+  public <T> T host(DagTaskVisitor<T> visitor) {
+    return visitor.meet(this);
+  }
 }
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagManagementDagActionStoreChangeMonitor.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagManagementDagActionStoreChangeMonitor.java
index e7843f1af..413fbcdc5 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagManagementDagActionStoreChangeMonitor.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagManagementDagActionStoreChangeMonitor.java
@@ -61,10 +61,11 @@ public class DagManagementDagActionStoreChangeMonitor 
extends DagActionStoreChan
       // todo - add actions for other other type of dag actions
       switch (dagAction.getDagActionType()) {
         case LAUNCH :
+        case REEVALUATE :
           dagManagement.addDagAction(dagAction);
           break;
         default:
-          log.warn("Received unsupported dagAction {}. Expected to be a 
LAUNCH", dagAction.getDagActionType());
+          log.warn("Received unsupported dagAction {}. Expected to be a 
REEVALUATE or LAUNCH", dagAction.getDagActionType());
           this.unexpectedErrors.mark();
       }
     } catch (IOException e) {
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java
index c152970cb..4aced6afb 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java
@@ -46,6 +46,7 @@ import 
org.apache.gobblin.metrics.reporter.util.SchemaRegistryVersionWriter;
 import org.apache.gobblin.metrics.reporter.util.SchemaVersionWriter;
 import org.apache.gobblin.runtime.troubleshooter.JobIssueEventHandler;
 import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.modules.orchestration.DagActionStore;
 import org.apache.gobblin.util.ConfigUtils;
 
 
@@ -64,10 +65,10 @@ public class KafkaAvroJobStatusMonitor extends 
KafkaJobStatusMonitor {
   @Getter
   private Meter messageParseFailures;
 
-  public KafkaAvroJobStatusMonitor(String topic, Config config, int numThreads,
-      JobIssueEventHandler jobIssueEventHandler, 
GaaSObservabilityEventProducer observabilityEventProducer)
+  public KafkaAvroJobStatusMonitor(String topic, Config config, int 
numThreads, JobIssueEventHandler jobIssueEventHandler,
+      GaaSObservabilityEventProducer observabilityEventProducer, 
DagActionStore dagActionStore)
       throws IOException, ReflectiveOperationException {
-    super(topic, config, numThreads,  jobIssueEventHandler, 
observabilityEventProducer);
+    super(topic, config, numThreads,  jobIssueEventHandler, 
observabilityEventProducer, dagActionStore);
 
     if (ConfigUtils.getBoolean(config, 
ConfigurationKeys.METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY, false)) {
       KafkaAvroSchemaRegistry schemaRegistry = (KafkaAvroSchemaRegistry) new 
KafkaAvroSchemaRegistryFactory().
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
index 8ae588231..ab6d62f93 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
@@ -27,6 +27,9 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+
 import com.codahale.metrics.MetricRegistry;
 import com.codahale.metrics.Timer;
 import com.github.rholder.retry.Attempt;
@@ -60,6 +63,7 @@ import 
org.apache.gobblin.runtime.troubleshooter.IssueEventBuilder;
 import org.apache.gobblin.runtime.troubleshooter.JobIssueEventHandler;
 import org.apache.gobblin.service.ExecutionStatus;
 import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.service.modules.orchestration.DagActionStore;
 import org.apache.gobblin.source.workunit.WorkUnit;
 import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.retry.RetryerFactory;
@@ -112,10 +116,11 @@ public abstract class KafkaJobStatusMonitor extends 
HighLevelConsumer<byte[], by
   private final JobIssueEventHandler jobIssueEventHandler;
   private final Retryer<Void> persistJobStatusRetryer;
   private final GaaSObservabilityEventProducer eventProducer;
-
+  private final DagActionStore dagActionStore;
+  private final boolean dagProcEngineEnabled;
 
   public KafkaJobStatusMonitor(String topic, Config config, int numThreads, 
JobIssueEventHandler jobIssueEventHandler,
-      GaaSObservabilityEventProducer observabilityEventProducer)
+      GaaSObservabilityEventProducer observabilityEventProducer, 
DagActionStore dagActionStore)
       throws ReflectiveOperationException {
     super(topic, config.withFallback(DEFAULTS), numThreads);
     String stateStoreFactoryClass = ConfigUtils.getString(config, 
ConfigurationKeys.STATE_STORE_FACTORY_CLASS_KEY, 
FileContextBasedFsStateStoreFactory.class.getName());
@@ -125,6 +130,8 @@ public abstract class KafkaJobStatusMonitor extends 
HighLevelConsumer<byte[], by
     this.scheduledExecutorService = Executors.newScheduledThreadPool(1);
 
     this.jobIssueEventHandler = jobIssueEventHandler;
+    this.dagActionStore = dagActionStore;
+    this.dagProcEngineEnabled = ConfigUtils.getBoolean(config, 
ServiceConfigKeys.DAG_PROCESSING_ENGINE_ENABLED, false);
 
     Config retryerOverridesConfig = 
config.hasPath(KafkaJobStatusMonitor.JOB_STATUS_MONITOR_PREFIX)
         ? config.getConfig(KafkaJobStatusMonitor.JOB_STATUS_MONITOR_PREFIX)
@@ -191,10 +198,33 @@ public abstract class KafkaJobStatusMonitor extends 
HighLevelConsumer<byte[], by
       persistJobStatusRetryer.call(() -> {
         // re-create `jobStatus` on each attempt, since mutated within 
`addJobStatusToStateStore`
         org.apache.gobblin.configuration.State jobStatus = 
parseJobStatus(gobblinTrackingEvent);
-        if (jobStatus != null) {
-          try (Timer.Context context = 
getMetricContext().timer(GET_AND_SET_JOB_STATUS).time()) {
-            addJobStatusToStateStore(jobStatus, this.stateStore, 
this.eventProducer);
+        if (jobStatus == null) {
+          return null;
+        }
+
+        try (Timer.Context context = 
getMetricContext().timer(GET_AND_SET_JOB_STATUS).time()) {
+          Pair<org.apache.gobblin.configuration.State, Boolean> 
updatedJobStatus = recalcJobStatus(jobStatus, this.stateStore);
+          jobStatus = updatedJobStatus.getLeft();
+
+          String flowName = 
jobStatus.getProp(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD);
+          String flowGroup = 
jobStatus.getProp(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD);
+          String flowExecutionId = 
jobStatus.getProp(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD);
+          String jobName = 
jobStatus.getProp(TimingEvent.FlowEventConstants.JOB_NAME_FIELD);
+          String jobGroup = 
jobStatus.getProp(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD);
+          String storeName = jobStatusStoreName(flowGroup, flowName);
+          String tableName = jobStatusTableName(flowExecutionId, jobGroup, 
jobName);
+
+          if (updatedJobStatus.getRight()) {
+            this.eventProducer.emitObservabilityEvent(jobStatus);
+
+            if (this.dagProcEngineEnabled) {
+              // todo - retried/resumed jobs *may* not be handled here, we may 
want to create their dag action elsewhere
+              this.dagActionStore.addJobDagAction(flowGroup, flowName, 
flowExecutionId, jobName, DagActionStore.DagActionType.REEVALUATE);
+            }
           }
+
+          // update the state store after adding a dag action to guaranty 
at-least-once adding of dag action
+          stateStore.put(storeName, tableName, jobStatus);
         }
         return null;
       });
@@ -217,17 +247,15 @@ public abstract class KafkaJobStatusMonitor extends 
HighLevelConsumer<byte[], by
   }
 
   /**
-   * Persist job status to the underlying {@link StateStore}.
-   * It fills missing fields in job status and also merge the fields with the
-   * existing job status in the state store. Merging is required because we
-   * do not want to lose the information sent by other GobblinTrackingEvents.
-   * @param jobStatus
+   * It fills missing fields in job status and also merge the fields with the 
existing job status in the state store.
+   * Merging is required because we do not want to lose the information sent 
by other GobblinTrackingEvents.
+   * Returns a pair of current job status after update in this method and a 
boolean suggesting if the new job status is
+   * now in final state and was not in final state before this method was 
called.
    * @throws IOException
    */
   @VisibleForTesting
-  static void addJobStatusToStateStore(org.apache.gobblin.configuration.State 
jobStatus, StateStore stateStore,
-      GaaSObservabilityEventProducer eventProducer)
-      throws IOException {
+  static Pair<org.apache.gobblin.configuration.State, Boolean> 
recalcJobStatus(org.apache.gobblin.configuration.State jobStatus,
+      StateStore<org.apache.gobblin.configuration.State> stateStore) throws 
IOException {
     try {
       if (!jobStatus.contains(TimingEvent.FlowEventConstants.JOB_NAME_FIELD)) {
         jobStatus.setProp(TimingEvent.FlowEventConstants.JOB_NAME_FIELD, 
JobStatusRetriever.NA_KEY);
@@ -276,10 +304,7 @@ public abstract class KafkaJobStatusMonitor extends 
HighLevelConsumer<byte[], by
       }
 
       modifyStateIfRetryRequired(jobStatus);
-      stateStore.put(storeName, tableName, jobStatus);
-      if (isNewStateTransitionToFinal(jobStatus, states)) {
-        eventProducer.emitObservabilityEvent(jobStatus);
-      }
+      return ImmutablePair.of(jobStatus, 
isNewStateTransitionToFinal(jobStatus, states));
     } catch (Exception e) {
       log.warn("Meet exception when adding jobStatus to state store at "
           + e.getStackTrace()[0].getClassName() + "line number: " + 
e.getStackTrace()[0].getLineNumber(), e);
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitorFactory.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitorFactory.java
index 0b80b4f2e..ecc759333 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitorFactory.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitorFactory.java
@@ -24,6 +24,7 @@ import com.typesafe.config.ConfigFactory;
 import com.typesafe.config.ConfigValueFactory;
 
 import javax.inject.Inject;
+import javax.inject.Named;
 import javax.inject.Provider;
 import lombok.extern.slf4j.Slf4j;
 
@@ -33,6 +34,8 @@ import 
org.apache.gobblin.metrics.kafka.KafkaAvroSchemaRegistry;
 import org.apache.gobblin.runtime.api.GobblinInstanceEnvironment;
 import org.apache.gobblin.runtime.troubleshooter.JobIssueEventHandler;
 import org.apache.gobblin.runtime.troubleshooter.MultiContextIssueRepository;
+import org.apache.gobblin.runtime.util.InjectionNames;
+import org.apache.gobblin.service.modules.orchestration.DagActionStore;
 import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
 
@@ -49,24 +52,29 @@ public class KafkaJobStatusMonitorFactory implements 
Provider<KafkaJobStatusMoni
   private final JobIssueEventHandler jobIssueEventHandler;
   private final MultiContextIssueRepository issueRepository;
   private final boolean instrumentationEnabled;
+  private final DagActionStore dagActionStore;
+  private final boolean dagProcEngineEnabled;
 
   @Inject
   public KafkaJobStatusMonitorFactory(Config config, JobIssueEventHandler 
jobIssueEventHandler, MultiContextIssueRepository issueRepository,
-      GobblinInstanceEnvironment env) {
-    this(config, jobIssueEventHandler, issueRepository, 
env.isInstrumentationEnabled());
+      GobblinInstanceEnvironment env, DagActionStore dagActionStore, 
@Named(InjectionNames.DAG_PROC_ENGINE_ENABLED) boolean dagProcEngineEnabled) {
+    this(config, jobIssueEventHandler, issueRepository, 
env.isInstrumentationEnabled(), dagActionStore, dagProcEngineEnabled);
   }
 
   public KafkaJobStatusMonitorFactory(Config config, JobIssueEventHandler 
jobIssueEventHandler, MultiContextIssueRepository issueRepository,
-      boolean instrumentationEnabled) {
+      boolean instrumentationEnabled, DagActionStore dagActionStore, boolean 
dagProcEngineEnabled) {
     this.config = Objects.requireNonNull(config);
     this.jobIssueEventHandler = Objects.requireNonNull(jobIssueEventHandler);
     this.issueRepository = issueRepository;
     this.instrumentationEnabled = instrumentationEnabled;
+    this.dagActionStore = dagActionStore;
+    this.dagProcEngineEnabled = dagProcEngineEnabled;
   }
 
   private KafkaJobStatusMonitor createJobStatusMonitor()
       throws ReflectiveOperationException {
     Config jobStatusConfig = 
config.getConfig(KafkaJobStatusMonitor.JOB_STATUS_MONITOR_PREFIX);
+    jobStatusConfig = 
jobStatusConfig.withValue(InjectionNames.DAG_PROC_ENGINE_ENABLED, 
ConfigValueFactory.fromAnyRef(this.dagProcEngineEnabled));
 
     String topic = 
jobStatusConfig.getString(KafkaJobStatusMonitor.JOB_STATUS_MONITOR_TOPIC_KEY);
     int numThreads = ConfigUtils.getInt(jobStatusConfig, 
KafkaJobStatusMonitor.JOB_STATUS_MONITOR_NUM_THREADS_KEY, 5);
@@ -95,7 +103,7 @@ public class KafkaJobStatusMonitorFactory implements 
Provider<KafkaJobStatusMoni
         observabilityEventProducerClassName, 
ConfigUtils.configToState(config), this.issueRepository, 
this.instrumentationEnabled);
 
     return (KafkaJobStatusMonitor) GobblinConstructorUtils
-        .invokeLongestConstructor(jobStatusMonitorClass, topic, 
jobStatusConfig, numThreads, jobIssueEventHandler, observabilityEventProducer);
+        .invokeLongestConstructor(jobStatusMonitorClass, topic, 
jobStatusConfig, numThreads, jobIssueEventHandler, observabilityEventProducer, 
dagActionStore);
   }
 
   @Override
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java
index 3dcc106f5..3bae27069 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java
@@ -124,6 +124,7 @@ public class GobblinServiceManagerTest {
   private FlowConfigV2Client flowConfigClient;
 
   private MySQLContainer mysql;
+  ITestMetastoreDatabase testMetastoreDatabase;
 
   private Git gitForPush;
   private TestingServer testingServer;
@@ -145,7 +146,7 @@ public class GobblinServiceManagerTest {
     serviceCoreProperties.put(ServiceConfigKeys.SERVICE_DB_USERNAME, 
mysql.getUsername());
     serviceCoreProperties.put(ServiceConfigKeys.SERVICE_DB_PASSWORD, 
mysql.getPassword());
 
-    ITestMetastoreDatabase testMetastoreDatabase = 
TestMetastoreDatabaseFactory.get();
+    testMetastoreDatabase = TestMetastoreDatabaseFactory.get();
 
     testingServer = new TestingServer(true);
 
@@ -267,6 +268,7 @@ public class GobblinServiceManagerTest {
     }
 
     mysql.stop();
+    testMetastoreDatabase.close();
   }
 
   /**
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImplTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImplTest.java
index 2e677f59d..ff75d906a 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImplTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImplTest.java
@@ -24,6 +24,7 @@ import java.util.Map;
 import java.util.Optional;
 
 import org.junit.Assert;
+import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
@@ -110,4 +111,9 @@ public class DagManagementTaskStreamImplTest {
     DagProc dagProc = dagTask.host(this.dagProcFactory);
     Assert.assertNotNull(dagProc);
   }
+
+  @AfterClass
+  public void tearDown() throws IOException {
+    testMetastoreDatabase.close();
+  }
 }
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineTest.java
index f5ed96867..077c5dd90 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.gobblin.service.modules.orchestration;
 
+import java.io.IOException;
 import java.net.URI;
 import java.util.HashMap;
 import java.util.Map;
@@ -26,6 +27,7 @@ import java.util.concurrent.TimeoutException;
 
 import org.mockito.Mockito;
 import org.testng.Assert;
+import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
@@ -43,6 +45,8 @@ import 
org.apache.gobblin.service.modules.orchestration.proc.DagProc;
 import org.apache.gobblin.service.modules.orchestration.task.DagTask;
 import org.apache.gobblin.testing.AssertWithBackoff;
 
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 
@@ -55,11 +59,18 @@ public class DagProcessingEngineTest {
   private DagTaskStream dagTaskStream;
   private DagProcFactory dagProcFactory;
   private MostlyMySqlDagManagementStateStore dagManagementStateStore;
+  static ITestMetastoreDatabase testMetastoreDatabase;
+  static DagActionStore dagActionStore;
+  static LeaseAttemptStatus.LeaseObtainedStatus leaseObtainedStatus;
 
   @BeforeClass
   public void setUp() throws Exception {
     // Setting up mock DB
-    ITestMetastoreDatabase testMetastoreDatabase = 
TestMetastoreDatabaseFactory.get();
+     testMetastoreDatabase = TestMetastoreDatabaseFactory.get();
+     dagActionStore = mock(DagActionStore.class);
+     doReturn(true).when(dagActionStore).deleteDagAction(any());
+    leaseObtainedStatus = mock(LeaseAttemptStatus.LeaseObtainedStatus.class);
+    doReturn(true).when(leaseObtainedStatus).completeLease();
 
     Config config;
     ConfigBuilder configBuilder = ConfigBuilder.create();
@@ -120,7 +131,7 @@ public class DagProcessingEngineTest {
     private final boolean isBad;
 
     public MockedDagTask(DagActionStore.DagAction dagAction, boolean isBad) {
-      super(dagAction, null, null);
+      super(dagAction, leaseObtainedStatus, dagActionStore);
       this.isBad = isBad;
     }
 
@@ -128,11 +139,6 @@ public class DagProcessingEngineTest {
     public <T> T host(DagTaskVisitor<T> visitor) {
       return (T) new MockedDagProc(this, this.isBad);
     }
-
-    @Override
-    public boolean conclude() {
-      return false;
-    }
   }
 
   static class MockedDagProc extends DagProc<Void> {
@@ -163,7 +169,8 @@ public class DagProcessingEngineTest {
 
   // This tests verifies that all the dag tasks entered to the dag task stream 
are retrieved by dag proc engine threads
   @Test
-  public void dagProcessingTest() throws InterruptedException, 
TimeoutException {
+  public void dagProcessingTest()
+      throws InterruptedException, TimeoutException, IOException {
     // there are MAX_NUM_OF_TASKS dag tasks returned and then each thread 
additionally call (infinitely) once to wait
     // in this unit tests, it does not infinitely wait though, because the 
mocked task stream throws an exception on
     // (MAX_NUM_OF_TASKS + 1) th call
@@ -176,4 +183,9 @@ public class DagProcessingEngineTest {
 
     
Assert.assertEquals(this.dagManagementStateStore.getDagManagerMetrics().dagProcessingExceptionMeter.getCount(),
  expectedExceptions);
   }
+
+  @AfterClass
+  public void tearDown() throws IOException {
+    testMetastoreDatabase.close();
+  }
 }
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStoreTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStoreTest.java
index ceb1b7b9f..649a6e80a 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStoreTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStoreTest.java
@@ -87,7 +87,7 @@ public class MostlyMySqlDagManagementStateStoreTest {
 
     Assert.assertTrue(this.dagManagementStateStore.containsDag(dagId));
     Assert.assertEquals(dag.toString(), 
this.dagManagementStateStore.getDag(dagId).get().toString());
-    Assert.assertEquals(dagNode, 
this.dagManagementStateStore.getDagNodeWithJobStatus(dagNodeId).get().getLeft());
+    Assert.assertEquals(dagNode, 
this.dagManagementStateStore.getDagNodeWithJobStatus(dagNodeId).getLeft().get());
     Assert.assertEquals(dag.toString(), 
this.dagManagementStateStore.getParentDag(dagNode).get().toString());
 
     List<Dag.DagNode<JobExecutionPlan>> dagNodes = 
this.dagManagementStateStore.getDagNodes(dagId);
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiterTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiterTest.java
index 93003a99a..603db5e19 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiterTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiterTest.java
@@ -23,6 +23,7 @@ import java.sql.Timestamp;
 import java.util.Optional;
 
 import org.testng.Assert;
+import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
@@ -65,12 +66,13 @@ public class MysqlMultiActiveLeaseArbiterTest {
       
String.format(MysqlMultiActiveLeaseArbiter.CONDITIONALLY_ACQUIRE_LEASE_IF_MATCHING_ALL_COLS_STATEMENT,
 TABLE);
   private String formattedAcquireLeaseIfFinishedStatement =
       
String.format(MysqlMultiActiveLeaseArbiter.CONDITIONALLY_ACQUIRE_LEASE_IF_FINISHED_LEASING_STATEMENT,
 TABLE);
+  ITestMetastoreDatabase testDb;
 
   // The setup functionality verifies that the initialization of the tables is 
done correctly and verifies any SQL
   // syntax errors.
   @BeforeClass
   public void setUp() throws Exception {
-    ITestMetastoreDatabase testDb = TestMetastoreDatabaseFactory.get();
+    this.testDb = TestMetastoreDatabaseFactory.get();
 
     Config config = ConfigBuilder.create()
         .addPrimitive(ConfigurationKeys.SCHEDULER_EVENT_EPSILON_MILLIS_KEY, 
EPSILON)
@@ -352,4 +354,9 @@ public class MysqlMultiActiveLeaseArbiterTest {
     Assert.assertTrue(firstObtainedStatus.getDagAction().equals(
         new DagActionStore.DagAction(flowGroup2, flowName, flowExecutionId, 
jobName, DagActionStore.DagActionType.LAUNCH)));
   }
+
+  @AfterClass
+  public void tearDown() throws IOException {
+    this.testDb.close();
+  }
 }
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlUserQuotaManagerTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlUserQuotaManagerTest.java
index cf39adbfc..de4bcacee 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlUserQuotaManagerTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlUserQuotaManagerTest.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.sql.Connection;
 import java.sql.SQLException;
 import org.testng.Assert;
+import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
@@ -39,10 +40,11 @@ public class MysqlUserQuotaManagerTest {
   private static final String PROXY_USER = "abora";
   private MysqlUserQuotaManager quotaManager;
   public static int INCREMENTS = 1000;
+  ITestMetastoreDatabase testDb;
 
   @BeforeClass
   public void setUp() throws Exception {
-    ITestMetastoreDatabase testDb = TestMetastoreDatabaseFactory.get();
+    testDb = TestMetastoreDatabaseFactory.get();
 
     Config config = ConfigBuilder.create()
         .addPrimitive(MysqlUserQuotaManager.CONFIG_PREFIX + '.' + 
ConfigurationKeys.STATE_STORE_DB_URL_KEY, testDb.getJdbcUrl())
@@ -53,7 +55,6 @@ public class MysqlUserQuotaManagerTest {
 
     this.quotaManager = new MysqlUserQuotaManager(config);
   }
-
   @Test
   public void testRunningDagStore() throws Exception {
     String dagId = 
DagManagerUtils.generateDagId(DagManagerTest.buildDag("dagId", 1234L, "", 
1).getNodes().get(0)).toString();
@@ -167,4 +168,9 @@ public class MysqlUserQuotaManagerTest {
     thread6.join();
     Assert.assertEquals(this.quotaManager.getCount(PROXY_USER, 
AbstractUserQuotaManager.CountType.USER_COUNT), -1);
   }
+
+  @AfterClass(alwaysRun = true)
+  public void tearDown() throws IOException {
+    testDb.close();
+  }
 }
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProcTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProcTest.java
index 21b9fc28b..9c75c0808 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProcTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProcTest.java
@@ -25,6 +25,7 @@ import java.util.List;
 
 import org.mockito.Mockito;
 import org.testng.Assert;
+import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
@@ -34,6 +35,7 @@ import com.typesafe.config.ConfigValueFactory;
 
 import org.apache.gobblin.config.ConfigBuilder;
 import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
 import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
 import org.apache.gobblin.runtime.api.FlowSpec;
 import org.apache.gobblin.runtime.api.JobSpec;
@@ -61,9 +63,11 @@ import static org.mockito.Mockito.spy;
 
 public class LaunchDagProcTest {
   private MostlyMySqlDagManagementStateStore dagManagementStateStore;
+  private ITestMetastoreDatabase testMetastoreDatabase;
 
   @BeforeClass
   public void setUp() throws Exception {
+    this.testMetastoreDatabase = TestMetastoreDatabaseFactory.get();
     this.dagManagementStateStore = 
spy(MostlyMySqlDagManagementStateStoreTest.getDummyDMSS(TestMetastoreDatabaseFactory.get()));
     
doReturn(FlowSpec.builder().build()).when(this.dagManagementStateStore).getFlowSpec(any());
     doNothing().when(this.dagManagementStateStore).tryAcquireQuota(any());
@@ -89,6 +93,11 @@ public class LaunchDagProcTest {
             .filter(a -> 
a.getMethod().getName().equals("addDagNodeState")).count());
   }
 
+  @AfterClass
+  public void tearDown() throws Exception {
+    this.testMetastoreDatabase.close();
+  }
+
   // This creates a dag like this
   //  D1  D2 D3
   //    \ | /
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProcTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProcTest.java
new file mode 100644
index 000000000..94dfd9c99
--- /dev/null
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProcTest.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.proc;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
+import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
+import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.runtime.api.SpecNotFoundException;
+import org.apache.gobblin.runtime.api.SpecProducer;
+import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.orchestration.DagActionStore;
+import 
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import org.apache.gobblin.service.modules.orchestration.DagManager;
+import org.apache.gobblin.service.modules.orchestration.DagManagerTest;
+import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
+import 
org.apache.gobblin.service.modules.orchestration.MostlyMySqlDagManagementStateStoreTest;
+import org.apache.gobblin.service.modules.orchestration.task.ReevaluateDagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.monitoring.JobStatus;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+
+
+public class ReevaluateDagProcTest {
+  private final long flowExecutionId = System.currentTimeMillis();
+  private final String flowGroup = "fg";
+
+  void mockDMSS(DagManagementStateStore dagManagementStateStore) throws 
IOException, SpecNotFoundException {
+    
doReturn(FlowSpec.builder().build()).when(dagManagementStateStore).getFlowSpec(any());
+    doNothing().when(dagManagementStateStore).tryAcquireQuota(any());
+    doNothing().when(dagManagementStateStore).addDagNodeState(any(), any());
+    doReturn(true).when(dagManagementStateStore).releaseQuota(any());
+  }
+
+  @Test
+  public void testOneNextJobToRun() throws Exception {
+    ITestMetastoreDatabase testMetastoreDatabase = 
TestMetastoreDatabaseFactory.get();
+    DagManagementStateStore dagManagementStateStore = 
spy(MostlyMySqlDagManagementStateStoreTest.getDummyDMSS(testMetastoreDatabase));
+    mockDMSS(dagManagementStateStore);
+    String flowName = "fn";
+    Dag<JobExecutionPlan> dag = DagManagerTest.buildDag("1", flowExecutionId, 
DagManager.FailureOption.FINISH_ALL_POSSIBLE.name(),
+        2, "user5", ConfigFactory.empty()
+            .withValue(ConfigurationKeys.FLOW_GROUP_KEY, 
ConfigValueFactory.fromAnyRef(flowGroup))
+            .withValue(ConfigurationKeys.FLOW_NAME_KEY, 
ConfigValueFactory.fromAnyRef(flowName))
+            .withValue(ConfigurationKeys.JOB_GROUP_KEY, 
ConfigValueFactory.fromAnyRef(flowGroup))
+    );
+    List<SpecProducer<Spec>> specProducers = dag.getNodes().stream().map(n -> {
+      try {
+        return DagManagerUtils.getSpecProducer(n);
+      } catch (ExecutionException | InterruptedException e) {
+        throw new RuntimeException(e);
+      }
+    }).collect(Collectors.toList());
+    JobStatus jobStatus = 
JobStatus.builder().flowName(flowName).flowGroup(flowGroup).jobGroup(flowGroup).jobName("job0").flowExecutionId(flowExecutionId).
+        message("Test 
message").eventName(ExecutionStatus.COMPLETE.name()).startTime(flowExecutionId).shouldRetry(false).orchestratedTime(flowExecutionId).build();
+
+    doReturn(Optional.of(dag)).when(dagManagementStateStore).getDag(any());
+    doReturn(new ImmutablePair<>(Optional.of(dag.getStartNodes().get(0)), 
Optional.of(jobStatus))).when(dagManagementStateStore).getDagNodeWithJobStatus(any());
+    
doReturn(Optional.of(dag)).when(dagManagementStateStore).getParentDag(any());
+    doNothing().when(dagManagementStateStore).deleteDagNodeState(any(), any());
+
+    ReevaluateDagProc
+        reEvaluateDagProc = new ReevaluateDagProc(new ReevaluateDagTask(new 
DagActionStore.DagAction(flowGroup, flowName,
+        String.valueOf(flowExecutionId), "job0", 
DagActionStore.DagActionType.REEVALUATE), null, mock(DagActionStore.class)));
+    reEvaluateDagProc.process(dagManagementStateStore);
+
+    long addSpecCount = specProducers.stream()
+        .mapToLong(p -> Mockito.mockingDetails(p)
+            .getInvocations()
+            .stream()
+            .filter(a -> a.getMethod().getName().equals("addSpec"))
+            .count())
+        .sum();
+
+    // next job is sent to spec producer
+    Assert.assertEquals(addSpecCount, 1L);
+
+    // current job's state is deleted
+    
Assert.assertEquals(Mockito.mockingDetails(dagManagementStateStore).getInvocations().stream()
+        .filter(a -> 
a.getMethod().getName().equals("deleteDagNodeState")).count(), 1);
+
+    testMetastoreDatabase.close();
+  }
+
+  // test when there does not exist a next job in the dag when the current 
job's reevaluate dag action is processed
+  @Test
+  public void testNoNextJobToRun() throws Exception {
+    ITestMetastoreDatabase testMetastoreDatabase = 
TestMetastoreDatabaseFactory.get();
+    DagManagementStateStore dagManagementStateStore = 
spy(MostlyMySqlDagManagementStateStoreTest.getDummyDMSS(testMetastoreDatabase));
+    mockDMSS(dagManagementStateStore);
+    String flowName = "fn2";
+    Dag<JobExecutionPlan> dag = DagManagerTest.buildDag("2", flowExecutionId, 
DagManager.FailureOption.FINISH_ALL_POSSIBLE.name(),
+        1, "user5", ConfigFactory.empty()
+            .withValue(ConfigurationKeys.FLOW_GROUP_KEY, 
ConfigValueFactory.fromAnyRef(flowGroup))
+            .withValue(ConfigurationKeys.FLOW_NAME_KEY, 
ConfigValueFactory.fromAnyRef(flowName))
+            .withValue(ConfigurationKeys.JOB_GROUP_KEY, 
ConfigValueFactory.fromAnyRef(flowGroup))
+    );
+    JobStatus jobStatus = 
JobStatus.builder().flowName(flowName).flowGroup(flowGroup).jobGroup(flowGroup).jobName("job0").flowExecutionId(flowExecutionId).
+        message("Test 
message").eventName(ExecutionStatus.COMPLETE.name()).startTime(flowExecutionId).shouldRetry(false).orchestratedTime(flowExecutionId).build();
+
+    doReturn(Optional.of(dag)).when(dagManagementStateStore).getDag(any());
+    doReturn(new ImmutablePair<>(Optional.of(dag.getStartNodes().get(0)), 
Optional.of(jobStatus))).when(dagManagementStateStore).getDagNodeWithJobStatus(any());
+    
doReturn(Optional.of(dag)).when(dagManagementStateStore).getParentDag(any());
+    doReturn(true).when(dagManagementStateStore).releaseQuota(any());
+    doNothing().when(dagManagementStateStore).deleteDagNodeState(any(), any());
+
+    List<SpecProducer<Spec>> specProducers = dag.getNodes().stream().map(n -> {
+      try {
+        return DagManagerUtils.getSpecProducer(n);
+      } catch (ExecutionException | InterruptedException e) {
+        throw new RuntimeException(e);
+      }
+    }).collect(Collectors.toList());
+
+    long addSpecCount = specProducers.stream()
+        .mapToLong(p -> Mockito.mockingDetails(p)
+            .getInvocations()
+            .stream()
+            .filter(a -> a.getMethod().getName().equals("addSpec"))
+            .count())
+        .sum();
+
+    ReevaluateDagProc
+        reEvaluateDagProc = new ReevaluateDagProc(new ReevaluateDagTask(new 
DagActionStore.DagAction(flowGroup, flowName,
+        String.valueOf(flowExecutionId), "job0", 
DagActionStore.DagActionType.REEVALUATE), null, mock(DagActionStore.class)));
+    reEvaluateDagProc.process(dagManagementStateStore);
+
+    // no new job to launch for this one job flow
+    Assert.assertEquals(addSpecCount, 0L);
+
+    // current job's state is deleted
+    
Assert.assertEquals(Mockito.mockingDetails(dagManagementStateStore).getInvocations().stream()
+        .filter(a -> 
a.getMethod().getName().equals("deleteDagNodeState")).count(), 1);
+
+    // dag is deleted because the only job in the dag is completed
+    
Assert.assertEquals(Mockito.mockingDetails(dagManagementStateStore).getInvocations().stream()
+        .filter(a -> a.getMethod().getName().equals("deleteDag")).count(), 1);
+
+    testMetastoreDatabase.close();
+  }
+}
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/JobStatusRetrieverTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/JobStatusRetrieverTest.java
index 345b5d7cc..fb216910d 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/JobStatusRetrieverTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/JobStatusRetrieverTest.java
@@ -22,6 +22,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Properties;
 
+import org.apache.commons.lang3.tuple.Pair;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.Test;
@@ -96,7 +97,12 @@ public abstract class JobStatusRetrieverTest {
       properties.setProperty(TimingEvent.JOB_ORCHESTRATED_TIME, 
String.valueOf(endTime));
     }
     State jobStatus = new State(properties);
-    KafkaJobStatusMonitor.addJobStatusToStateStore(jobStatus, 
this.jobStatusRetriever.getStateStore(), new 
NoopGaaSObservabilityEventProducer());
+    Pair<State, Boolean> updatedJobStatus = 
KafkaJobStatusMonitor.recalcJobStatus(jobStatus, 
this.jobStatusRetriever.getStateStore());
+    jobStatus = updatedJobStatus.getLeft();
+    this.jobStatusRetriever.getStateStore().put(
+        KafkaJobStatusMonitor.jobStatusStoreName(flowGroup, flowName),
+        KafkaJobStatusMonitor.jobStatusTableName(flowExecutionId, jobGroup, 
jobName),
+        jobStatus);
   }
 
   static Properties createAttemptsProperties(int currGen, int currAttempts, 
boolean shouldRetry) {
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetrieverTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetrieverTest.java
index 66d552e54..591e65f66 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetrieverTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetrieverTest.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.util.Iterator;
 import java.util.Properties;
 
+import org.apache.commons.lang3.tuple.Pair;
 import org.testng.Assert;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
@@ -45,11 +46,12 @@ public class MysqlJobStatusRetrieverTest extends 
JobStatusRetrieverTest {
   private MysqlJobStatusStateStore<State> dbJobStateStore;
   private static final String TEST_USER = "testUser";
   private static final String TEST_PASSWORD = "testPassword";
+  private ITestMetastoreDatabase testMetastoreDatabase;
 
   @BeforeClass
   @Override
   public void setUp() throws Exception {
-    ITestMetastoreDatabase testMetastoreDatabase = 
TestMetastoreDatabaseFactory.get();
+    this.testMetastoreDatabase = TestMetastoreDatabaseFactory.get();
     String jdbcUrl = testMetastoreDatabase.getJdbcUrl();
 
     ConfigBuilder configBuilder = ConfigBuilder.create();
@@ -127,15 +129,24 @@ public class MysqlJobStatusRetrieverTest extends 
JobStatusRetrieverTest {
     long flowExecutionId = 12340L;
     String flowGroup = Strings.repeat("A", 
ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH);
     String flowName = Strings.repeat("B", 
ServiceConfigKeys.MAX_FLOW_NAME_LENGTH);
+    String jobGroup = Strings.repeat("D", 
ServiceConfigKeys.MAX_JOB_GROUP_LENGTH);
+    String jobName = Strings.repeat("C", 
ServiceConfigKeys.MAX_JOB_NAME_LENGTH);
+
     properties.setProperty(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD, 
flowGroup);
     properties.setProperty(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD, 
flowName);
     
properties.setProperty(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD, 
String.valueOf(flowExecutionId));
-    properties.setProperty(TimingEvent.FlowEventConstants.JOB_NAME_FIELD, 
Strings.repeat("C", ServiceConfigKeys.MAX_JOB_NAME_LENGTH));
+    properties.setProperty(TimingEvent.FlowEventConstants.JOB_NAME_FIELD, 
jobName);
     properties.setProperty(JobStatusRetriever.EVENT_NAME_FIELD, 
ExecutionStatus.ORCHESTRATED.name());
-    properties.setProperty(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD, 
Strings.repeat("D", ServiceConfigKeys.MAX_JOB_GROUP_LENGTH));
+    properties.setProperty(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD, 
jobGroup);
     State jobStatus = new State(properties);
 
-    KafkaJobStatusMonitor.addJobStatusToStateStore(jobStatus, 
this.jobStatusRetriever.getStateStore(), new 
NoopGaaSObservabilityEventProducer());
+    Pair<State, Boolean> updatedJobStatus = 
KafkaJobStatusMonitor.recalcJobStatus(jobStatus, 
this.jobStatusRetriever.getStateStore());
+    jobStatus = updatedJobStatus.getLeft();
+    this.jobStatusRetriever.getStateStore().put(
+        KafkaJobStatusMonitor.jobStatusStoreName(flowGroup, flowName),
+        KafkaJobStatusMonitor.jobStatusTableName(flowExecutionId, jobGroup, 
jobName),
+        jobStatus);
+
     Iterator<JobStatus>
         jobStatusIterator = 
this.jobStatusRetriever.getJobStatusesForFlowExecution(flowName, flowGroup, 
flowExecutionId);
     Assert.assertTrue(jobStatusIterator.hasNext());
@@ -148,18 +159,26 @@ public class MysqlJobStatusRetrieverTest extends 
JobStatusRetrieverTest {
     long flowExecutionId = 12340L;
     String flowGroup = Strings.repeat("A", 
ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH + 1);
     String flowName = Strings.repeat("B", 
ServiceConfigKeys.MAX_FLOW_NAME_LENGTH);
+    String jobGroup = Strings.repeat("D", 
ServiceConfigKeys.MAX_JOB_GROUP_LENGTH);
+    String jobName = Strings.repeat("C", 
ServiceConfigKeys.MAX_JOB_NAME_LENGTH);
+
     properties.setProperty(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD, 
flowGroup);
     properties.setProperty(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD, 
flowName);
     
properties.setProperty(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD, 
String.valueOf(flowExecutionId));
-    properties.setProperty(TimingEvent.FlowEventConstants.JOB_NAME_FIELD, 
Strings.repeat("C", ServiceConfigKeys.MAX_JOB_NAME_LENGTH));
+    properties.setProperty(TimingEvent.FlowEventConstants.JOB_NAME_FIELD, 
jobName);
     properties.setProperty(JobStatusRetriever.EVENT_NAME_FIELD, 
ExecutionStatus.ORCHESTRATED.name());
-    properties.setProperty(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD, 
Strings.repeat("D", ServiceConfigKeys.MAX_JOB_GROUP_LENGTH));
+    properties.setProperty(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD, 
jobGroup);
     State jobStatus = new State(properties);
 
     try {
-      KafkaJobStatusMonitor.addJobStatusToStateStore(jobStatus, 
this.jobStatusRetriever.getStateStore(), new 
NoopGaaSObservabilityEventProducer());
+      Pair<State, Boolean> updatedJobStatus = 
KafkaJobStatusMonitor.recalcJobStatus(jobStatus, 
this.jobStatusRetriever.getStateStore());
+      jobStatus = updatedJobStatus.getLeft();
+      this.jobStatusRetriever.getStateStore().put(
+          KafkaJobStatusMonitor.jobStatusStoreName(flowGroup, flowName),
+          KafkaJobStatusMonitor.jobStatusTableName(flowExecutionId, jobGroup, 
jobName),
+          jobStatus);
     } catch (IOException e) {
-      Assert.assertTrue(e.getCause().getCause().getMessage().contains("Data 
too long"));
+      Assert.assertTrue(e.getCause().getMessage().contains("Data too long"));
       return;
     }
     Assert.fail();
@@ -169,4 +188,9 @@ public class MysqlJobStatusRetrieverTest extends 
JobStatusRetrieverTest {
   void cleanUpDir() throws Exception {
     
this.dbJobStateStore.delete(KafkaJobStatusMonitor.jobStatusStoreName(FLOW_GROUP,
 FLOW_NAME));
   }
+
+  @Override
+  public void tearDown() throws Exception {
+    this.testMetastoreDatabase.close();
+  }
 }

Reply via email to