This is an automated email from the ASF dual-hosted git repository.

arjun4084346 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 02eca4f8d [GOBBLIN-1910] Pr dag refactor stage 2 (#3858)
02eca4f8d is described below

commit 02eca4f8de577930b4ba88f138b3631102308524
Author: Arjun Singh Bora <[email protected]>
AuthorDate: Sat Mar 9 08:45:45 2024 -0800

    [GOBBLIN-1910] Pr dag refactor stage 2 (#3858)
    
    * add new components to take responsibilities away from DagManager
    * rebase the latest commit
    * address review comments
    * add unit test
    * add flow catalog to DagManagementStateStore
    ---------
    Co-authored-by: Meeth Gala <[email protected]>
---
 .../apache/gobblin/service/ServiceConfigKeys.java  |   5 +
 .../apache/gobblin/metrics/ServiceMetricNames.java |   2 +
 .../gobblin/runtime/metrics/RuntimeMetrics.java    |   1 +
 .../modules/orchestration/DagManagement.java       |  33 ++++
 .../orchestration/DagManagementStateStore.java     |  18 ++-
 .../orchestration/DagManagementTaskStreamImpl.java | 115 +++++++++++++
 .../modules/orchestration/DagManagerMetrics.java   |   3 +
 .../modules/orchestration/DagManagerUtils.java     |   2 +-
 .../modules/orchestration/DagProcFactory.java      |  44 +++++
 .../modules/orchestration/DagProcessingEngine.java | 106 ++++++++++++
 .../modules/orchestration/DagTaskStream.java       |  30 ++++
 .../modules/orchestration/DagTaskVisitor.java      |  25 +++
 .../MostlyMySqlDagManagementStateStore.java        |  50 +++++-
 .../modules/orchestration/proc/DagProc.java        |  61 +++++++
 .../modules/orchestration/proc/LaunchDagProc.java  |  75 +++++++++
 .../modules/orchestration/task/DagTask.java        |  60 +++++++
 .../modules/orchestration/task/LaunchDagTask.java  |  43 +++++
 .../monitoring/DagActionStoreChangeMonitor.java    |  25 +--
 .../DagManagementDagActionStoreChangeMonitor.java  |  89 ++++++++++
 ...eEnabledDagActionStoreChangeMonitorFactory.java |  85 ++++++++++
 .../DagManagementTaskStreamImplTest.java           |  91 +++++++++++
 .../orchestration/DagProcessingEngineTest.java     | 180 +++++++++++++++++++++
 .../MostlyMySqlDagManagementStateStoreTest.java    |   9 +-
 23 files changed, 1127 insertions(+), 25 deletions(-)

diff --git 
a/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java 
b/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
index 2cfb6a017..87bda430a 100644
--- 
a/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
+++ 
b/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
@@ -199,4 +199,9 @@ public class ServiceConfigKeys {
   public static final int DEFAULT_MEMORY_ISSUE_REPO_MAX_ISSUE_PER_CONTEXT= 20;
 
   public static final String ISSUE_REPO_CLASS = GOBBLIN_SERVICE_PREFIX + 
"issueRepo.class";
+
+  public static final String GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_PREFIX = 
ServiceConfigKeys.GOBBLIN_SERVICE_PREFIX + "dagProcessingEngine.";
+  public static final String DAG_PROCESSING_ENGINE_ENABLED = 
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_PREFIX + "enabled";
+  public static final String NUM_DAG_PROC_THREADS_KEY = 
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_PREFIX + "numThreads";
+  public static final Integer DEFAULT_NUM_DAG_PROC_THREADS = 3;
 }
diff --git 
a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
 
b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
index 962713b1c..41e242b43 100644
--- 
a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
+++ 
b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
@@ -77,4 +77,6 @@ public class ServiceMetricNames {
   public static final String DAG_COUNT_MYSQL_DAG_STATE_COUNT = 
GOBBLIN_SERVICE_PREFIX_WITH_DELIMITER + "MysqlDagStateStore" + ".totalDagCount";
 
   public static final String DAG_COUNT_FS_DAG_STATE_COUNT = 
GOBBLIN_SERVICE_PREFIX_WITH_DELIMITER + "FsDagStateStore" + ".totalDagCount";
+
+  public static final String DAG_PROCESSING_EXCEPTION_METER = 
"DagProcessingException";
 }
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/RuntimeMetrics.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/RuntimeMetrics.java
index 0d6dacf9a..11215a7f1 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/RuntimeMetrics.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/RuntimeMetrics.java
@@ -59,6 +59,7 @@ public class RuntimeMetrics {
   public static final String 
GOBBLIN_DAG_ACTION_STORE_SUCCESSFUL_LAUNCH_SUBMISSIONS_ON_STARTUP = 
DAG_ACTION_STORE_MONITOR_PREFIX + "successfulLaunchSubmissionsOnStartup";
   public static final String 
GOBBLIN_DAG_ACTION_STORE_FAILED_LAUNCH_SUBMISSIONS_ON_STARTUP = 
DAG_ACTION_STORE_MONITOR_PREFIX + "failedLaunchSubmissionsOnStartup";
   public static final String 
GOBBLIN_DAG_ACTION_STORE_MONITOR_UNEXPECTED_ERRORS = 
DAG_ACTION_STORE_MONITOR_PREFIX + "unexpected.errors";
+  public static final String 
GOBBLIN_DAG_ACTION_STORE_MONITOR_UNEXPECTED_LAUNCH_EVENT_ERRORS = 
DAG_ACTION_STORE_MONITOR_PREFIX + "unexpected.launch.event.errors";
   public static final String
       GOBBLIN_DAG_ACTION_STORE_PRODUCE_TO_CONSUME_DELAY_MILLIS = 
DAG_ACTION_STORE_MONITOR_PREFIX + "produce.to.consume.delay";
   public static final String GOBBLIN_MYSQL_QUOTA_MANAGER_UNEXPECTED_ERRORS = 
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX_WITH_DELIMITER + 
"gobblin.mysql.quota.manager.unexpected.errors";
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagement.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagement.java
new file mode 100644
index 000000000..1e4b17c90
--- /dev/null
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagement.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+
+import org.apache.gobblin.runtime.api.DagActionStore;
+
+
+/**
+ * An interface to provide abstractions for managing operations on Dag.
+ * It accepts a {@link 
org.apache.gobblin.runtime.api.DagActionStore.DagAction} which can be processed 
later.
+ * Consumption of the Dags happen through {@link DagTaskStream}.
+ */
+public interface DagManagement {
+
+  void addDagAction(DagActionStore.DagAction dagAction) throws IOException;
+}
\ No newline at end of file
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java
index 4903247d2..c6e45a46d 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java
@@ -18,13 +18,17 @@
 package org.apache.gobblin.service.modules.orchestration;
 
 import java.io.IOException;
+import java.net.URI;
 import java.util.Collection;
 import java.util.List;
 import java.util.Optional;
+import java.util.Properties;
 import java.util.Set;
 
 import org.apache.gobblin.annotation.Alpha;
 import org.apache.gobblin.exception.QuotaExceededException;
+import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.api.SpecNotFoundException;
 import org.apache.gobblin.service.modules.flowgraph.Dag;
 import org.apache.gobblin.service.modules.flowgraph.DagNodeId;
 import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
@@ -36,6 +40,18 @@ import 
org.apache.gobblin.service.modules.spec.JobExecutionPlan;
  */
 @Alpha
 public interface DagManagementStateStore {
+  /**
+   * Returns a {@link FlowSpec} for the given URI.
+   * @throws SpecNotFoundException if the spec is not found
+   */
+  FlowSpec getFlowSpec(URI uri) throws SpecNotFoundException;
+
+  /**
+   * Removes a {@link FlowSpec} with the given URI and pass the deletion to 
listeners if `triggerListener` is true
+   * No-op if the flow spec was not present in the store.
+   */
+  void removeFlowSpec(URI uri, Properties headers, boolean triggerListener);
+
   /**
    * Checkpoints any changes in {@link Dag} or in its {@link Dag.DagNode}s.
    * e.g. on adding a failed dag in store to retry later, on submitting a dag 
node to spec producer because that changes
@@ -165,4 +181,4 @@ public interface DagManagementStateStore {
    * Returns true if successfully reduces the quota usage
    */
   boolean releaseQuota(Dag.DagNode<JobExecutionPlan> dagNode) throws 
IOException;
-}
+}
\ No newline at end of file
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java
new file mode 100644
index 000000000..a83d1103e
--- /dev/null
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.util.Optional;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import lombok.Data;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.service.modules.orchestration.task.LaunchDagTask;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * DagManagementTaskStreamImpl implements {@link DagManagement} and {@link 
DagTaskStream}. It accepts
+ * {@link org.apache.gobblin.runtime.api.DagActionStore.DagAction}s and 
iteratively provides {@link DagTask}.
+ */
+@Slf4j
+@Singleton
+@Data
+public class DagManagementTaskStreamImpl implements DagManagement, 
DagTaskStream {
+  private final Config config;
+  @Getter private final EventSubmitter eventSubmitter;
+  @Getter private static final DagManagerMetrics dagManagerMetrics = new 
DagManagerMetrics();
+
+  @Inject(optional=true)
+  protected Optional<DagActionStore> dagActionStore;
+  @Inject
+  private static final int MAX_HOUSEKEEPING_THREAD_DELAY = 180;
+  private final BlockingQueue<DagActionStore.DagAction> dagActionQueue = new 
LinkedBlockingQueue<>();
+
+  @Inject
+  public DagManagementTaskStreamImpl(Config config, Optional<DagActionStore> 
dagActionStore) {
+    this.config = config;
+    this.dagActionStore = dagActionStore;
+    MetricContext metricContext = 
Instrumented.getMetricContext(ConfigUtils.configToState(ConfigFactory.empty()), 
getClass());
+    this.eventSubmitter = new EventSubmitter.Builder(metricContext, 
"org.apache.gobblin.service").build();
+    dagManagerMetrics.activate();
+  }
+
+  @Override
+  public synchronized void addDagAction(DagActionStore.DagAction dagAction) {
+    // TODO: Used to track missing dag issue, remove later as needed
+    log.info("Add dagAction{}", dagAction);
+
+    if (!this.dagActionQueue.offer(dagAction)) {
+      throw new RuntimeException("Could not add dag action " + dagAction + " 
to the queue");
+    }
+  }
+
+  @Override
+  public boolean hasNext() {
+    return true;
+  }
+
+  @Override
+  public DagTask next() {
+    try {
+      DagActionStore.DagAction dagAction = this.dagActionQueue.take();  
//`take` blocks till element is not available
+      // todo reconsider the use of MultiActiveLeaseArbiter
+      //MultiActiveLeaseArbiter.LeaseAttemptStatus leaseAttemptStatus = new 
MultiActiveLeaseArbiter.LeaseObtainedStatus(dagAction);
+      // todo - uncomment after flow trigger handler provides such an api
+      //Properties jobProps = getJobProperties(dagAction);
+      //flowTriggerHandler.getLeaseOnDagAction(jobProps, dagAction, 
System.currentTimeMillis());
+      //if (leaseAttemptStatus instanceof 
MultiActiveLeaseArbiter.LeaseObtainedStatus) {
+      // can it return null? is this iterator allowed to return null?
+      return createDagTask(dagAction, new 
MultiActiveLeaseArbiter.LeaseObtainedStatus(dagAction, 
System.currentTimeMillis()));
+      //}
+    } catch (Throwable t) {
+      //TODO: need to handle exceptions gracefully
+      log.error("Error getting DagAction from the queue / creating DagTask", 
t);
+    }
+    return null;
+  }
+
+  private DagTask createDagTask(DagActionStore.DagAction dagAction, 
MultiActiveLeaseArbiter.LeaseObtainedStatus leaseObtainedStatus) {
+    DagActionStore.FlowActionType flowActionType = 
dagAction.getFlowActionType();
+
+    switch (flowActionType) {
+      case LAUNCH:
+        return new LaunchDagTask(dagAction, leaseObtainedStatus);
+      default:
+        throw new UnsupportedOperationException("Not yet implemented");
+    }
+  }
+}
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerMetrics.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerMetrics.java
index a5f34cff7..24e25e044 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerMetrics.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerMetrics.java
@@ -62,6 +62,7 @@ public class DagManagerMetrics {
   private ContextAwareMeter allRunningMeter;
   private ContextAwareMeter allSlaExceededMeter;
   private ContextAwareMeter allStartSlaExceededMeter;
+  public ContextAwareMeter dagProcessingExceptionMeter;
   // Meters representing the flows in a given state per flowgroup
   private final Map<String, ContextAwareMeter> groupSuccessfulMeters = 
Maps.newConcurrentMap();
   private final Map<String, ContextAwareMeter> groupFailureMeters = 
Maps.newConcurrentMap();
@@ -100,6 +101,8 @@ public class DagManagerMetrics {
           ServiceMetricNames.SLA_EXCEEDED_FLOWS_METER));
       allRunningMeter = 
metricContext.contextAwareMeter(MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
           ServiceMetricNames.JOBS_SENT_TO_SPEC_EXECUTOR));
+      dagProcessingExceptionMeter = 
metricContext.contextAwareMeter(MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
+          ServiceMetricNames.DAG_PROCESSING_EXCEPTION_METER));
     }
   }
 
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
index e894a0b16..d9d928e19 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
@@ -353,7 +353,7 @@ public class DagManagerUtils {
     return dagNode.getValue().getSpecExecutor().getUri().toString();
   }
 
-  static void emitFlowEvent(EventSubmitter eventSubmitter, 
Dag<JobExecutionPlan> dag, String flowEvent) {
+  public static void emitFlowEvent(EventSubmitter eventSubmitter, 
Dag<JobExecutionPlan> dag, String flowEvent) {
     if (!dag.isEmpty()) {
       // Every dag node will contain the same flow metadata
       Config config = getDagJobConfig(dag);
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcFactory.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcFactory.java
new file mode 100644
index 000000000..3729c19d5
--- /dev/null
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcFactory.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import com.google.inject.Singleton;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.service.modules.orchestration.proc.DagProc;
+import org.apache.gobblin.service.modules.orchestration.proc.LaunchDagProc;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.service.modules.orchestration.task.LaunchDagTask;
+
+
+/**
+ * {@link DagTaskVisitor} for transforming a specific {@link DagTask} derived 
class to its companion {@link DagProc} derived class.
+ * Each {@link DagTask} needs it own {@link DagProcFactory#meet} method 
overload to create {@link DagProc} that is
+ * supposed to process that {@link DagTask}.
+ */
+
+@Alpha
+@Singleton
+public class DagProcFactory implements DagTaskVisitor<DagProc> {
+  @Override
+  public LaunchDagProc meet(LaunchDagTask launchDagTask) {
+    return new LaunchDagProc(launchDagTask);
+  }
+  //todo - overload meet method for other dag tasks
+}
+
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngine.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngine.java
new file mode 100644
index 000000000..bd6aba126
--- /dev/null
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngine.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.typesafe.config.Config;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.service.modules.orchestration.proc.DagProc;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.ExecutorsUtils;
+
+
+/**
+ * Responsible for polling {@link DagTask}s from {@link DagTaskStream} and 
processing the
+ * {@link org.apache.gobblin.service.modules.flowgraph.Dag} based on the type 
of {@link DagTask}.
+ * Each {@link DagTask} returned from the {@link DagTaskStream} comes with a 
time-limited lease conferring the exclusive
+ * right to perform the work of the task.
+ * The {@link DagProcFactory} transforms each {@link DagTask} into a specific, 
concrete {@link DagProc}, which
+ * encapsulates all processing inside {@link 
DagProc#process(DagManagementStateStore)}
+ */
+
+@Alpha
+@Slf4j
+@Singleton
+public class DagProcessingEngine {
+
+  @Getter private final DagTaskStream dagTaskStream;
+  @Getter DagManagementStateStore dagManagementStateStore;
+
+  @Inject
+  public DagProcessingEngine(Config config, DagTaskStream dagTaskStream, 
DagProcFactory dagProcFactory,
+      DagManagementStateStore dagManagementStateStore) {
+    Integer numThreads = ConfigUtils.getInt
+        (config, ServiceConfigKeys.NUM_DAG_PROC_THREADS_KEY, 
ServiceConfigKeys.DEFAULT_NUM_DAG_PROC_THREADS);
+    ScheduledExecutorService scheduledExecutorPool =
+        Executors.newScheduledThreadPool(numThreads,
+            ExecutorsUtils.newThreadFactory(Optional.of(log), 
Optional.of("DagProcessingEngineThread")));
+    this.dagTaskStream = dagTaskStream;
+    this.dagManagementStateStore = dagManagementStateStore;
+
+    for (int i=0; i < numThreads; i++) {
+      // todo - set metrics for count of active DagProcEngineThread
+      DagProcEngineThread dagProcEngineThread = new 
DagProcEngineThread(dagTaskStream, dagProcFactory, dagManagementStateStore);
+      scheduledExecutorPool.submit(dagProcEngineThread);
+    }
+  }
+
+  @AllArgsConstructor
+  @VisibleForTesting
+  static class DagProcEngineThread implements Runnable {
+    private DagTaskStream dagTaskStream;
+    private DagProcFactory dagProcFactory;
+    private DagManagementStateStore dagManagementStateStore;
+
+    @Override
+    public void run() {
+      while (true) {
+        DagTask dagTask = dagTaskStream.next(); // blocking call
+        if (dagTask == null) {
+          //todo - add a metrics to count the times dagTask was null
+          log.warn("Received a null dag task, ignoring.");
+          continue;
+        }
+        DagProc dagProc = dagTask.host(dagProcFactory);
+        try {
+          // todo - add retries
+          dagProc.process(dagManagementStateStore);
+          dagTask.conclude();
+        } catch (Exception e) {
+          log.error("DagProcEngineThread encountered exception while 
processing dag " + dagTask.getDagId(), e);
+          
DagManagementTaskStreamImpl.getDagManagerMetrics().dagProcessingExceptionMeter.mark();
+        }
+        // todo mark lease success and releases it
+        //dagTaskStream.complete(dagTask);
+      }
+    }
+  }
+}
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagTaskStream.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagTaskStream.java
new file mode 100644
index 000000000..b8995fffe
--- /dev/null
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagTaskStream.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.util.Iterator;
+
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+
+
+/**
+ * An interface to provide abstraction for getting next available {@link 
DagTask} to process.
+ */
+
+public interface DagTaskStream extends Iterator<DagTask> {
+}
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagTaskVisitor.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagTaskVisitor.java
new file mode 100644
index 000000000..0b27b4e9d
--- /dev/null
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagTaskVisitor.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import org.apache.gobblin.service.modules.orchestration.task.LaunchDagTask;
+
+
+public interface DagTaskVisitor<T> {
+  T meet(LaunchDagTask launchDagTask);
+}
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStore.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStore.java
index d484ff8f8..48b789d11 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStore.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStore.java
@@ -23,15 +23,20 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
 import com.google.common.collect.Lists;
+import com.google.inject.Inject;
 import com.typesafe.config.Config;
 
 import lombok.extern.slf4j.Slf4j;
 
+import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.api.SpecNotFoundException;
 import org.apache.gobblin.runtime.api.TopologySpec;
+import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
 import org.apache.gobblin.service.modules.flowgraph.Dag;
 import org.apache.gobblin.service.modules.flowgraph.DagNodeId;
 import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
@@ -54,21 +59,50 @@ public class MostlyMySqlDagManagementStateStore implements 
DagManagementStateSto
   // dagToJobs holds a map of dagId to running jobs of that dag
   private final Map<DagManager.DagId, 
LinkedList<Dag.DagNode<JobExecutionPlan>>> dagToJobs = new 
ConcurrentHashMap<>();
   private final Map<DagManager.DagId, Long> dagToDeadline = new 
ConcurrentHashMap<>();
-  private final DagStateStore dagStateStore;
-  private final DagStateStore failedDagStateStore;
+  private DagStateStore dagStateStore;
+  private DagStateStore failedDagStateStore;
+  private boolean dagStoresInitialized = false;
   private final UserQuotaManager quotaManager;
+  Map<URI, TopologySpec> topologySpecMap;
+  private final Config config;
   private static final String FAILED_DAG_STATESTORE_PREFIX = 
"failedDagStateStore";
   public static final String DAG_STATESTORE_CLASS_KEY = 
DagManager.DAG_MANAGER_PREFIX + "dagStateStoreClass";
+  FlowCatalog flowCatalog;
 
-  public MostlyMySqlDagManagementStateStore(Config config, Map<URI, 
TopologySpec> topologySpecMap) throws IOException {
-    this.dagStateStore = createDagStateStore(config, topologySpecMap);
-    this.failedDagStateStore = createDagStateStore(
-        ConfigUtils.getConfigOrEmpty(config, 
FAILED_DAG_STATESTORE_PREFIX).withFallback(config), topologySpecMap);
+  @Inject
+  public MostlyMySqlDagManagementStateStore(Config config, FlowCatalog 
flowCatalog) throws IOException {
     this.quotaManager = new MysqlUserQuotaManager(config);
-    this.quotaManager.init(getDags());
+    this.config = config;
+    this.flowCatalog = flowCatalog;
+   }
+
+  // It should be called after topology spec map is set
+  public synchronized void start() throws IOException {
+    if (!dagStoresInitialized) {
+      this.dagStateStore = createDagStateStore(config, topologySpecMap);
+      this.failedDagStateStore = 
createDagStateStore(ConfigUtils.getConfigOrEmpty(config, 
FAILED_DAG_STATESTORE_PREFIX).withFallback(config),
+          topologySpecMap);
+      initQuota(getDags());
+      dagStoresInitialized = true;
+    }
+  }
+
+  @Override
+  public FlowSpec getFlowSpec(URI uri) throws SpecNotFoundException {
+    return this.flowCatalog.getSpecs(uri);
+  }
+
+  @Override
+  public void removeFlowSpec(URI uri, Properties headers, boolean 
triggerListener) {
+    this.flowCatalog.remove(uri, headers, triggerListener);
+  }
+
+  public synchronized void setTopologySpecMap(Map<URI, TopologySpec> 
topologySpecMap) throws IOException {
+    this.topologySpecMap = topologySpecMap;
+    start();
   }
 
-  DagStateStore createDagStateStore(Config config, Map<URI, TopologySpec> 
topologySpecMap) {
+  private DagStateStore createDagStateStore(Config config, Map<URI, 
TopologySpec> topologySpecMap) {
     try {
       Class<?> dagStateStoreClass = 
Class.forName(ConfigUtils.getString(config, DAG_STATESTORE_CLASS_KEY, 
MysqlDagStateStore.class.getName()));
       return (DagStateStore) 
GobblinConstructorUtils.invokeLongestConstructor(dagStateStoreClass, config, 
topologySpecMap);
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProc.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProc.java
new file mode 100644
index 000000000..94e14387a
--- /dev/null
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProc.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.proc;
+
+import java.io.IOException;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import 
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+
+
+/**
+ * Responsible for performing the actual work for a given {@link DagTask} by 
first initializing its state, performing
+ * actions based on the type of {@link DagTask} and finally submitting an 
event to the executor.
+ */
+@Alpha
+@Slf4j
+public abstract class DagProc<S, T> {
+  protected static final MetricContext metricContext = 
Instrumented.getMetricContext(new State(), DagProc.class);
+  protected static final EventSubmitter eventSubmitter = new 
EventSubmitter.Builder(
+      metricContext, "org.apache.gobblin.service").build();
+
+  public final void process(DagManagementStateStore dagManagementStateStore) 
throws IOException {
+    S state = initialize(dagManagementStateStore);   // todo - retry
+    T result = act(dagManagementStateStore, state);   // todo - retry
+    commit(dagManagementStateStore, result);   // todo - retry
+    sendNotification(result, eventSubmitter);   // todo - retry
+  }
+
+  protected abstract S initialize(DagManagementStateStore 
dagManagementStateStore) throws IOException;
+
+  protected abstract T act(DagManagementStateStore dagManagementStateStore, S 
state) throws IOException;
+
+  protected abstract void sendNotification(T result, EventSubmitter 
eventSubmitter) throws IOException;
+
+  // todo - commit the modified dags to the persistent store, maybe not 
required for InMem dagManagementStateStore
+  protected void commit(DagManagementStateStore dagManagementStateStore, T 
result) {
+
+  }
+}
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProc.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProc.java
new file mode 100644
index 000000000..72a59ee0e
--- /dev/null
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProc.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.proc;
+
+import java.io.IOException;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicLong;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.metrics.ContextAwareGauge;
+import org.apache.gobblin.metrics.ServiceMetricNames;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import 
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import org.apache.gobblin.service.modules.orchestration.task.LaunchDagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+
+/**
+ * An implementation for {@link LaunchDagTask}
+ */
+@Slf4j
+@Alpha
+public class LaunchDagProc extends DagProc<Optional<Dag<JobExecutionPlan>>, 
Optional<Dag<JobExecutionPlan>>> {
+  private final LaunchDagTask launchDagTask;
+  private final AtomicLong orchestrationDelayCounter;
+
+  public LaunchDagProc(LaunchDagTask launchDagTask) {
+    this.launchDagTask = launchDagTask;
+    this.orchestrationDelayCounter = new AtomicLong(0);
+    ContextAwareGauge<Long> orchestrationDelayMetric = 
this.metricContext.newContextAwareGauge
+        (ServiceMetricNames.FLOW_ORCHESTRATION_DELAY, 
orchestrationDelayCounter::get);
+    this.metricContext.register(orchestrationDelayMetric);
+  }
+
+  @Override
+  protected Optional<Dag<JobExecutionPlan>> initialize(DagManagementStateStore 
dagManagementStateStore)
+      throws IOException {
+    throw new UnsupportedOperationException("Not yet implemented");
+  }
+
+  @Override
+  protected Optional<Dag<JobExecutionPlan>> act(DagManagementStateStore 
dagManagementStateStore, Optional<Dag<JobExecutionPlan>> dag)
+      throws IOException {
+    throw new UnsupportedOperationException("Not yet implemented");
+  }
+
+  @Override
+  protected void sendNotification(Optional<Dag<JobExecutionPlan>> result, 
EventSubmitter eventSubmitter)
+      throws IOException {
+    throw new UnsupportedOperationException("Not yet implemented");
+  }
+
+  @Override
+  protected void commit(DagManagementStateStore dagManagementStateStore, 
Optional<Dag<JobExecutionPlan>> dag) {
+    throw new UnsupportedOperationException("Not yet implemented");
+  }
+}
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagTask.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagTask.java
new file mode 100644
index 000000000..114864e64
--- /dev/null
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagTask.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.task;
+
+import java.io.IOException;
+
+import lombok.Getter;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import 
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import org.apache.gobblin.service.modules.orchestration.DagManager;
+import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
+import org.apache.gobblin.service.modules.orchestration.DagTaskVisitor;
+import org.apache.gobblin.service.modules.orchestration.proc.DagProc;
+
+
+/**
+ * Defines an individual task on a Dag.
+ * Upon successful completion of the corresponding {@link 
DagProc#process(DagManagementStateStore)},
+ * {@link DagTask#conclude()} must be called.
+ */
+
+@Alpha
+public abstract class DagTask {
+  @Getter public final DagActionStore.DagAction dagAction;
+  private final MultiActiveLeaseArbiter.LeaseObtainedStatus 
leaseObtainedStatus;
+  @Getter protected final DagManager.DagId dagId;
+
+  public DagTask(DagActionStore.DagAction dagAction, 
MultiActiveLeaseArbiter.LeaseObtainedStatus leaseObtainedStatus) {
+    this.dagAction = dagAction;
+    this.leaseObtainedStatus = leaseObtainedStatus;
+    this.dagId = DagManagerUtils.generateDagId(dagAction.getFlowGroup(), 
dagAction.getFlowName(), dagAction.getFlowExecutionId());
+  }
+
+  public abstract <T> T host(DagTaskVisitor<T> visitor);
+
+  /**
+   * Any cleanup work, e.g. releasing lease if it was acquired earlier, may be 
done in this method.
+   * Returns true if concluding dag task finished successfully otherwise false.
+   */
+  // todo call it from the right place
+  public abstract boolean conclude() throws IOException;
+}
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/LaunchDagTask.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/LaunchDagTask.java
new file mode 100644
index 000000000..5093eaf3e
--- /dev/null
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/LaunchDagTask.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.task;
+
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.service.modules.orchestration.DagTaskVisitor;
+
+
+/**
+ * A {@link DagTask} responsible to handle launch tasks.
+ */
+
+public class LaunchDagTask extends DagTask {
+  public LaunchDagTask(DagActionStore.DagAction dagAction, 
MultiActiveLeaseArbiter.LeaseObtainedStatus leaseObtainedStatus) {
+    super(dagAction, leaseObtainedStatus);
+  }
+
+  public <T> T host(DagTaskVisitor<T> visitor) {
+    return visitor.meet(this);
+  }
+
+  @Override
+  public boolean conclude() {
+    // todo - release lease
+    return true;
+  }
+}
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
index a6f11e08f..7260e45d2 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
@@ -17,6 +17,13 @@
 
 package org.apache.gobblin.service.monitoring;
 
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Collection;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.cache.CacheBuilder;
@@ -24,16 +31,12 @@ import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigValueFactory;
-import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.Collection;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
+
 import lombok.AllArgsConstructor;
 import lombok.Data;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
+
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.kafka.client.DecodeableKafkaRecord;
 import org.apache.gobblin.metrics.ContextAwareGauge;
@@ -59,13 +62,13 @@ public class DagActionStoreChangeMonitor extends 
HighLevelConsumer {
   public static final String DAG_ACTION_CHANGE_MONITOR_PREFIX = 
"dagActionChangeStore";
 
   // Metrics
-  private ContextAwareMeter killsInvoked;
-  private ContextAwareMeter resumesInvoked;
+  protected ContextAwareMeter killsInvoked;
+  protected ContextAwareMeter resumesInvoked;
   private ContextAwareMeter successfulLaunchSubmissions;
   private ContextAwareMeter failedLaunchSubmissions;
   private ContextAwareMeter successfulLaunchSubmissionsOnStartup;
   private ContextAwareMeter failedLaunchSubmissionsOnStartup;
-  private ContextAwareMeter unexpectedErrors;
+  protected ContextAwareMeter unexpectedErrors;
   private ContextAwareMeter messageProcessedMeter;
   private ContextAwareMeter duplicateMessagesMeter;
   private ContextAwareMeter heartbeatMessagesMeter;
@@ -230,7 +233,7 @@ public class DagActionStoreChangeMonitor extends 
HighLevelConsumer {
    * For a given dagAction, calls the appropriate method in the DagManager to 
carry out the desired action.
    * @param isStartup true if called for dagAction loaded directly from store 
upon startup, false otherwise
    */
-  private void handleDagAction(DagActionStore.DagAction dagAction, boolean 
isStartup) {
+  protected void handleDagAction(DagActionStore.DagAction dagAction, boolean 
isStartup) {
     log.info("(" + (isStartup ? "on-startup" : "post-startup") + ") DagAction 
change ({}) received for flow: {}",
         dagAction.getFlowActionType(), dagAction);
     if 
(dagAction.getFlowActionType().equals(DagActionStore.FlowActionType.RESUME)) {
@@ -342,7 +345,7 @@ public class DagActionStoreChangeMonitor extends 
HighLevelConsumer {
 
   @Data
   @AllArgsConstructor
-  private static class LaunchSubmissionMetricProxy {
+  protected static class LaunchSubmissionMetricProxy {
     private ContextAwareMeter successMeter;
     private ContextAwareMeter failureMeter;
 
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagManagementDagActionStoreChangeMonitor.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagManagementDagActionStoreChangeMonitor.java
new file mode 100644
index 000000000..6bfc92045
--- /dev/null
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagManagementDagActionStoreChangeMonitor.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.monitoring;
+
+import java.io.IOException;
+
+import com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.metrics.ContextAwareMeter;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.metrics.RuntimeMetrics;
+import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
+import org.apache.gobblin.service.modules.orchestration.DagManagement;
+import org.apache.gobblin.service.modules.orchestration.Orchestrator;
+
+
+/**
+ * A {@link DagActionStoreChangeMonitor} that should be used {@link 
org.apache.gobblin.service.ServiceConfigKeys#DAG_PROCESSING_ENGINE_ENABLED}
+ * is set.
+ */
+@Slf4j
+public class DagManagementDagActionStoreChangeMonitor extends 
DagActionStoreChangeMonitor {
+  private final DagManagement dagManagement;
+  protected ContextAwareMeter unexpectedLaunchEventErrors;
+
+  // Note that the topic is an empty string (rather than null to avoid NPE) 
because this monitor relies on the consumer
+  // client itself to determine all Kafka related information dynamically 
rather than through the config.
+  public DagManagementDagActionStoreChangeMonitor(Config config, int 
numThreads,
+      FlowCatalog flowCatalog, Orchestrator orchestrator, DagActionStore 
dagActionStore,
+      boolean isMultiActiveSchedulerEnabled, DagManagement dagManagement) {
+    // DagManager is only needed in the `handleDagAction` method of its parent 
class and not needed in this class,
+    // so we are passing a null value for DagManager to its parent class.
+    super("", config, null, numThreads, flowCatalog, orchestrator, 
dagActionStore, isMultiActiveSchedulerEnabled);
+    this.dagManagement = dagManagement;
+  }
+
+  /**
+   * This implementation passes on the {@link 
org.apache.gobblin.runtime.api.DagActionStore.DagAction} to the
+   * {@link DagManagement} instead of finding a {@link 
org.apache.gobblin.runtime.api.FlowSpec} and passing the spec to {@link 
Orchestrator}.
+   */
+  @Override
+  protected void handleDagAction(DagActionStore.DagAction dagAction, boolean 
isStartup) {
+    log.info("(" + (isStartup ? "on-startup" : "post-startup") + ") DagAction 
change ({}) received for flow: {}",
+        dagAction.getFlowActionType(), dagAction);
+    LaunchSubmissionMetricProxy launchSubmissionMetricProxy = isStartup ? 
ON_STARTUP : POST_STARTUP;
+    try {
+      // todo - add actions for other other type of dag actions
+      if 
(dagAction.getFlowActionType().equals(DagActionStore.FlowActionType.LAUNCH)) {
+        // If multi-active scheduler is NOT turned on we should not receive 
these type of events
+        if (!this.isMultiActiveSchedulerEnabled) {
+          this.unexpectedLaunchEventErrors.mark();
+          throw new RuntimeException(String.format("Received LAUNCH dagAction 
while not in multi-active scheduler "
+              + "mode for flowAction: %s", dagAction));
+        }
+        dagManagement.addDagAction(dagAction);
+      } else {
+        log.warn("Received unsupported dagAction {}. Expected to be a KILL, 
RESUME, or LAUNCH", dagAction.getFlowActionType());
+        this.unexpectedErrors.mark();
+      }
+    } catch (IOException e) {
+      log.warn("Failed to addDagAction for flowId {} due to exception {}", 
dagAction.getFlowId(), e.getMessage());
+      launchSubmissionMetricProxy.markFailure();
+    }
+  }
+
+  @Override
+  protected void createMetrics() {
+    super.createMetrics();
+    // Dag Action specific metrics
+    this.unexpectedLaunchEventErrors = 
this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_UNEXPECTED_ERRORS);
+  }
+}
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagProcEngineEnabledDagActionStoreChangeMonitorFactory.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagProcEngineEnabledDagActionStoreChangeMonitorFactory.java
new file mode 100644
index 000000000..fbf6bc336
--- /dev/null
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagProcEngineEnabledDagActionStoreChangeMonitorFactory.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.monitoring;
+
+import java.util.Objects;
+
+import com.typesafe.config.Config;
+
+import javax.inject.Inject;
+import javax.inject.Named;
+import javax.inject.Provider;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
+import org.apache.gobblin.runtime.util.InjectionNames;
+import org.apache.gobblin.service.modules.orchestration.DagManagement;
+import org.apache.gobblin.service.modules.orchestration.DagManager;
+import org.apache.gobblin.service.modules.orchestration.Orchestrator;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * A factory implementation that returns a {@link 
DagManagementDagActionStoreChangeMonitor} instance.
+ */
+@Slf4j
+public class DagProcEngineEnabledDagActionStoreChangeMonitorFactory implements 
Provider<DagActionStoreChangeMonitor> {
+  static final String DAG_ACTION_STORE_CHANGE_MONITOR_NUM_THREADS_KEY = 
"numThreads";
+
+  private final Config config;
+  private final FlowCatalog flowCatalog;
+  private final Orchestrator orchestrator;
+  private final DagActionStore dagActionStore;
+  private final boolean isMultiActiveSchedulerEnabled;
+  private final DagManagement dagManagement;
+
+  @Inject
+  public DagProcEngineEnabledDagActionStoreChangeMonitorFactory(Config config, 
DagManager dagManager, FlowCatalog flowCatalog,
+      Orchestrator orchestrator, DagActionStore dagActionStore, DagManagement 
dagManagement,
+      @Named(InjectionNames.MULTI_ACTIVE_SCHEDULER_ENABLED) boolean 
isMultiActiveSchedulerEnabled) {
+    this.config = Objects.requireNonNull(config);
+    this.flowCatalog = flowCatalog;
+    this.orchestrator = orchestrator;
+    this.dagActionStore = dagActionStore;
+    this.isMultiActiveSchedulerEnabled = isMultiActiveSchedulerEnabled;
+    this.dagManagement = dagManagement;
+  }
+
+  private DagManagementDagActionStoreChangeMonitor 
createDagActionStoreMonitor()
+    throws ReflectiveOperationException {
+    Config dagActionStoreChangeConfig = 
this.config.getConfig(DagActionStoreChangeMonitor.DAG_ACTION_CHANGE_MONITOR_PREFIX);
+    log.info("DagActionStore will be initialized with config {}", 
dagActionStoreChangeConfig);
+
+    int numThreads = ConfigUtils.getInt(dagActionStoreChangeConfig, 
DAG_ACTION_STORE_CHANGE_MONITOR_NUM_THREADS_KEY, 5);
+
+    return new 
DagManagementDagActionStoreChangeMonitor(dagActionStoreChangeConfig,
+        numThreads, flowCatalog, orchestrator, dagActionStore, 
isMultiActiveSchedulerEnabled, this.dagManagement);
+  }
+
+  @Override
+  public DagActionStoreChangeMonitor get() {
+    try {
+      DagActionStoreChangeMonitor changeMonitor = 
createDagActionStoreMonitor();
+      changeMonitor.initializeMonitor();
+      return changeMonitor;
+    } catch (ReflectiveOperationException e) {
+      throw new RuntimeException("Failed to initialize DagActionStoreMonitor 
due to ", e);
+    }
+  }
+}
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImplTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImplTest.java
new file mode 100644
index 000000000..c9998f046
--- /dev/null
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImplTest.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+import org.junit.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.typesafe.config.Config;
+
+import org.apache.gobblin.config.ConfigBuilder;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
+import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.TopologySpec;
+import org.apache.gobblin.service.modules.orchestration.proc.DagProc;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.service.modules.orchestration.task.LaunchDagTask;
+
+public class DagManagementTaskStreamImplTest {
+  private static final String TEST_USER = "testUser";
+  private static final String TEST_PASSWORD = "testPassword";
+  private static final String TEST_TABLE = "quotas";
+  static ITestMetastoreDatabase testMetastoreDatabase;
+  DagProcessingEngine.DagProcEngineThread dagProcEngineThread;
+  DagManagementTaskStreamImpl dagManagementTaskStream;
+  DagProcFactory dagProcFactory;
+
+  @BeforeClass
+  public void setUp() throws Exception {
+    // Setting up mock DB
+    testMetastoreDatabase = TestMetastoreDatabaseFactory.get();
+
+    Config config;
+    ConfigBuilder configBuilder = ConfigBuilder.create();
+    
configBuilder.addPrimitive(MostlyMySqlDagManagementStateStore.DAG_STATESTORE_CLASS_KEY,
 MostlyMySqlDagManagementStateStoreTest.TestMysqlDagStateStore.class.getName())
+        
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_URL_KEY),
 testMetastoreDatabase.getJdbcUrl())
+        
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_USER_KEY),
 TEST_USER)
+        
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_PASSWORD_KEY),
 TEST_PASSWORD)
+        
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_TABLE_KEY),
 TEST_TABLE);
+    config = configBuilder.build();
+
+    // Constructing TopologySpecMap.
+    Map<URI, TopologySpec> topologySpecMap = new HashMap<>();
+    String specExecInstance = "mySpecExecutor";
+    TopologySpec topologySpec = 
DagTestUtils.buildNaiveTopologySpec(specExecInstance);
+    URI specExecURI = new URI(specExecInstance);
+    topologySpecMap.put(specExecURI, topologySpec);
+    MostlyMySqlDagManagementStateStore dagManagementStateStore = new 
MostlyMySqlDagManagementStateStore(config, null);
+    dagManagementStateStore.setTopologySpecMap(topologySpecMap);
+    this.dagManagementTaskStream =
+        new DagManagementTaskStreamImpl(config, Optional.empty());
+    this.dagProcFactory = new DagProcFactory();
+    this.dagProcEngineThread = new DagProcessingEngine.DagProcEngineThread(
+        this.dagManagementTaskStream, this.dagProcFactory, 
dagManagementStateStore);
+  }
+
+  // This tests adding and removal of dag actions from dag task stream
+  // when we have different dag procs in future, we can test dag processing 
and exception handling
+  @Test
+  public void addRemoveDagActions() throws IOException {
+    dagManagementTaskStream.addDagAction(
+        new DagActionStore.DagAction("fg", "fn", "12345", 
DagActionStore.FlowActionType.LAUNCH));
+    DagTask dagTask = dagManagementTaskStream.next();
+    Assert.assertTrue(dagTask instanceof LaunchDagTask);
+    DagProc dagProc = dagTask.host(this.dagProcFactory);
+    Assert.assertNotNull(dagProc);
+  }
+}
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineTest.java
new file mode 100644
index 000000000..11a74d7db
--- /dev/null
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineTest.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.net.URI;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Optional;
+import java.util.concurrent.TimeoutException;
+
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.config.ConfigBuilder;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
+import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.TopologySpec;
+import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.service.modules.orchestration.proc.DagProc;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.testing.AssertWithBackoff;
+
+import static org.mockito.Mockito.spy;
+
+@Slf4j
+public class DagProcessingEngineTest {
+  private static final String TEST_USER = "testUser";
+  private static final String TEST_PASSWORD = "testPassword";
+  private static final String TEST_TABLE = "quotas";
+  static ITestMetastoreDatabase testMetastoreDatabase;
+  DagProcessingEngine.DagProcEngineThread dagProcEngineThread;
+  DagManagementTaskStreamImpl dagManagementTaskStream;
+  DagProcessingEngine dagProcessingEngine;
+  DagTaskStream dagTaskStream;
+  DagProcFactory dagProcFactory;
+
+  @BeforeClass
+  public void setUp() throws Exception {
+    // Setting up mock DB
+    testMetastoreDatabase = TestMetastoreDatabaseFactory.get();
+
+    Config config;
+    ConfigBuilder configBuilder = ConfigBuilder.create();
+    
configBuilder.addPrimitive(MostlyMySqlDagManagementStateStore.DAG_STATESTORE_CLASS_KEY,
 MostlyMySqlDagManagementStateStoreTest.TestMysqlDagStateStore.class.getName())
+        
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_URL_KEY),
 testMetastoreDatabase.getJdbcUrl())
+        
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_USER_KEY),
 TEST_USER)
+        
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_PASSWORD_KEY),
 TEST_PASSWORD)
+        
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_TABLE_KEY),
 TEST_TABLE);
+    config = configBuilder.build();
+
+    // Constructing TopologySpecMap.
+    Map<URI, TopologySpec> topologySpecMap = new HashMap<>();
+    String specExecInstance = "mySpecExecutor";
+    TopologySpec topologySpec = 
DagTestUtils.buildNaiveTopologySpec(specExecInstance);
+    URI specExecURI = new URI(specExecInstance);
+    topologySpecMap.put(specExecURI, topologySpec);
+    MostlyMySqlDagManagementStateStore dagManagementStateStore = new 
MostlyMySqlDagManagementStateStore(config, null);
+    dagManagementStateStore.setTopologySpecMap(topologySpecMap);
+    this.dagManagementTaskStream =
+        new DagManagementTaskStreamImpl(config, Optional.empty());
+    this.dagProcFactory = new DagProcFactory();
+    this.dagProcEngineThread = new DagProcessingEngine.DagProcEngineThread(
+        this.dagManagementTaskStream, this.dagProcFactory, 
dagManagementStateStore);
+    this.dagTaskStream = spy(new MockedDagTaskStream());
+    this.dagProcessingEngine =
+        new DagProcessingEngine(config, dagTaskStream, this.dagProcFactory, 
dagManagementStateStore);
+  }
+
+  static class MockedDagTaskStream implements DagTaskStream {
+    public static final int MAX_NUM_OF_TASKS = 1000;
+    public static final int FAILING_DAGS_FREQUENCY = 10;
+    volatile int i=0;
+
+    @Override
+    public boolean hasNext() {
+      return true;
+    }
+
+    @Override
+    public synchronized DagTask next() throws NoSuchElementException {
+      i++;
+      if (i > MAX_NUM_OF_TASKS) {
+        throw new RuntimeException("Simulating an exception to stop the 
thread!");
+      }
+      if (i % FAILING_DAGS_FREQUENCY == 0 ) {
+        return new MockedDagTask(new DagActionStore.DagAction("fg-" + i, "fn-" 
+ i, "1234" + i, DagActionStore.FlowActionType.LAUNCH), true);
+      } else {
+        return new MockedDagTask(new DagActionStore.DagAction("fg-" + i, "fn-" 
+ i, "1234" + i, DagActionStore.FlowActionType.LAUNCH), false);
+      }
+    }
+  }
+
+  static class MockedDagTask extends DagTask {
+    private final boolean isBad;
+
+    public MockedDagTask(DagActionStore.DagAction dagAction, boolean isBad) {
+      super(dagAction, null);
+      this.isBad = isBad;
+    }
+
+    @Override
+    public <T> T host(DagTaskVisitor<T> visitor) {
+      return (T) new MockedDagProc(this.isBad);
+    }
+
+    @Override
+    public boolean conclude() {
+      return false;
+    }
+  }
+
+  static class MockedDagProc extends DagProc<Void, Void> {
+    private final boolean isBad;
+    public MockedDagProc(boolean isBad) {
+      this.isBad = isBad;
+    }
+
+    @Override
+    protected Void initialize(DagManagementStateStore dagManagementStateStore) 
{
+      return null;
+    }
+
+    @Override
+    protected Void act(DagManagementStateStore dagManagementStateStore, Void 
state) {
+      if (this.isBad) {
+        throw new RuntimeException("Simulating an exception!");
+      }
+      return null;
+    }
+
+    @Override
+    protected void sendNotification(Void result, EventSubmitter 
eventSubmitter) {
+    }
+
+    @Override
+    protected void commit(DagManagementStateStore dagManagementStateStore, 
Void result) {
+    }
+  }
+
+  // This tests verifies that all the dag tasks entered to the dag task stream 
are retrieved by dag proc engine threads
+  @Test
+  public void dagProcessingTest() throws InterruptedException, 
TimeoutException {
+    // there are MAX_NUM_OF_TASKS dag tasks returned and then each thread 
additionally call (infinitely) once to wait
+    // in this unit tests, it does not infinitely wait though, because the 
mocked task stream throws an exception on
+    // (MAX_NUM_OF_TASKS + 1) th call
+    int expectedNumOfInvocations = MockedDagTaskStream.MAX_NUM_OF_TASKS + 
ServiceConfigKeys.DEFAULT_NUM_DAG_PROC_THREADS;
+    int expectedExceptions = MockedDagTaskStream.MAX_NUM_OF_TASKS / 
MockedDagTaskStream.FAILING_DAGS_FREQUENCY;
+
+    AssertWithBackoff.assertTrue(input -> 
Mockito.mockingDetails(this.dagTaskStream).getInvocations().size() == 
expectedNumOfInvocations,
+        10000L, "dagTaskStream was not called " + expectedNumOfInvocations + " 
number of times",
+        log, 1, 1000L);
+
+    
Assert.assertEquals(DagManagementTaskStreamImpl.getDagManagerMetrics().dagProcessingExceptionMeter.getCount(),
  expectedExceptions);
+  }
+}
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStoreTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStoreTest.java
index 4e1b914d1..0a8e421df 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStoreTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStoreTest.java
@@ -47,7 +47,7 @@ import 
org.apache.gobblin.service.modules.spec.JobExecutionPlan;
  */
 public class MostlyMySqlDagManagementStateStoreTest {
 
-  private DagManagementStateStore dagManagementStateStore;
+  private MostlyMySqlDagManagementStateStore dagManagementStateStore;
   private static final String TEST_USER = "testUser";
   private static final String TEST_PASSWORD = "testPassword";
   private static final String TEST_DAG_STATE_STORE = "TestDagStateStore";
@@ -75,7 +75,9 @@ public class MostlyMySqlDagManagementStateStoreTest {
     TopologySpec topologySpec = 
DagTestUtils.buildNaiveTopologySpec(specExecInstance);
     URI specExecURI = new URI(specExecInstance);
     topologySpecMap.put(specExecURI, topologySpec);
-    this.dagManagementStateStore = new 
MostlyMySqlDagManagementStateStore(config, topologySpecMap);
+    this.dagManagementStateStore = new 
MostlyMySqlDagManagementStateStore(config, null);
+    this.dagManagementStateStore.setTopologySpecMap(topologySpecMap);
+    this.dagManagementStateStore.start();
   }
 
   @Test
@@ -128,8 +130,7 @@ public class MostlyMySqlDagManagementStateStoreTest {
     @Override
     protected StateStore<State> createStateStore(Config config) {
       try {
-
-        String jdbcUrl = 
MostlyMySqlDagManagementStateStoreTest.testMetastoreDatabase.getJdbcUrl();
+        String jdbcUrl = 
config.getString(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_URL_KEY));
         HikariDataSource dataSource = new HikariDataSource();
 
         
dataSource.setDriverClassName(ConfigurationKeys.DEFAULT_STATE_STORE_DB_JDBC_DRIVER);

Reply via email to