This is an automated email from the ASF dual-hosted git repository.

arjun4084346 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 54f87cb72 [GOBBLIN-2029 ]Codestyle changes in DagProc and related 
classes (#3906)
54f87cb72 is described below

commit 54f87cb72172b2ecf119ea5818a6a055ada77e97
Author: Arjun Singh Bora <[email protected]>
AuthorDate: Fri Mar 29 16:26:51 2024 -0700

    [GOBBLIN-2029 ]Codestyle changes in DagProc and related classes (#3906)
    
    * some codestyle changes
    * address review comments
---
 .../runtime/DagActionStoreChangeMonitorTest.java   |  12 ++-
 .../runtime/scheduler/QuartzJobSpecScheduler.java  |   4 +-
 .../spec_executorInstance/MockedSpecExecutor.java  |   2 +-
 .../modules/core/GobblinServiceGuiceModule.java    |   2 +-
 .../orchestration/DagManagementStateStore.java     |   6 ++
 .../orchestration/DagManagementTaskStreamImpl.java |   2 +-
 .../service/modules/orchestration/DagManager.java  |   5 +
 .../modules/orchestration/DagProcessingEngine.java |   2 +-
 .../MostlyMySqlDagManagementStateStore.java        |   5 +
 .../MysqlMultiActiveLeaseArbiter.java              |   2 +-
 .../modules/orchestration/proc/DagProc.java        |  38 +++----
 .../proc/{LaunchDagProc.java => DagProcUtils.java} | 109 +++-----------------
 .../modules/orchestration/proc/LaunchDagProc.java  | 113 +++------------------
 .../modules/orchestration/task/DagTask.java        |   4 -
 ...lowExecutionResourceHandlerWithWarmStandby.java |  14 ++-
 .../modules/spec/JobExecutionPlanDagFactory.java   |   2 +-
 .../spec/JobExecutionPlanListDeserializer.java     |   1 -
 .../monitoring/DagActionStoreChangeMonitor.java    |   2 +-
 .../DagActionStoreChangeMonitorFactory.java        |  15 +--
 .../DagManagementDagActionStoreChangeMonitor.java  |  30 ++----
 ...nagementDagActionStoreChangeMonitorFactory.java |  15 +--
 .../service/monitoring/KafkaJobStatusMonitor.java  |  13 ++-
 .../modules/orchestration/DagManagerTest.java      |   5 +
 .../orchestration/DagProcessingEngineTest.java     |  21 ++--
 .../orchestration/FlowLaunchHandlerTest.java       |   6 +-
 .../MostlyMySqlDagManagementStateStoreTest.java    |   3 +-
 .../orchestration/proc/LaunchDagProcTest.java      |   2 +-
 27 files changed, 135 insertions(+), 300 deletions(-)

diff --git 
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagActionStoreChangeMonitorTest.java
 
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagActionStoreChangeMonitorTest.java
index 78083c065..0575a99bd 100644
--- 
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagActionStoreChangeMonitorTest.java
+++ 
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagActionStoreChangeMonitorTest.java
@@ -17,17 +17,23 @@
 
 package org.apache.gobblin.runtime;
 
+import java.net.URI;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.testng.annotations.Test;
+
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 import com.typesafe.config.ConfigValueFactory;
-import java.net.URI;
+
 import lombok.extern.slf4j.Slf4j;
+
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.kafka.client.DecodeableKafkaRecord;
 import org.apache.gobblin.kafka.client.Kafka09ConsumerClient;
-import org.apache.gobblin.service.modules.orchestration.DagActionStore;
 import org.apache.gobblin.runtime.api.SpecNotFoundException;
 import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
+import org.apache.gobblin.service.modules.orchestration.DagActionStore;
 import org.apache.gobblin.service.modules.orchestration.DagManager;
 import org.apache.gobblin.service.modules.orchestration.Orchestrator;
 import org.apache.gobblin.service.monitoring.DagActionStoreChangeEvent;
@@ -35,8 +41,6 @@ import 
org.apache.gobblin.service.monitoring.DagActionStoreChangeMonitor;
 import org.apache.gobblin.service.monitoring.DagActionValue;
 import org.apache.gobblin.service.monitoring.GenericStoreChangeEvent;
 import org.apache.gobblin.service.monitoring.OperationType;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.testng.annotations.Test;
 
 import static org.mockito.Mockito.*;
 
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/scheduler/QuartzJobSpecScheduler.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/scheduler/QuartzJobSpecScheduler.java
index 4e46cee70..a3deb4157 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/scheduler/QuartzJobSpecScheduler.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/scheduler/QuartzJobSpecScheduler.java
@@ -42,6 +42,8 @@ import com.google.common.collect.ImmutableMap;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 
+import lombok.Data;
+
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.runtime.api.GobblinInstanceEnvironment;
 import org.apache.gobblin.runtime.api.JobSpec;
@@ -50,8 +52,6 @@ import org.apache.gobblin.runtime.api.JobSpecScheduler;
 import org.apache.gobblin.scheduler.SchedulerService;
 import org.apache.gobblin.util.service.StandardServiceConfig;
 
-import lombok.Data;
-
 /**
  * A {@link JobSpecScheduler} using Quartz.
  */
diff --git 
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_executorInstance/MockedSpecExecutor.java
 
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_executorInstance/MockedSpecExecutor.java
index 7c44b9833..378d04717 100644
--- 
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_executorInstance/MockedSpecExecutor.java
+++ 
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_executorInstance/MockedSpecExecutor.java
@@ -45,7 +45,7 @@ public class MockedSpecExecutor extends InMemorySpecExecutor {
     when(mockedSpecProducer.addSpec(any())).thenReturn(new 
CompletedFuture(Boolean.TRUE, null));
     when(mockedSpecProducer.serializeAddSpecResponse(any())).thenReturn("");
     when(mockedSpecProducer.deserializeAddSpecResponse(any())).thenReturn(new 
CompletedFuture(Boolean.TRUE, null));
-    }
+  }
 
   public static SpecExecutor createDummySpecExecutor(URI uri) {
     Properties properties = new Properties();
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
index ed179fa58..c2af7cb1a 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
@@ -70,6 +70,7 @@ import org.apache.gobblin.service.ServiceConfigKeys;
 import org.apache.gobblin.service.modules.db.ServiceDatabaseManager;
 import org.apache.gobblin.service.modules.db.ServiceDatabaseProvider;
 import org.apache.gobblin.service.modules.db.ServiceDatabaseProviderImpl;
+import 
org.apache.gobblin.service.modules.orchestration.DagActionReminderScheduler;
 import org.apache.gobblin.service.modules.orchestration.DagManagement;
 import 
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
 import 
org.apache.gobblin.service.modules.orchestration.DagManagementTaskStreamImpl;
@@ -78,7 +79,6 @@ import 
org.apache.gobblin.service.modules.orchestration.DagProcFactory;
 import org.apache.gobblin.service.modules.orchestration.DagProcessingEngine;
 import org.apache.gobblin.service.modules.orchestration.DagTaskStream;
 import 
org.apache.gobblin.service.modules.orchestration.MostlyMySqlDagManagementStateStore;
-import 
org.apache.gobblin.service.modules.orchestration.DagActionReminderScheduler;
 import org.apache.gobblin.service.modules.orchestration.FlowLaunchHandler;
 import org.apache.gobblin.service.modules.orchestration.Orchestrator;
 import org.apache.gobblin.service.modules.orchestration.UserQuotaManager;
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java
index 86df6bee8..274abeaa0 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java
@@ -194,4 +194,10 @@ public interface DagManagementStateStore {
    * @return {@link JobStatus} or {@link Optional#empty} if not present in the 
Job-Status Store
    */
   Optional<JobStatus> getJobStatus(DagNodeId dagNodeId);
+
+  /**
+   * Returns true if the {@link Dag} identified by the given {@link 
org.apache.gobblin.service.modules.orchestration.DagManager.DagId}
+   * has any running job, false otherwise.
+   */
+  public boolean hasRunningJobs(DagManager.DagId dagId);
 }
\ No newline at end of file
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java
index 16d396394..4409a673f 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java
@@ -162,7 +162,7 @@ public class DagManagementTaskStreamImpl implements 
DagManagement, DagTaskStream
       case LAUNCH:
         return new LaunchDagTask(dagAction, leaseObtainedStatus, 
dagActionStore.get());
       default:
-        throw new UnsupportedOperationException("Not yet implemented");
+        throw new UnsupportedOperationException(dagActionType + " not yet 
implemented");
     }
   }
 
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index 7fd90a244..218fe8128 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -172,6 +172,7 @@ public class DagManager extends AbstractIdleService {
     String flowGroup;
     String flowName;
     String flowExecutionId;
+
     public DagId(String flowGroup, String flowName, String flowExecutionId) {
       this.flowGroup = flowGroup;
       this.flowName = flowName;
@@ -186,6 +187,10 @@ public class DagManager extends AbstractIdleService {
     DagActionStore.DagAction toDagAction(DagActionStore.DagActionType 
actionType) {
       return new DagActionStore.DagAction(flowGroup, flowName, 
flowExecutionId, NO_JOB_NAME_DEFAULT, actionType);
     }
+
+    public FlowId getFlowId() {
+      return new 
FlowId().setFlowGroup(this.flowGroup).setFlowName(this.flowName);
+    }
   }
 
   private final BlockingQueue<Dag<JobExecutionPlan>>[] runQueue;
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngine.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngine.java
index aa4a5f0e0..cbcfa3704 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngine.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngine.java
@@ -97,7 +97,7 @@ public class DagProcessingEngine {
           dagProc.process(dagManagementStateStore);
           dagTask.conclude();
         } catch (Exception e) {
-          log.error("DagProcEngineThread encountered exception while 
processing dag " + dagTask.getDagId(), e);
+          log.error("DagProcEngineThread encountered exception while 
processing dag " + dagProc.getDagId(), e);
           
dagManagementStateStore.getDagManagerMetrics().dagProcessingExceptionMeter.mark();
         }
         // todo mark lease success and releases it
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStore.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStore.java
index 029dd35b8..5304c8003 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStore.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStore.java
@@ -263,4 +263,9 @@ public class MostlyMySqlDagManagementStateStore implements 
DagManagementStateSto
       return java.util.Optional.empty();
     }
   }
+
+  @Override
+  public boolean hasRunningJobs(DagManager.DagId dagId) {
+    return !getDagNodes(dagId).isEmpty();
+  }
 }
\ No newline at end of file
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiter.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiter.java
index 9dfea5acc..113027b24 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiter.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiter.java
@@ -490,7 +490,7 @@ public class MysqlMultiActiveLeaseArbiter implements 
MultiActiveLeaseArbiter {
 
   /**
    * Parse result of attempted insert/update to obtain a lease for a
-   * {@link org.apache.gobblin.runtime.api.DagActionStore.DagAction} event by 
selecting values corresponding to that
+   * {@link DagActionStore.DagAction} event by selecting values corresponding 
to that
    * event from the table to return the corresponding status based on 
successful insert/update or not.
    * @throws SQLException
    * @throws IOException
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProc.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProc.java
index 6a5943445..4f0a254b5 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProc.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProc.java
@@ -19,7 +19,6 @@ package org.apache.gobblin.service.modules.orchestration.proc;
 
 import java.io.IOException;
 
-import lombok.AccessLevel;
 import lombok.Data;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
@@ -29,41 +28,44 @@ import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.instrumented.Instrumented;
 import org.apache.gobblin.metrics.MetricContext;
 import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.service.modules.flowgraph.DagNodeId;
 import 
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
 import org.apache.gobblin.service.modules.orchestration.DagManager;
+import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
 import org.apache.gobblin.service.modules.orchestration.task.DagTask;
 
 
 /**
  * Responsible for performing the actual work for a given {@link DagTask} by 
first initializing its state, performing
- * actions based on the type of {@link DagTask} and finally submitting an 
event to the executor.
+ * actions based on the type of {@link DagTask}. Submitting events in time is 
found to be important in
+ * <a href="https://github.com/apache/gobblin/pull/3641";>PR#3641</a>, hence 
initialize and act methods submit events as
+ * they happen.
  */
 @Alpha
 @Data
 @Slf4j
-public abstract class DagProc<S, T> {
+public abstract class DagProc<T> {
+  protected final DagTask dagTask;
+  @Getter protected final DagManager.DagId dagId;
+  @Getter protected final DagNodeId dagNodeId;
   protected static final MetricContext metricContext = 
Instrumented.getMetricContext(new State(), DagProc.class);
   protected static final EventSubmitter eventSubmitter = new 
EventSubmitter.Builder(
       metricContext, "org.apache.gobblin.service").build();
-  @Getter(AccessLevel.PROTECTED)
-  private final DagTask dagTask;
 
-  public final void process(DagManagementStateStore dagManagementStateStore) 
throws IOException {
-    S state = initialize(dagManagementStateStore);   // todo - retry
-    T result = act(dagManagementStateStore, state);   // todo - retry
-    sendNotification(result, eventSubmitter);   // todo - retry
-    log.info("{} successfully concluded actions for dagId : {}", 
getClass().getSimpleName(), getDagId());
+  public DagProc(DagTask dagTask) {
+    this.dagTask = dagTask;
+    this.dagId = 
DagManagerUtils.generateDagId(this.dagTask.getDagAction().getFlowGroup(),
+        this.dagTask.getDagAction().getFlowName(), 
this.dagTask.getDagAction().getFlowExecutionId());
+    this.dagNodeId = this.dagTask.getDagAction().getDagNodeId();
   }
 
-  protected DagManager.DagId getDagId() {
-    return this.dagTask.getDagId();
+  public final void process(DagManagementStateStore dagManagementStateStore) 
throws IOException {
+    T state = initialize(dagManagementStateStore);   // todo - retry
+    act(dagManagementStateStore, state);   // todo - retry
+    log.info("{} concluded processing for dagId : {}", 
getClass().getSimpleName(), this.dagId);
   }
 
-  protected abstract S initialize(DagManagementStateStore 
dagManagementStateStore) throws IOException;
-
-  protected abstract T act(DagManagementStateStore dagManagementStateStore, S 
state) throws IOException;
-
-  protected abstract void sendNotification(T result, EventSubmitter 
eventSubmitter) throws IOException;
+  protected abstract T initialize(DagManagementStateStore 
dagManagementStateStore) throws IOException;
 
-  // todo - commit the modified dags to the persistent store, maybe not 
required for InMem dagManagementStateStore
+  protected abstract void act(DagManagementStateStore dagManagementStateStore, 
T state) throws IOException;
 }
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProc.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java
similarity index 53%
copy from 
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProc.java
copy to 
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java
index 6368d8887..9c87bb029 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProc.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java
@@ -18,122 +18,43 @@
 package org.apache.gobblin.service.modules.orchestration.proc;
 
 import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
 import java.util.Collections;
 import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
 import java.util.concurrent.Future;
-import java.util.concurrent.atomic.AtomicLong;
 
 import com.google.common.collect.Maps;
 
 import lombok.extern.slf4j.Slf4j;
 
-import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.metrics.ServiceMetricNames;
-import org.apache.gobblin.metrics.event.EventSubmitter;
 import org.apache.gobblin.metrics.event.TimingEvent;
-import org.apache.gobblin.service.modules.orchestration.DagActionStore;
-import org.apache.gobblin.runtime.api.FlowSpec;
 import org.apache.gobblin.runtime.api.JobSpec;
 import org.apache.gobblin.runtime.api.Spec;
 import org.apache.gobblin.runtime.api.SpecExecutor;
-import org.apache.gobblin.runtime.api.SpecNotFoundException;
 import org.apache.gobblin.runtime.api.SpecProducer;
 import org.apache.gobblin.service.ExecutionStatus;
 import org.apache.gobblin.service.modules.flowgraph.Dag;
 import 
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import org.apache.gobblin.service.modules.orchestration.DagManager;
 import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
 import org.apache.gobblin.service.modules.orchestration.TimingEventUtils;
-import org.apache.gobblin.service.modules.orchestration.task.LaunchDagTask;
 import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
-import 
org.apache.gobblin.service.modules.utils.FlowCompilationValidationHelper;
 
 
 /**
- * An implementation for {@link DagProc} that launches a new job.
+ * A class to group together all the utility methods for {@link DagProc} 
derived class implementations.
  */
 @Slf4j
-public class LaunchDagProc extends DagProc<Optional<Dag<JobExecutionPlan>>, 
Optional<Dag<JobExecutionPlan>>> {
-  private final FlowCompilationValidationHelper 
flowCompilationValidationHelper;
-  // todo - this is not orchestration delay and should be renamed. keeping it 
the same because DagManager is also using
-  // the same name
-  private static final AtomicLong orchestrationDelayCounter = new 
AtomicLong(0);
-
-  public LaunchDagProc(LaunchDagTask launchDagTask, 
FlowCompilationValidationHelper flowCompilationValidationHelper) {
-    super(launchDagTask);
-    this.flowCompilationValidationHelper = flowCompilationValidationHelper;
-  }
-
-  static {
-    metricContext.register(
-        
metricContext.newContextAwareGauge(ServiceMetricNames.FLOW_ORCHESTRATION_DELAY, 
orchestrationDelayCounter::get));
-  }
-
-  @Override
-  protected Optional<Dag<JobExecutionPlan>> initialize(DagManagementStateStore 
dagManagementStateStore)
-      throws IOException {
-    try {
-      DagActionStore.DagAction dagAction = this.getDagTask().getDagAction();
-      FlowSpec flowSpec = loadFlowSpec(dagManagementStateStore, dagAction);
-      flowSpec.addProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, 
dagAction.getFlowExecutionId());
-      return 
this.flowCompilationValidationHelper.createExecutionPlanIfValid(flowSpec).toJavaUtil();
-    } catch (URISyntaxException | SpecNotFoundException | InterruptedException 
e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  private FlowSpec loadFlowSpec(DagManagementStateStore 
dagManagementStateStore, DagActionStore.DagAction dagAction)
-      throws URISyntaxException, SpecNotFoundException {
-    URI flowUri = FlowSpec.Utils.createFlowSpecUri(dagAction.getFlowId());
-    return dagManagementStateStore.getFlowSpec(flowUri);
-  }
-
-  @Override
-  protected Optional<Dag<JobExecutionPlan>> act(DagManagementStateStore 
dagManagementStateStore, Optional<Dag<JobExecutionPlan>> dag)
-      throws IOException {
-    if (!dag.isPresent()) {
-      log.warn("Dag with id " + getDagId() + " could not be compiled.");
-      // todo - add metrics
-      return Optional.empty();
-    }
-    submitNextNodes(dagManagementStateStore, dag.get());
-    orchestrationDelayCounter.set(System.currentTimeMillis() - 
DagManagerUtils.getFlowExecId(dag.get()));
-    return dag;
-  }
-
-  /**
-   * Submit next set of Dag nodes in the provided Dag.
-   */
-   private void submitNextNodes(DagManagementStateStore 
dagManagementStateStore,
-       Dag<JobExecutionPlan> dag) throws IOException {
-     Set<Dag.DagNode<JobExecutionPlan>> nextNodes = 
DagManagerUtils.getNext(dag);
-
-     if (nextNodes.size() > 1) {
-       handleMultipleJobs(nextNodes);
-     }
-
-     //Submit jobs from the dag ready for execution.
-     for (Dag.DagNode<JobExecutionPlan> dagNode : nextNodes) {
-       submitJobToExecutor(dagManagementStateStore, dagNode);
-       dagManagementStateStore.addDagNodeState(dagNode, getDagId());
-       log.info("Submitted job {} for dagId {}", 
DagManagerUtils.getJobName(dagNode), getDagId());
-     }
-
-     //Checkpoint the dag state, it should have an updated value of dag nodes
-     dagManagementStateStore.checkpointDag(dag);
-   }
-
-  private void handleMultipleJobs(Set<Dag.DagNode<JobExecutionPlan>> 
nextNodes) {
-     throw new UnsupportedOperationException("More than one start job is not 
allowed");
-  }
+public class DagProcUtils {
 
   /**
-   * Submits a {@link JobSpec} to a {@link SpecExecutor}.
+   * - submits a {@link JobSpec} to a {@link SpecExecutor}
+   * - emits a {@link TimingEvent.LauncherTimings#JOB_ORCHESTRATED} {@link 
org.apache.gobblin.metrics.GobblinTrackingEvent}
+   * that measures the time needed to submit the job to {@link SpecExecutor}
+   * - increment running jobs counter for the {@link Dag}, the proxy user that 
submitted the job and the {@link SpecExecutor} job was sent to
+   * - add updated dag node state to dagManagementStateStore
    */
-  private void submitJobToExecutor(DagManagementStateStore 
dagManagementStateStore, Dag.DagNode<JobExecutionPlan> dagNode) {
+  public static void submitJobToExecutor(DagManagementStateStore 
dagManagementStateStore, Dag.DagNode<JobExecutionPlan> dagNode,
+      DagManager.DagId dagId) {
     DagManagerUtils.incrementJobAttempt(dagNode);
     JobExecutionPlan jobExecutionPlan = 
DagManagerUtils.getJobExecutionPlan(dagNode);
     JobSpec jobSpec = DagManagerUtils.getJobSpec(dagNode);
@@ -145,7 +66,8 @@ public class LaunchDagProc extends 
DagProc<Optional<Dag<JobExecutionPlan>>, Opti
     SpecProducer<Spec> producer;
     try {
       producer = DagManagerUtils.getSpecProducer(dagNode);
-      TimingEvent jobOrchestrationTimer = 
eventSubmitter.getTimingEvent(TimingEvent.LauncherTimings.JOB_ORCHESTRATED);
+      // todo - submits an event with some other name, because it is not 
really orchestration happening here
+      TimingEvent jobOrchestrationTimer = 
DagProc.eventSubmitter.getTimingEvent(TimingEvent.LauncherTimings.JOB_ORCHESTRATED);
 
       // Increment job count before submitting the job onto the spec producer, 
in case that throws an exception.
       // By this point the quota is allocated, so it's imperative to increment 
as missing would introduce the potential to decrement below zero upon quota 
release.
@@ -171,8 +93,9 @@ public class LaunchDagProc extends 
DagProc<Optional<Dag<JobExecutionPlan>>, Opti
       jobOrchestrationTimer.stop(jobMetadata);
       log.info("Orchestrated job: {} on Executor: {}", 
DagManagerUtils.getFullyQualifiedJobName(dagNode), specExecutorUri);
       
dagManagementStateStore.getDagManagerMetrics().incrementJobsSentToExecutor(dagNode);
+      dagManagementStateStore.addDagNodeState(dagNode, dagId);
     } catch (Exception e) {
-      TimingEvent jobFailedTimer = 
eventSubmitter.getTimingEvent(TimingEvent.LauncherTimings.JOB_FAILED);
+      TimingEvent jobFailedTimer = 
DagProc.eventSubmitter.getTimingEvent(TimingEvent.LauncherTimings.JOB_FAILED);
       String message = "Cannot submit job " + 
DagManagerUtils.getFullyQualifiedJobName(dagNode) + " on executor " + 
specExecutorUri;
       log.error(message, e);
       jobMetadata.put(TimingEvent.METADATA_MESSAGE, message + " due to " + 
e.getMessage());
@@ -188,8 +111,4 @@ public class LaunchDagProc extends 
DagProc<Optional<Dag<JobExecutionPlan>>, Opti
       throw new RuntimeException(e);
     }
   }
-
-  @Override
-  protected void sendNotification(Optional<Dag<JobExecutionPlan>> result, 
EventSubmitter eventSubmitter) {
-  }
 }
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProc.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProc.java
index 6368d8887..4cef3b7a7 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProc.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProc.java
@@ -18,35 +18,20 @@
 package org.apache.gobblin.service.modules.orchestration.proc;
 
 import java.io.IOException;
-import java.net.URI;
 import java.net.URISyntaxException;
-import java.util.Collections;
-import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
-import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicLong;
 
-import com.google.common.collect.Maps;
-
 import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.metrics.ServiceMetricNames;
-import org.apache.gobblin.metrics.event.EventSubmitter;
-import org.apache.gobblin.metrics.event.TimingEvent;
-import org.apache.gobblin.service.modules.orchestration.DagActionStore;
 import org.apache.gobblin.runtime.api.FlowSpec;
-import org.apache.gobblin.runtime.api.JobSpec;
-import org.apache.gobblin.runtime.api.Spec;
-import org.apache.gobblin.runtime.api.SpecExecutor;
 import org.apache.gobblin.runtime.api.SpecNotFoundException;
-import org.apache.gobblin.runtime.api.SpecProducer;
-import org.apache.gobblin.service.ExecutionStatus;
 import org.apache.gobblin.service.modules.flowgraph.Dag;
 import 
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
 import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
-import org.apache.gobblin.service.modules.orchestration.TimingEventUtils;
 import org.apache.gobblin.service.modules.orchestration.task.LaunchDagTask;
 import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
 import 
org.apache.gobblin.service.modules.utils.FlowCompilationValidationHelper;
@@ -56,52 +41,44 @@ import 
org.apache.gobblin.service.modules.utils.FlowCompilationValidationHelper;
  * An implementation for {@link DagProc} that launches a new job.
  */
 @Slf4j
-public class LaunchDagProc extends DagProc<Optional<Dag<JobExecutionPlan>>, 
Optional<Dag<JobExecutionPlan>>> {
+public class LaunchDagProc extends DagProc<Optional<Dag<JobExecutionPlan>>> {
   private final FlowCompilationValidationHelper 
flowCompilationValidationHelper;
   // todo - this is not orchestration delay and should be renamed. keeping it 
the same because DagManager is also using
   // the same name
   private static final AtomicLong orchestrationDelayCounter = new 
AtomicLong(0);
 
-  public LaunchDagProc(LaunchDagTask launchDagTask, 
FlowCompilationValidationHelper flowCompilationValidationHelper) {
-    super(launchDagTask);
-    this.flowCompilationValidationHelper = flowCompilationValidationHelper;
-  }
-
   static {
     metricContext.register(
         
metricContext.newContextAwareGauge(ServiceMetricNames.FLOW_ORCHESTRATION_DELAY, 
orchestrationDelayCounter::get));
   }
 
+  public LaunchDagProc(LaunchDagTask launchDagTask, 
FlowCompilationValidationHelper flowCompilationValidationHelper) {
+    super(launchDagTask);
+    this.flowCompilationValidationHelper = flowCompilationValidationHelper;
+  }
+
   @Override
   protected Optional<Dag<JobExecutionPlan>> initialize(DagManagementStateStore 
dagManagementStateStore)
       throws IOException {
     try {
-      DagActionStore.DagAction dagAction = this.getDagTask().getDagAction();
-      FlowSpec flowSpec = loadFlowSpec(dagManagementStateStore, dagAction);
-      flowSpec.addProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, 
dagAction.getFlowExecutionId());
+      FlowSpec flowSpec = 
dagManagementStateStore.getFlowSpec(FlowSpec.Utils.createFlowSpecUri(getDagId().getFlowId()));
+      flowSpec.addProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, 
getDagId().getFlowExecutionId());
       return 
this.flowCompilationValidationHelper.createExecutionPlanIfValid(flowSpec).toJavaUtil();
     } catch (URISyntaxException | SpecNotFoundException | InterruptedException 
e) {
       throw new RuntimeException(e);
     }
   }
 
-  private FlowSpec loadFlowSpec(DagManagementStateStore 
dagManagementStateStore, DagActionStore.DagAction dagAction)
-      throws URISyntaxException, SpecNotFoundException {
-    URI flowUri = FlowSpec.Utils.createFlowSpecUri(dagAction.getFlowId());
-    return dagManagementStateStore.getFlowSpec(flowUri);
-  }
-
   @Override
-  protected Optional<Dag<JobExecutionPlan>> act(DagManagementStateStore 
dagManagementStateStore, Optional<Dag<JobExecutionPlan>> dag)
+  protected void act(DagManagementStateStore dagManagementStateStore, 
Optional<Dag<JobExecutionPlan>> dag)
       throws IOException {
     if (!dag.isPresent()) {
       log.warn("Dag with id " + getDagId() + " could not be compiled.");
       // todo - add metrics
-      return Optional.empty();
+    } else {
+      submitNextNodes(dagManagementStateStore, dag.get());
+      orchestrationDelayCounter.set(System.currentTimeMillis() - 
DagManagerUtils.getFlowExecId(dag.get()));
     }
-    submitNextNodes(dagManagementStateStore, dag.get());
-    orchestrationDelayCounter.set(System.currentTimeMillis() - 
DagManagerUtils.getFlowExecId(dag.get()));
-    return dag;
   }
 
   /**
@@ -117,8 +94,7 @@ public class LaunchDagProc extends 
DagProc<Optional<Dag<JobExecutionPlan>>, Opti
 
      //Submit jobs from the dag ready for execution.
      for (Dag.DagNode<JobExecutionPlan> dagNode : nextNodes) {
-       submitJobToExecutor(dagManagementStateStore, dagNode);
-       dagManagementStateStore.addDagNodeState(dagNode, getDagId());
+       DagProcUtils.submitJobToExecutor(dagManagementStateStore, dagNode, 
getDagId());
        log.info("Submitted job {} for dagId {}", 
DagManagerUtils.getJobName(dagNode), getDagId());
      }
 
@@ -129,67 +105,4 @@ public class LaunchDagProc extends 
DagProc<Optional<Dag<JobExecutionPlan>>, Opti
   private void handleMultipleJobs(Set<Dag.DagNode<JobExecutionPlan>> 
nextNodes) {
      throw new UnsupportedOperationException("More than one start job is not 
allowed");
   }
-
-  /**
-   * Submits a {@link JobSpec} to a {@link SpecExecutor}.
-   */
-  private void submitJobToExecutor(DagManagementStateStore 
dagManagementStateStore, Dag.DagNode<JobExecutionPlan> dagNode) {
-    DagManagerUtils.incrementJobAttempt(dagNode);
-    JobExecutionPlan jobExecutionPlan = 
DagManagerUtils.getJobExecutionPlan(dagNode);
-    JobSpec jobSpec = DagManagerUtils.getJobSpec(dagNode);
-    Map<String, String> jobMetadata = 
TimingEventUtils.getJobMetadata(Maps.newHashMap(), jobExecutionPlan);
-
-    String specExecutorUri = DagManagerUtils.getSpecExecutorUri(dagNode);
-
-    // Run this spec on selected executor
-    SpecProducer<Spec> producer;
-    try {
-      producer = DagManagerUtils.getSpecProducer(dagNode);
-      TimingEvent jobOrchestrationTimer = 
eventSubmitter.getTimingEvent(TimingEvent.LauncherTimings.JOB_ORCHESTRATED);
-
-      // Increment job count before submitting the job onto the spec producer, 
in case that throws an exception.
-      // By this point the quota is allocated, so it's imperative to increment 
as missing would introduce the potential to decrement below zero upon quota 
release.
-      // Quota release is guaranteed, despite failure, because exception 
handling within would mark the job FAILED.
-      // When the ensuing kafka message spurs DagManager processing, the quota 
is released and the counts decremented
-      // Ensure that we do not double increment for flows that are retried
-      if (DagManagerUtils.getJobExecutionPlan(dagNode).getCurrentAttempts() == 
1) {
-        
dagManagementStateStore.getDagManagerMetrics().incrementRunningJobMetrics(dagNode);
-      }
-      // Submit the job to the SpecProducer, which in turn performs the actual 
job submission to the SpecExecutor instance.
-      // The SpecProducer implementations submit the job to the underlying 
executor and return when the submission is complete,
-      // either successfully or unsuccessfully. To catch any exceptions in the 
job submission, the DagManagerThread
-      // blocks (by calling Future#get()) until the submission is completed.
-      dagManagementStateStore.tryAcquireQuota(Collections.singleton(dagNode));
-      Future<?> addSpecFuture = producer.addSpec(jobSpec);
-      // todo - we should add future.get() instead of the complete future into 
the JobExecutionPlan
-      
dagNode.getValue().setJobFuture(com.google.common.base.Optional.of(addSpecFuture));
-      addSpecFuture.get();
-      jobExecutionPlan.setExecutionStatus(ExecutionStatus.ORCHESTRATED);
-      jobMetadata.put(TimingEvent.METADATA_MESSAGE, 
producer.getExecutionLink(addSpecFuture, specExecutorUri));
-      // Add serialized job properties as part of the orchestrated job event 
metadata
-      jobMetadata.put(JobExecutionPlan.JOB_PROPS_KEY, 
dagNode.getValue().toString());
-      jobOrchestrationTimer.stop(jobMetadata);
-      log.info("Orchestrated job: {} on Executor: {}", 
DagManagerUtils.getFullyQualifiedJobName(dagNode), specExecutorUri);
-      
dagManagementStateStore.getDagManagerMetrics().incrementJobsSentToExecutor(dagNode);
-    } catch (Exception e) {
-      TimingEvent jobFailedTimer = 
eventSubmitter.getTimingEvent(TimingEvent.LauncherTimings.JOB_FAILED);
-      String message = "Cannot submit job " + 
DagManagerUtils.getFullyQualifiedJobName(dagNode) + " on executor " + 
specExecutorUri;
-      log.error(message, e);
-      jobMetadata.put(TimingEvent.METADATA_MESSAGE, message + " due to " + 
e.getMessage());
-      if (jobFailedTimer != null) {
-        jobFailedTimer.stop(jobMetadata);
-      }
-      try {
-        // when there is no exception, quota will be released in job status 
monitor or re-evaluate dag proc
-        dagManagementStateStore.releaseQuota(dagNode);
-      } catch (IOException ex) {
-        log.error("Could not release quota while handling e", ex);
-      }
-      throw new RuntimeException(e);
-    }
-  }
-
-  @Override
-  protected void sendNotification(Optional<Dag<JobExecutionPlan>> result, 
EventSubmitter eventSubmitter) {
-  }
 }
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagTask.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagTask.java
index 732b9a226..bc12926c7 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagTask.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagTask.java
@@ -25,8 +25,6 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.gobblin.annotation.Alpha;
 import org.apache.gobblin.service.modules.orchestration.DagActionStore;
 import 
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
-import org.apache.gobblin.service.modules.orchestration.DagManager;
-import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
 import org.apache.gobblin.service.modules.orchestration.DagTaskVisitor;
 import org.apache.gobblin.service.modules.orchestration.LeaseAttemptStatus;
 import org.apache.gobblin.service.modules.orchestration.proc.DagProc;
@@ -44,14 +42,12 @@ public abstract class DagTask {
   @Getter public final DagActionStore.DagAction dagAction;
   private final LeaseAttemptStatus.LeaseObtainedStatus leaseObtainedStatus;
   private final DagActionStore dagActionStore;
-  @Getter protected final DagManager.DagId dagId;
 
   public DagTask(DagActionStore.DagAction dagAction, 
LeaseAttemptStatus.LeaseObtainedStatus leaseObtainedStatus,
       DagActionStore dagActionStore) {
     this.dagAction = dagAction;
     this.leaseObtainedStatus = leaseObtainedStatus;
     this.dagActionStore = dagActionStore;
-    this.dagId = DagManagerUtils.generateDagId(dagAction.getFlowGroup(), 
dagAction.getFlowName(), dagAction.getFlowExecutionId());
   }
 
   public abstract <T> T host(DagTaskVisitor<T> visitor);
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowExecutionResourceHandlerWithWarmStandby.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowExecutionResourceHandlerWithWarmStandby.java
index f792ace15..caee99e59 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowExecutionResourceHandlerWithWarmStandby.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowExecutionResourceHandlerWithWarmStandby.java
@@ -17,6 +17,12 @@
 
 package org.apache.gobblin.service.modules.restli;
 
+import java.io.IOException;
+import java.sql.SQLException;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.helix.HelixManager;
+
 import com.google.common.base.Optional;
 import com.google.common.eventbus.EventBus;
 import com.google.inject.Inject;
@@ -25,17 +31,15 @@ import com.linkedin.restli.common.EmptyRecord;
 import com.linkedin.restli.common.HttpStatus;
 import com.linkedin.restli.server.RestLiServiceException;
 import com.linkedin.restli.server.UpdateResponse;
-import java.io.IOException;
-import java.sql.SQLException;
+
 import javax.inject.Named;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang.StringUtils;
-import org.apache.gobblin.service.modules.orchestration.DagActionStore;
+
 import org.apache.gobblin.runtime.util.InjectionNames;
 import org.apache.gobblin.service.FlowExecutionResourceLocalHandler;
 import org.apache.gobblin.service.FlowStatusId;
 import org.apache.gobblin.service.modules.core.GobblinServiceManager;
-import org.apache.helix.HelixManager;
+import org.apache.gobblin.service.modules.orchestration.DagActionStore;
 
 @Slf4j
 public class GobblinServiceFlowExecutionResourceHandlerWithWarmStandby extends 
GobblinServiceFlowExecutionResourceHandler{
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanDagFactory.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanDagFactory.java
index 8af2ee7a8..d96c67619 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanDagFactory.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanDagFactory.java
@@ -17,13 +17,13 @@
 
 package org.apache.gobblin.service.modules.spec;
 
-import com.google.common.collect.Maps;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 
+import com.google.common.collect.Maps;
 import com.typesafe.config.Config;
 
 import lombok.extern.slf4j.Slf4j;
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanListDeserializer.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanListDeserializer.java
index ede2870a1..c51a9f378 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanListDeserializer.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanListDeserializer.java
@@ -120,7 +120,6 @@ public class JobExecutionPlanListDeserializer implements 
JsonDeserializer<List<J
         String jobExecutionFuture = 
serializedJobExecutionPlan.get(SerializationConstants.JOB_EXECUTION_FUTURE).getAsString();
         Future future = 
specExecutor.getProducer().get().deserializeAddSpecResponse(jobExecutionFuture);
         jobExecutionPlan.setJobFuture(Optional.fromNullable(future));
-
       } catch (ExecutionException | InterruptedException e) {
         log.warn("Error during deserialization of JobExecutionFuture.");
         throw new RuntimeException(e);
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
index 68fded2e0..40bbb06bf 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
@@ -41,13 +41,13 @@ import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.kafka.client.DecodeableKafkaRecord;
 import org.apache.gobblin.metrics.ContextAwareGauge;
 import org.apache.gobblin.metrics.ContextAwareMeter;
-import org.apache.gobblin.service.modules.orchestration.DagActionStore;
 import org.apache.gobblin.runtime.api.FlowSpec;
 import org.apache.gobblin.runtime.api.SpecNotFoundException;
 import org.apache.gobblin.runtime.kafka.HighLevelConsumer;
 import org.apache.gobblin.runtime.metrics.RuntimeMetrics;
 import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
 import org.apache.gobblin.service.FlowId;
+import org.apache.gobblin.service.modules.orchestration.DagActionStore;
 import org.apache.gobblin.service.modules.orchestration.DagManager;
 import org.apache.gobblin.service.modules.orchestration.Orchestrator;
 
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitorFactory.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitorFactory.java
index 07e4229f8..6277360e9 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitorFactory.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitorFactory.java
@@ -26,9 +26,9 @@ import javax.inject.Named;
 import javax.inject.Provider;
 import lombok.extern.slf4j.Slf4j;
 
-import org.apache.gobblin.service.modules.orchestration.DagActionStore;
 import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
 import org.apache.gobblin.runtime.util.InjectionNames;
+import org.apache.gobblin.service.modules.orchestration.DagActionStore;
 import org.apache.gobblin.service.modules.orchestration.DagManager;
 import org.apache.gobblin.service.modules.orchestration.Orchestrator;
 import org.apache.gobblin.util.ConfigUtils;
@@ -60,8 +60,7 @@ public class DagActionStoreChangeMonitorFactory implements 
Provider<DagActionSto
     this.isMultiActiveSchedulerEnabled = isMultiActiveSchedulerEnabled;
   }
 
-  private DagActionStoreChangeMonitor createDagActionStoreMonitor()
-    throws ReflectiveOperationException {
+  private DagActionStoreChangeMonitor createDagActionStoreMonitor() {
     Config dagActionStoreChangeConfig = 
this.config.getConfig(DagActionStoreChangeMonitor.DAG_ACTION_CHANGE_MONITOR_PREFIX);
     log.info("DagActionStore will be initialized with config {}", 
dagActionStoreChangeConfig);
 
@@ -74,12 +73,8 @@ public class DagActionStoreChangeMonitorFactory implements 
Provider<DagActionSto
 
   @Override
   public DagActionStoreChangeMonitor get() {
-    try {
-      DagActionStoreChangeMonitor changeMonitor = 
createDagActionStoreMonitor();
-      changeMonitor.initializeMonitor();
-      return changeMonitor;
-    } catch (ReflectiveOperationException e) {
-      throw new RuntimeException("Failed to initialize DagActionStoreMonitor 
due to ", e);
-    }
+    DagActionStoreChangeMonitor changeMonitor = createDagActionStoreMonitor();
+    changeMonitor.initializeMonitor();
+    return changeMonitor;
   }
 }
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagManagementDagActionStoreChangeMonitor.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagManagementDagActionStoreChangeMonitor.java
index 928266602..e7843f1af 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagManagementDagActionStoreChangeMonitor.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagManagementDagActionStoreChangeMonitor.java
@@ -23,10 +23,8 @@ import com.typesafe.config.Config;
 
 import lombok.extern.slf4j.Slf4j;
 
-import org.apache.gobblin.metrics.ContextAwareMeter;
-import org.apache.gobblin.service.modules.orchestration.DagActionStore;
-import org.apache.gobblin.runtime.metrics.RuntimeMetrics;
 import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
+import org.apache.gobblin.service.modules.orchestration.DagActionStore;
 import org.apache.gobblin.service.modules.orchestration.DagManagement;
 import org.apache.gobblin.service.modules.orchestration.Orchestrator;
 
@@ -38,7 +36,6 @@ import 
org.apache.gobblin.service.modules.orchestration.Orchestrator;
 @Slf4j
 public class DagManagementDagActionStoreChangeMonitor extends 
DagActionStoreChangeMonitor {
   private final DagManagement dagManagement;
-  protected ContextAwareMeter unexpectedLaunchEventErrors;
 
   // Note that the topic is an empty string (rather than null to avoid NPE) 
because this monitor relies on the consumer
   // client itself to determine all Kafka related information dynamically 
rather than through the config.
@@ -62,28 +59,17 @@ public class DagManagementDagActionStoreChangeMonitor 
extends DagActionStoreChan
     LaunchSubmissionMetricProxy launchSubmissionMetricProxy = isStartup ? 
ON_STARTUP : POST_STARTUP;
     try {
       // todo - add actions for other other type of dag actions
-      if 
(dagAction.getDagActionType().equals(DagActionStore.DagActionType.LAUNCH)) {
-        // If multi-active scheduler is NOT turned on we should not receive 
these type of events
-        if (!this.isMultiActiveSchedulerEnabled) {
-          this.unexpectedLaunchEventErrors.mark();
-          throw new RuntimeException(String.format("Received LAUNCH dagAction 
while not in multi-active scheduler "
-              + "mode for flowAction: %s", dagAction));
-        }
-        dagManagement.addDagAction(dagAction);
-      } else {
-        log.warn("Received unsupported dagAction {}. Expected to be a KILL, 
RESUME, or LAUNCH", dagAction.getDagActionType());
-        this.unexpectedErrors.mark();
+      switch (dagAction.getDagActionType()) {
+        case LAUNCH :
+          dagManagement.addDagAction(dagAction);
+          break;
+        default:
+          log.warn("Received unsupported dagAction {}. Expected to be a 
LAUNCH", dagAction.getDagActionType());
+          this.unexpectedErrors.mark();
       }
     } catch (IOException e) {
       log.warn("Failed to addDagAction for flowId {} due to exception {}", 
dagAction.getFlowId(), e.getMessage());
       launchSubmissionMetricProxy.markFailure();
     }
   }
-
-  @Override
-  protected void createMetrics() {
-    super.createMetrics();
-    // Dag Action specific metrics
-    this.unexpectedLaunchEventErrors = 
this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_UNEXPECTED_ERRORS);
-  }
 }
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagManagementDagActionStoreChangeMonitorFactory.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagManagementDagActionStoreChangeMonitorFactory.java
index 6853ed79f..af5146027 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagManagementDagActionStoreChangeMonitorFactory.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagManagementDagActionStoreChangeMonitorFactory.java
@@ -26,9 +26,9 @@ import javax.inject.Named;
 import javax.inject.Provider;
 import lombok.extern.slf4j.Slf4j;
 
-import org.apache.gobblin.service.modules.orchestration.DagActionStore;
 import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
 import org.apache.gobblin.runtime.util.InjectionNames;
+import org.apache.gobblin.service.modules.orchestration.DagActionStore;
 import org.apache.gobblin.service.modules.orchestration.DagManagement;
 import org.apache.gobblin.service.modules.orchestration.DagManager;
 import org.apache.gobblin.service.modules.orchestration.Orchestrator;
@@ -61,8 +61,7 @@ public class DagManagementDagActionStoreChangeMonitorFactory 
implements Provider
     this.dagManagement = dagManagement;
   }
 
-  private DagManagementDagActionStoreChangeMonitor 
createDagActionStoreMonitor()
-    throws ReflectiveOperationException {
+  private DagManagementDagActionStoreChangeMonitor 
createDagActionStoreMonitor() {
     Config dagActionStoreChangeConfig = 
this.config.getConfig(DagActionStoreChangeMonitor.DAG_ACTION_CHANGE_MONITOR_PREFIX);
     log.info("DagActionStore will be initialized with config {}", 
dagActionStoreChangeConfig);
 
@@ -74,12 +73,8 @@ public class DagManagementDagActionStoreChangeMonitorFactory 
implements Provider
 
   @Override
   public DagActionStoreChangeMonitor get() {
-    try {
-      DagActionStoreChangeMonitor changeMonitor = 
createDagActionStoreMonitor();
-      changeMonitor.initializeMonitor();
-      return changeMonitor;
-    } catch (ReflectiveOperationException e) {
-      throw new RuntimeException("Failed to initialize DagActionStoreMonitor 
due to ", e);
-    }
+    DagActionStoreChangeMonitor changeMonitor = createDagActionStoreMonitor();
+    changeMonitor.initializeMonitor();
+    return changeMonitor;
   }
 }
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
index 87e1c3b5a..8ae588231 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
@@ -30,9 +30,10 @@ import java.util.concurrent.TimeUnit;
 import com.codahale.metrics.MetricRegistry;
 import com.codahale.metrics.Timer;
 import com.github.rholder.retry.Attempt;
-import com.github.rholder.retry.Retryer;
 import com.github.rholder.retry.RetryException;
 import com.github.rholder.retry.RetryListener;
+import com.github.rholder.retry.Retryer;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Joiner;
 import com.google.common.base.Splitter;
 import com.google.common.collect.ImmutableList;
@@ -40,7 +41,6 @@ import com.google.common.collect.ImmutableMap;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 
-import com.google.common.annotations.VisibleForTesting;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 
@@ -64,7 +64,10 @@ import org.apache.gobblin.source.workunit.WorkUnit;
 import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.retry.RetryerFactory;
 
-import static org.apache.gobblin.util.retry.RetryerFactory.*;
+import static org.apache.gobblin.util.retry.RetryerFactory.RETRY_INTERVAL_MS;
+import static org.apache.gobblin.util.retry.RetryerFactory.RETRY_TIME_OUT_MS;
+import static org.apache.gobblin.util.retry.RetryerFactory.RETRY_TYPE;
+import static org.apache.gobblin.util.retry.RetryerFactory.RetryType;
 
 
 /**
@@ -241,7 +244,7 @@ public abstract class KafkaJobStatusMonitor extends 
HighLevelConsumer<byte[], by
       String tableName = jobStatusTableName(flowExecutionId, jobGroup, 
jobName);
 
       List<org.apache.gobblin.configuration.State> states = 
stateStore.getAll(storeName, tableName);
-      if (states.size() > 0) {
+      if (!states.isEmpty()) {
         org.apache.gobblin.configuration.State previousJobStatus = 
states.get(states.size() - 1);
         String previousStatus = 
previousJobStatus.getProp(JobStatusRetriever.EVENT_NAME_FIELD);
         String currentStatus = 
jobStatus.getProp(JobStatusRetriever.EVENT_NAME_FIELD);
@@ -305,7 +308,7 @@ public abstract class KafkaJobStatusMonitor extends 
HighLevelConsumer<byte[], by
   }
 
   static boolean 
isNewStateTransitionToFinal(org.apache.gobblin.configuration.State 
currentState, List<org.apache.gobblin.configuration.State> prevStates) {
-    if (prevStates.size() == 0) {
+    if (prevStates.isEmpty()) {
       return 
FlowStatusGenerator.FINISHED_STATUSES.contains(currentState.getProp(JobStatusRetriever.EVENT_NAME_FIELD));
     }
     return currentState.contains(JobStatusRetriever.EVENT_NAME_FIELD) && 
FlowStatusGenerator.FINISHED_STATUSES.contains(currentState.getProp(JobStatusRetriever.EVENT_NAME_FIELD))
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
index b9a9d33e5..8370183e8 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
@@ -154,6 +154,11 @@ public class DagManagerTest {
 
   public static Dag<JobExecutionPlan> buildDag(String id, Long 
flowExecutionId, String flowFailureOption, int numNodes, String proxyUser, 
Config additionalConfig)
       throws URISyntaxException {
+    if (additionalConfig.hasPath(ConfigurationKeys.JOB_NAME_KEY)) {
+      throw new RuntimeException("Please do not set " + 
ConfigurationKeys.JOB_NAME_KEY + " because this method is "
+          + "using hard coded job names in setting " + 
ConfigurationKeys.JOB_DEPENDENCIES);
+    }
+
     List<JobExecutionPlan> jobExecutionPlans = new ArrayList<>();
 
     for (int i = 0; i < numNodes; i++) {
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineTest.java
index 138ddb3a2..f5ed96867 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineTest.java
@@ -17,7 +17,6 @@
 
 package org.apache.gobblin.service.modules.orchestration;
 
-import java.io.IOException;
 import java.net.URI;
 import java.util.HashMap;
 import java.util.Map;
@@ -38,7 +37,6 @@ import org.apache.gobblin.config.ConfigBuilder;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
 import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
-import org.apache.gobblin.metrics.event.EventSubmitter;
 import org.apache.gobblin.runtime.api.TopologySpec;
 import org.apache.gobblin.service.ServiceConfigKeys;
 import org.apache.gobblin.service.modules.orchestration.proc.DagProc;
@@ -84,6 +82,7 @@ public class DagProcessingEngineTest {
         new DagManagementTaskStreamImpl(config, 
Optional.of(mock(DagActionStore.class)),
             mock(MultiActiveLeaseArbiter.class), Optional.empty(), false);
     this.dagProcFactory = new DagProcFactory(null);
+
     DagProcessingEngine.DagProcEngineThread dagProcEngineThread =
         new 
DagProcessingEngine.DagProcEngineThread(this.dagManagementTaskStream, 
this.dagProcFactory,
             dagManagementStateStore);
@@ -127,7 +126,7 @@ public class DagProcessingEngineTest {
 
     @Override
     public <T> T host(DagTaskVisitor<T> visitor) {
-      return (T) new MockedDagProc(this.isBad);
+      return (T) new MockedDagProc(this, this.isBad);
     }
 
     @Override
@@ -136,15 +135,16 @@ public class DagProcessingEngineTest {
     }
   }
 
-  static class MockedDagProc extends DagProc<Void, Void> {
+  static class MockedDagProc extends DagProc<Void> {
     private final boolean isBad;
-    public MockedDagProc(boolean isBad) {
-      super(null);
+
+    public MockedDagProc(MockedDagTask mockedDagTask, boolean isBad) {
+      super(mockedDagTask);
       this.isBad = isBad;
     }
 
     @Override
-    protected DagManager.DagId getDagId() {
+    public DagManager.DagId getDagId() {
       return new DagManager.DagId("fg", "fn", "12345");
     }
 
@@ -154,15 +154,10 @@ public class DagProcessingEngineTest {
     }
 
     @Override
-    protected Void act(DagManagementStateStore dagManagementStateStore, Void 
state) {
+    protected void act(DagManagementStateStore dagManagementStateStore, Void 
state) {
       if (this.isBad) {
         throw new RuntimeException("Simulating an exception!");
       }
-      return null;
-    }
-
-    @Override
-    protected void sendNotification(Void result, EventSubmitter 
eventSubmitter) throws IOException {
     }
   }
 
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandlerTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandlerTest.java
index 655532ab0..dea34b6cd 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandlerTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandlerTest.java
@@ -18,12 +18,14 @@
 package org.apache.gobblin.service.modules.orchestration;
 
 import java.util.Properties;
-import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.service.modules.scheduler.GobblinServiceJobScheduler;
+
 import org.junit.Assert;
 import org.quartz.JobDataMap;
 import org.testng.annotations.Test;
 
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.service.modules.scheduler.GobblinServiceJobScheduler;
+
 
 public class FlowLaunchHandlerTest {
   long eventToRevisit = 123000L;
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStoreTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStoreTest.java
index af1532b8a..ceb1b7b9f 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStoreTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStoreTest.java
@@ -126,7 +126,8 @@ public class MostlyMySqlDagManagementStateStoreTest {
     TopologySpec topologySpec = 
DagTestUtils.buildNaiveTopologySpec(specExecInstance);
     URI specExecURI = new URI(specExecInstance);
     topologySpecMap.put(specExecURI, topologySpec);
-    MostlyMySqlDagManagementStateStore dagManagementStateStore = new 
MostlyMySqlDagManagementStateStore(config, null, null, jobStatusRetriever);
+    MostlyMySqlDagManagementStateStore dagManagementStateStore =
+        new MostlyMySqlDagManagementStateStore(config, null, null, 
jobStatusRetriever);
     dagManagementStateStore.setTopologySpecMap(topologySpecMap);
     return dagManagementStateStore;
   }
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProcTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProcTest.java
index 6b71977a1..21b9fc28b 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProcTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProcTest.java
@@ -35,13 +35,13 @@ import com.typesafe.config.ConfigValueFactory;
 import org.apache.gobblin.config.ConfigBuilder;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
-import org.apache.gobblin.service.modules.orchestration.DagActionStore;
 import org.apache.gobblin.runtime.api.FlowSpec;
 import org.apache.gobblin.runtime.api.JobSpec;
 import org.apache.gobblin.runtime.api.SpecExecutor;
 import org.apache.gobblin.runtime.spec_executorInstance.MockedSpecExecutor;
 import org.apache.gobblin.service.modules.flowgraph.Dag;
 import org.apache.gobblin.service.modules.orchestration.AzkabanProjectConfig;
+import org.apache.gobblin.service.modules.orchestration.DagActionStore;
 import org.apache.gobblin.service.modules.orchestration.DagManager;
 import org.apache.gobblin.service.modules.orchestration.DagManagerTest;
 import 
org.apache.gobblin.service.modules.orchestration.MostlyMySqlDagManagementStateStore;

Reply via email to