This is an automated email from the ASF dual-hosted git repository.

arjun4084346 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 227f39b07 [GOBBLIN-2017] handle multiple next jobs to run in dag procs 
(#3965)
227f39b07 is described below

commit 227f39b0740d85b07e95f2fa87e7bcf0c834c5da
Author: Arjun Singh Bora <[email protected]>
AuthorDate: Tue Jun 11 18:08:33 2024 -0700

    [GOBBLIN-2017] handle multiple next jobs to run in dag procs (#3965)
    
    * handle multiple next jobs to run
    * fix test
---
 .../modules/orchestration/proc/DagProcUtils.java   |  26 +++++
 .../modules/orchestration/proc/LaunchDagProc.java  |  28 +-----
 .../orchestration/proc/ReevaluateDagProc.java      |  71 +++++---------
 .../modules/orchestration/proc/ResumeDagProc.java  |  21 +----
 .../service/modules/spec/JobExecutionPlan.java     |   4 +
 .../orchestration/proc/LaunchDagProcTest.java      |  76 +++++++++++----
 .../orchestration/proc/ReevaluateDagProcTest.java  | 105 ++++++++++++++++++---
 .../scheduler/GobblinServiceJobSchedulerTest.java  |  47 +++++----
 8 files changed, 241 insertions(+), 137 deletions(-)

diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java
index 91bdbf68d..c90907d23 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java
@@ -22,6 +22,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.Set;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
@@ -55,6 +56,29 @@ import static 
org.apache.gobblin.service.ExecutionStatus.CANCELLED;
  */
 @Slf4j
 public class DagProcUtils {
+
+  /**
+   * If there is a single job to run next, it runs it. If there are multiple 
jobs to run, it creates a
+   * {@link 
org.apache.gobblin.service.modules.orchestration.DagActionStore.DagActionType#REEVALUATE}
 dag action for
+   * each of them and those jobs will be launched in respective {@link 
ReevaluateDagProc}.
+   */
+  public static void submitNextNodes(DagManagementStateStore 
dagManagementStateStore, Dag<JobExecutionPlan> dag,
+      DagManager.DagId dagId) throws IOException {
+    Set<Dag.DagNode<JobExecutionPlan>> nextNodes = 
DagManagerUtils.getNext(dag);
+
+    if (nextNodes.size() == 1) {
+      Dag.DagNode<JobExecutionPlan> dagNode = nextNodes.iterator().next();
+      DagProcUtils.submitJobToExecutor(dagManagementStateStore, dagNode, 
dagId);
+      log.info("Submitted job {} for dagId {}", 
DagManagerUtils.getJobName(dagNode), dagId);
+    } else {
+      for (Dag.DagNode<JobExecutionPlan> dagNode : nextNodes) {
+        JobExecutionPlan jobExecutionPlan = dagNode.getValue();
+        
dagManagementStateStore.addJobDagAction(jobExecutionPlan.getFlowGroup(), 
jobExecutionPlan.getFlowName(),
+            String.valueOf(jobExecutionPlan.getFlowExecutionId()), 
jobExecutionPlan.getJobName(), DagActionStore.DagActionType.REEVALUATE);
+      }
+    }
+  }
+
   /**
    * - submits a {@link JobSpec} to a {@link SpecExecutor}
    * - emits a {@link TimingEvent.LauncherTimings#JOB_ORCHESTRATED} {@link 
org.apache.gobblin.metrics.GobblinTrackingEvent}
@@ -122,6 +146,8 @@ public class DagProcUtils {
       }
       throw new RuntimeException(e);
     }
+
+    log.info("Submitted job {} for dagId {}", 
DagManagerUtils.getJobName(dagNode), dagId);
   }
 
   public static void cancelDagNode(Dag.DagNode<JobExecutionPlan> 
dagNodeToCancel, DagManagementStateStore dagManagementStateStore) throws 
IOException {
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProc.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProc.java
index d3a7614cc..c2d840ef7 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProc.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProc.java
@@ -20,7 +20,6 @@ package org.apache.gobblin.service.modules.orchestration.proc;
 import java.io.IOException;
 import java.net.URISyntaxException;
 import java.util.Optional;
-import java.util.Set;
 import java.util.concurrent.atomic.AtomicLong;
 
 import lombok.extern.slf4j.Slf4j;
@@ -57,6 +56,10 @@ public class LaunchDagProc extends 
DagProc<Optional<Dag<JobExecutionPlan>>> {
     this.flowCompilationValidationHelper = flowCompilationValidationHelper;
   }
 
+  /**
+   * It retrieves the {@link FlowSpec} for the dag this dag proc corresponds 
to, from the {@link DagManagementStateStore}
+   * and compiles it to create a {@link Dag} and saves it in the {@link 
DagManagementStateStore}.
+   */
   @Override
   protected Optional<Dag<JobExecutionPlan>> initialize(DagManagementStateStore 
dagManagementStateStore)
       throws IOException {
@@ -80,32 +83,11 @@ public class LaunchDagProc extends 
DagProc<Optional<Dag<JobExecutionPlan>>> {
       log.warn("Dag with id " + getDagId() + " could not be compiled.");
       // todo - add metrics
     } else {
-      submitNextNodes(dagManagementStateStore, dag.get());
+      DagProcUtils.submitNextNodes(dagManagementStateStore, dag.get(), 
getDagId());
       // Checkpoint the dag state, it should have an updated value of dag nodes
       dagManagementStateStore.checkpointDag(dag.get());
       
DagProcUtils.sendEnforceFlowFinishDeadlineDagAction(dagManagementStateStore, 
getDagTask().getDagAction());
       orchestrationDelayCounter.set(System.currentTimeMillis() - 
DagManagerUtils.getFlowExecId(dag.get()));
     }
   }
-
-  /**
-   * Submit next set of Dag nodes in the provided Dag.
-   */
-   private void submitNextNodes(DagManagementStateStore 
dagManagementStateStore, Dag<JobExecutionPlan> dag) {
-     Set<Dag.DagNode<JobExecutionPlan>> nextNodes = 
DagManagerUtils.getNext(dag);
-
-     if (nextNodes.size() > 1) {
-       handleMultipleJobs(nextNodes);
-     }
-
-     //Submit jobs from the dag ready for execution.
-     for (Dag.DagNode<JobExecutionPlan> dagNode : nextNodes) {
-       DagProcUtils.submitJobToExecutor(dagManagementStateStore, dagNode, 
getDagId());
-       log.info("Submitted job {} for dagId {}", 
DagManagerUtils.getJobName(dagNode), getDagId());
-     }
-   }
-
-  private void handleMultipleJobs(Set<Dag.DagNode<JobExecutionPlan>> 
nextNodes) {
-     throw new UnsupportedOperationException("More than one start job is not 
allowed");
-  }
 }
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java
index 6fe42fcc4..4359a6d88 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java
@@ -19,9 +19,7 @@ package org.apache.gobblin.service.modules.orchestration.proc;
 
 import java.io.IOException;
 import java.util.Optional;
-import java.util.Set;
 
-import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.apache.commons.lang3.tuple.Pair;
 import org.quartz.SchedulerException;
 
@@ -57,25 +55,7 @@ public class ReevaluateDagProc extends 
DagProc<Pair<Optional<Dag.DagNode<JobExec
   @Override
   protected Pair<Optional<Dag.DagNode<JobExecutionPlan>>, Optional<JobStatus>> 
initialize(DagManagementStateStore dagManagementStateStore)
       throws IOException {
-    Pair<Optional<Dag.DagNode<JobExecutionPlan>>, Optional<JobStatus>> 
dagNodeWithJobStatus =
-        dagManagementStateStore.getDagNodeWithJobStatus(this.dagNodeId);
-
-    if (!dagNodeWithJobStatus.getLeft().isPresent() || 
!dagNodeWithJobStatus.getRight().isPresent()) {
-      // this is possible when MALA malfunctions and a duplicated reevaluate 
dag proc is launched for a dag node that is
-      // already "reevaluated" and cleaned up.
-      return ImmutablePair.of(Optional.empty(), Optional.empty());
-    }
-
-    ExecutionStatus executionStatus = 
ExecutionStatus.valueOf(dagNodeWithJobStatus.getRight().get().getEventName());
-    if 
(!FlowStatusGenerator.FINISHED_STATUSES.contains(executionStatus.name())) {
-      log.warn("Job status for dagNode {} is {}. Re-evaluate dag action should 
have been created only for finished status - {}",
-          dagNodeId, executionStatus, FlowStatusGenerator.FINISHED_STATUSES);
-      // this may happen if adding job status in the store failed after adding 
a ReevaluateDagAction in KafkaJobStatusMonitor
-      throw new RuntimeException(String.format("Job status %s is not final for 
job %s", executionStatus, getDagId()));
-    }
-
-    setStatus(dagManagementStateStore, dagNodeWithJobStatus.getLeft().get(), 
executionStatus);
-    return dagNodeWithJobStatus;
+    return dagManagementStateStore.getDagNodeWithJobStatus(this.dagNodeId);
   }
 
   @Override
@@ -90,9 +70,30 @@ public class ReevaluateDagProc extends 
DagProc<Pair<Optional<Dag.DagNode<JobExec
     }
 
     Dag.DagNode<JobExecutionPlan> dagNode = 
dagNodeWithJobStatus.getLeft().get();
-    JobStatus jobStatus = dagNodeWithJobStatus.getRight().get();
-    ExecutionStatus executionStatus = dagNode.getValue().getExecutionStatus();
+
+    if (!dagNodeWithJobStatus.getRight().isPresent()) {
+      // Usually reevaluate dag action is created by JobStatusMonitor when a 
finished job status is available,
+      // but when reevaluate/resume/launch dag proc found multiple parallel 
jobs to run next, it creates reevaluate
+      // dag actions for each of those parallel job and in this scenario there 
is no job status available.
+      // If the job status is not present, this job was never launched, submit 
it now.
+      DagProcUtils.submitJobToExecutor(dagManagementStateStore, dagNode, 
getDagId());
+      return;
+    }
+
     Dag<JobExecutionPlan> dag = 
dagManagementStateStore.getDag(getDagId()).get();
+    JobStatus jobStatus = dagNodeWithJobStatus.getRight().get();
+    ExecutionStatus executionStatus = 
ExecutionStatus.valueOf(jobStatus.getEventName());
+    setStatus(dagManagementStateStore, dagNode, executionStatus);
+
+    if 
(!FlowStatusGenerator.FINISHED_STATUSES.contains(executionStatus.name())) {
+      log.warn("Job status for dagNode {} is {}. Re-evaluate dag action should 
have been created only for finished status - {}",
+          dagNodeId, executionStatus, FlowStatusGenerator.FINISHED_STATUSES);
+      // this may happen if adding job status in the store failed after adding 
a ReevaluateDagAction in KafkaJobStatusMonitor
+      throw new RuntimeException(String.format("Job status for dagNode %s is 
%s. Re-evaluate dag action are created for"
+              + " new jobs with no job status when there are multiple of them 
to run next; or when a job finishes with status - %s",
+          dagNodeId, executionStatus, FlowStatusGenerator.FINISHED_STATUSES));
+    }
+
     onJobFinish(dagManagementStateStore, dagNode, executionStatus, dag);
 
     if (jobStatus.isShouldRetry()) {
@@ -134,6 +135,7 @@ public class ReevaluateDagProc extends 
DagProc<Pair<Optional<Dag.DagNode<JobExec
     for (Dag.DagNode<JobExecutionPlan> node : dag.getNodes()) {
       if (node.getValue().getId().equals(dagNodeId)) {
         node.getValue().setExecutionStatus(executionStatus);
+        dagManagementStateStore.addDagNodeState(node, getDagId());
         dagManagementStateStore.checkpointDag(dag);
         return;
       }
@@ -165,7 +167,7 @@ public class ReevaluateDagProc extends 
DagProc<Pair<Optional<Dag.DagNode<JobExec
         break;
       case COMPLETE:
         
dagManagementStateStore.getDagManagerMetrics().incrementExecutorSuccess(dagNode);
-        submitNextNodes(dagManagementStateStore, dag);
+        DagProcUtils.submitNextNodes(dagManagementStateStore, dag, getDagId());
         break;
       default:
         log.warn("It should not reach here. Job status {} is unexpected.", 
executionStatus);
@@ -176,27 +178,6 @@ public class ReevaluateDagProc extends 
DagProc<Pair<Optional<Dag.DagNode<JobExec
     dagManagementStateStore.deleteDagNodeState(getDagId(), dagNode);
   }
 
-  /**
-   * Submit next set of Dag nodes in the Dag identified by the provided dagId
-   */
-  private void submitNextNodes(DagManagementStateStore 
dagManagementStateStore, Dag<JobExecutionPlan> dag) {
-    Set<Dag.DagNode<JobExecutionPlan>> nextNodes = 
DagManagerUtils.getNext(dag);
-
-    if (nextNodes.size() > 1) {
-      handleMultipleJobs(nextNodes);
-    }
-
-    if (!nextNodes.isEmpty()) {
-      Dag.DagNode<JobExecutionPlan> nextNode = 
nextNodes.stream().findFirst().get();
-      DagProcUtils.submitJobToExecutor(dagManagementStateStore, nextNode, 
getDagId());
-      log.info("Submitted job {} for dagId {}", 
DagManagerUtils.getJobName(nextNode), getDagId());
-    }
-  }
-
-  private void handleMultipleJobs(Set<Dag.DagNode<JobExecutionPlan>> 
nextNodes) {
-    throw new UnsupportedOperationException("More than one start job is not 
allowed");
-  }
-
   private void 
removeFlowFinishDeadlineTriggerAndDagAction(DagManagementStateStore 
dagManagementStateStore) {
     DagActionStore.DagAction enforceFlowFinishDeadlineDagAction = 
DagActionStore.DagAction.forFlow(getDagNodeId().getFlowGroup(),
         getDagNodeId().getFlowName(), 
String.valueOf(getDagNodeId().getFlowExecutionId()),
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ResumeDagProc.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ResumeDagProc.java
index b3d077c46..ddc100ae8 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ResumeDagProc.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ResumeDagProc.java
@@ -20,7 +20,6 @@ package org.apache.gobblin.service.modules.orchestration.proc;
 import java.io.IOException;
 import java.util.Map;
 import java.util.Optional;
-import java.util.Set;
 
 import com.google.common.collect.Maps;
 
@@ -92,26 +91,8 @@ public class ResumeDagProc extends 
DagProc<Optional<Dag<JobExecutionPlan>>> {
     // if it fails here, it will check point the failed dag in the (running) 
dag store again, which is idempotent
     dagManagementStateStore.deleteFailedDag(failedDag.get());
 
-    resumeDag(dagManagementStateStore, failedDag.get());
+    DagProcUtils.submitNextNodes(dagManagementStateStore, failedDag.get(), 
getDagId());
 
     
DagProcUtils.sendEnforceFlowFinishDeadlineDagAction(dagManagementStateStore, 
getDagTask().getDagAction());
   }
-
-  private void resumeDag(DagManagementStateStore dagManagementStateStore, 
Dag<JobExecutionPlan> dag) {
-    Set<Dag.DagNode<JobExecutionPlan>> nextNodes = 
DagManagerUtils.getNext(dag);
-
-    if (nextNodes.size() > 1) {
-      handleMultipleJobs(nextNodes);
-    }
-
-    //Submit jobs from the dag ready for execution.
-    for (Dag.DagNode<JobExecutionPlan> dagNode : nextNodes) {
-      DagProcUtils.submitJobToExecutor(dagManagementStateStore, dagNode, 
getDagId());
-      log.info("Submitted job {} for dagId {}", 
DagManagerUtils.getJobName(dagNode), getDagId());
-    }
-  }
-
-  private void handleMultipleJobs(Set<Dag.DagNode<JobExecutionPlan>> 
nextNodes) {
-    throw new UnsupportedOperationException("More than one start job is not 
allowed");
-  }
 }
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
index 7210c6725..72e545104 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
@@ -68,6 +68,10 @@ public class JobExecutionPlan {
 
   private final JobSpec jobSpec;
   private final SpecExecutor specExecutor;
+  // the field is only read in one place in ResumeDagProc, in other places, a 
dag node's status is queried
+  // through API DagManagementStateStore#getDagNodeWithJobStatus which 
retrieves status directly from JobStatusRetriever
+  // todo - we should either keep this field updated as soon as a status is 
received so it can be used everywhere
+  //  or consider removing it completely
   private ExecutionStatus executionStatus = ExecutionStatus.PENDING;
   private final int maxAttempts;
   private int currentGeneration = 1;
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProcTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProcTest.java
index 6e723a716..b4c5e0109 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProcTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProcTest.java
@@ -22,6 +22,7 @@ import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.ExecutionException;
 
 import org.mockito.Mockito;
 import org.testng.Assert;
@@ -39,13 +40,16 @@ import 
org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
 import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
 import org.apache.gobblin.runtime.api.FlowSpec;
 import org.apache.gobblin.runtime.api.JobSpec;
+import org.apache.gobblin.runtime.api.Spec;
 import org.apache.gobblin.runtime.api.SpecExecutor;
+import org.apache.gobblin.runtime.api.SpecProducer;
 import org.apache.gobblin.runtime.spec_executorInstance.MockedSpecExecutor;
 import org.apache.gobblin.service.modules.flowgraph.Dag;
 import org.apache.gobblin.service.modules.orchestration.AzkabanProjectConfig;
 import org.apache.gobblin.service.modules.orchestration.DagActionStore;
 import org.apache.gobblin.service.modules.orchestration.DagManager;
 import org.apache.gobblin.service.modules.orchestration.DagManagerTest;
+import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
 import 
org.apache.gobblin.service.modules.orchestration.MostlyMySqlDagManagementStateStore;
 import 
org.apache.gobblin.service.modules.orchestration.MostlyMySqlDagManagementStateStoreTest;
 import org.apache.gobblin.service.modules.orchestration.task.LaunchDagTask;
@@ -82,41 +86,75 @@ public class LaunchDagProcTest {
   }
 
   @Test
-  public void launchDag()
-      throws IOException, InterruptedException, URISyntaxException {
-    Dag<JobExecutionPlan> dag = DagManagerTest.buildDag("1", 
System.currentTimeMillis(), DagManager.FailureOption.FINISH_ALL_POSSIBLE.name(),
-        5, "user5", 
ConfigFactory.empty().withValue(ConfigurationKeys.FLOW_GROUP_KEY, 
ConfigValueFactory.fromAnyRef("fg")));
+  public void launchDag() throws IOException, InterruptedException, 
URISyntaxException, ExecutionException {
+    String flowGroup = "fg";
+    String flowName = "fn";
+    String flowExecutionId = "12345";
+    Dag<JobExecutionPlan> dag = DagManagerTest.buildDag("1", 
Long.parseLong(flowExecutionId),
+        DagManager.FailureOption.FINISH_ALL_POSSIBLE.name(), 5, "user5", 
ConfigFactory.empty()
+            .withValue(ConfigurationKeys.FLOW_GROUP_KEY, 
ConfigValueFactory.fromAnyRef(flowGroup))
+            .withValue(ConfigurationKeys.FLOW_NAME_KEY, 
ConfigValueFactory.fromAnyRef(flowName)));
     FlowCompilationValidationHelper flowCompilationValidationHelper = 
mock(FlowCompilationValidationHelper.class);
     
doReturn(com.google.common.base.Optional.of(dag)).when(flowCompilationValidationHelper).createExecutionPlanIfValid(any());
+    SpecProducer<Spec> specProducer = 
DagManagerUtils.getSpecProducer(dag.getNodes().get(0));
+    List<SpecProducer<Spec>> specProducers = 
ReevaluateDagProcTest.getDagSpecProducers(dag);
     LaunchDagProc launchDagProc = new LaunchDagProc(
-        new LaunchDagTask(new DagActionStore.DagAction("fg", "fn", "12345",
-            "jn", DagActionStore.DagActionType.LAUNCH), null, 
this.dagManagementStateStore),
+        new LaunchDagTask(new DagActionStore.DagAction(flowGroup, flowName, 
flowExecutionId, "job0",
+            DagActionStore.DagActionType.LAUNCH), null, 
this.dagManagementStateStore),
         flowCompilationValidationHelper);
 
     launchDagProc.process(this.dagManagementStateStore);
-    int expectedNumOfSavingDagNodeStates = 1; // = number of start nodes
-    Assert.assertEquals(expectedNumOfSavingDagNodeStates,
-        
Mockito.mockingDetails(this.dagManagementStateStore).getInvocations().stream()
-            .filter(a -> 
a.getMethod().getName().equals("addDagNodeState")).count());
 
-    Mockito.verify(this.dagManagementStateStore, Mockito.times(1))
+    int numOfLaunchedJobs = 1; // = number of start nodes
+    long addSpecCount = specProducers.stream()
+        .mapToLong(p -> Mockito.mockingDetails(p)
+            .getInvocations()
+            .stream()
+            .filter(a -> a.getMethod().getName().equals("addSpec"))
+            .count())
+        .sum();
+    Mockito.verify(specProducer, 
Mockito.times(numOfLaunchedJobs)).addSpec(any());
+    Assert.assertEquals(numOfLaunchedJobs, addSpecCount);
+
+    Mockito.verify(this.dagManagementStateStore, 
Mockito.times(numOfLaunchedJobs))
         .addFlowDagAction(any(), any(), any(), 
eq(DagActionStore.DagActionType.ENFORCE_FLOW_FINISH_DEADLINE));
   }
 
+  @Test
+  public void launchDagWithMultipleParallelJobs() throws IOException, 
InterruptedException, URISyntaxException {
+    String flowGroup = "fg";
+    String flowName = "fn";
+    String flowExecutionId = "12345";
+    Dag<JobExecutionPlan> dag = 
buildDagWithMultipleNodesAtDifferentLevels("1", flowExecutionId,
+        DagManager.FailureOption.FINISH_ALL_POSSIBLE.name(),"user5", 
ConfigFactory.empty()
+            .withValue(ConfigurationKeys.FLOW_GROUP_KEY, 
ConfigValueFactory.fromAnyRef(flowGroup))
+            .withValue(ConfigurationKeys.FLOW_NAME_KEY,  
ConfigValueFactory.fromAnyRef(flowName)));
+    FlowCompilationValidationHelper flowCompilationValidationHelper = 
mock(FlowCompilationValidationHelper.class);
+    
doReturn(com.google.common.base.Optional.of(dag)).when(flowCompilationValidationHelper).createExecutionPlanIfValid(any());
+    LaunchDagProc launchDagProc = new LaunchDagProc(
+        new LaunchDagTask(new DagActionStore.DagAction(flowGroup, flowName, 
flowExecutionId,
+            "jn", DagActionStore.DagActionType.LAUNCH), null, 
this.dagManagementStateStore),
+        flowCompilationValidationHelper);
+
+    launchDagProc.process(this.dagManagementStateStore);
+    int numOfLaunchedJobs = 3; // = number of start nodes
+    // parallel jobs are launched through reevaluate dag action
+    Mockito.verify(this.dagManagementStateStore, 
Mockito.times(numOfLaunchedJobs))
+        .addJobDagAction(eq(flowGroup), eq(flowName), eq(flowExecutionId), 
any(), eq(DagActionStore.DagActionType.REEVALUATE));
+  }
+
   // This creates a dag like this
   //  D1  D2 D3
   //    \ | /
   //     DN4
-  //    / | \
-  //  D5 D6  D7
+  //    /   \
+  //  D5     D6
 
-  // Not used now, but can be used in GOBBLIN-2017
-  public static Dag<JobExecutionPlan> 
buildDagWithMultipleNodesAtDifferentLevels(String id, Long flowExecutionId, 
String flowFailureOption,
-      String proxyUser, Config additionalConfig)
-      throws URISyntaxException {
+  public static Dag<JobExecutionPlan> 
buildDagWithMultipleNodesAtDifferentLevels(String id, String flowExecutionId,
+      String flowFailureOption, String proxyUser, Config additionalConfig) 
throws URISyntaxException {
     List<JobExecutionPlan> jobExecutionPlans = new ArrayList<>();
 
-    for (int i = 0; i < 7; i++) {
+    for (int i = 0; i < 6; i++) {
       String suffix = Integer.toString(i);
       Config jobConfig = ConfigBuilder.create().
           addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "group" + id).
@@ -129,7 +167,7 @@ public class LaunchDagProcTest {
       jobConfig = additionalConfig.withFallback(jobConfig);
       if (i == 3) {
         jobConfig = jobConfig.withValue(ConfigurationKeys.JOB_DEPENDENCIES, 
ConfigValueFactory.fromAnyRef("job0,job1,job2"));
-      } else if ((i == 4) || (i == 5) || (i == 6)) {
+      } else if ((i == 4) || (i == 5)) {
         jobConfig = jobConfig.withValue(ConfigurationKeys.JOB_DEPENDENCIES, 
ConfigValueFactory.fromAnyRef("job3"));
       }
       JobSpec js = JobSpec.builder("test_job" + 
suffix).withVersion(suffix).withConfig(jobConfig).
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProcTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProcTest.java
index eb3429597..c5ac11f69 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProcTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProcTest.java
@@ -57,6 +57,7 @@ import 
org.apache.gobblin.service.modules.spec.JobExecutionPlan;
 import org.apache.gobblin.service.monitoring.JobStatus;
 
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
@@ -110,13 +111,7 @@ public class ReevaluateDagProcTest {
             .withValue(ConfigurationKeys.FLOW_NAME_KEY, 
ConfigValueFactory.fromAnyRef(flowName))
             .withValue(ConfigurationKeys.JOB_GROUP_KEY, 
ConfigValueFactory.fromAnyRef(flowGroup))
     );
-    List<SpecProducer<Spec>> specProducers = dag.getNodes().stream().map(n -> {
-      try {
-        return DagManagerUtils.getSpecProducer(n);
-      } catch (ExecutionException | InterruptedException e) {
-        throw new RuntimeException(e);
-      }
-    }).collect(Collectors.toList());
+    List<SpecProducer<Spec>> specProducers = getDagSpecProducers(dag);
     JobStatus jobStatus = 
JobStatus.builder().flowName(flowName).flowGroup(flowGroup).jobGroup(flowGroup).jobName("job0").flowExecutionId(flowExecutionId).
         message("Test 
message").eventName(ExecutionStatus.COMPLETE.name()).startTime(flowExecutionId).shouldRetry(false).orchestratedTime(flowExecutionId).build();
 
@@ -168,13 +163,7 @@ public class ReevaluateDagProcTest {
     doReturn(new ImmutablePair<>(Optional.of(dag.getStartNodes().get(0)), 
Optional.of(jobStatus))).when(dagManagementStateStore).getDagNodeWithJobStatus(any());
     doReturn(true).when(dagManagementStateStore).releaseQuota(any());
 
-    List<SpecProducer<Spec>> specProducers = dag.getNodes().stream().map(n -> {
-      try {
-        return DagManagerUtils.getSpecProducer(n);
-      } catch (ExecutionException | InterruptedException e) {
-        throw new RuntimeException(e);
-      }
-    }).collect(Collectors.toList());
+    List<SpecProducer<Spec>> specProducers = getDagSpecProducers(dag);
 
     long addSpecCount = specProducers.stream()
         .mapToLong(p -> Mockito.mockingDetails(p)
@@ -206,4 +195,92 @@ public class ReevaluateDagProcTest {
     
Assert.assertEquals(Mockito.mockingDetails(this.dagManagementStateStore).getInvocations().stream()
         .filter(a -> 
a.getMethod().getName().equals("deleteDagAction")).count(), 1);
   }
+
+  @Test
+  public void testCurrentJobToRun() throws Exception {
+    String flowName = "fn3";
+    Dag<JobExecutionPlan> dag = DagManagerTest.buildDag("1", flowExecutionId, 
DagManager.FailureOption.FINISH_ALL_POSSIBLE.name(),
+        2, "user5", ConfigFactory.empty()
+            .withValue(ConfigurationKeys.FLOW_GROUP_KEY, 
ConfigValueFactory.fromAnyRef(flowGroup))
+            .withValue(ConfigurationKeys.FLOW_NAME_KEY, 
ConfigValueFactory.fromAnyRef(flowName))
+            .withValue(ConfigurationKeys.JOB_GROUP_KEY, 
ConfigValueFactory.fromAnyRef(flowGroup))
+    );
+    List<Dag.DagNode<JobExecutionPlan>> startDagNodes = dag.getStartNodes();
+    List<SpecProducer<Spec>> specProducers = getDagSpecProducers(dag);
+
+    doReturn(Optional.of(dag)).when(dagManagementStateStore).getDag(any());
+    doReturn(new ImmutablePair<>(Optional.of(startDagNodes.get(0)), 
Optional.empty()))
+        .when(dagManagementStateStore).getDagNodeWithJobStatus(any());
+
+    ReevaluateDagProc
+        reEvaluateDagProc = new ReevaluateDagProc(new ReevaluateDagTask(new 
DagActionStore.DagAction(flowGroup, flowName,
+        String.valueOf(flowExecutionId), "job0", 
DagActionStore.DagActionType.REEVALUATE), null,
+        dagManagementStateStore));
+    reEvaluateDagProc.process(dagManagementStateStore);
+
+    int numOfLaunchedJobs = 1; // only the current job
+    // only the current job should have run
+    Mockito.verify(specProducers.get(0), Mockito.times(1)).addSpec(any());
+
+    specProducers.stream().skip(numOfLaunchedJobs) // separately verified 
`specProducers.get(0)`
+        .forEach(sp -> Mockito.verify(sp, Mockito.never()).addSpec(any()));
+
+    // no job's state is deleted because that happens when the job finishes 
triggered the reevaluate dag proc
+    Mockito.verify(dagManagementStateStore, 
Mockito.never()).deleteDagNodeState(any(), any());
+    Mockito.verify(dagManagementStateStore, 
Mockito.never()).deleteDagAction(any());
+    Mockito.verify(dagManagementStateStore, 
Mockito.never()).addJobDagAction(any(), any(), any(), any(),
+        eq(DagActionStore.DagActionType.REEVALUATE));
+    Mockito.verify(dagActionReminderScheduler, 
Mockito.never()).unscheduleReminderJob(any());
+  }
+
+  @Test
+  public void testMultipleNextJobToRun() throws Exception {
+    String flowName = "fn4";
+    Dag<JobExecutionPlan> dag = 
LaunchDagProcTest.buildDagWithMultipleNodesAtDifferentLevels("1", 
String.valueOf(flowExecutionId),
+        DagManager.FailureOption.FINISH_ALL_POSSIBLE.name(), "user5", 
ConfigFactory.empty()
+            .withValue(ConfigurationKeys.FLOW_GROUP_KEY, 
ConfigValueFactory.fromAnyRef(flowGroup))
+            .withValue(ConfigurationKeys.FLOW_NAME_KEY, 
ConfigValueFactory.fromAnyRef(flowName))
+            .withValue(ConfigurationKeys.JOB_GROUP_KEY, 
ConfigValueFactory.fromAnyRef(flowGroup))
+    );
+    JobStatus jobStatus = 
JobStatus.builder().flowName(flowName).flowGroup(flowGroup).jobGroup(flowGroup)
+        .jobName("job3").flowExecutionId(flowExecutionId).message("Test 
message").eventName(ExecutionStatus.COMPLETE.name())
+        
.startTime(flowExecutionId).shouldRetry(false).orchestratedTime(flowExecutionId).build();
+
+    doReturn(Optional.of(dag)).when(dagManagementStateStore).getDag(any());
+    doReturn(new ImmutablePair<>(Optional.of(dag.getStartNodes().get(0)), 
Optional.of(jobStatus)))
+        .when(dagManagementStateStore).getDagNodeWithJobStatus(any());
+
+    // mocked job status for the first four jobs
+    
dag.getNodes().get(0).getValue().setExecutionStatus(ExecutionStatus.COMPLETE);
+    
dag.getNodes().get(1).getValue().setExecutionStatus(ExecutionStatus.COMPLETE);
+    
dag.getNodes().get(2).getValue().setExecutionStatus(ExecutionStatus.COMPLETE);
+    
dag.getNodes().get(3).getValue().setExecutionStatus(ExecutionStatus.COMPLETE);
+
+    doReturn(Optional.of(dag)).when(dagManagementStateStore).getDag(any());
+
+    ReevaluateDagProc
+        reEvaluateDagProc = new ReevaluateDagProc(new ReevaluateDagTask(new 
DagActionStore.DagAction(flowGroup, flowName,
+        String.valueOf(flowExecutionId), "job3", 
DagActionStore.DagActionType.REEVALUATE), null, dagManagementStateStore));
+    List<SpecProducer<Spec>> specProducers = getDagSpecProducers(dag);
+    // process 4th job
+    reEvaluateDagProc.process(dagManagementStateStore);
+
+    int numOfLaunchedJobs = 2; // = number of jobs that should launch when 4th 
job passes, i.e. 5th and 6th job
+    // parallel jobs are launched through reevaluate dag action
+    Mockito.verify(dagManagementStateStore, Mockito.times(numOfLaunchedJobs))
+        .addJobDagAction(eq(flowGroup), eq(flowName), 
eq(String.valueOf(flowExecutionId)), any(), 
eq(DagActionStore.DagActionType.REEVALUATE));
+
+    // when there are parallel jobs to launch, they are not directly sent to 
spec producers, instead reevaluate dag action is created
+    specProducers.forEach(sp -> Mockito.verify(sp, 
Mockito.never()).addSpec(any()));
+  }
+
+  public static List<SpecProducer<Spec>> 
getDagSpecProducers(Dag<JobExecutionPlan> dag) {
+    return dag.getNodes().stream().map(n -> {
+      try {
+        return DagManagerUtils.getSpecProducer(n);
+      } catch (ExecutionException | InterruptedException e) {
+        throw new RuntimeException(e);
+      }
+    }).collect(Collectors.toList());
+  }
 }
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java
index dfc7d3a83..6c709d1bd 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java
@@ -17,15 +17,18 @@
 package org.apache.gobblin.service.modules.scheduler;
 
 import java.io.File;
+import java.lang.reflect.Field;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 
 import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
 import org.mockito.invocation.Invocation;
 import org.mockito.stubbing.Answer;
 import org.testng.Assert;
@@ -67,7 +70,10 @@ import org.apache.gobblin.testing.AssertWithBackoff;
 import org.apache.gobblin.util.ConfigUtils;
 
 import static 
org.apache.gobblin.runtime.spec_catalog.FlowCatalog.FLOWSPEC_STORE_DIR_KEY;
+import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.when;
 
 
@@ -112,7 +118,7 @@ public class GobblinServiceJobSchedulerTest {
     Properties properties = new Properties();
     properties.setProperty(FLOWSPEC_STORE_DIR_KEY, specDir.getAbsolutePath());
     FlowCatalog flowCatalog = new 
FlowCatalog(ConfigUtils.propertiesToConfig(properties));
-    SpecCatalogListener mockListener = Mockito.mock(SpecCatalogListener.class);
+    SpecCatalogListener mockListener = mock(SpecCatalogListener.class);
     
when(mockListener.getName()).thenReturn(ServiceConfigKeys.GOBBLIN_SERVICE_JOB_SCHEDULER_LISTENER_CLASS);
     when(mockListener.onAddSpec(any())).thenReturn(new AddSpecResponse(""));
     flowCatalog.addListener(mockListener);
@@ -129,14 +135,14 @@ public class GobblinServiceJobSchedulerTest {
 
     Assert.assertEquals(flowCatalog.getSpecs().size(), 2);
 
-    Orchestrator mockOrchestrator = Mockito.mock(Orchestrator.class);
+    Orchestrator mockOrchestrator = mock(Orchestrator.class);
     UserQuotaManager quotaManager = new InMemoryUserQuotaManager(quotaConfig);
 
     // Mock a GaaS scheduler.
     TestGobblinServiceJobScheduler scheduler = new 
TestGobblinServiceJobScheduler("testscheduler",
         ConfigFactory.empty(), Optional.of(flowCatalog), mockOrchestrator, 
Optional.of(quotaManager), null, false);
 
-    SpecCompiler mockCompiler = Mockito.mock(SpecCompiler.class);
+    SpecCompiler mockCompiler = mock(SpecCompiler.class);
     Mockito.when(mockOrchestrator.getSpecCompiler()).thenReturn(mockCompiler);
     Mockito.doAnswer((Answer<Void>) a -> {
       scheduler.isCompilerHealthy = true;
@@ -196,7 +202,7 @@ public class GobblinServiceJobSchedulerTest {
     ServiceBasedAppLauncher serviceLauncher = new 
ServiceBasedAppLauncher(properties, "GaaSJobSchedulerTest");
 
     // Assume that the catalog can store corrupted flows
-    SpecCatalogListener mockListener = Mockito.mock(SpecCatalogListener.class);
+    SpecCatalogListener mockListener = mock(SpecCatalogListener.class);
     
when(mockListener.getName()).thenReturn(ServiceConfigKeys.GOBBLIN_SERVICE_JOB_SCHEDULER_LISTENER_CLASS);
     when(mockListener.onAddSpec(any())).thenReturn(new AddSpecResponse(""));
     flowCatalog.addListener(mockListener);
@@ -216,13 +222,13 @@ public class GobblinServiceJobSchedulerTest {
 
     Assert.assertEquals(flowCatalog.getSpecs().size(), 3);
 
-    Orchestrator mockOrchestrator = Mockito.mock(Orchestrator.class);
+    Orchestrator mockOrchestrator = mock(Orchestrator.class);
 
     // Mock a GaaS scheduler.
     TestGobblinServiceJobScheduler scheduler = new 
TestGobblinServiceJobScheduler("testscheduler",
         ConfigFactory.empty(), Optional.of(flowCatalog), mockOrchestrator, 
Optional.of(new InMemoryUserQuotaManager(quotaConfig)), null, false);
 
-    SpecCompiler mockCompiler = Mockito.mock(SpecCompiler.class);
+    SpecCompiler mockCompiler = mock(SpecCompiler.class);
     Mockito.when(mockOrchestrator.getSpecCompiler()).thenReturn(mockCompiler);
     Mockito.doAnswer((Answer<Void>) a -> {
       scheduler.isCompilerHealthy = true;
@@ -260,7 +266,7 @@ public class GobblinServiceJobSchedulerTest {
     ServiceBasedAppLauncher serviceLauncher = new 
ServiceBasedAppLauncher(properties, "GaaSJobSchedulerTest");
 
     // Assume that the catalog can store corrupted flows
-    SpecCatalogListener mockListener = Mockito.mock(SpecCatalogListener.class);
+    SpecCatalogListener mockListener = mock(SpecCatalogListener.class);
     
when(mockListener.getName()).thenReturn(ServiceConfigKeys.GOBBLIN_SERVICE_JOB_SCHEDULER_LISTENER_CLASS);
     when(mockListener.onAddSpec(any())).thenReturn(new AddSpecResponse(""));
     flowCatalog.addListener(mockListener);
@@ -279,7 +285,7 @@ public class GobblinServiceJobSchedulerTest {
 
     Assert.assertEquals(flowCatalog.getSpecs().size(), 3);
 
-    Orchestrator mockOrchestrator = Mockito.mock(Orchestrator.class);
+    Orchestrator mockOrchestrator = mock(Orchestrator.class);
     SchedulerService schedulerService = new SchedulerService(new Properties());
     // Mock a GaaS scheduler.
     TestGobblinServiceJobScheduler scheduler = new 
TestGobblinServiceJobScheduler("testscheduler",
@@ -287,7 +293,7 @@ public class GobblinServiceJobSchedulerTest {
 
     schedulerService.startAsync().awaitRunning();
     scheduler.startUp();
-    SpecCompiler mockCompiler = Mockito.mock(SpecCompiler.class);
+    SpecCompiler mockCompiler = mock(SpecCompiler.class);
     Mockito.when(mockOrchestrator.getSpecCompiler()).thenReturn(mockCompiler);
     Mockito.doAnswer((Answer<Void>) a -> {
       scheduler.isCompilerHealthy = true;
@@ -342,8 +348,8 @@ public class GobblinServiceJobSchedulerTest {
     FlowSpec flowSpec1 = 
FlowCatalogTest.initFlowSpec(specDir.getAbsolutePath(), URI.create("spec1"), 
"flowName1", "group1",
         ConfigFactory.empty(), true);
 
-    Orchestrator mockOrchestrator = Mockito.mock(Orchestrator.class);
-    SpecCompiler mockSpecCompiler = Mockito.mock(SpecCompiler.class);
+    Orchestrator mockOrchestrator = mock(Orchestrator.class);
+    SpecCompiler mockSpecCompiler = mock(SpecCompiler.class);
     when(mockOrchestrator.getSpecCompiler()).thenReturn(mockSpecCompiler);
     Dag<JobExecutionPlan> mockDag0 = this.buildDag(flowSpec0.getConfig(), "0");
     Dag<JobExecutionPlan> mockDag1 = this.buildDag(flowSpec1.getConfig(), "1");
@@ -354,7 +360,7 @@ public class GobblinServiceJobSchedulerTest {
     // Mock a GaaS scheduler not in warm standby mode
     GobblinServiceJobScheduler scheduler = new 
GobblinServiceJobScheduler("testscheduler",
         ConfigFactory.empty(), Optional.absent(), Optional.of(flowCatalog), 
mockOrchestrator, schedulerService,
-        Optional.of(new InMemoryUserQuotaManager(quotaConfig)), 
Optional.absent(), false, Optional.of(Mockito.mock(
+        Optional.of(new InMemoryUserQuotaManager(quotaConfig)), 
Optional.absent(), false, Optional.of(mock(
         FlowLaunchHandler.class)));
 
     schedulerService.startAsync().awaitRunning();
@@ -379,12 +385,21 @@ public class GobblinServiceJobSchedulerTest {
     schedulerWithWarmStandbyEnabled.startUp();
     schedulerWithWarmStandbyEnabled.setActive(true);
 
+    MockitoAnnotations.openMocks(this);
+    Map<String, FlowSpec> mockMap = spy(new HashMap<>());
+
+    // Use reflection to set the private map field to the spied one, so we can 
check the invocations
+    Field mapField = 
schedulerWithWarmStandbyEnabled.getClass().getDeclaredField("scheduledFlowSpecs");
+    mapField.setAccessible(true);
+    mapField.set(schedulerWithWarmStandbyEnabled, mockMap);
+
     schedulerWithWarmStandbyEnabled.onAddSpec(flowSpec0); //Ignore the 
response for this request
-    
Assert.assertEquals(schedulerWithWarmStandbyEnabled.scheduledFlowSpecs.size(), 
1);
+    Mockito.verify(mockMap, 
Mockito.times(1)).put(eq(flowSpec0.getUri().toString()), eq(flowSpec0));
     schedulerWithWarmStandbyEnabled.onAddSpec(flowSpec1);
     // Second flow should be added to scheduled flows since no quota check in 
this case
-    
AssertWithBackoff.create().maxSleepMs(200L).timeoutMs(5000L).backoffFactor(1)
-        .assertTrue(input -> 
schedulerWithWarmStandbyEnabled.scheduledFlowSpecs.size() == 2, "Waiting for 
add spec to complete");
+    // we need to check if map's put is called to check if the spec was ever 
scheduled. we cannot just check the size
+    // because the spec is removed by NonScheduledJobRunner
+    Mockito.verify(mockMap, 
Mockito.times(1)).put(eq(flowSpec1.getUri().toString()), eq(flowSpec1));
     // set scheduler to be inactive and unschedule flows
     schedulerWithWarmStandbyEnabled.setActive(false);
     
Assert.assertEquals(schedulerWithWarmStandbyEnabled.scheduledFlowSpecs.size(), 
0);
@@ -398,7 +413,7 @@ public class GobblinServiceJobSchedulerTest {
         Optional<FlowCatalog> flowCatalog, Orchestrator orchestrator, 
Optional<UserQuotaManager> quotaManager,
         SchedulerService schedulerService, boolean isWarmStandbyEnabled) 
throws Exception {
       super(serviceName, config, Optional.absent(), flowCatalog, orchestrator, 
schedulerService,
-          quotaManager, Optional.absent(), isWarmStandbyEnabled, 
Optional.of(Mockito.mock(
+          quotaManager, Optional.absent(), isWarmStandbyEnabled, 
Optional.of(mock(
               FlowLaunchHandler.class)));
       if (schedulerService != null) {
         hasScheduler = true;


Reply via email to