[ 
https://issues.apache.org/jira/browse/GOBBLIN-1910?focusedWorklogId=908455&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-908455
 ]

ASF GitHub Bot logged work on GOBBLIN-1910:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 06/Mar/24 02:30
            Start Date: 06/Mar/24 02:30
    Worklog Time Spent: 10m 
      Work Description: phet commented on code in PR #3858:
URL: https://github.com/apache/gobblin/pull/3858#discussion_r1513704254


##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagTask.java:
##########
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.task;
+
+import java.io.IOException;
+
+import lombok.Getter;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import 
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import org.apache.gobblin.service.modules.orchestration.DagManager;
+import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
+import org.apache.gobblin.service.modules.orchestration.DagTaskVisitor;
+import org.apache.gobblin.service.modules.orchestration.proc.DagProc;
+
+
+/**
+ * Defines an individual task on a Dag.
+ * Upon successful completion of the corresponding {@link 
DagProc#process(DagManagementStateStore)},
+ * {@link DagTask#conclude()} must be called.
+ */
+
+@Alpha
+public abstract class DagTask<T> {
+  @Getter public DagActionStore.DagAction dagAction;
+  private MultiActiveLeaseArbiter.LeaseObtainedStatus leaseObtainedStatus;
+  @Getter DagManager.DagId dagId;

Review Comment:
   `protected`?  `final`?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStore.java:
##########
@@ -54,21 +59,50 @@ public class MostlyMySqlDagManagementStateStore implements 
DagManagementStateSto
   // dagToJobs holds a map of dagId to running jobs of that dag
   private final Map<DagManager.DagId, 
LinkedList<Dag.DagNode<JobExecutionPlan>>> dagToJobs = new 
ConcurrentHashMap<>();
   private final Map<DagManager.DagId, Long> dagToDeadline = new 
ConcurrentHashMap<>();
-  private final DagStateStore dagStateStore;
-  private final DagStateStore failedDagStateStore;
+  private DagStateStore dagStateStore;
+  private DagStateStore failedDagStateStore;
+  private boolean dagStoresInitialized = false;
   private final UserQuotaManager quotaManager;
+  Map<URI, TopologySpec> topologySpecMap;
+  private final Config config;
   private static final String FAILED_DAG_STATESTORE_PREFIX = 
"failedDagStateStore";
   public static final String DAG_STATESTORE_CLASS_KEY = 
DagManager.DAG_MANAGER_PREFIX + "dagStateStoreClass";
+  FlowCatalog flowCatalog;
 
-  public MostlyMySqlDagManagementStateStore(Config config, Map<URI, 
TopologySpec> topologySpecMap) throws IOException {
-    this.dagStateStore = createDagStateStore(config, topologySpecMap);
-    this.failedDagStateStore = createDagStateStore(
-        ConfigUtils.getConfigOrEmpty(config, 
FAILED_DAG_STATESTORE_PREFIX).withFallback(config), topologySpecMap);
+  @Inject
+  public MostlyMySqlDagManagementStateStore(Config config, FlowCatalog 
flowCatalog) throws IOException {
     this.quotaManager = new MysqlUserQuotaManager(config);
-    this.quotaManager.init(getDags());
+    this.config = config;
+    this.flowCatalog = flowCatalog;
+   }
+
+  @Override
+  // It should be called after topology spec map is set
+  public synchronized void start() throws IOException {

Review Comment:
   if the reason to have a separate `start()` is that the topologySpecMap must 
be set, why not combine this method w/ `setTopologySpecMap` so it takes a TSM 
param?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagTaskStream.java:
##########
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.util.Iterator;
+
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+
+
+public interface DagTaskStream extends Iterator<DagTask> {
+  boolean hasNext();
+  DagTask next();
+}

Review Comment:
   let's add javadoc
   
   also the body could be empty, as these methods already come in from 
`Iterator<DagTask>`



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java:
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.Optional;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import lombok.Data;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.service.modules.orchestration.proc.DagProc;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.service.modules.orchestration.task.LaunchDagTask;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * NewDagManager has these functionalities :

Review Comment:
   rename



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProc.java:
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.proc;
+
+import java.io.IOException;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import 
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+
+
+/**
+ * Responsible for performing the actual work for a given {@link DagTask} by 
first initializing its state, performing
+ * actions based on the type of {@link DagTask} and finally submitting an 
event to the executor.
+ */
+@Alpha
+@Slf4j
+public abstract class DagProc<S, T> {
+  protected static final MetricContext metricContext = 
Instrumented.getMetricContext(new State(), DagProc.class);
+  protected static final EventSubmitter eventSubmitter = new 
EventSubmitter.Builder(
+      metricContext, "org.apache.gobblin.service").build();
+
+  public final void process(DagManagementStateStore dagManagementStateStore) 
throws IOException {
+    S state = initialize(dagManagementStateStore);   // todo - retry
+    T result = act(dagManagementStateStore, state);   // todo - retry
+    commit(dagManagementStateStore, result);   // todo - retry
+  }
+
+  protected abstract S initialize(DagManagementStateStore 
dagManagementStateStore) throws IOException;
+
+  protected abstract T act(DagManagementStateStore dagManagementStateStore, S 
state) throws IOException;
+
+  // todo - commit the modified dags to the persistent store, maybe not 
required for InMem dagManagementStateStore
+  protected abstract void commit(DagManagementStateStore 
dagManagementStateStore, T result);

Review Comment:
   I'm not sure where the `commit` came in.  I'd recommend that whatever we're 
doing against the state store in `act` be committed within.  [I 
recall](https://github.com/apache/gobblin/pull/3776/files#diff-f77e59443bfc1a4d7d559486eb0ad62222bce017c0d7b547940dad417907606dR45)
 the original third `DagProc` method to be `sendNotification` (which produces 
the GTE for the `KafkaJobStatusMonitor`, where needed).



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java:
##########
@@ -36,6 +40,26 @@
  */
 @Alpha
 public interface DagManagementStateStore {
+
+  /**
+   * Does any initial setup work. It should usually be called after the 
initialization.
+   */
+  default void start() throws IOException {
+    initQuota(getDags());
+  }
+
+  /**
+   * Returns a {@link FlowSpec} for the given URI.
+   * @throws SpecNotFoundException if the spec is not found
+   */
+  FlowSpec getSpecs(URI uri) throws SpecNotFoundException;

Review Comment:
   why name this `getSpecs` rather than `getSpec`?  truly, most precise would 
be `getFlowSpec`.



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java:
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.Optional;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import lombok.Data;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.service.modules.orchestration.proc.DagProc;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.service.modules.orchestration.task.LaunchDagTask;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * NewDagManager has these functionalities :
+ * a) interact with {@link DagManagementStateStore} to update/retrieve dags, 
checkpoint
+ * b) add {@link org.apache.gobblin.runtime.api.DagActionStore.DagAction} to 
the {@link DagTaskStream}
+ */
+@Slf4j
+@Singleton
+@Data
+public class DagManagementTaskStreamImpl implements DagManagement, 
DagTaskStream {
+  private final Config config;
+  @Getter private final EventSubmitter eventSubmitter;
+  @Getter private static final DagManagerMetrics dagManagerMetrics = new 
DagManagerMetrics();
+  private volatile boolean isActive = false;
+
+  @Inject(optional=true)
+  protected Optional<DagActionStore> dagActionStore;
+  @Inject
+  @Getter DagManagementStateStore dagManagementStateStore;
+  private static final int MAX_HOUSEKEEPING_THREAD_DELAY = 180;
+  private final BlockingQueue<DagActionStore.DagAction> dagActionQueue = new 
LinkedBlockingQueue<>();
+
+  @Inject
+  public DagManagementTaskStreamImpl(Config config, Optional<DagActionStore> 
dagActionStore, DagManagementStateStore dagManagementStateStore) {
+    this.config = config;
+    this.dagActionStore = dagActionStore;
+    this.dagManagementStateStore = dagManagementStateStore;
+    MetricContext metricContext = 
Instrumented.getMetricContext(ConfigUtils.configToState(ConfigFactory.empty()), 
getClass());
+    this.eventSubmitter = new EventSubmitter.Builder(metricContext, 
"org.apache.gobblin.service").build();
+  }
+
+  public void setActive(boolean active) {
+    if (this.isActive == active) {
+      log.info("DagManagementTaskStreamImpl already {}, skipping further 
actions.", (!active) ? "inactive" : "active");
+    }
+    this.isActive = active;
+    try {
+      if (this.isActive) {
+        log.info("Activating DagManagementTaskStreamImpl.");
+        //Initializing state store for persisting Dags.
+        this.dagManagementStateStore.start();
+        dagManagerMetrics.activate();
+      } else { //Mark the DagManager inactive.
+        log.info("Inactivating the DagManagementTaskStreamImpl. Shutting down 
all DagManager threads");
+        dagManagerMetrics.cleanup();
+      }
+    } catch (IOException e) {
+        log.error("Exception encountered when activating the 
DagManagementTaskStreamImpl", e);
+        throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public synchronized void addDagAction(DagActionStore.DagAction dagAction) 
throws IOException {
+    // TODO: Used to track missing dag issue, remove later as needed
+    log.info("Add dagAction{}", dagAction);
+    if (!isActive) {
+      log.warn("Skipping add dagAction because this instance of 
DagManagementTaskStreamImpl is not active for dag: {}",
+          dagAction);
+      return;
+    }
+
+    DagManager.DagId dagId = 
DagManagerUtils.generateDagId(dagAction.getFlowGroup(), 
dagAction.getFlowName(), dagAction.getFlowExecutionId());
+    String dagIdString = dagId.toString();
+    if (this.dagManagementStateStore.containsDag(dagId)) {
+      log.warn("Already tracking a dag with dagId {}, skipping.", dagIdString);
+      return;
+    }
+
+    // After persisting the dag, its status will be tracked by active 
dagManagers so the action should be deleted
+    // to avoid duplicate executions upon leadership change
+    if (this.dagActionStore.isPresent()) {
+      this.dagActionStore.get().deleteDagAction(dagAction);
+    }
+
+    if (!this.dagActionQueue.offer(dagAction)) {
+      throw new RuntimeException("Could not add dag action " + dagAction + " 
to the queue");
+    }
+  }
+
+  @Override
+  public boolean hasNext() {
+    return !this.dagActionQueue.isEmpty();
+  }
+
+  @Override
+  public DagTask<DagProc> next() {
+    try {
+      DagActionStore.DagAction dagAction = this.dagActionQueue.take();  
//`take` blocks till element is not available
+      // todo reconsider the use of MultiActiveLeaseArbiter
+      //MultiActiveLeaseArbiter.LeaseAttemptStatus leaseAttemptStatus = new 
MultiActiveLeaseArbiter.LeaseObtainedStatus(dagAction);
+      // todo - uncomment after flow trigger handler provides such an api
+      //Properties jobProps = getJobProperties(dagAction);
+      //flowTriggerHandler.getLeaseOnDagAction(jobProps, dagAction, 
System.currentTimeMillis());
+      //if (leaseAttemptStatus instanceof 
MultiActiveLeaseArbiter.LeaseObtainedStatus) {
+      // can it return null? is this iterator allowed to return null?
+      return createDagTask(dagAction, new 
MultiActiveLeaseArbiter.LeaseObtainedStatus(dagAction, 
System.currentTimeMillis()));
+      //}
+    } catch (Throwable t) {

Review Comment:
   catching `Throwable` is pretty extreme, given it would swallow OOM and other 
low-level `Error`s.  what specifically are the kinds of exceptions you're on 
the lookout for?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java:
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.Optional;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import lombok.Data;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.service.modules.orchestration.proc.DagProc;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.service.modules.orchestration.task.LaunchDagTask;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * NewDagManager has these functionalities :
+ * a) interact with {@link DagManagementStateStore} to update/retrieve dags, 
checkpoint
+ * b) add {@link org.apache.gobblin.runtime.api.DagActionStore.DagAction} to 
the {@link DagTaskStream}
+ */
+@Slf4j
+@Singleton
+@Data
+public class DagManagementTaskStreamImpl implements DagManagement, 
DagTaskStream {
+  private final Config config;
+  @Getter private final EventSubmitter eventSubmitter;
+  @Getter private static final DagManagerMetrics dagManagerMetrics = new 
DagManagerMetrics();
+  private volatile boolean isActive = false;
+
+  @Inject(optional=true)
+  protected Optional<DagActionStore> dagActionStore;
+  @Inject
+  @Getter DagManagementStateStore dagManagementStateStore;
+  private static final int MAX_HOUSEKEEPING_THREAD_DELAY = 180;
+  private final BlockingQueue<DagActionStore.DagAction> dagActionQueue = new 
LinkedBlockingQueue<>();
+
+  @Inject
+  public DagManagementTaskStreamImpl(Config config, Optional<DagActionStore> 
dagActionStore, DagManagementStateStore dagManagementStateStore) {
+    this.config = config;
+    this.dagActionStore = dagActionStore;
+    this.dagManagementStateStore = dagManagementStateStore;
+    MetricContext metricContext = 
Instrumented.getMetricContext(ConfigUtils.configToState(ConfigFactory.empty()), 
getClass());
+    this.eventSubmitter = new EventSubmitter.Builder(metricContext, 
"org.apache.gobblin.service").build();
+  }
+
+  public void setActive(boolean active) {
+    if (this.isActive == active) {
+      log.info("DagManagementTaskStreamImpl already {}, skipping further 
actions.", (!active) ? "inactive" : "active");
+    }
+    this.isActive = active;
+    try {
+      if (this.isActive) {
+        log.info("Activating DagManagementTaskStreamImpl.");
+        //Initializing state store for persisting Dags.
+        this.dagManagementStateStore.start();
+        dagManagerMetrics.activate();
+      } else { //Mark the DagManager inactive.
+        log.info("Inactivating the DagManagementTaskStreamImpl. Shutting down 
all DagManager threads");
+        dagManagerMetrics.cleanup();
+      }
+    } catch (IOException e) {
+        log.error("Exception encountered when activating the 
DagManagementTaskStreamImpl", e);
+        throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public synchronized void addDagAction(DagActionStore.DagAction dagAction) 
throws IOException {
+    // TODO: Used to track missing dag issue, remove later as needed
+    log.info("Add dagAction{}", dagAction);
+    if (!isActive) {
+      log.warn("Skipping add dagAction because this instance of 
DagManagementTaskStreamImpl is not active for dag: {}",
+          dagAction);
+      return;
+    }
+
+    DagManager.DagId dagId = 
DagManagerUtils.generateDagId(dagAction.getFlowGroup(), 
dagAction.getFlowName(), dagAction.getFlowExecutionId());
+    String dagIdString = dagId.toString();
+    if (this.dagManagementStateStore.containsDag(dagId)) {
+      log.warn("Already tracking a dag with dagId {}, skipping.", dagIdString);
+      return;
+    }
+
+    // After persisting the dag, its status will be tracked by active 
dagManagers so the action should be deleted
+    // to avoid duplicate executions upon leadership change
+    if (this.dagActionStore.isPresent()) {
+      this.dagActionStore.get().deleteDagAction(dagAction);
+    }

Review Comment:
   seems naively that if multiple active participants that they'd all try to 
delete the same dag action from the shared store.
   
   or are you relying on some trick, such as that `dagActionStore.isPresent()` 
is only true for active participants



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngine.java:
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.linkedin.r2.util.NamedThreadFactory;
+import com.typesafe.config.Config;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.service.modules.orchestration.proc.DagProc;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * Responsible for polling {@link DagTask}s from {@link DagTaskStream} and 
processing the
+ * {@link org.apache.gobblin.service.modules.flowgraph.Dag} based on the type 
of {@link DagTask}.
+ * Each {@link DagTask} returned from the {@link DagTaskStream} comes with a 
time-limited lease conferring the exclusive
+ * right to perform the work of the task.
+ * The {@link DagProcFactory} transforms each {@link DagTask} into a specific, 
concrete {@link DagProc}, which
+ * encapsulates all processing inside {@link 
DagProc#process(DagManagementStateStore)}
+ */
+
+@Alpha
+@Slf4j
+@Singleton
+public class DagProcessingEngine {
+
+  @Getter private final DagTaskStream dagTaskStream;
+  @Getter DagManagementStateStore dagManagementStateStore;
+
+  @Inject
+  public DagProcessingEngine(Config config, DagTaskStream dagTaskStream, 
DagProcFactory dagProcFactory,
+      DagManagementStateStore dagManagementStateStore) {
+    Integer numThreads = ConfigUtils.getInt
+        (config, ServiceConfigKeys.NUM_DAG_PROC_THREADS_KEY, 
ServiceConfigKeys.DEFAULT_NUM_DAG_PROC_THREADS);
+    ScheduledExecutorService scheduledExecutorPool =
+        Executors.newScheduledThreadPool(numThreads, new 
NamedThreadFactory("DagProcessingEngineThread"));
+    this.dagTaskStream = dagTaskStream;
+    this.dagManagementStateStore = dagManagementStateStore;
+
+    for (int i=0; i < numThreads; i++) {
+      DagProcEngineThread dagProcEngineThread = new 
DagProcEngineThread(dagTaskStream, dagProcFactory, dagManagementStateStore);
+      scheduledExecutorPool.submit(dagProcEngineThread);
+    }
+  }
+
+  @AllArgsConstructor
+  @VisibleForTesting
+  static class DagProcEngineThread implements Runnable {
+    private DagTaskStream dagTaskStream;
+    private DagProcFactory dagProcFactory;
+    private DagManagementStateStore dagManagementStateStore;
+
+    @Override
+    public void run() {
+      while (true) {
+        DagTask<DagProc> dagTask = dagTaskStream.next(); // blocking call
+        if (dagTask == null) {
+          log.warn("Received a null dag task, ignoring.");
+          continue;
+        }
+        DagProc dagProc = dagTask.host(dagProcFactory);
+        try {
+          // todo - add retries
+          dagProc.process(dagManagementStateStore);
+          dagTask.conclude();
+        } catch (Throwable t) {
+          log.error("DagProcEngineThread encountered error ", t);

Review Comment:
   let's include an identifier in the log message, such as `dagTask.getDagId()`



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagTask.java:
##########
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.task;
+
+import java.io.IOException;
+
+import lombok.Getter;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import 
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import org.apache.gobblin.service.modules.orchestration.DagManager;
+import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
+import org.apache.gobblin.service.modules.orchestration.DagTaskVisitor;
+import org.apache.gobblin.service.modules.orchestration.proc.DagProc;
+
+
+/**
+ * Defines an individual task on a Dag.
+ * Upon successful completion of the corresponding {@link 
DagProc#process(DagManagementStateStore)},
+ * {@link DagTask#conclude()} must be called.
+ */
+
+@Alpha
+public abstract class DagTask<T> {
+  @Getter public DagActionStore.DagAction dagAction;
+  private MultiActiveLeaseArbiter.LeaseObtainedStatus leaseObtainedStatus;
+  @Getter DagManager.DagId dagId;
+
+  public DagTask(DagActionStore.DagAction dagAction, 
MultiActiveLeaseArbiter.LeaseObtainedStatus leaseObtainedStatus) {
+    this.dagAction = dagAction;
+    this.leaseObtainedStatus = leaseObtainedStatus;
+    this.dagId = DagManagerUtils.generateDagId(dagAction.getFlowGroup(), 
dagAction.getFlowName(), dagAction.getFlowExecutionId());
+  }
+
+  public abstract T host(DagTaskVisitor<T> visitor);
+
+  /**
+   * Any cleanup work, e.g. releasing lease if it was acquired earlier, may be 
done in this method.
+   * Returns true if concluding dag task finished successfully otherwise false.
+   */
+  // todo call it from the right place
+  public abstract boolean conclude() throws IOException;

Review Comment:
   if you already have `leaseObtainedStatus` on hand, why can't you implement 
this here?  AFAICT, it could even be `final` here.



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/LaunchDagTask.java:
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.task;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.service.modules.orchestration.DagTaskVisitor;
+import org.apache.gobblin.service.modules.orchestration.proc.LaunchDagProc;
+
+
+/**
+ * A {@link DagTask} responsible to handle launch tasks.
+ */
+
+@Slf4j

Review Comment:
   I don't see you logging, so suggest not to add annotation



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java:
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.Optional;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import lombok.Data;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.service.modules.orchestration.proc.DagProc;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.service.modules.orchestration.task.LaunchDagTask;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * NewDagManager has these functionalities :
+ * a) interact with {@link DagManagementStateStore} to update/retrieve dags, 
checkpoint
+ * b) add {@link org.apache.gobblin.runtime.api.DagActionStore.DagAction} to 
the {@link DagTaskStream}
+ */
+@Slf4j
+@Singleton
+@Data
+public class DagManagementTaskStreamImpl implements DagManagement, 
DagTaskStream {
+  private final Config config;
+  @Getter private final EventSubmitter eventSubmitter;
+  @Getter private static final DagManagerMetrics dagManagerMetrics = new 
DagManagerMetrics();
+  private volatile boolean isActive = false;
+
+  @Inject(optional=true)
+  protected Optional<DagActionStore> dagActionStore;
+  @Inject
+  @Getter DagManagementStateStore dagManagementStateStore;
+  private static final int MAX_HOUSEKEEPING_THREAD_DELAY = 180;
+  private final BlockingQueue<DagActionStore.DagAction> dagActionQueue = new 
LinkedBlockingQueue<>();
+
+  @Inject
+  public DagManagementTaskStreamImpl(Config config, Optional<DagActionStore> 
dagActionStore, DagManagementStateStore dagManagementStateStore) {
+    this.config = config;
+    this.dagActionStore = dagActionStore;
+    this.dagManagementStateStore = dagManagementStateStore;
+    MetricContext metricContext = 
Instrumented.getMetricContext(ConfigUtils.configToState(ConfigFactory.empty()), 
getClass());
+    this.eventSubmitter = new EventSubmitter.Builder(metricContext, 
"org.apache.gobblin.service").build();
+  }
+
+  public void setActive(boolean active) {
+    if (this.isActive == active) {
+      log.info("DagManagementTaskStreamImpl already {}, skipping further 
actions.", (!active) ? "inactive" : "active");
+    }
+    this.isActive = active;
+    try {
+      if (this.isActive) {
+        log.info("Activating DagManagementTaskStreamImpl.");
+        //Initializing state store for persisting Dags.
+        this.dagManagementStateStore.start();
+        dagManagerMetrics.activate();
+      } else { //Mark the DagManager inactive.
+        log.info("Inactivating the DagManagementTaskStreamImpl. Shutting down 
all DagManager threads");
+        dagManagerMetrics.cleanup();
+      }
+    } catch (IOException e) {
+        log.error("Exception encountered when activating the 
DagManagementTaskStreamImpl", e);
+        throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public synchronized void addDagAction(DagActionStore.DagAction dagAction) 
throws IOException {
+    // TODO: Used to track missing dag issue, remove later as needed
+    log.info("Add dagAction{}", dagAction);
+    if (!isActive) {
+      log.warn("Skipping add dagAction because this instance of 
DagManagementTaskStreamImpl is not active for dag: {}",
+          dagAction);
+      return;
+    }
+
+    DagManager.DagId dagId = 
DagManagerUtils.generateDagId(dagAction.getFlowGroup(), 
dagAction.getFlowName(), dagAction.getFlowExecutionId());
+    String dagIdString = dagId.toString();
+    if (this.dagManagementStateStore.containsDag(dagId)) {
+      log.warn("Already tracking a dag with dagId {}, skipping.", dagIdString);
+      return;
+    }
+
+    // After persisting the dag, its status will be tracked by active 
dagManagers so the action should be deleted
+    // to avoid duplicate executions upon leadership change
+    if (this.dagActionStore.isPresent()) {
+      this.dagActionStore.get().deleteDagAction(dagAction);
+    }
+
+    if (!this.dagActionQueue.offer(dagAction)) {
+      throw new RuntimeException("Could not add dag action " + dagAction + " 
to the queue");
+    }
+  }
+
+  @Override
+  public boolean hasNext() {
+    return !this.dagActionQueue.isEmpty();

Review Comment:
   this should be an infinite stream, hence always `true` (even if it would 
require blocking since `dagActionQueue` doesn't yet have any elements).



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java:
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.Optional;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import lombok.Data;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.service.modules.orchestration.proc.DagProc;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.service.modules.orchestration.task.LaunchDagTask;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * NewDagManager has these functionalities :
+ * a) interact with {@link DagManagementStateStore} to update/retrieve dags, 
checkpoint
+ * b) add {@link org.apache.gobblin.runtime.api.DagActionStore.DagAction} to 
the {@link DagTaskStream}
+ */
+@Slf4j
+@Singleton
+@Data
+public class DagManagementTaskStreamImpl implements DagManagement, 
DagTaskStream {
+  private final Config config;
+  @Getter private final EventSubmitter eventSubmitter;
+  @Getter private static final DagManagerMetrics dagManagerMetrics = new 
DagManagerMetrics();
+  private volatile boolean isActive = false;
+
+  @Inject(optional=true)
+  protected Optional<DagActionStore> dagActionStore;
+  @Inject
+  @Getter DagManagementStateStore dagManagementStateStore;
+  private static final int MAX_HOUSEKEEPING_THREAD_DELAY = 180;
+  private final BlockingQueue<DagActionStore.DagAction> dagActionQueue = new 
LinkedBlockingQueue<>();
+
+  @Inject
+  public DagManagementTaskStreamImpl(Config config, Optional<DagActionStore> 
dagActionStore, DagManagementStateStore dagManagementStateStore) {
+    this.config = config;
+    this.dagActionStore = dagActionStore;
+    this.dagManagementStateStore = dagManagementStateStore;
+    MetricContext metricContext = 
Instrumented.getMetricContext(ConfigUtils.configToState(ConfigFactory.empty()), 
getClass());
+    this.eventSubmitter = new EventSubmitter.Builder(metricContext, 
"org.apache.gobblin.service").build();
+  }
+
+  public void setActive(boolean active) {
+    if (this.isActive == active) {
+      log.info("DagManagementTaskStreamImpl already {}, skipping further 
actions.", (!active) ? "inactive" : "active");
+    }
+    this.isActive = active;
+    try {
+      if (this.isActive) {
+        log.info("Activating DagManagementTaskStreamImpl.");
+        //Initializing state store for persisting Dags.
+        this.dagManagementStateStore.start();
+        dagManagerMetrics.activate();
+      } else { //Mark the DagManager inactive.
+        log.info("Inactivating the DagManagementTaskStreamImpl. Shutting down 
all DagManager threads");
+        dagManagerMetrics.cleanup();
+      }
+    } catch (IOException e) {
+        log.error("Exception encountered when activating the 
DagManagementTaskStreamImpl", e);
+        throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public synchronized void addDagAction(DagActionStore.DagAction dagAction) 
throws IOException {
+    // TODO: Used to track missing dag issue, remove later as needed
+    log.info("Add dagAction{}", dagAction);
+    if (!isActive) {
+      log.warn("Skipping add dagAction because this instance of 
DagManagementTaskStreamImpl is not active for dag: {}",
+          dagAction);
+      return;
+    }
+
+    DagManager.DagId dagId = 
DagManagerUtils.generateDagId(dagAction.getFlowGroup(), 
dagAction.getFlowName(), dagAction.getFlowExecutionId());
+    String dagIdString = dagId.toString();
+    if (this.dagManagementStateStore.containsDag(dagId)) {
+      log.warn("Already tracking a dag with dagId {}, skipping.", dagIdString);
+      return;

Review Comment:
   I'm unclear on this check--what are we guarding against?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/LaunchDagTask.java:
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.task;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.service.modules.orchestration.DagTaskVisitor;
+import org.apache.gobblin.service.modules.orchestration.proc.LaunchDagProc;
+
+
+/**
+ * A {@link DagTask} responsible to handle launch tasks.
+ */
+
+@Slf4j
+public class LaunchDagTask extends DagTask<LaunchDagProc> {
+  public LaunchDagTask(DagActionStore.DagAction dagAction, 
MultiActiveLeaseArbiter.LeaseObtainedStatus leaseObtainedStatus) {
+    super(dagAction, leaseObtainedStatus);
+  }
+
+
+  @Override
+  public LaunchDagProc host(DagTaskVisitor<LaunchDagProc> visitor) {
+    return visitor.meet(this);
+  }
+
+  @Override
+  public boolean conclude() {
+    // todo - release lease
+    return true;
+  }

Review Comment:
   as suggested, let's implement in base class.  or do you anticipate that 
some/one of the derived classes would need a cutom impl all its own?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngine.java:
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
+import com.google.common.base.Optional;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.linkedin.r2.util.NamedThreadFactory;
+import com.typesafe.config.Config;
+
+import javax.naming.OperationNotSupportedException;
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.orchestration.proc.DagProc;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * Responsible for polling {@link DagTask}s from {@link DagTaskStream} and 
processing the {@link org.apache.gobblin.service.modules.flowgraph.Dag}
+ * based on the type of {@link DagTask} which is determined by the {@link 
org.apache.gobblin.runtime.api.DagActionStore.DagAction}.
+ * Each {@link DagTask} acquires a lease for the {@link 
org.apache.gobblin.runtime.api.DagActionStore.DagAction}.
+ * The {@link DagProcFactory} then provides the appropriate {@link DagProc} 
associated with the {@link DagTask}.
+ * The actual work or processing is done by the {@link 
DagProc#process(DagManagementStateStore)}
+ */
+
+@Alpha
+@Slf4j
+@Singleton
+public class DagProcessingEngine {
+  public static final String GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_PREFIX = 
ServiceConfigKeys.GOBBLIN_SERVICE_PREFIX + "dagProcessingEngine.";
+  public static final String DAG_PROCESSING_ENGINE_ENABLED = 
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_PREFIX + "enabled";
+  public static final String NUM_THREADS_KEY = 
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_PREFIX + "numThreads";
+  private static final Integer DEFAULT_NUM_THREADS = 3;
+
+  @Getter private final DagTaskStream dagTaskStream;
+  @Getter DagManagementStateStore dagManagementStateStore;
+
+  @Inject
+  public DagProcessingEngine(Config config, DagTaskStream dagTaskStream, 
DagProcFactory dagProcFactory,
+      DagManagementStateStore dagManagementStateStore, 
Optional<MultiActiveLeaseArbiter> multiActiveLeaseArbiter) {
+    Integer numThreads = ConfigUtils.getInt(config, NUM_THREADS_KEY, 
DEFAULT_NUM_THREADS);
+    ScheduledExecutorService scheduledExecutorPool =
+        Executors.newScheduledThreadPool(numThreads, new 
NamedThreadFactory("DagProcessingEngineThread"));
+    this.dagTaskStream = dagTaskStream;
+    this.dagManagementStateStore = dagManagementStateStore;
+
+    for (int i=0; i < numThreads; i++) {
+      DagProcEngineThread dagProcEngineThread = new 
DagProcEngineThread(dagTaskStream, dagProcFactory, dagManagementStateStore);
+      scheduledExecutorPool.submit(dagProcEngineThread);
+    }
+  }
+
+  public void addDagNodeToRetry(Dag.DagNode<JobExecutionPlan> dagNode) throws 
OperationNotSupportedException {
+    // todo - how to add dag action for a dag node? should we create a dag 
node action? right now dag action is synonymous to flow action
+    // this.dagTaskStream.addDagTask(new RetryDagTask(dagNode));
+    throw new OperationNotSupportedException();
+  }
+
+  @AllArgsConstructor
+  private static class DagProcEngineThread implements Runnable {
+
+    private DagTaskStream dagTaskStream;
+    private DagProcFactory dagProcFactory;
+    private DagManagementStateStore dagManagementStateStore;
+
+    @Override
+    public void run() {
+      while (true) {
+        DagTask dagTask = dagTaskStream.next(); // blocking call
+        if (dagTask == null) {
+          continue;
+        }
+        DagProc dagProc = dagTask.host(dagProcFactory);
+        try {
+          // todo - add retries
+          dagProc.process(dagManagementStateStore);
+        } catch (Throwable t) {
+          log.error("DagProcEngineThread encountered error ", t);
+        }
+        // todo mark lease success and releases it
+        //dagTaskStream.complete(dagTask);
+      }

Review Comment:
   performance-wise I'm not too concerned, as the reminder queue increase would 
only be by the value of `ServiceConfigKeys.NUM_DAG_PROC_THREADS_KEY`.
   
   the most important consideration is to confirm how the participant doing the 
processing is certain not to "leak" tasks upon processing failure.



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagTaskVisitor.java:
##########
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import org.apache.gobblin.service.modules.orchestration.task.LaunchDagTask;
+
+
+public interface DagTaskVisitor<T> {
+  T meet(LaunchDagTask launchDagTask);
+}

Review Comment:
   please still add javadoc!





Issue Time Tracking
-------------------

    Worklog Id:     (was: 908455)
    Time Spent: 27h 20m  (was: 27h 10m)

> Refactor code to move current in-memory references to new design for REST 
> calls: Launch, Resume and Kill
> --------------------------------------------------------------------------------------------------------
>
>                 Key: GOBBLIN-1910
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-1910
>             Project: Apache Gobblin
>          Issue Type: New Feature
>            Reporter: Meeth Gala
>            Priority: Major
>          Time Spent: 27h 20m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to