[ 
https://issues.apache.org/jira/browse/GOBBLIN-1910?focusedWorklogId=883014&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-883014
 ]

ASF GitHub Bot logged work on GOBBLIN-1910:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 02/Oct/23 23:09
            Start Date: 02/Oct/23 23:09
    Worklog Time Spent: 10m 
      Work Description: umustafi commented on code in PR #3776:
URL: https://github.com/apache/gobblin/pull/3776#discussion_r1343192925


##########
gobblin-rest-service/gobblin-rest-api/src/main/snapshot/org.apache.gobblin.rest.jobExecutions.snapshot.json:
##########


Review Comment:
   Is this meant to be a part of ur change? was this done or included by 
mistake?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java:
##########
@@ -0,0 +1,44 @@
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.Map;
+
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+
+/**
+ * An interface to provide abstractions for managing {@link Dag} and {@link 
org.apache.gobblin.service.modules.flowgraph.Dag.DagNode} states
+ */
+public interface DagManagementStateStore {
+
+  public void addJobState(String dagId, Dag.DagNode<JobExecutionPlan> dagNode);
+
+  public void deleteJobState(String dagId, Dag.DagNode<JobExecutionPlan> 
dagNode);
+
+  public boolean hasRunningJobs(String dagId);
+
+  public void removeDagActionFromStore(DagManager.DagId dagId, 
DagActionStore.FlowActionType flowActionType) throws IOException;
+
+  public void addDagSLA(String dagId, Long flowSla);
+
+  public Long getDagSLA(String dagId);

Review Comment:
   are these dagStart or dagKill Deadlines? We have both configurable so may 
want to create separate functions for each. 



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngine.java:
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.util.Optional;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.service.modules.orchestration.proc.DagProc;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.util.ConfigUtils;
+
+import static 
org.apache.gobblin.service.ServiceConfigKeys.GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_ENABLED_KEY;
+
+
+/**
+ * Responsible for polling {@link DagTask}s from {@link DagTaskStream} and 
processing the {@link org.apache.gobblin.service.modules.flowgraph.Dag}
+ * based on the type of {@link DagTask} which is determined by the {@link 
org.apache.gobblin.runtime.api.DagActionStore.DagAction}.
+ * Each {@link DagTask} acquires a lease for the {@link 
org.apache.gobblin.runtime.api.DagActionStore.DagAction}.
+ * The {@link DagProcFactory} then provides the appropriate {@link DagProc} 
associated with the {@link DagTask}.
+ * The actual work or processing is done by the {@link 
DagProc#process(DagManagementStateStore, int, long)}
+ */
+
+@Alpha
+@Slf4j
+public class DagProcessingEngine {
+
+  public static final String NUM_THREADS_KEY = 
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_ENABLED_KEY + "numThreads";
+  public static final String JOB_STATUS_POLLING_INTERVAL_KEY = 
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_ENABLED_KEY + "pollingInterval";
+  public static final String MAX_RETRY_ATTEMPTS = 
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_ENABLED_KEY + "maxRetryAttempts";
+  public static final String RETRY_DELAY_MS = 
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_ENABLED_KEY + "retryDelayMillis";
+
+
+  private static final Integer DEFAULT_NUM_THREADS = 3;
+  private static final Integer DEFAULT_JOB_STATUS_POLLING_INTERVAL = 10;
+  private static final Integer DEFAULT_MAX_RETRY_ATTEMPTS = 3;
+  private static final long DEFAULT_RETRY_DELAY_MS = 1000;

Review Comment:
   `millis` to differentiate from `micros`, also add the time unit to polling 
interval



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java:
##########
@@ -171,6 +176,12 @@ public void configure(Binder binder) {
     if (serviceConfig.isMultiActiveSchedulerEnabled()) {
       
binder.bind(MultiActiveLeaseArbiter.class).to(MysqlMultiActiveLeaseArbiter.class);
       binder.bind(FlowTriggerHandler.class);
+      if(serviceConfig.isDagProcessingEngineEnabled()) {
+        
binder.bind(DagManagementStateStore.class).to(InMemoryDagManagementStateStore.class);
+        binder.bind(DagProcFactory.class).in(Singleton.class);
+        binder.bind(DagProcessingEngine.class).in(Singleton.class);
+        binder.bind(DagTaskStream.class).in(Singleton.class);

Review Comment:
   I believe you need to create an `OptionalBinder` like I've done above 
`OptionalBinder.newOptionalBinder(binder,MultiActiveLeaseArbiter.class);` as 
each of these classes are instantiated only in certain cases. 



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagTaskStream.java:
##########
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Properties;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.jetbrains.annotations.NotNull;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigException;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.orchestration.proc.KillDagProc;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.service.modules.orchestration.task.KillDagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.monitoring.JobStatus;
+import org.apache.gobblin.service.monitoring.JobStatusRetriever;
+
+import static org.apache.gobblin.service.ExecutionStatus.ORCHESTRATED;
+import static org.apache.gobblin.service.ExecutionStatus.valueOf;
+
+
+/**
+ * Holds a stream of {@link DagTask} that {@link DagProcessingEngine} would 
pull from to process, as it is ready for more work.
+ * It provides an implementation for {@link DagManagement} that defines the 
rules for a flow and job.
+ * Implements {@link Iterable} to provide {@link DagTask}s as soon as it's 
available to {@link DagProcessingEngine}
+ */
+
+@Alpha
+@Slf4j
+@AllArgsConstructor
+public class DagTaskStream implements Iterable<DagTask>, DagManagement {
+
+  @Getter
+  private final BlockingQueue<DagActionStore.DagAction> dagActionQueue = new 
LinkedBlockingQueue<>();
+  private FlowTriggerHandler flowTriggerHandler;
+  private DagManagementStateStore dagManagementStateStore;
+  private DagManagerMetrics dagManagerMetrics;
+
+
+  @NotNull
+  @Override
+  public Iterator<DagTask> iterator() {
+    return new Iterator<DagTask>() {
+
+      @Override
+      public boolean hasNext() {
+        return true;
+      }
+
+      @Override
+      public DagTask next() {
+
+        DagTask dagTask = null;
+        while(true) {
+          DagActionStore.DagAction dagAction = take();
+          Properties jobProps = getJobProperties(dagAction);
+          try {
+            MultiActiveLeaseArbiter.LeaseAttemptStatus leaseAttemptStatus = 
flowTriggerHandler.getLeaseOnDagAction(jobProps, dagAction, 
System.currentTimeMillis());
+            if(leaseAttemptStatus instanceof 
MultiActiveLeaseArbiter.LeaseObtainedStatus) {
+              dagTask = createDagTask(dagAction,
+                  (MultiActiveLeaseArbiter.LeaseObtainedStatus) 
leaseAttemptStatus);
+            }
+            if (dagTask != null) {
+              break; // Exit the loop when dagTask is non-null
+            }

Review Comment:
   let's move this check to above conditional 



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagTaskStream.java:
##########
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Properties;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.jetbrains.annotations.NotNull;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigException;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.orchestration.proc.KillDagProc;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.service.modules.orchestration.task.KillDagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.monitoring.JobStatus;
+import org.apache.gobblin.service.monitoring.JobStatusRetriever;
+
+import static org.apache.gobblin.service.ExecutionStatus.ORCHESTRATED;
+import static org.apache.gobblin.service.ExecutionStatus.valueOf;
+
+
+/**
+ * Holds a stream of {@link DagTask} that {@link DagProcessingEngine} would 
pull from to process, as it is ready for more work.
+ * It provides an implementation for {@link DagManagement} that defines the 
rules for a flow and job.
+ * Implements {@link Iterable} to provide {@link DagTask}s as soon as it's 
available to {@link DagProcessingEngine}
+ */
+
+@Alpha
+@Slf4j
+@AllArgsConstructor
+public class DagTaskStream implements Iterable<DagTask>, DagManagement {
+
+  @Getter
+  private final BlockingQueue<DagActionStore.DagAction> dagActionQueue = new 
LinkedBlockingQueue<>();
+  private FlowTriggerHandler flowTriggerHandler;
+  private DagManagementStateStore dagManagementStateStore;
+  private DagManagerMetrics dagManagerMetrics;
+
+
+  @NotNull
+  @Override
+  public Iterator<DagTask> iterator() {
+    return new Iterator<DagTask>() {
+
+      @Override
+      public boolean hasNext() {
+        return true;
+      }
+
+      @Override
+      public DagTask next() {
+
+        DagTask dagTask = null;
+        while(true) {
+          DagActionStore.DagAction dagAction = take();
+          Properties jobProps = getJobProperties(dagAction);
+          try {
+            MultiActiveLeaseArbiter.LeaseAttemptStatus leaseAttemptStatus = 
flowTriggerHandler.getLeaseOnDagAction(jobProps, dagAction, 
System.currentTimeMillis());
+            if(leaseAttemptStatus instanceof 
MultiActiveLeaseArbiter.LeaseObtainedStatus) {
+              dagTask = createDagTask(dagAction,
+                  (MultiActiveLeaseArbiter.LeaseObtainedStatus) 
leaseAttemptStatus);
+            }
+            if (dagTask != null) {
+              break; // Exit the loop when dagTask is non-null
+            }
+          } catch (IOException e) {
+            //TODO: need to handle exceptions gracefully
+            throw new RuntimeException(e);
+          }
+        }
+        return dagTask;
+      }
+    };
+  }
+
+  private boolean add(DagActionStore.DagAction dagAction) {
+    return this.dagActionQueue.offer(dagAction);
+  }
+
+  private DagActionStore.DagAction take() {
+    return this.dagActionQueue.poll();
+  }
+
+  private DagTask createDagTask(DagActionStore.DagAction dagAction, 
MultiActiveLeaseArbiter.LeaseObtainedStatus leaseObtainedStatus) {
+    DagActionStore.FlowActionType flowActionType = 
dagAction.getFlowActionType();
+    switch (flowActionType) {
+      case KILL:
+        return new KillDagTask(dagAction, leaseObtainedStatus);
+      case RESUME:
+      case LAUNCH:
+      case ADVANCE:
+      default:
+        log.warn("It should not reach here. Yet to provide implementation.");
+        return null;
+    }
+  }
+
+  protected void complete(DagTask dagTask) throws IOException {
+    dagTask.conclude(this.flowTriggerHandler.getMultiActiveLeaseArbiter());
+  }
+
+  @Override
+  public void launchFlow(String flowGroup, String flowName, long 
eventTimestamp) {
+    //TODO: provide implementation after finalizing code flow
+    throw new UnsupportedOperationException("Currently launch flow is not 
supported.");
+  }
+
+  @Override
+  public void resumeFlow(DagActionStore.DagAction killAction, long 
eventTimestamp) {
+    //TODO: provide implementation after finalizing code flow
+    throw new UnsupportedOperationException("Currently resume flow is not 
supported.");
+  }
+
+  @Override
+  public void killFlow(DagActionStore.DagAction killAction, long 
eventTimestamp) throws IOException {
+     if(!add(killAction)) {
+        throw new IOException("Could not add kill dag action: " + killAction + 
" to the queue.");
+      }
+  }
+  /**
+   * Check if the SLA is configured for the flow this job belongs to.
+   * If it is, this method will enforce on cancelling the job when SLA is 
reached.
+   * It will return true and cancellation would be initiated via {@link 
org.apache.gobblin.service.modules.orchestration.proc.AdvanceDagProc}
+   * by creating {@link 
org.apache.gobblin.runtime.api.DagActionStore.DagAction} on the flowtype: 
{@link org.apache.gobblin.runtime.api.DagActionStore.FlowActionType#CANCEL}
+   * The Flow SLA will be set when the {@link Dag} is launched either via 
Scheduler or REST client
+   * @param node dag node of the job
+   * @return true if the job reached sla needs to be cancelled
+   * @throws ExecutionException exception
+   * @throws InterruptedException exception
+   */
+

Review Comment:
   extra newline. is it launched any other way?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagTaskStream.java:
##########
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Properties;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.jetbrains.annotations.NotNull;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigException;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.orchestration.proc.KillDagProc;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.service.modules.orchestration.task.KillDagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.monitoring.JobStatus;
+import org.apache.gobblin.service.monitoring.JobStatusRetriever;
+
+import static org.apache.gobblin.service.ExecutionStatus.ORCHESTRATED;
+import static org.apache.gobblin.service.ExecutionStatus.valueOf;
+
+
+/**
+ * Holds a stream of {@link DagTask} that {@link DagProcessingEngine} would 
pull from to process, as it is ready for more work.
+ * It provides an implementation for {@link DagManagement} that defines the 
rules for a flow and job.
+ * Implements {@link Iterable} to provide {@link DagTask}s as soon as it's 
available to {@link DagProcessingEngine}
+ */
+
+@Alpha
+@Slf4j
+@AllArgsConstructor
+public class DagTaskStream implements Iterable<DagTask>, DagManagement {
+
+  @Getter
+  private final BlockingQueue<DagActionStore.DagAction> dagActionQueue = new 
LinkedBlockingQueue<>();
+  private FlowTriggerHandler flowTriggerHandler;
+  private DagManagementStateStore dagManagementStateStore;
+  private DagManagerMetrics dagManagerMetrics;
+
+
+  @NotNull
+  @Override
+  public Iterator<DagTask> iterator() {
+    return new Iterator<DagTask>() {
+
+      @Override
+      public boolean hasNext() {
+        return true;
+      }
+
+      @Override
+      public DagTask next() {
+
+        DagTask dagTask = null;
+        while(true) {
+          DagActionStore.DagAction dagAction = take();
+          Properties jobProps = getJobProperties(dagAction);
+          try {
+            MultiActiveLeaseArbiter.LeaseAttemptStatus leaseAttemptStatus = 
flowTriggerHandler.getLeaseOnDagAction(jobProps, dagAction, 
System.currentTimeMillis());
+            if(leaseAttemptStatus instanceof 
MultiActiveLeaseArbiter.LeaseObtainedStatus) {
+              dagTask = createDagTask(dagAction,
+                  (MultiActiveLeaseArbiter.LeaseObtainedStatus) 
leaseAttemptStatus);
+            }
+            if (dagTask != null) {
+              break; // Exit the loop when dagTask is non-null
+            }
+          } catch (IOException e) {
+            //TODO: need to handle exceptions gracefully
+            throw new RuntimeException(e);
+          }
+        }
+        return dagTask;
+      }
+    };
+  }
+
+  private boolean add(DagActionStore.DagAction dagAction) {
+    return this.dagActionQueue.offer(dagAction);
+  }
+
+  private DagActionStore.DagAction take() {
+    return this.dagActionQueue.poll();
+  }
+
+  private DagTask createDagTask(DagActionStore.DagAction dagAction, 
MultiActiveLeaseArbiter.LeaseObtainedStatus leaseObtainedStatus) {
+    DagActionStore.FlowActionType flowActionType = 
dagAction.getFlowActionType();
+    switch (flowActionType) {
+      case KILL:
+        return new KillDagTask(dagAction, leaseObtainedStatus);
+      case RESUME:
+      case LAUNCH:
+      case ADVANCE:
+      default:
+        log.warn("It should not reach here. Yet to provide implementation.");

Review Comment:
   let's update this message to be more specific. `Received flowActionType that 
is not handled by createDagTask`. Instead of return null we should either 
return an `optional` or throw an exception and catch the exception/check for 
`optional.isPresent`



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/InMemoryDagManagementStateStore.java:
##########
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import com.google.common.collect.Lists;
+
+import lombok.Getter;
+import lombok.Synchronized;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+
+/**
+ * An implementation of {@link DagManagementStateStore} to provide information 
about dags, dag nodes and their job states.
+ * This store maintains and utilizes in-memory references about dags and their 
job states and is used
+ * to determine what the current status of the {@link Dag} and/or {@link 
Dag.DagNode} is and what actions needs to be
+ * taken next likewise mark it as: complete, failed, sla breached or simply 
clean up after completion.
+ * Going forward, each of these in-memory references will be read/write from 
MySQL store.
+ * Thus, the {@link DagManager} would then be stateless and operate 
independently.
+ */
+@Getter(onMethod_={@Synchronized})
+@Alpha
+public class InMemoryDagManagementStateStore implements 
DagManagementStateStore {
+  private final Map<Dag.DagNode<JobExecutionPlan>, Dag<JobExecutionPlan>> 
jobToDag = new HashMap<>();
+  private final Map<String, Dag<JobExecutionPlan>> dagIdToDags = new 
HashMap<>();
+  private final Set<String> failedDagIds = new HashSet<>();
+  private final Map<String, Dag<JobExecutionPlan>> dagIdToResumingDags = new 
HashMap<>();
+  // dagToJobs holds a map of dagId to running jobs of that dag
+  final Map<String, LinkedList<Dag.DagNode<JobExecutionPlan>>> dagToJobs = new 
HashMap<>();

Review Comment:
   shall we rename to `dagIdToRunningJob`



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/InMemoryDagManagementStateStore.java:
##########
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import com.google.common.collect.Lists;
+
+import lombok.Getter;
+import lombok.Synchronized;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+
+/**
+ * An implementation of {@link DagManagementStateStore} to provide information 
about dags, dag nodes and their job states.
+ * This store maintains and utilizes in-memory references about dags and their 
job states and is used
+ * to determine what the current status of the {@link Dag} and/or {@link 
Dag.DagNode} is and what actions needs to be
+ * taken next likewise mark it as: complete, failed, sla breached or simply 
clean up after completion.
+ * Going forward, each of these in-memory references will be read/write from 
MySQL store.
+ * Thus, the {@link DagManager} would then be stateless and operate 
independently.
+ */
+@Getter(onMethod_={@Synchronized})
+@Alpha
+public class InMemoryDagManagementStateStore implements 
DagManagementStateStore {
+  private final Map<Dag.DagNode<JobExecutionPlan>, Dag<JobExecutionPlan>> 
jobToDag = new HashMap<>();
+  private final Map<String, Dag<JobExecutionPlan>> dagIdToDags = new 
HashMap<>();
+  private final Set<String> failedDagIds = new HashSet<>();
+  private final Map<String, Dag<JobExecutionPlan>> dagIdToResumingDags = new 
HashMap<>();
+  // dagToJobs holds a map of dagId to running jobs of that dag
+  final Map<String, LinkedList<Dag.DagNode<JobExecutionPlan>>> dagToJobs = new 
HashMap<>();
+  final Map<String, Long> dagToSLA = new HashMap<>();
+  private final Set<String> dagIdstoClean = new HashSet<>();
+  private Optional<DagActionStore> dagActionStore;
+
+  @Override
+  public synchronized void deleteJobState(String dagId, 
Dag.DagNode<JobExecutionPlan> dagNode) {
+    this.jobToDag.remove(dagNode);
+    this.dagToJobs.get(dagId).remove(dagNode);
+    this.dagToSLA.remove(dagId);
+  }
+
+
+  @Override
+  public synchronized void addJobState(String dagId, 
Dag.DagNode<JobExecutionPlan> dagNode) {
+    Dag<JobExecutionPlan> dag = this.dagIdToDags.get(dagId);
+    this.jobToDag.put(dagNode, dag);
+    if (this.dagToJobs.containsKey(dagId)) {
+      this.dagToJobs.get(dagId).add(dagNode);
+    } else {
+      LinkedList<Dag.DagNode<JobExecutionPlan>> dagNodeList = 
Lists.newLinkedList();
+      dagNodeList.add(dagNode);
+      this.dagToJobs.put(dagId, dagNodeList);
+    }
+  }
+
+  @Override
+  public synchronized boolean hasRunningJobs(String dagId) {
+    List<Dag.DagNode<JobExecutionPlan>> dagNodes = this.dagToJobs.get(dagId);
+    return dagNodes != null && !dagNodes.isEmpty();
+  }
+
+  @Override
+  public synchronized void removeDagActionFromStore(DagManager.DagId dagId, 
DagActionStore.FlowActionType flowActionType) throws IOException {
+    if (this.dagActionStore.isPresent()) {
+      this.dagActionStore.get().deleteDagAction(
+          new DagActionStore.DagAction(dagId.flowGroup, dagId.flowName, 
dagId.flowExecutionId, flowActionType));
+    }
+  }
+
+  @Override
+  public void addDagSLA(String dagId, Long flowSla) {
+    this.dagToSLA.putIfAbsent(dagId, flowSla);
+  }
+
+  @Override
+  public Long getDagSLA(String dagId) {
+    if(this.dagToSLA.containsKey(dagId)) {
+      return this.dagToSLA.get(dagId);
+    }
+    return null;
+  }
+
+  @Override
+  public Dag<JobExecutionPlan> getDag(String dagId) {
+    if(this.dagIdToDags.containsKey(dagId)) {
+      return this.dagIdToDags.get(dagId);
+    }
+    return null;

Review Comment:
   we should return `Optional` not null, hard to remember to do null check



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagTaskStream.java:
##########
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Properties;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.jetbrains.annotations.NotNull;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigException;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.orchestration.proc.KillDagProc;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.service.modules.orchestration.task.KillDagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.monitoring.JobStatus;
+import org.apache.gobblin.service.monitoring.JobStatusRetriever;
+
+import static org.apache.gobblin.service.ExecutionStatus.ORCHESTRATED;
+import static org.apache.gobblin.service.ExecutionStatus.valueOf;
+
+
+/**
+ * Holds a stream of {@link DagTask} that {@link DagProcessingEngine} would 
pull from to process, as it is ready for more work.
+ * It provides an implementation for {@link DagManagement} that defines the 
rules for a flow and job.
+ * Implements {@link Iterable} to provide {@link DagTask}s as soon as it's 
available to {@link DagProcessingEngine}
+ */
+
+@Alpha
+@Slf4j
+@AllArgsConstructor
+public class DagTaskStream implements Iterable<DagTask>, DagManagement {
+
+  @Getter
+  private final BlockingQueue<DagActionStore.DagAction> dagActionQueue = new 
LinkedBlockingQueue<>();
+  private FlowTriggerHandler flowTriggerHandler;
+  private DagManagementStateStore dagManagementStateStore;
+  private DagManagerMetrics dagManagerMetrics;
+
+
+  @NotNull
+  @Override
+  public Iterator<DagTask> iterator() {
+    return new Iterator<DagTask>() {
+
+      @Override
+      public boolean hasNext() {
+        return true;
+      }
+
+      @Override
+      public DagTask next() {
+
+        DagTask dagTask = null;
+        while(true) {
+          DagActionStore.DagAction dagAction = take();
+          Properties jobProps = getJobProperties(dagAction);
+          try {
+            MultiActiveLeaseArbiter.LeaseAttemptStatus leaseAttemptStatus = 
flowTriggerHandler.getLeaseOnDagAction(jobProps, dagAction, 
System.currentTimeMillis());
+            if(leaseAttemptStatus instanceof 
MultiActiveLeaseArbiter.LeaseObtainedStatus) {
+              dagTask = createDagTask(dagAction,
+                  (MultiActiveLeaseArbiter.LeaseObtainedStatus) 
leaseAttemptStatus);
+            }
+            if (dagTask != null) {
+              break; // Exit the loop when dagTask is non-null
+            }
+          } catch (IOException e) {
+            //TODO: need to handle exceptions gracefully
+            throw new RuntimeException(e);
+          }
+        }
+        return dagTask;
+      }
+    };
+  }
+
+  private boolean add(DagActionStore.DagAction dagAction) {
+    return this.dagActionQueue.offer(dagAction);
+  }
+
+  private DagActionStore.DagAction take() {
+    return this.dagActionQueue.poll();
+  }
+
+  private DagTask createDagTask(DagActionStore.DagAction dagAction, 
MultiActiveLeaseArbiter.LeaseObtainedStatus leaseObtainedStatus) {
+    DagActionStore.FlowActionType flowActionType = 
dagAction.getFlowActionType();
+    switch (flowActionType) {
+      case KILL:
+        return new KillDagTask(dagAction, leaseObtainedStatus);
+      case RESUME:
+      case LAUNCH:
+      case ADVANCE:
+      default:
+        log.warn("It should not reach here. Yet to provide implementation.");
+        return null;
+    }
+  }
+
+  protected void complete(DagTask dagTask) throws IOException {
+    dagTask.conclude(this.flowTriggerHandler.getMultiActiveLeaseArbiter());
+  }
+
+  @Override
+  public void launchFlow(String flowGroup, String flowName, long 
eventTimestamp) {
+    //TODO: provide implementation after finalizing code flow
+    throw new UnsupportedOperationException("Currently launch flow is not 
supported.");
+  }
+
+  @Override
+  public void resumeFlow(DagActionStore.DagAction killAction, long 
eventTimestamp) {
+    //TODO: provide implementation after finalizing code flow
+    throw new UnsupportedOperationException("Currently resume flow is not 
supported.");
+  }
+
+  @Override
+  public void killFlow(DagActionStore.DagAction killAction, long 
eventTimestamp) throws IOException {
+     if(!add(killAction)) {
+        throw new IOException("Could not add kill dag action: " + killAction + 
" to the queue.");
+      }
+  }
+  /**
+   * Check if the SLA is configured for the flow this job belongs to.
+   * If it is, this method will enforce on cancelling the job when SLA is 
reached.
+   * It will return true and cancellation would be initiated via {@link 
org.apache.gobblin.service.modules.orchestration.proc.AdvanceDagProc}
+   * by creating {@link 
org.apache.gobblin.runtime.api.DagActionStore.DagAction} on the flowtype: 
{@link org.apache.gobblin.runtime.api.DagActionStore.FlowActionType#CANCEL}
+   * The Flow SLA will be set when the {@link Dag} is launched either via 
Scheduler or REST client
+   * @param node dag node of the job
+   * @return true if the job reached sla needs to be cancelled
+   * @throws ExecutionException exception
+   * @throws InterruptedException exception
+   */
+
+  @Override
+  public boolean enforceFlowCompletionDeadline(Dag.DagNode<JobExecutionPlan> 
node) throws ExecutionException, InterruptedException {
+    //TODO: need to fetch timestamps from database when multi-active is enabled
+    long flowStartTime = DagManagerUtils.getFlowStartTime(node);
+    long currentTime = System.currentTimeMillis();
+    String dagId = DagManagerUtils.generateDagId(node).toString();
+
+    long flowSla = this.dagManagementStateStore.getDagSLA(dagId);
+
+    if (currentTime > flowStartTime + flowSla) {
+      log.info("Flow {} exceeded the SLA of {} ms. Enforce cancellation of the 
job {} ...",
+          
node.getValue().getJobSpec().getConfig().getString(ConfigurationKeys.FLOW_NAME_KEY),
 flowSla,
+          
node.getValue().getJobSpec().getConfig().getString(ConfigurationKeys.JOB_NAME_KEY));
+      dagManagerMetrics.incrementExecutorSlaExceeded(node);
+      return true;
+    }
+    return false;
+  }
+
+  /**
+   * Enforce cancel on the job if the job has been "orphaned". A job is 
orphaned if has been in ORCHESTRATED
+   * {@link ExecutionStatus} for some specific amount of time.
+   * @param node {@link Dag.DagNode} representing the job
+   * @param jobStatus current {@link JobStatus} of the job
+   * @return true if the total time that the job remains in the ORCHESTRATED 
state exceeds
+   * {@value ConfigurationKeys#GOBBLIN_JOB_START_SLA_TIME}.
+   */
+
+  @Override
+  public boolean enforceJobStartDeadline(Dag.DagNode<JobExecutionPlan> node, 
JobStatus jobStatus) throws ExecutionException, InterruptedException {
+    if (jobStatus == null) {
+      return false;
+    }
+    ExecutionStatus executionStatus = valueOf(jobStatus.getEventName());
+    //TODO: timestamps needs to be fetched from database instead of using 
System.currentTimeMillis()
+    long timeOutForJobStart = DagManagerUtils.getJobStartSla(node, 
System.currentTimeMillis());
+    long jobOrchestratedTime = jobStatus.getOrchestratedTime();
+    if (executionStatus == ORCHESTRATED && System.currentTimeMillis() - 
jobOrchestratedTime > timeOutForJobStart) {
+      log.info("Job {} of flow {} exceeded the job start SLA of {} ms. Enforce 
cancellation of the job ...",
+          DagManagerUtils.getJobName(node),
+          DagManagerUtils.getFullyQualifiedDagName(node),
+          timeOutForJobStart);
+      dagManagerMetrics.incrementCountsStartSlaExceeded(node);

Review Comment:
   this doesn't actually kill or restart the job it looks like right, we just 
return a boolean. Let's rename the function then to say `is...DeadlineExceeded` 
for this function and above one. Let's have a separate function that enforces 
it and does the action. 



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/CleanUpDagProc.java:
##########
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.proc;
+
+import java.io.IOException;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import 
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import 
org.apache.gobblin.service.modules.orchestration.exception.MaybeRetryableException;
+import org.apache.gobblin.service.modules.orchestration.task.CleanUpDagTask;
+
+
+/**
+ * An implementation of {@link DagProc} that is responsible for cleaning up 
{@link Dag} that has reached an end state
+ * likewise: FAILED, COMPLETE or CANCELED

Review Comment:
   nit: `ie:` or `including:`



##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java:
##########
@@ -136,14 +144,22 @@ protected void processMessage(DecodeableKafkaRecord 
message) {
     // We only expect INSERT and DELETE operations done to this table. INSERTs 
correspond to any type of
     // {@link DagActionStore.FlowActionType} flow requests that have to be 
processed. DELETEs require no action.
     try {
+
       if (operation.equals("INSERT")) {
         if (dagActionType.equals(DagActionStore.FlowActionType.RESUME)) {
           log.info("Received insert dag action and about to send resume flow 
request");
           dagManager.handleResumeFlowRequest(flowGroup, 
flowName,Long.parseLong(flowExecutionId));
+          //TODO: add a flag for if condition only if multi-active is enabled
           this.resumesInvoked.mark();
         } else if (dagActionType.equals(DagActionStore.FlowActionType.KILL)) {
           log.info("Received insert dag action and about to send kill flow 
request");
           dagManager.handleKillFlowRequest(flowGroup, flowName, 
Long.parseLong(flowExecutionId));

Review Comment:
   the conditional above should be checked first and this should be the else 
right?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProc.java:
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.proc;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import 
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import 
org.apache.gobblin.service.modules.orchestration.exception.MaybeRetryableException;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+
+
+/**
+ * Responsible to performing the actual work for a given {@link DagTask}.
+ * It processes the {@link DagTask} by first initializing its state, 
performing actions
+ * based on the type of {@link DagTask} and finally submitting an event to the 
executor.
+ * @param <S> current state of the dag node
+ * @param <R> result after processing the dag node
+ */
+@Alpha
+@Slf4j
+public abstract class DagProc<S, R> {
+
+  abstract protected S initialize(DagManagementStateStore 
dagManagementStateStore) throws MaybeRetryableException, IOException;
+  abstract protected R act(S state, DagManagementStateStore 
dagManagementStateStore) throws MaybeRetryableException, Exception;
+  abstract protected void sendNotification(R result, EventSubmitter 
eventSubmitter) throws MaybeRetryableException, IOException;
+
+  public final void process(DagManagementStateStore dagManagementStateStore, 
EventSubmitter eventSubmitter, int maxRetryCount, long delayRetryMillis) {
+    try {
+      S state = this.initializeWithRetries(dagManagementStateStore, 
maxRetryCount, delayRetryMillis);
+      R result = this.actWithRetries(state, dagManagementStateStore, 
maxRetryCount, delayRetryMillis); // may be pass state store too here
+      this.sendNotificationWithRetries(result, eventSubmitter, maxRetryCount, 
delayRetryMillis);
+      log.info("Successfully processed Dag Request");
+    } catch (Exception ex) {
+      throw new RuntimeException("Cannot process Dag Request: ", ex);
+    }
+  }
+
+  protected final S initializeWithRetries(DagManagementStateStore 
dagManagementStateStore, int maxRetryCount, long delayRetryMillis) throws 
IOException {
+    for (int retryCount = 0; retryCount < maxRetryCount; retryCount++) {
+      try {
+        return this.initialize(dagManagementStateStore);
+      } catch (MaybeRetryableException e) {
+        if (retryCount < maxRetryCount - 1) { // Don't wait before the last 
retry
+          waitBeforeRetry(delayRetryMillis);
+        }
+      } catch (IOException ex) {
+        throw new RuntimeException(ex);
+      }
+    }
+    throw new RuntimeException("Max retry attempts reached. Cannot initialize 
Dag");
+  }
+
+  protected final R actWithRetries(S state, DagManagementStateStore 
dagManagementStateStore, int maxRetryCount, long delayRetryMillis) {

Review Comment:
   should we have a generic method that deals with retries and takes a function 
as parameter? Seems like you are repeating code between these two functions and 
want to handle them the same way



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagTaskStream.java:
##########
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.LinkedBlockingDeque;
+
+import com.codahale.metrics.Timer;
+import com.google.common.base.Preconditions;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigException;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.config.ConfigBuilder;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
+import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.orchestration.processor.KillDagProc;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.service.modules.orchestration.task.KillDagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import 
org.apache.gobblin.service.modules.utils.FlowCompilationValidationHelper;
+import org.apache.gobblin.service.monitoring.JobStatus;
+import org.apache.gobblin.service.monitoring.JobStatusRetriever;
+
+import static org.apache.gobblin.service.ExecutionStatus.ORCHESTRATED;
+import static org.apache.gobblin.service.ExecutionStatus.valueOf;
+
+
+/**
+ * Holds a stream of {@link DagTask} that {@link DagManager} would pull from 
to process, as it is ready for more work.
+ * It provides an implementation for {@link DagManagement} defines the rules 
for a flow and job.
+ * Implements {@link Iterator} to provide the next {@link DagTask} if 
available to {@link DagManager}
+ */
+
+@Alpha
+@Slf4j
+@AllArgsConstructor
+public class DagTaskStream implements Iterator<Optional<DagTask>>, 
DagManagement {
+
+  @Getter
+  private final BlockingDeque<DagActionStore.DagAction> dagActionQueue = new 
LinkedBlockingDeque<>();
+  private FlowTriggerHandler flowTriggerHandler;
+  private DagManagementStateStore dagManagementStateStore;
+  private DagManagerMetrics dagManagerMetrics;
+
+
+  @Override
+  public boolean hasNext() {
+    return true;
+  }
+
+
+  @Override
+  public Optional<DagTask> next() {
+
+    DagActionStore.DagAction dagAction = dagActionQueue.peek();
+    try {
+      Preconditions.checkArgument(dagAction != null, "No Dag Action found in 
the queue");
+      Properties jobProps = getJobProperties(dagAction);
+      Optional<MultiActiveLeaseArbiter.LeaseObtainedStatus> 
leaseObtainedStatus =
+          flowTriggerHandler.getLeaseOnDagAction(jobProps, dagAction, 
System.currentTimeMillis()).toJavaUtil();
+      if(leaseObtainedStatus.isPresent()) {
+        DagTask dagTask = createDagTask(dagAction, leaseObtainedStatus.get());
+        return Optional.of(dagTask);
+      }
+    } catch (Exception ex) {
+      //TODO: need to handle exceptions gracefully
+      throw new RuntimeException(ex);
+    }
+    return Optional.empty();
+  }
+
+  public boolean add(DagActionStore.DagAction dagAction) throws IOException {
+    return this.dagActionQueue.offer(dagAction);
+  }
+
+  public DagActionStore.DagAction take() {
+    return this.dagActionQueue.poll();
+  }
+
+  public DagTask createDagTask(DagActionStore.DagAction dagAction, 
MultiActiveLeaseArbiter.LeaseObtainedStatus leaseObtainedStatus) {
+    DagActionStore.FlowActionType flowActionType = 
dagAction.getFlowActionType();
+    switch (flowActionType) {
+      case KILL:
+        return new KillDagTask(dagAction, leaseObtainedStatus);
+      case RESUME:
+      case LAUNCH:
+      case ADVANCE:
+      default:
+        log.warn("It should not reach here. Yet to provide implementation.");
+        return null;
+    }
+  }
+
+  @Override
+  public void launchFlow(String flowGroup, String flowName, long  
triggerTimeStamp) {
+    //TODO: provide implementation after finalizing code flow
+    throw new UnsupportedOperationException("Currently launch flow is not 
supported.");
+  }
+
+  @Override
+  public void resumeFlow(String flowGroup, String flowName, String 
flowExecutionId, long  triggerTimeStamp) throws IOException, 
InterruptedException {
+    //TODO: provide implementation after finalizing code flow
+    throw new UnsupportedOperationException("Currently launch flow is not 
supported.");
+  }
+
+  @Override
+  public void killFlow(String flowGroup, String flowName, String 
flowExecutionId, long  produceTimestamp) throws IOException {
+    DagManager.DagId dagId = DagManagerUtils.generateDagId(flowGroup, 
flowName, flowExecutionId);
+    
if(!this.dagManagementStateStore.getDagIdToDags().containsKey(dagId.toString()))
 {
+      log.info("Invalid dag since not present in map. Hence cannot cancel it");
+      return;
+    }
+    DagActionStore.DagAction killAction = new 
DagActionStore.DagAction(flowGroup, flowName, flowExecutionId, 
DagActionStore.FlowActionType.KILL);
+      if(!add(killAction)) {
+        throw new IOException("Could not add kill dag action: " + killAction + 
" to the queue.");
+      }
+  }
+  /**
+   * Check if the SLA is configured for the flow this job belongs to.
+   * If it is, this method will try to cancel the job when SLA is reached.
+   *
+   * @param node dag node of the job
+   * @return true if the job is killed because it reached sla
+   * @throws ExecutionException exception
+   * @throws InterruptedException exception
+   */
+
+  @Override
+  public boolean enforceFlowCompletionDeadline(Dag.DagNode<JobExecutionPlan> 
node) throws ExecutionException, InterruptedException {
+    //TODO: need to distribute the responsibility outside of this class
+    long flowStartTime = DagManagerUtils.getFlowStartTime(node);
+    long currentTime = System.currentTimeMillis();
+    String dagId = DagManagerUtils.generateDagId(node).toString();
+
+    long flowSla;
+    if (this.dagManagementStateStore.getDagToSLA().containsKey(dagId)) {
+      flowSla = this.dagManagementStateStore.getDagToSLA().get(dagId);
+    } else {
+      try {
+        flowSla = DagManagerUtils.getFlowSLA(node);
+      } catch (ConfigException e) {
+        log.warn("Flow SLA for flowGroup: {}, flowName: {} is given in invalid 
format, using default SLA of {}",
+            
node.getValue().getJobSpec().getConfig().getString(ConfigurationKeys.FLOW_GROUP_KEY),
+            
node.getValue().getJobSpec().getConfig().getString(ConfigurationKeys.FLOW_NAME_KEY),
+            DagManagerUtils.DEFAULT_FLOW_SLA_MILLIS);
+        flowSla = DagManagerUtils.DEFAULT_FLOW_SLA_MILLIS;
+      }
+      this.dagManagementStateStore.getDagToSLA().put(dagId, flowSla);
+    }
+
+    if (currentTime > flowStartTime + flowSla) {
+      log.info("Flow {} exceeded the SLA of {} ms. Killing the job {} now...",
+          
node.getValue().getJobSpec().getConfig().getString(ConfigurationKeys.FLOW_NAME_KEY),
 flowSla,
+          
node.getValue().getJobSpec().getConfig().getString(ConfigurationKeys.JOB_NAME_KEY));
+      dagManagerMetrics.incrementExecutorSlaExceeded(node);
+      KillDagProc.killDagNode(node);
+
+      
this.dagManagementStateStore.getDagIdToDags().get(dagId).setFlowEvent(TimingEvent.FlowTimings.FLOW_RUN_DEADLINE_EXCEEDED);
+      
this.dagManagementStateStore.getDagIdToDags().get(dagId).setMessage("Flow 
killed due to exceeding SLA of " + flowSla + " ms");
+
+      return true;
+    }
+    return false;
+  }
+
+  /**
+   * Cancel the job if the job has been "orphaned". A job is orphaned if has 
been in ORCHESTRATED
+   * {@link ExecutionStatus} for some specific amount of time.
+   * @param node {@link Dag.DagNode} representing the job
+   * @param jobStatus current {@link JobStatus} of the job
+   * @return true if the total time that the job remains in the ORCHESTRATED 
state exceeds
+   * {@value ConfigurationKeys#GOBBLIN_JOB_START_SLA_TIME}.
+   */
+
+  @Override
+  public boolean enforceJobStartDeadline(Dag.DagNode<JobExecutionPlan> node, 
JobStatus jobStatus) throws ExecutionException, InterruptedException {
+    //TODO: need to distribute the responsibility outside of this class
+    if (jobStatus == null) {
+      return false;
+    }
+    ExecutionStatus executionStatus = valueOf(jobStatus.getEventName());
+    //TODO: initialize default job sla in millis via configs
+    long timeOutForJobStart = DagManagerUtils.getJobStartSla(node, 
System.currentTimeMillis());
+    long jobOrchestratedTime = jobStatus.getOrchestratedTime();
+    if (executionStatus == ORCHESTRATED && System.currentTimeMillis() - 
jobOrchestratedTime > timeOutForJobStart) {
+      log.info("Job {} of flow {} exceeded the job start SLA of {} ms. Killing 
the job now...",
+          DagManagerUtils.getJobName(node),
+          DagManagerUtils.getFullyQualifiedDagName(node),
+          timeOutForJobStart);
+      dagManagerMetrics.incrementCountsStartSlaExceeded(node);
+      KillDagProc.killDagNode(node);
+
+      String dagId = DagManagerUtils.generateDagId(node).toString();
+      
dagManagementStateStore.getDagIdToDags().get(dagId).setFlowEvent(TimingEvent.FlowTimings.FLOW_START_DEADLINE_EXCEEDED);
+      dagManagementStateStore.getDagIdToDags().get(dagId).setMessage("Flow 
killed because no update received for " + timeOutForJobStart + " ms after 
orchestration");
+      return true;
+    } else {
+      return false;
+    }
+
+  }
+
+  /**
+   * Retrieve the {@link JobStatus} from the {@link JobExecutionPlan}.
+   */
+
+  protected JobStatus retrieveJobStatus(Dag.DagNode<JobExecutionPlan> dagNode) 
{

Review Comment:
   plus 1 we can move these to the `DagManagementStateStore`, we can call those 
methods if needed



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java:
##########
@@ -0,0 +1,44 @@
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.Map;
+
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+
+/**
+ * An interface to provide abstractions for managing {@link Dag} and {@link 
org.apache.gobblin.service.modules.flowgraph.Dag.DagNode} states
+ */
+public interface DagManagementStateStore {
+
+  public void addJobState(String dagId, Dag.DagNode<JobExecutionPlan> dagNode);
+
+  public void deleteJobState(String dagId, Dag.DagNode<JobExecutionPlan> 
dagNode);
+
+  public boolean hasRunningJobs(String dagId);
+
+  public void removeDagActionFromStore(DagManager.DagId dagId, 
DagActionStore.FlowActionType flowActionType) throws IOException;
+
+  public void addDagSLA(String dagId, Long flowSla);
+
+  public Long getDagSLA(String dagId);
+
+  public Dag<JobExecutionPlan> getDag(String dagId);
+
+  public LinkedList<Dag.DagNode<JobExecutionPlan>> getJobs(String dagId) 
throws IOException;
+
+  public boolean addFailedDagId(String dagId);

Review Comment:
   does this add to failedDagId store?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagTaskStream.java:
##########
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Properties;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.jetbrains.annotations.NotNull;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigException;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.orchestration.proc.KillDagProc;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.service.modules.orchestration.task.KillDagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.monitoring.JobStatus;
+import org.apache.gobblin.service.monitoring.JobStatusRetriever;
+
+import static org.apache.gobblin.service.ExecutionStatus.ORCHESTRATED;
+import static org.apache.gobblin.service.ExecutionStatus.valueOf;
+
+
+/**
+ * Holds a stream of {@link DagTask} that {@link DagProcessingEngine} would 
pull from to process, as it is ready for more work.
+ * It provides an implementation for {@link DagManagement} that defines the 
rules for a flow and job.
+ * Implements {@link Iterable} to provide {@link DagTask}s as soon as it's 
available to {@link DagProcessingEngine}
+ */
+
+@Alpha
+@Slf4j
+@AllArgsConstructor
+public class DagTaskStream implements Iterable<DagTask>, DagManagement {
+
+  @Getter
+  private final BlockingQueue<DagActionStore.DagAction> dagActionQueue = new 
LinkedBlockingQueue<>();
+  private FlowTriggerHandler flowTriggerHandler;
+  private DagManagementStateStore dagManagementStateStore;
+  private DagManagerMetrics dagManagerMetrics;
+
+
+  @NotNull
+  @Override
+  public Iterator<DagTask> iterator() {
+    return new Iterator<DagTask>() {
+
+      @Override
+      public boolean hasNext() {
+        return true;
+      }
+
+      @Override
+      public DagTask next() {
+
+        DagTask dagTask = null;
+        while(true) {
+          DagActionStore.DagAction dagAction = take();
+          Properties jobProps = getJobProperties(dagAction);
+          try {
+            MultiActiveLeaseArbiter.LeaseAttemptStatus leaseAttemptStatus = 
flowTriggerHandler.getLeaseOnDagAction(jobProps, dagAction, 
System.currentTimeMillis());
+            if(leaseAttemptStatus instanceof 
MultiActiveLeaseArbiter.LeaseObtainedStatus) {
+              dagTask = createDagTask(dagAction,
+                  (MultiActiveLeaseArbiter.LeaseObtainedStatus) 
leaseAttemptStatus);
+            }
+            if (dagTask != null) {
+              break; // Exit the loop when dagTask is non-null
+            }
+          } catch (IOException e) {
+            //TODO: need to handle exceptions gracefully
+            throw new RuntimeException(e);
+          }
+        }
+        return dagTask;
+      }
+    };
+  }
+
+  private boolean add(DagActionStore.DagAction dagAction) {
+    return this.dagActionQueue.offer(dagAction);
+  }
+
+  private DagActionStore.DagAction take() {
+    return this.dagActionQueue.poll();

Review Comment:
   any exceptions possible to result in this method and one above?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagTaskStream.java:
##########
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Properties;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.jetbrains.annotations.NotNull;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigException;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.orchestration.proc.KillDagProc;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.service.modules.orchestration.task.KillDagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.monitoring.JobStatus;
+import org.apache.gobblin.service.monitoring.JobStatusRetriever;
+
+import static org.apache.gobblin.service.ExecutionStatus.ORCHESTRATED;
+import static org.apache.gobblin.service.ExecutionStatus.valueOf;
+
+
+/**
+ * Holds a stream of {@link DagTask} that {@link DagProcessingEngine} would 
pull from to process, as it is ready for more work.
+ * It provides an implementation for {@link DagManagement} that defines the 
rules for a flow and job.
+ * Implements {@link Iterable} to provide {@link DagTask}s as soon as it's 
available to {@link DagProcessingEngine}
+ */
+
+@Alpha
+@Slf4j
+@AllArgsConstructor
+public class DagTaskStream implements Iterable<DagTask>, DagManagement {
+
+  @Getter
+  private final BlockingQueue<DagActionStore.DagAction> dagActionQueue = new 
LinkedBlockingQueue<>();
+  private FlowTriggerHandler flowTriggerHandler;
+  private DagManagementStateStore dagManagementStateStore;
+  private DagManagerMetrics dagManagerMetrics;

Review Comment:
   `final` as well



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngine.java:
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.util.Optional;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.service.modules.orchestration.proc.DagProc;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.util.ConfigUtils;
+
+import static 
org.apache.gobblin.service.ServiceConfigKeys.GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_ENABLED_KEY;
+
+
+/**
+ * Responsible for polling {@link DagTask}s from {@link DagTaskStream} and 
processing the {@link org.apache.gobblin.service.modules.flowgraph.Dag}
+ * based on the type of {@link DagTask} which is determined by the {@link 
org.apache.gobblin.runtime.api.DagActionStore.DagAction}.
+ * Each {@link DagTask} acquires a lease for the {@link 
org.apache.gobblin.runtime.api.DagActionStore.DagAction}.
+ * The {@link DagProcFactory} then provides the appropriate {@link DagProc} 
associated with the {@link DagTask}.
+ * The actual work or processing is done by the {@link 
DagProc#process(DagManagementStateStore, int, long)}
+ */
+
+@Alpha
+@Slf4j
+public class DagProcessingEngine {
+
+  public static final String NUM_THREADS_KEY = 
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_ENABLED_KEY + "numThreads";
+  public static final String JOB_STATUS_POLLING_INTERVAL_KEY = 
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_ENABLED_KEY + "pollingInterval";
+  public static final String MAX_RETRY_ATTEMPTS = 
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_ENABLED_KEY + "maxRetryAttempts";
+  public static final String RETRY_DELAY_MS = 
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_ENABLED_KEY + "retryDelayMillis";
+
+
+  private static final Integer DEFAULT_NUM_THREADS = 3;
+  private static final Integer DEFAULT_JOB_STATUS_POLLING_INTERVAL = 10;
+  private static final Integer DEFAULT_MAX_RETRY_ATTEMPTS = 3;
+  private static final long DEFAULT_RETRY_DELAY_MS = 1000;

Review Comment:
   these can also be declared here or in configuration files above as is normal 
with other configurations but I'm less particular about it



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagTaskStream.java:
##########
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Properties;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.jetbrains.annotations.NotNull;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigException;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.orchestration.proc.KillDagProc;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.service.modules.orchestration.task.KillDagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.monitoring.JobStatus;
+import org.apache.gobblin.service.monitoring.JobStatusRetriever;
+
+import static org.apache.gobblin.service.ExecutionStatus.ORCHESTRATED;
+import static org.apache.gobblin.service.ExecutionStatus.valueOf;
+
+
+/**
+ * Holds a stream of {@link DagTask} that {@link DagProcessingEngine} would 
pull from to process, as it is ready for more work.

Review Comment:
   this sentence is a bit hard to read `Maintains a stream of ... that ... it 
polls when ready for more work to process`



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagTaskStream.java:
##########
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Properties;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.jetbrains.annotations.NotNull;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigException;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.orchestration.proc.KillDagProc;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.service.modules.orchestration.task.KillDagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.monitoring.JobStatus;
+import org.apache.gobblin.service.monitoring.JobStatusRetriever;
+
+import static org.apache.gobblin.service.ExecutionStatus.ORCHESTRATED;
+import static org.apache.gobblin.service.ExecutionStatus.valueOf;
+
+
+/**
+ * Holds a stream of {@link DagTask} that {@link DagProcessingEngine} would 
pull from to process, as it is ready for more work.
+ * It provides an implementation for {@link DagManagement} that defines the 
rules for a flow and job.
+ * Implements {@link Iterable} to provide {@link DagTask}s as soon as it's 
available to {@link DagProcessingEngine}
+ */
+
+@Alpha
+@Slf4j
+@AllArgsConstructor
+public class DagTaskStream implements Iterable<DagTask>, DagManagement {
+
+  @Getter
+  private final BlockingQueue<DagActionStore.DagAction> dagActionQueue = new 
LinkedBlockingQueue<>();
+  private FlowTriggerHandler flowTriggerHandler;
+  private DagManagementStateStore dagManagementStateStore;
+  private DagManagerMetrics dagManagerMetrics;
+
+
+  @NotNull
+  @Override
+  public Iterator<DagTask> iterator() {
+    return new Iterator<DagTask>() {
+
+      @Override
+      public boolean hasNext() {
+        return true;
+      }
+
+      @Override
+      public DagTask next() {
+
+        DagTask dagTask = null;
+        while(true) {
+          DagActionStore.DagAction dagAction = take();
+          Properties jobProps = getJobProperties(dagAction);
+          try {
+            MultiActiveLeaseArbiter.LeaseAttemptStatus leaseAttemptStatus = 
flowTriggerHandler.getLeaseOnDagAction(jobProps, dagAction, 
System.currentTimeMillis());
+            if(leaseAttemptStatus instanceof 
MultiActiveLeaseArbiter.LeaseObtainedStatus) {
+              dagTask = createDagTask(dagAction,
+                  (MultiActiveLeaseArbiter.LeaseObtainedStatus) 
leaseAttemptStatus);
+            }
+            if (dagTask != null) {
+              break; // Exit the loop when dagTask is non-null
+            }
+          } catch (IOException e) {
+            //TODO: need to handle exceptions gracefully
+            throw new RuntimeException(e);
+          }
+        }
+        return dagTask;
+      }
+    };
+  }
+
+  private boolean add(DagActionStore.DagAction dagAction) {
+    return this.dagActionQueue.offer(dagAction);
+  }
+
+  private DagActionStore.DagAction take() {
+    return this.dagActionQueue.poll();
+  }
+
+  private DagTask createDagTask(DagActionStore.DagAction dagAction, 
MultiActiveLeaseArbiter.LeaseObtainedStatus leaseObtainedStatus) {
+    DagActionStore.FlowActionType flowActionType = 
dagAction.getFlowActionType();
+    switch (flowActionType) {
+      case KILL:
+        return new KillDagTask(dagAction, leaseObtainedStatus);
+      case RESUME:
+      case LAUNCH:
+      case ADVANCE:
+      default:
+        log.warn("It should not reach here. Yet to provide implementation.");
+        return null;
+    }
+  }
+
+  protected void complete(DagTask dagTask) throws IOException {
+    dagTask.conclude(this.flowTriggerHandler.getMultiActiveLeaseArbiter());
+  }
+
+  @Override
+  public void launchFlow(String flowGroup, String flowName, long 
eventTimestamp) {
+    //TODO: provide implementation after finalizing code flow
+    throw new UnsupportedOperationException("Currently launch flow is not 
supported.");
+  }
+
+  @Override
+  public void resumeFlow(DagActionStore.DagAction killAction, long 
eventTimestamp) {
+    //TODO: provide implementation after finalizing code flow
+    throw new UnsupportedOperationException("Currently resume flow is not 
supported.");
+  }
+
+  @Override
+  public void killFlow(DagActionStore.DagAction killAction, long 
eventTimestamp) throws IOException {
+     if(!add(killAction)) {
+        throw new IOException("Could not add kill dag action: " + killAction + 
" to the queue.");
+      }
+  }
+  /**
+   * Check if the SLA is configured for the flow this job belongs to.
+   * If it is, this method will enforce on cancelling the job when SLA is 
reached.
+   * It will return true and cancellation would be initiated via {@link 
org.apache.gobblin.service.modules.orchestration.proc.AdvanceDagProc}
+   * by creating {@link 
org.apache.gobblin.runtime.api.DagActionStore.DagAction} on the flowtype: 
{@link org.apache.gobblin.runtime.api.DagActionStore.FlowActionType#CANCEL}
+   * The Flow SLA will be set when the {@link Dag} is launched either via 
Scheduler or REST client
+   * @param node dag node of the job
+   * @return true if the job reached sla needs to be cancelled
+   * @throws ExecutionException exception
+   * @throws InterruptedException exception
+   */
+
+  @Override
+  public boolean enforceFlowCompletionDeadline(Dag.DagNode<JobExecutionPlan> 
node) throws ExecutionException, InterruptedException {
+    //TODO: need to fetch timestamps from database when multi-active is enabled
+    long flowStartTime = DagManagerUtils.getFlowStartTime(node);
+    long currentTime = System.currentTimeMillis();
+    String dagId = DagManagerUtils.generateDagId(node).toString();
+
+    long flowSla = this.dagManagementStateStore.getDagSLA(dagId);
+
+    if (currentTime > flowStartTime + flowSla) {
+      log.info("Flow {} exceeded the SLA of {} ms. Enforce cancellation of the 
job {} ...",
+          
node.getValue().getJobSpec().getConfig().getString(ConfigurationKeys.FLOW_NAME_KEY),
 flowSla,
+          
node.getValue().getJobSpec().getConfig().getString(ConfigurationKeys.JOB_NAME_KEY));

Review Comment:
   good descriptive message. Let's add one detail exceeded the _completion SLA_ 
as opposed to start



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagTaskStream.java:
##########
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Properties;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.jetbrains.annotations.NotNull;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigException;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.orchestration.proc.KillDagProc;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.service.modules.orchestration.task.KillDagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.monitoring.JobStatus;
+import org.apache.gobblin.service.monitoring.JobStatusRetriever;
+
+import static org.apache.gobblin.service.ExecutionStatus.ORCHESTRATED;
+import static org.apache.gobblin.service.ExecutionStatus.valueOf;
+
+
+/**
+ * Holds a stream of {@link DagTask} that {@link DagProcessingEngine} would 
pull from to process, as it is ready for more work.
+ * It provides an implementation for {@link DagManagement} that defines the 
rules for a flow and job.
+ * Implements {@link Iterable} to provide {@link DagTask}s as soon as it's 
available to {@link DagProcessingEngine}
+ */
+
+@Alpha
+@Slf4j
+@AllArgsConstructor
+public class DagTaskStream implements Iterable<DagTask>, DagManagement {
+
+  @Getter
+  private final BlockingQueue<DagActionStore.DagAction> dagActionQueue = new 
LinkedBlockingQueue<>();
+  private FlowTriggerHandler flowTriggerHandler;
+  private DagManagementStateStore dagManagementStateStore;
+  private DagManagerMetrics dagManagerMetrics;
+
+
+  @NotNull
+  @Override
+  public Iterator<DagTask> iterator() {
+    return new Iterator<DagTask>() {
+
+      @Override
+      public boolean hasNext() {
+        return true;
+      }
+
+      @Override
+      public DagTask next() {
+
+        DagTask dagTask = null;
+        while(true) {
+          DagActionStore.DagAction dagAction = take();
+          Properties jobProps = getJobProperties(dagAction);
+          try {
+            MultiActiveLeaseArbiter.LeaseAttemptStatus leaseAttemptStatus = 
flowTriggerHandler.getLeaseOnDagAction(jobProps, dagAction, 
System.currentTimeMillis());
+            if(leaseAttemptStatus instanceof 
MultiActiveLeaseArbiter.LeaseObtainedStatus) {
+              dagTask = createDagTask(dagAction,
+                  (MultiActiveLeaseArbiter.LeaseObtainedStatus) 
leaseAttemptStatus);
+            }
+            if (dagTask != null) {
+              break; // Exit the loop when dagTask is non-null
+            }
+          } catch (IOException e) {
+            //TODO: need to handle exceptions gracefully
+            throw new RuntimeException(e);
+          }
+        }
+        return dagTask;
+      }
+    };
+  }
+
+  private boolean add(DagActionStore.DagAction dagAction) {
+    return this.dagActionQueue.offer(dagAction);
+  }
+
+  private DagActionStore.DagAction take() {
+    return this.dagActionQueue.poll();
+  }
+
+  private DagTask createDagTask(DagActionStore.DagAction dagAction, 
MultiActiveLeaseArbiter.LeaseObtainedStatus leaseObtainedStatus) {
+    DagActionStore.FlowActionType flowActionType = 
dagAction.getFlowActionType();
+    switch (flowActionType) {
+      case KILL:
+        return new KillDagTask(dagAction, leaseObtainedStatus);
+      case RESUME:
+      case LAUNCH:
+      case ADVANCE:
+      default:
+        log.warn("It should not reach here. Yet to provide implementation.");
+        return null;
+    }
+  }
+
+  protected void complete(DagTask dagTask) throws IOException {
+    dagTask.conclude(this.flowTriggerHandler.getMultiActiveLeaseArbiter());
+  }
+
+  @Override
+  public void launchFlow(String flowGroup, String flowName, long 
eventTimestamp) {
+    //TODO: provide implementation after finalizing code flow
+    throw new UnsupportedOperationException("Currently launch flow is not 
supported.");
+  }
+
+  @Override
+  public void resumeFlow(DagActionStore.DagAction killAction, long 
eventTimestamp) {
+    //TODO: provide implementation after finalizing code flow
+    throw new UnsupportedOperationException("Currently resume flow is not 
supported.");
+  }
+
+  @Override
+  public void killFlow(DagActionStore.DagAction killAction, long 
eventTimestamp) throws IOException {
+     if(!add(killAction)) {
+        throw new IOException("Could not add kill dag action: " + killAction + 
" to the queue.");
+      }
+  }
+  /**
+   * Check if the SLA is configured for the flow this job belongs to.
+   * If it is, this method will enforce on cancelling the job when SLA is 
reached.
+   * It will return true and cancellation would be initiated via {@link 
org.apache.gobblin.service.modules.orchestration.proc.AdvanceDagProc}
+   * by creating {@link 
org.apache.gobblin.runtime.api.DagActionStore.DagAction} on the flowtype: 
{@link org.apache.gobblin.runtime.api.DagActionStore.FlowActionType#CANCEL}

Review Comment:
   can shorten this since we have the "@return true" below and just have `via 
...` joining with sentence above



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngine.java:
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.util.Optional;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.service.modules.orchestration.proc.DagProc;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.util.ConfigUtils;
+
+import static 
org.apache.gobblin.service.ServiceConfigKeys.GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_ENABLED_KEY;
+
+
+/**
+ * Responsible for polling {@link DagTask}s from {@link DagTaskStream} and 
processing the {@link org.apache.gobblin.service.modules.flowgraph.Dag}
+ * based on the type of {@link DagTask} which is determined by the {@link 
org.apache.gobblin.runtime.api.DagActionStore.DagAction}.
+ * Each {@link DagTask} acquires a lease for the {@link 
org.apache.gobblin.runtime.api.DagActionStore.DagAction}.
+ * The {@link DagProcFactory} then provides the appropriate {@link DagProc} 
associated with the {@link DagTask}.
+ * The actual work or processing is done by the {@link 
DagProc#process(DagManagementStateStore, int, long)}
+ */
+
+@Alpha
+@Slf4j
+public class DagProcessingEngine {
+
+  public static final String NUM_THREADS_KEY = 
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_ENABLED_KEY + "numThreads";
+  public static final String JOB_STATUS_POLLING_INTERVAL_KEY = 
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_ENABLED_KEY + "pollingInterval";
+  public static final String MAX_RETRY_ATTEMPTS = 
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_ENABLED_KEY + "maxRetryAttempts";
+  public static final String RETRY_DELAY_MS = 
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_ENABLED_KEY + "retryDelayMillis";

Review Comment:
   let's put this in the `ServiceConfigKeys` or `ConfigurationKeys`. Also agree 
with kip's comment above that there should be a `dag_processing_prefix` used 
before them all



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagTaskStream.java:
##########
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Properties;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.jetbrains.annotations.NotNull;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigException;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.orchestration.proc.KillDagProc;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.service.modules.orchestration.task.KillDagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.monitoring.JobStatus;
+import org.apache.gobblin.service.monitoring.JobStatusRetriever;
+
+import static org.apache.gobblin.service.ExecutionStatus.ORCHESTRATED;
+import static org.apache.gobblin.service.ExecutionStatus.valueOf;
+
+
+/**
+ * Holds a stream of {@link DagTask} that {@link DagProcessingEngine} would 
pull from to process, as it is ready for more work.
+ * It provides an implementation for {@link DagManagement} that defines the 
rules for a flow and job.
+ * Implements {@link Iterable} to provide {@link DagTask}s as soon as it's 
available to {@link DagProcessingEngine}
+ */
+
+@Alpha
+@Slf4j
+@AllArgsConstructor
+public class DagTaskStream implements Iterable<DagTask>, DagManagement {
+
+  @Getter
+  private final BlockingQueue<DagActionStore.DagAction> dagActionQueue = new 
LinkedBlockingQueue<>();
+  private FlowTriggerHandler flowTriggerHandler;
+  private DagManagementStateStore dagManagementStateStore;
+  private DagManagerMetrics dagManagerMetrics;
+
+
+  @NotNull
+  @Override
+  public Iterator<DagTask> iterator() {
+    return new Iterator<DagTask>() {
+
+      @Override
+      public boolean hasNext() {
+        return true;
+      }
+
+      @Override
+      public DagTask next() {
+
+        DagTask dagTask = null;
+        while(true) {
+          DagActionStore.DagAction dagAction = take();
+          Properties jobProps = getJobProperties(dagAction);
+          try {
+            MultiActiveLeaseArbiter.LeaseAttemptStatus leaseAttemptStatus = 
flowTriggerHandler.getLeaseOnDagAction(jobProps, dagAction, 
System.currentTimeMillis());
+            if(leaseAttemptStatus instanceof 
MultiActiveLeaseArbiter.LeaseObtainedStatus) {
+              dagTask = createDagTask(dagAction,
+                  (MultiActiveLeaseArbiter.LeaseObtainedStatus) 
leaseAttemptStatus);
+            }
+            if (dagTask != null) {
+              break; // Exit the loop when dagTask is non-null
+            }
+          } catch (IOException e) {
+            //TODO: need to handle exceptions gracefully
+            throw new RuntimeException(e);
+          }
+        }
+        return dagTask;
+      }
+    };
+  }
+
+  private boolean add(DagActionStore.DagAction dagAction) {
+    return this.dagActionQueue.offer(dagAction);
+  }
+
+  private DagActionStore.DagAction take() {
+    return this.dagActionQueue.poll();
+  }
+
+  private DagTask createDagTask(DagActionStore.DagAction dagAction, 
MultiActiveLeaseArbiter.LeaseObtainedStatus leaseObtainedStatus) {
+    DagActionStore.FlowActionType flowActionType = 
dagAction.getFlowActionType();
+    switch (flowActionType) {
+      case KILL:
+        return new KillDagTask(dagAction, leaseObtainedStatus);
+      case RESUME:
+      case LAUNCH:
+      case ADVANCE:
+      default:
+        log.warn("It should not reach here. Yet to provide implementation.");
+        return null;
+    }
+  }
+
+  protected void complete(DagTask dagTask) throws IOException {
+    dagTask.conclude(this.flowTriggerHandler.getMultiActiveLeaseArbiter());
+  }
+
+  @Override
+  public void launchFlow(String flowGroup, String flowName, long 
eventTimestamp) {
+    //TODO: provide implementation after finalizing code flow
+    throw new UnsupportedOperationException("Currently launch flow is not 
supported.");
+  }
+
+  @Override
+  public void resumeFlow(DagActionStore.DagAction killAction, long 
eventTimestamp) {
+    //TODO: provide implementation after finalizing code flow
+    throw new UnsupportedOperationException("Currently resume flow is not 
supported.");
+  }
+
+  @Override
+  public void killFlow(DagActionStore.DagAction killAction, long 
eventTimestamp) throws IOException {
+     if(!add(killAction)) {
+        throw new IOException("Could not add kill dag action: " + killAction + 
" to the queue.");
+      }
+  }
+  /**
+   * Check if the SLA is configured for the flow this job belongs to.
+   * If it is, this method will enforce on cancelling the job when SLA is 
reached.

Review Comment:
   remove `on`



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagTaskStream.java:
##########
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Properties;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.jetbrains.annotations.NotNull;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigException;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.orchestration.proc.KillDagProc;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.service.modules.orchestration.task.KillDagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.monitoring.JobStatus;
+import org.apache.gobblin.service.monitoring.JobStatusRetriever;
+
+import static org.apache.gobblin.service.ExecutionStatus.ORCHESTRATED;
+import static org.apache.gobblin.service.ExecutionStatus.valueOf;
+
+
+/**
+ * Holds a stream of {@link DagTask} that {@link DagProcessingEngine} would 
pull from to process, as it is ready for more work.
+ * It provides an implementation for {@link DagManagement} that defines the 
rules for a flow and job.
+ * Implements {@link Iterable} to provide {@link DagTask}s as soon as it's 
available to {@link DagProcessingEngine}
+ */
+
+@Alpha
+@Slf4j
+@AllArgsConstructor
+public class DagTaskStream implements Iterable<DagTask>, DagManagement {
+
+  @Getter
+  private final BlockingQueue<DagActionStore.DagAction> dagActionQueue = new 
LinkedBlockingQueue<>();
+  private FlowTriggerHandler flowTriggerHandler;
+  private DagManagementStateStore dagManagementStateStore;
+  private DagManagerMetrics dagManagerMetrics;
+
+
+  @NotNull
+  @Override
+  public Iterator<DagTask> iterator() {
+    return new Iterator<DagTask>() {
+
+      @Override
+      public boolean hasNext() {
+        return true;
+      }
+
+      @Override
+      public DagTask next() {
+
+        DagTask dagTask = null;
+        while(true) {
+          DagActionStore.DagAction dagAction = take();
+          Properties jobProps = getJobProperties(dagAction);
+          try {
+            MultiActiveLeaseArbiter.LeaseAttemptStatus leaseAttemptStatus = 
flowTriggerHandler.getLeaseOnDagAction(jobProps, dagAction, 
System.currentTimeMillis());
+            if(leaseAttemptStatus instanceof 
MultiActiveLeaseArbiter.LeaseObtainedStatus) {
+              dagTask = createDagTask(dagAction,
+                  (MultiActiveLeaseArbiter.LeaseObtainedStatus) 
leaseAttemptStatus);
+            }
+            if (dagTask != null) {
+              break; // Exit the loop when dagTask is non-null
+            }
+          } catch (IOException e) {
+            //TODO: need to handle exceptions gracefully
+            throw new RuntimeException(e);
+          }
+        }
+        return dagTask;
+      }
+    };
+  }
+
+  private boolean add(DagActionStore.DagAction dagAction) {
+    return this.dagActionQueue.offer(dagAction);
+  }
+
+  private DagActionStore.DagAction take() {
+    return this.dagActionQueue.poll();
+  }
+
+  private DagTask createDagTask(DagActionStore.DagAction dagAction, 
MultiActiveLeaseArbiter.LeaseObtainedStatus leaseObtainedStatus) {
+    DagActionStore.FlowActionType flowActionType = 
dagAction.getFlowActionType();
+    switch (flowActionType) {
+      case KILL:
+        return new KillDagTask(dagAction, leaseObtainedStatus);
+      case RESUME:
+      case LAUNCH:
+      case ADVANCE:
+      default:
+        log.warn("It should not reach here. Yet to provide implementation.");
+        return null;
+    }
+  }
+
+  protected void complete(DagTask dagTask) throws IOException {
+    dagTask.conclude(this.flowTriggerHandler.getMultiActiveLeaseArbiter());
+  }
+
+  @Override
+  public void launchFlow(String flowGroup, String flowName, long 
eventTimestamp) {
+    //TODO: provide implementation after finalizing code flow
+    throw new UnsupportedOperationException("Currently launch flow is not 
supported.");
+  }
+
+  @Override
+  public void resumeFlow(DagActionStore.DagAction killAction, long 
eventTimestamp) {
+    //TODO: provide implementation after finalizing code flow
+    throw new UnsupportedOperationException("Currently resume flow is not 
supported.");
+  }
+
+  @Override
+  public void killFlow(DagActionStore.DagAction killAction, long 
eventTimestamp) throws IOException {
+     if(!add(killAction)) {
+        throw new IOException("Could not add kill dag action: " + killAction + 
" to the queue.");
+      }
+  }
+  /**
+   * Check if the SLA is configured for the flow this job belongs to.
+   * If it is, this method will enforce on cancelling the job when SLA is 
reached.
+   * It will return true and cancellation would be initiated via {@link 
org.apache.gobblin.service.modules.orchestration.proc.AdvanceDagProc}
+   * by creating {@link 
org.apache.gobblin.runtime.api.DagActionStore.DagAction} on the flowtype: 
{@link org.apache.gobblin.runtime.api.DagActionStore.FlowActionType#CANCEL}
+   * The Flow SLA will be set when the {@link Dag} is launched either via 
Scheduler or REST client
+   * @param node dag node of the job
+   * @return true if the job reached sla needs to be cancelled
+   * @throws ExecutionException exception
+   * @throws InterruptedException exception
+   */
+
+  @Override
+  public boolean enforceFlowCompletionDeadline(Dag.DagNode<JobExecutionPlan> 
node) throws ExecutionException, InterruptedException {
+    //TODO: need to fetch timestamps from database when multi-active is enabled
+    long flowStartTime = DagManagerUtils.getFlowStartTime(node);
+    long currentTime = System.currentTimeMillis();
+    String dagId = DagManagerUtils.generateDagId(node).toString();
+
+    long flowSla = this.dagManagementStateStore.getDagSLA(dagId);
+
+    if (currentTime > flowStartTime + flowSla) {
+      log.info("Flow {} exceeded the SLA of {} ms. Enforce cancellation of the 
job {} ...",
+          
node.getValue().getJobSpec().getConfig().getString(ConfigurationKeys.FLOW_NAME_KEY),
 flowSla,
+          
node.getValue().getJobSpec().getConfig().getString(ConfigurationKeys.JOB_NAME_KEY));
+      dagManagerMetrics.incrementExecutorSlaExceeded(node);
+      return true;
+    }
+    return false;
+  }
+
+  /**
+   * Enforce cancel on the job if the job has been "orphaned". A job is 
orphaned if has been in ORCHESTRATED
+   * {@link ExecutionStatus} for some specific amount of time.

Review Comment:
   specific amount of time determined by config?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/InMemoryDagManagementStateStore.java:
##########
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import com.google.common.collect.Lists;
+
+import lombok.Getter;
+import lombok.Synchronized;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+
+/**
+ * An implementation of {@link DagManagementStateStore} to provide information 
about dags, dag nodes and their job states.
+ * This store maintains and utilizes in-memory references about dags and their 
job states and is used
+ * to determine what the current status of the {@link Dag} and/or {@link 
Dag.DagNode} is and what actions needs to be
+ * taken next likewise mark it as: complete, failed, sla breached or simply 
clean up after completion.
+ * Going forward, each of these in-memory references will be read/write from 
MySQL store.
+ * Thus, the {@link DagManager} would then be stateless and operate 
independently.
+ */
+@Getter(onMethod_={@Synchronized})
+@Alpha
+public class InMemoryDagManagementStateStore implements 
DagManagementStateStore {
+  private final Map<Dag.DagNode<JobExecutionPlan>, Dag<JobExecutionPlan>> 
jobToDag = new HashMap<>();
+  private final Map<String, Dag<JobExecutionPlan>> dagIdToDags = new 
HashMap<>();
+  private final Set<String> failedDagIds = new HashSet<>();
+  private final Map<String, Dag<JobExecutionPlan>> dagIdToResumingDags = new 
HashMap<>();
+  // dagToJobs holds a map of dagId to running jobs of that dag
+  final Map<String, LinkedList<Dag.DagNode<JobExecutionPlan>>> dagToJobs = new 
HashMap<>();
+  final Map<String, Long> dagToSLA = new HashMap<>();
+  private final Set<String> dagIdstoClean = new HashSet<>();
+  private Optional<DagActionStore> dagActionStore;
+
+  @Override
+  public synchronized void deleteJobState(String dagId, 
Dag.DagNode<JobExecutionPlan> dagNode) {

Review Comment:
   may throw exception if the `dagNode` does not exist



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/AdvanceDagProc.java:
##########
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.service.modules.orchestration.proc;
+
+import java.io.IOException;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import 
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import 
org.apache.gobblin.service.modules.orchestration.exception.MaybeRetryableException;
+import org.apache.gobblin.service.modules.orchestration.task.AdvanceDagTask;
+
+
+/**
+ * An implementation of {@link DagProc} dealing which advancing to the next 
node in the {@link Dag}.

Review Comment:
   `dealing WITH`?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/InMemoryDagManagementStateStore.java:
##########
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import com.google.common.collect.Lists;
+
+import lombok.Getter;
+import lombok.Synchronized;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+
+/**
+ * An implementation of {@link DagManagementStateStore} to provide information 
about dags, dag nodes and their job states.
+ * This store maintains and utilizes in-memory references about dags and their 
job states and is used
+ * to determine what the current status of the {@link Dag} and/or {@link 
Dag.DagNode} is and what actions needs to be
+ * taken next likewise mark it as: complete, failed, sla breached or simply 
clean up after completion.
+ * Going forward, each of these in-memory references will be read/write from 
MySQL store.
+ * Thus, the {@link DagManager} would then be stateless and operate 
independently.
+ */
+@Getter(onMethod_={@Synchronized})
+@Alpha
+public class InMemoryDagManagementStateStore implements 
DagManagementStateStore {
+  private final Map<Dag.DagNode<JobExecutionPlan>, Dag<JobExecutionPlan>> 
jobToDag = new HashMap<>();
+  private final Map<String, Dag<JobExecutionPlan>> dagIdToDags = new 
HashMap<>();
+  private final Set<String> failedDagIds = new HashSet<>();
+  private final Map<String, Dag<JobExecutionPlan>> dagIdToResumingDags = new 
HashMap<>();
+  // dagToJobs holds a map of dagId to running jobs of that dag
+  final Map<String, LinkedList<Dag.DagNode<JobExecutionPlan>>> dagToJobs = new 
HashMap<>();
+  final Map<String, Long> dagToSLA = new HashMap<>();

Review Comment:
   `dagIdToSLA/Deadline`



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/CleanUpDagProc.java:
##########
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.proc;
+
+import java.io.IOException;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import 
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import 
org.apache.gobblin.service.modules.orchestration.exception.MaybeRetryableException;
+import org.apache.gobblin.service.modules.orchestration.task.CleanUpDagTask;
+
+
+/**
+ * An implementation of {@link DagProc} that is responsible for cleaning up 
{@link Dag} that has reached an end state
+ * likewise: FAILED, COMPLETE or CANCELED
+ *
+ */
+@Slf4j
+@Alpha
+public class CleanUpDagProc extends DagProc {
+
+  private CleanUpDagTask cleanUpDagTask;
+
+  public CleanUpDagProc(CleanUpDagTask cleanUpDagTask) {
+    this.cleanUpDagTask = cleanUpDagTask;
+  }
+
+  @Override
+  protected Object initialize(DagManagementStateStore dagManagementStateStore) 
throws MaybeRetryableException, IOException {
+    throw new UnsupportedOperationException("Not supported");
+

Review Comment:
   extra space



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcFactory.java:
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import com.google.common.base.Optional;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
+import org.apache.gobblin.service.modules.flow.SpecCompiler;
+import org.apache.gobblin.service.modules.orchestration.processor.DagProc;
+import org.apache.gobblin.service.modules.orchestration.processor.KillDagProc;
+import org.apache.gobblin.service.modules.orchestration.task.AdvanceDagTask;
+import org.apache.gobblin.service.modules.orchestration.task.CleanUpDagTask;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.service.modules.orchestration.task.KillDagTask;
+import org.apache.gobblin.service.modules.orchestration.task.LaunchDagTask;
+import org.apache.gobblin.service.modules.orchestration.task.ResumeDagTask;
+import 
org.apache.gobblin.service.modules.utils.FlowCompilationValidationHelper;
+import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
+import org.apache.gobblin.service.monitoring.JobStatusRetriever;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * Factory for creating {@link DagProc} based on the visitor type for a given 
{@link DagTask}.
+ */
+
+@Alpha
+@Slf4j
+public class DagProcFactory implements DagTaskVisitor<DagProc> {
+
+  private DagManagementStateStore dagManagementStateStore;
+  private JobStatusRetriever jobStatusRetriever;
+  private FlowStatusGenerator flowStatusGenerator;
+  private UserQuotaManager quotaManager;
+  private SpecCompiler specCompiler;
+  private FlowCatalog flowCatalog;
+  private FlowCompilationValidationHelper flowCompilationValidationHelper;
+  private Config config;
+  private Optional<EventSubmitter> eventSubmitter;
+  private boolean instrumentationEnabled;
+

Review Comment:
   since we have attempted to initialize with Guice since previous revision 
let's also add the `@Inject` annotation to the constructor 



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java:
##########
@@ -313,4 +317,42 @@ protected static long getUTCTimeFromDelayPeriod(long 
delayPeriodMillis) {
     Date date = Date.from(localDateTime.atZone(ZoneId.of("UTC")).toInstant());
     return GobblinServiceJobScheduler.utcDateAsUTCEpochMillis(date);
   }
+
+  /**
+   * Attempts to acquire lease for a given {@link 
org.apache.gobblin.runtime.api.DagActionStore.DagAction}
+   * through lease arbitration and if it fails, it will create and schedule a 
reminder trigger to check back again.

Review Comment:
   let's update this in the next iteration



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java:
##########
@@ -0,0 +1,44 @@
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.Map;
+
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+
+/**
+ * An interface to provide abstractions for managing {@link Dag} and {@link 
org.apache.gobblin.service.modules.flowgraph.Dag.DagNode} states

Review Comment:
   add more description like `for a Dag, allows one to update or extract its 
job state or ...` giving some high level descriptions 



##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java:
##########
@@ -94,6 +99,9 @@ public DagActionStoreChangeMonitor(String topic, Config 
config, DagActionStore d
     this.flowCatalog = flowCatalog;
     this.orchestrator = orchestrator;
     this.isMultiActiveSchedulerEnabled = isMultiActiveSchedulerEnabled;
+    // instantiating using default ctor; subsequent PR will handle 
instantiating with multi-args ctor
+//    this.dagTaskStream = new DagTaskStream();

Review Comment:
   yea we should use guice to bring in it in, see the 
`DagActionStoreChangeMonitorFactory`



##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java:
##########
@@ -136,14 +144,22 @@ protected void processMessage(DecodeableKafkaRecord 
message) {
     // We only expect INSERT and DELETE operations done to this table. INSERTs 
correspond to any type of
     // {@link DagActionStore.FlowActionType} flow requests that have to be 
processed. DELETEs require no action.
     try {
+
       if (operation.equals("INSERT")) {
         if (dagActionType.equals(DagActionStore.FlowActionType.RESUME)) {
           log.info("Received insert dag action and about to send resume flow 
request");
           dagManager.handleResumeFlowRequest(flowGroup, 
flowName,Long.parseLong(flowExecutionId));
+          //TODO: add a flag for if condition only if multi-active is enabled
           this.resumesInvoked.mark();
         } else if (dagActionType.equals(DagActionStore.FlowActionType.KILL)) {
           log.info("Received insert dag action and about to send kill flow 
request");
           dagManager.handleKillFlowRequest(flowGroup, flowName, 
Long.parseLong(flowExecutionId));

Review Comment:
   or are we trying a "dummy kill in the new refactor" if so let's add a 
comment that the change below doesn't carry out the action. it's still in 
testing. 



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagement.java:
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.concurrent.ExecutionException;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.monitoring.JobStatus;
+
+
+/**
+ * Responsible for defining the behavior of {@link DagTask} handling scenarios 
for launch, resume, kill, job start
+ * and flow completion deadlines
+ *
+ */
+@Alpha
+public interface DagManagement {
+
+  /**
+   * Currently, it is handling just the launch of a {@link Dag} request via 
REST client for adhoc flows.
+   * The eventTimestamp for adhoc flows will always be 0 (zero), while -1 
would indicate failures.
+   * Essentially for a valid launch flow, the eventTimestamp needs to be >= 0.
+   * Future implementations will cover launch of flows through the scheduler 
too!

Review Comment:
   Agree with Kip's suggestion here to make it read like an interface 
   "Handles launching of ... via ..." 
   @param followed by requirements
   @throws if input doesn't match





Issue Time Tracking
-------------------

    Worklog Id:     (was: 883014)
    Time Spent: 13.5h  (was: 13h 20m)

> Refactor code to move current in-memory references to new design for REST 
> calls: Launch, Resume and Kill
> --------------------------------------------------------------------------------------------------------
>
>                 Key: GOBBLIN-1910
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-1910
>             Project: Apache Gobblin
>          Issue Type: New Feature
>            Reporter: Meeth Gala
>            Priority: Major
>          Time Spent: 13.5h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to