phet commented on code in PR #3893:
URL: https://github.com/apache/gobblin/pull/3893#discussion_r1522580123


##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngine.java:
##########
@@ -75,7 +75,7 @@ public DagProcessingEngine(Config config, DagTaskStream 
dagTaskStream, DagProcFa
 
   @AllArgsConstructor
   @VisibleForTesting
-  static class DagProcEngineThread implements Runnable {
+  public static class DagProcEngineThread implements Runnable {

Review Comment:
   I don't see any new uses in this PR.  since just `@VisibleForTesting`, it's 
actually better to keep package-protected than it is `public`.



##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStoreTest.java:
##########
@@ -52,14 +52,9 @@ public class MostlyMySqlDagManagementStateStoreTest {
   private static final String TEST_PASSWORD = "testPassword";
   private static final String TEST_DAG_STATE_STORE = "TestDagStateStore";
   private static final String TEST_TABLE = "quotas";
-  static ITestMetastoreDatabase testMetastoreDatabase;
-
-
-  @BeforeClass
-  public void setUp() throws Exception {
-    // Setting up mock DB
-    testMetastoreDatabase = TestMetastoreDatabaseFactory.get();
 
+  public static MostlyMySqlDagManagementStateStore 
getDummyDMSS(ITestMetastoreDatabase testMetastoreDatabase) throws Exception {
+    MostlyMySqlDagManagementStateStore dagManagementStateStore;

Review Comment:
   why declare it here, ahead of where it's initialized on line 72?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProc.java:
##########
@@ -18,58 +18,165 @@
 package org.apache.gobblin.service.modules.orchestration.proc;
 
 import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Collections;
+import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicLong;
 
+import com.google.common.collect.Maps;
+
 import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.annotation.Alpha;
-import org.apache.gobblin.metrics.ContextAwareGauge;
+import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.metrics.ServiceMetricNames;
 import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.api.JobSpec;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.runtime.api.SpecExecutor;
+import org.apache.gobblin.runtime.api.SpecNotFoundException;
+import org.apache.gobblin.runtime.api.SpecProducer;
+import org.apache.gobblin.service.ExecutionStatus;
 import org.apache.gobblin.service.modules.flowgraph.Dag;
 import 
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import 
org.apache.gobblin.service.modules.orchestration.DagManagementTaskStreamImpl;
+import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
+import org.apache.gobblin.service.modules.orchestration.TimingEventUtils;
 import org.apache.gobblin.service.modules.orchestration.task.LaunchDagTask;
 import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import 
org.apache.gobblin.service.modules.utils.FlowCompilationValidationHelper;
 
 
 /**
- * An implementation for {@link LaunchDagTask}
+ * An implementation for {@link DagProc} that launches a new job.
  */
 @Slf4j
 @Alpha
 public class LaunchDagProc extends DagProc<Optional<Dag<JobExecutionPlan>>, 
Optional<Dag<JobExecutionPlan>>> {
   private final LaunchDagTask launchDagTask;
-  private final AtomicLong orchestrationDelayCounter;
+  private final FlowCompilationValidationHelper 
flowCompilationValidationHelper;
+  private static final AtomicLong orchestrationDelayCounter = new 
AtomicLong(0);
+  static {
+    metricContext.register(
+        
metricContext.newContextAwareGauge(ServiceMetricNames.FLOW_ORCHESTRATION_DELAY, 
orchestrationDelayCounter::get));
+  }
 
-  public LaunchDagProc(LaunchDagTask launchDagTask) {
+  public LaunchDagProc(LaunchDagTask launchDagTask, 
FlowCompilationValidationHelper flowCompilationValidationHelper) {

Review Comment:
   `@RequiredArgsConstructor`?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProc.java:
##########
@@ -18,58 +18,165 @@
 package org.apache.gobblin.service.modules.orchestration.proc;
 
 import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Collections;
+import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicLong;
 
+import com.google.common.collect.Maps;
+
 import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.annotation.Alpha;
-import org.apache.gobblin.metrics.ContextAwareGauge;
+import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.metrics.ServiceMetricNames;
 import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.api.JobSpec;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.runtime.api.SpecExecutor;
+import org.apache.gobblin.runtime.api.SpecNotFoundException;
+import org.apache.gobblin.runtime.api.SpecProducer;
+import org.apache.gobblin.service.ExecutionStatus;
 import org.apache.gobblin.service.modules.flowgraph.Dag;
 import 
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import 
org.apache.gobblin.service.modules.orchestration.DagManagementTaskStreamImpl;
+import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
+import org.apache.gobblin.service.modules.orchestration.TimingEventUtils;
 import org.apache.gobblin.service.modules.orchestration.task.LaunchDagTask;
 import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import 
org.apache.gobblin.service.modules.utils.FlowCompilationValidationHelper;
 
 
 /**
- * An implementation for {@link LaunchDagTask}
+ * An implementation for {@link DagProc} that launches a new job.
  */
 @Slf4j
 @Alpha
 public class LaunchDagProc extends DagProc<Optional<Dag<JobExecutionPlan>>, 
Optional<Dag<JobExecutionPlan>>> {
   private final LaunchDagTask launchDagTask;
-  private final AtomicLong orchestrationDelayCounter;
+  private final FlowCompilationValidationHelper 
flowCompilationValidationHelper;
+  private static final AtomicLong orchestrationDelayCounter = new 
AtomicLong(0);
+  static {
+    metricContext.register(
+        
metricContext.newContextAwareGauge(ServiceMetricNames.FLOW_ORCHESTRATION_DELAY, 
orchestrationDelayCounter::get));
+  }
 
-  public LaunchDagProc(LaunchDagTask launchDagTask) {
+  public LaunchDagProc(LaunchDagTask launchDagTask, 
FlowCompilationValidationHelper flowCompilationValidationHelper) {
     this.launchDagTask = launchDagTask;
-    this.orchestrationDelayCounter = new AtomicLong(0);
-    ContextAwareGauge<Long> orchestrationDelayMetric = 
metricContext.newContextAwareGauge
-        (ServiceMetricNames.FLOW_ORCHESTRATION_DELAY, 
orchestrationDelayCounter::get);
-    metricContext.register(orchestrationDelayMetric);
+    this.flowCompilationValidationHelper = flowCompilationValidationHelper;
   }
 
   @Override
   protected Optional<Dag<JobExecutionPlan>> initialize(DagManagementStateStore 
dagManagementStateStore)
       throws IOException {
-    throw new UnsupportedOperationException("Not yet implemented");
+    try {
+      DagActionStore.DagAction dagAction = this.launchDagTask.getDagAction();
+      URI flowUri = FlowSpec.Utils.createFlowSpecUri(dagAction.getFlowId());
+      FlowSpec flowSpec = dagManagementStateStore.getFlowSpec(flowUri);
+      flowSpec.addProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, 
dagAction.getFlowExecutionId());
+      return 
this.flowCompilationValidationHelper.createExecutionPlanIfValid(flowSpec).toJavaUtil();
+    } catch (URISyntaxException | SpecNotFoundException | InterruptedException 
e) {
+      throw new RuntimeException(e);
+    }
   }
 
   @Override
   protected Optional<Dag<JobExecutionPlan>> act(DagManagementStateStore 
dagManagementStateStore, Optional<Dag<JobExecutionPlan>> dag)
       throws IOException {
-    throw new UnsupportedOperationException("Not yet implemented");
+    if (!dag.isPresent()) {
+      log.warn("Dag with id " + this.launchDagTask.getDagId() + " could not be 
compiled.");
+      // todo - add metrics
+      return Optional.empty();
+    }
+    submitNextNodes(dagManagementStateStore, dag.get());
+    log.info("Launch dagProc concluded actions for dagId : {}", 
this.launchDagTask.getDagId());

Review Comment:
   see comment about suggesting to move this logging to `DagProc::process`



##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java:
##########
@@ -184,6 +184,43 @@ static Dag<JobExecutionPlan> buildDag(String id, Long 
flowExecutionId, String fl
     return new JobExecutionPlanDagFactory().createDag(jobExecutionPlans);
   }
 
+  // This creates a dag like this
+  //  D1  D2 D3
+  //    \ | /
+  //     DN4
+  //    / | \
+  //  D5 D6  D7
+  public static Dag<JobExecutionPlan> 
buildDagWithMultipleNodesAtDifferentLevels(String id, Long flowExecutionId, 
String flowFailureOption,

Review Comment:
   since this isn't a test but a utility method that seems unused in this 
defining class, does it really belong here?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProc.java:
##########
@@ -40,36 +65,126 @@
 @Alpha
 public class LaunchDagProc extends DagProc<Optional<Dag<JobExecutionPlan>>, 
Optional<Dag<JobExecutionPlan>>> {
   private final LaunchDagTask launchDagTask;
-  private final AtomicLong orchestrationDelayCounter;
+  FlowCompilationValidationHelper flowCompilationValidationHelper;
 
-  public LaunchDagProc(LaunchDagTask launchDagTask) {
+  public LaunchDagProc(LaunchDagTask launchDagTask, 
FlowCompilationValidationHelper flowCompilationValidationHelper) {
     this.launchDagTask = launchDagTask;
-    this.orchestrationDelayCounter = new AtomicLong(0);
+    AtomicLong orchestrationDelayCounter = new AtomicLong(0);
     ContextAwareGauge<Long> orchestrationDelayMetric = 
metricContext.newContextAwareGauge
         (ServiceMetricNames.FLOW_ORCHESTRATION_DELAY, 
orchestrationDelayCounter::get);
     metricContext.register(orchestrationDelayMetric);
+    this.flowCompilationValidationHelper = flowCompilationValidationHelper;
   }
 
   @Override
   protected Optional<Dag<JobExecutionPlan>> initialize(DagManagementStateStore 
dagManagementStateStore)
       throws IOException {
-    throw new UnsupportedOperationException("Not yet implemented");
+    try {
+      DagActionStore.DagAction dagAction = this.launchDagTask.getDagAction();
+      URI flowUri = FlowSpec.Utils.createFlowSpecUri(dagAction.getFlowId());
+      FlowSpec flowSpec = dagManagementStateStore.getFlowSpec(flowUri);
+      flowSpec.addProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, 
dagAction.getFlowExecutionId());
+      return 
this.flowCompilationValidationHelper.createExecutionPlanIfValid(flowSpec).toJavaUtil();
+    } catch (URISyntaxException | SpecNotFoundException | InterruptedException 
e) {
+      throw new RuntimeException(e);
+    }
   }
 
   @Override
   protected Optional<Dag<JobExecutionPlan>> act(DagManagementStateStore 
dagManagementStateStore, Optional<Dag<JobExecutionPlan>> dag)
       throws IOException {
-    throw new UnsupportedOperationException("Not yet implemented");
+    if (!dag.isPresent()) {
+      log.warn("No dag with id " + this.launchDagTask.getDagId() + " found to 
launch");
+      return Optional.empty();
+    }
+    DagManager.DagId dagId = DagManagerUtils.generateDagId(dag.get());
+    Set<Dag.DagNode<JobExecutionPlan>> nextSubmitted = 
submitNext(dagManagementStateStore, dag.get());
+    for (Dag.DagNode<JobExecutionPlan> dagNode : nextSubmitted) {
+      dagManagementStateStore.addDagNodeState(dagNode, dagId);  // compare 
this - arjun1
+    }
+
+    log.info("Dag {} processed.", dagId);
+    return dag;
   }
 
-  @Override
-  protected void sendNotification(Optional<Dag<JobExecutionPlan>> result, 
EventSubmitter eventSubmitter)
-      throws IOException {
-    throw new UnsupportedOperationException("Not yet implemented");
+  /**
+   * Submit next set of Dag nodes in the Dag identified by the provided dagId
+   */
+   private Set<Dag.DagNode<JobExecutionPlan>> 
submitNext(DagManagementStateStore dagManagementStateStore,
+       Dag<JobExecutionPlan> dag) throws IOException {
+     DagManager.DagId dagId = DagManagerUtils.generateDagId(dag);
+     Set<Dag.DagNode<JobExecutionPlan>> nextNodes = 
DagManagerUtils.getNext(dag);
+     List<String> nextJobNames = new ArrayList<>();
+
+     //Submit jobs from the dag ready for execution.
+     for (Dag.DagNode<JobExecutionPlan> dagNode : nextNodes) {
+       submitJob(dagManagementStateStore, dagNode);
+       nextJobNames.add(DagManagerUtils.getJobName(dagNode));
+     }
+
+     log.info("Submitting next nodes for dagId {}, where next jobs to be 
submitted are {}", dagId, nextJobNames);
+
+     //Checkpoint the dag state, it should have an updated value of dag nodes
+     dagManagementStateStore.checkpointDag(dag);
+
+     return nextNodes;
+  }
+
+  /**
+   * Submits a {@link JobSpec} to a {@link SpecExecutor}.
+   */
+  private void submitJob(DagManagementStateStore dagManagementStateStore, 
Dag.DagNode<JobExecutionPlan> dagNode) {
+    DagManagerUtils.incrementJobAttempt(dagNode);
+    JobExecutionPlan jobExecutionPlan = 
DagManagerUtils.getJobExecutionPlan(dagNode);
+    jobExecutionPlan.setExecutionStatus(ExecutionStatus.RUNNING);
+    JobSpec jobSpec = DagManagerUtils.getJobSpec(dagNode);
+    Map<String, String> jobMetadata = 
TimingEventUtils.getJobMetadata(Maps.newHashMap(), jobExecutionPlan);
+
+    String specExecutorUri = DagManagerUtils.getSpecExecutorUri(dagNode);
+
+    // Run this spec on selected executor
+    SpecProducer<Spec> producer;
+    try {
+      dagManagementStateStore.tryAcquireQuota(Collections.singleton(dagNode));
+      producer = DagManagerUtils.getSpecProducer(dagNode);
+      TimingEvent jobOrchestrationTimer = 
eventSubmitter.getTimingEvent(TimingEvent.LauncherTimings.JOB_ORCHESTRATED);
+
+      // Increment job count before submitting the job onto the spec producer, 
in case that throws an exception.
+      // By this point the quota is allocated, so it's imperative to increment 
as missing would introduce the potential to decrement below zero upon quota 
release.
+      // Quota release is guaranteed, despite failure, because exception 
handling within would mark the job FAILED.
+      // When the ensuing kafka message spurs DagManager processing, the quota 
is released and the counts decremented
+      // Ensure that we do not double increment for flows that are retried
+      if (dagNode.getValue().getCurrentAttempts() == 1) {
+        
DagManagementTaskStreamImpl.getDagManagerMetrics().incrementRunningJobMetrics(dagNode);

Review Comment:
   any thoughts?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProc.java:
##########
@@ -18,58 +18,165 @@
 package org.apache.gobblin.service.modules.orchestration.proc;
 
 import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Collections;
+import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicLong;
 
+import com.google.common.collect.Maps;
+
 import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.annotation.Alpha;
-import org.apache.gobblin.metrics.ContextAwareGauge;
+import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.metrics.ServiceMetricNames;
 import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.api.JobSpec;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.runtime.api.SpecExecutor;
+import org.apache.gobblin.runtime.api.SpecNotFoundException;
+import org.apache.gobblin.runtime.api.SpecProducer;
+import org.apache.gobblin.service.ExecutionStatus;
 import org.apache.gobblin.service.modules.flowgraph.Dag;
 import 
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import 
org.apache.gobblin.service.modules.orchestration.DagManagementTaskStreamImpl;
+import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
+import org.apache.gobblin.service.modules.orchestration.TimingEventUtils;
 import org.apache.gobblin.service.modules.orchestration.task.LaunchDagTask;
 import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import 
org.apache.gobblin.service.modules.utils.FlowCompilationValidationHelper;
 
 
 /**
- * An implementation for {@link LaunchDagTask}
+ * An implementation for {@link DagProc} that launches a new job.
  */
 @Slf4j
 @Alpha
 public class LaunchDagProc extends DagProc<Optional<Dag<JobExecutionPlan>>, 
Optional<Dag<JobExecutionPlan>>> {
   private final LaunchDagTask launchDagTask;
-  private final AtomicLong orchestrationDelayCounter;
+  private final FlowCompilationValidationHelper 
flowCompilationValidationHelper;
+  private static final AtomicLong orchestrationDelayCounter = new 
AtomicLong(0);
+  static {
+    metricContext.register(
+        
metricContext.newContextAwareGauge(ServiceMetricNames.FLOW_ORCHESTRATION_DELAY, 
orchestrationDelayCounter::get));
+  }
 
-  public LaunchDagProc(LaunchDagTask launchDagTask) {
+  public LaunchDagProc(LaunchDagTask launchDagTask, 
FlowCompilationValidationHelper flowCompilationValidationHelper) {
     this.launchDagTask = launchDagTask;
-    this.orchestrationDelayCounter = new AtomicLong(0);
-    ContextAwareGauge<Long> orchestrationDelayMetric = 
metricContext.newContextAwareGauge
-        (ServiceMetricNames.FLOW_ORCHESTRATION_DELAY, 
orchestrationDelayCounter::get);
-    metricContext.register(orchestrationDelayMetric);
+    this.flowCompilationValidationHelper = flowCompilationValidationHelper;
   }
 
   @Override
   protected Optional<Dag<JobExecutionPlan>> initialize(DagManagementStateStore 
dagManagementStateStore)
       throws IOException {
-    throw new UnsupportedOperationException("Not yet implemented");
+    try {
+      DagActionStore.DagAction dagAction = this.launchDagTask.getDagAction();
+      URI flowUri = FlowSpec.Utils.createFlowSpecUri(dagAction.getFlowId());
+      FlowSpec flowSpec = dagManagementStateStore.getFlowSpec(flowUri);
+      flowSpec.addProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, 
dagAction.getFlowExecutionId());
+      return 
this.flowCompilationValidationHelper.createExecutionPlanIfValid(flowSpec).toJavaUtil();
+    } catch (URISyntaxException | SpecNotFoundException | InterruptedException 
e) {
+      throw new RuntimeException(e);
+    }
   }
 
   @Override
   protected Optional<Dag<JobExecutionPlan>> act(DagManagementStateStore 
dagManagementStateStore, Optional<Dag<JobExecutionPlan>> dag)
       throws IOException {
-    throw new UnsupportedOperationException("Not yet implemented");
+    if (!dag.isPresent()) {
+      log.warn("Dag with id " + this.launchDagTask.getDagId() + " could not be 
compiled.");
+      // todo - add metrics
+      return Optional.empty();
+    }
+    submitNextNodes(dagManagementStateStore, dag.get());
+    log.info("Launch dagProc concluded actions for dagId : {}", 
this.launchDagTask.getDagId());
+    return dag;
   }
 
-  @Override
-  protected void sendNotification(Optional<Dag<JobExecutionPlan>> result, 
EventSubmitter eventSubmitter)
-      throws IOException {
-    throw new UnsupportedOperationException("Not yet implemented");
+  /**
+   * Submit next set of Dag nodes in the provided Dag.
+   */
+   private void submitNextNodes(DagManagementStateStore 
dagManagementStateStore,
+       Dag<JobExecutionPlan> dag) throws IOException {
+     Set<Dag.DagNode<JobExecutionPlan>> nextNodes = 
DagManagerUtils.getNext(dag);
+
+     //Submit jobs from the dag ready for execution.
+     for (Dag.DagNode<JobExecutionPlan> dagNode : nextNodes) {
+       submitJobToExecutor(dagManagementStateStore, dagNode);
+       dagManagementStateStore.addDagNodeState(dagNode, 
this.launchDagTask.getDagId());
+       log.info("Submitted job {} for dagId {}", 
DagManagerUtils.getJobName(dagNode), this.launchDagTask.getDagId());
+     }
+
+     //Checkpoint the dag state, it should have an updated value of dag nodes
+     dagManagementStateStore.checkpointDag(dag);
+   }
+
+  /**
+   * Submits a {@link JobSpec} to a {@link SpecExecutor}.
+   */
+  private void submitJobToExecutor(DagManagementStateStore 
dagManagementStateStore, Dag.DagNode<JobExecutionPlan> dagNode) {
+    DagManagerUtils.incrementJobAttempt(dagNode);
+    JobExecutionPlan jobExecutionPlan = 
DagManagerUtils.getJobExecutionPlan(dagNode);
+    JobSpec jobSpec = DagManagerUtils.getJobSpec(dagNode);
+    Map<String, String> jobMetadata = 
TimingEventUtils.getJobMetadata(Maps.newHashMap(), jobExecutionPlan);
+
+    String specExecutorUri = DagManagerUtils.getSpecExecutorUri(dagNode);
+
+    // Run this spec on selected executor
+    SpecProducer<Spec> producer;
+    try {
+      producer = DagManagerUtils.getSpecProducer(dagNode);
+      TimingEvent jobOrchestrationTimer = 
eventSubmitter.getTimingEvent(TimingEvent.LauncherTimings.JOB_ORCHESTRATED);
+
+      // Increment job count before submitting the job onto the spec producer, 
in case that throws an exception.
+      // By this point the quota is allocated, so it's imperative to increment 
as missing would introduce the potential to decrement below zero upon quota 
release.
+      // Quota release is guaranteed, despite failure, because exception 
handling within would mark the job FAILED.
+      // When the ensuing kafka message spurs DagManager processing, the quota 
is released and the counts decremented
+      // Ensure that we do not double increment for flows that are retried
+      if (DagManagerUtils.getJobExecutionPlan(dagNode).getCurrentAttempts() == 
1) {
+        
DagManagementTaskStreamImpl.getDagManagerMetrics().incrementRunningJobMetrics(dagNode);
+      }
+      // Submit the job to the SpecProducer, which in turn performs the actual 
job submission to the SpecExecutor instance.
+      // The SpecProducer implementations submit the job to the underlying 
executor and return when the submission is complete,
+      // either successfully or unsuccessfully. To catch any exceptions in the 
job submission, the DagManagerThread
+      // blocks (by calling Future#get()) until the submission is completed.
+      dagManagementStateStore.tryAcquireQuota(Collections.singleton(dagNode));
+      Future<?> addSpecFuture = producer.addSpec(jobSpec);
+      
dagNode.getValue().setJobFuture(com.google.common.base.Optional.of(addSpecFuture));

Review Comment:
   maybe add a TODO to `JobExecutionPlan` to store alternatively the 
already-forced value in lieu of the future?



##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProcTest.java:
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.proc;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.Optional;
+
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+
+import org.apache.gobblin.config.ConfigBuilder;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import 
org.apache.gobblin.service.modules.orchestration.DagManagementTaskStreamImpl;
+import org.apache.gobblin.service.modules.orchestration.DagManager;
+import org.apache.gobblin.service.modules.orchestration.DagManagerTest;
+import 
org.apache.gobblin.service.modules.orchestration.MostlyMySqlDagManagementStateStore;
+import 
org.apache.gobblin.service.modules.orchestration.MostlyMySqlDagManagementStateStoreTest;
+import org.apache.gobblin.service.modules.orchestration.task.LaunchDagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import 
org.apache.gobblin.service.modules.utils.FlowCompilationValidationHelper;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+
+
+public class LaunchDagProcTest {
+  MostlyMySqlDagManagementStateStore dagManagementStateStore;
+  DagManagementTaskStreamImpl dagManagementTaskStream;
+  @BeforeClass
+  public void setUp() throws Exception {
+    this.dagManagementTaskStream = new 
DagManagementTaskStreamImpl(ConfigBuilder.create().build(), Optional.empty());
+    this.dagManagementStateStore = 
spy(MostlyMySqlDagManagementStateStoreTest.getDummyDMSS(TestMetastoreDatabaseFactory.get()));
+    
doReturn(FlowSpec.builder().build()).when(this.dagManagementStateStore).getFlowSpec(any());
+    doNothing().when(this.dagManagementStateStore).tryAcquireQuota(any());
+    doNothing().when(this.dagManagementStateStore).addDagNodeState(any(), 
any());
+  }
+  @Test
+  public void launchDag()
+      throws IOException, InterruptedException, URISyntaxException {
+    // this creates a dag with 3 start nodes
+    Dag<JobExecutionPlan> dag1 = 
DagManagerTest.buildDagWithMultipleNodesAtDifferentLevels("1", 
System.currentTimeMillis(), DagManager.FailureOption.FINISH_ALL_POSSIBLE.name(),
+        "user5", 
ConfigFactory.empty().withValue(ConfigurationKeys.FLOW_GROUP_KEY, 
ConfigValueFactory.fromAnyRef("group2")));
+    FlowCompilationValidationHelper flowCompilationValidationHelper = 
mock(FlowCompilationValidationHelper.class);
+    
doReturn(com.google.common.base.Optional.of(dag1)).when(flowCompilationValidationHelper).createExecutionPlanIfValid(any());
+    LaunchDagProc launchDagProc = new LaunchDagProc(new LaunchDagTask(new 
DagActionStore.DagAction("fg", "fn",
+        "12345", DagActionStore.FlowActionType.LAUNCH), null), 
flowCompilationValidationHelper);
+
+    launchDagProc.process(this.dagManagementStateStore);
+    int expectedNumOfSavingDagNodeStates = 3; // = number of start nodes
+    Assert.assertEquals(expectedNumOfSavingDagNodeStates,
+        
Mockito.mockingDetails(this.dagManagementStateStore).getInvocations().stream()
+            .filter(a -> 
a.getMethod().getName().equals("addDagNodeState")).count());
+    System.out.println();

Review Comment:
   remove



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to