umustafi commented on code in PR #3825:
URL: https://github.com/apache/gobblin/pull/3825#discussion_r1427430802


##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/NewDagManager.java:
##########
@@ -0,0 +1,558 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.codahale.metrics.Timer;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.eventbus.EventBus;
+import com.google.inject.Inject;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.ServiceMetricNames;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.api.SpecNotFoundException;
+import org.apache.gobblin.runtime.api.TopologySpec;
+import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
+import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.FlowId;
+import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.service.modules.flow.SpecCompiler;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import 
org.apache.gobblin.service.modules.utils.FlowCompilationValidationHelper;
+import org.apache.gobblin.service.modules.utils.SharedFlowMetricsSingleton;
+import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
+import org.apache.gobblin.service.monitoring.JobStatus;
+import org.apache.gobblin.service.monitoring.JobStatusRetriever;
+import org.apache.gobblin.service.monitoring.KafkaJobStatusMonitor;
+import org.apache.gobblin.service.monitoring.KillFlowEvent;
+import org.apache.gobblin.service.monitoring.ResumeFlowEvent;
+import org.apache.gobblin.service.monitoring.event.JobStatusEvent;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
+
+import static org.apache.gobblin.service.ExecutionStatus.*;
+
+
+/**
+ * NewDagManager manages dags in memory and various mappings.

Review Comment:
   The job of the newDagManager is actually to only add new work ("tasks") to 
the dagTaskStream. Let's update the description. 



##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java:
##########
@@ -138,7 +146,8 @@ public <V> void onRetry(Attempt<V> attempt) {
             }
           }
         }));
-        this.eventProducer = observabilityEventProducer;
+    this.eventProducer = observabilityEventProducer;
+    this.dagProcessingEngine = dagProcessingEngine;

Review Comment:
   Why are we interacting with `dagProcessingEngine`? We should be passing work 
to `newDagManager`



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java:
##########
@@ -393,11 +397,23 @@ public void remove(Spec spec, Properties headers) throws 
IOException {
     // .. this will work for Identity compiler but not always for multi-hop.
     // Note: Current logic assumes compilation is consistent between all 
executions
     if (spec instanceof FlowSpec) {
+      URI specUri = spec.getUri();
       //Send the dag to the DagManager to stop it.
       //Also send it to the SpecProducer to do any cleanup tasks on 
SpecExecutor.
       if (this.dagManager.isPresent()) {
-        _log.info("Forwarding cancel request for flow URI {} to DagManager.", 
spec.getUri());
-        this.dagManager.get().stopDag(spec.getUri());
+        _log.info("Forwarding cancel request for flow URI {} to DagManager.", 
specUri);
+        this.dagManager.get().stopDag(specUri);
+      }
+      if (this.dagProcessingEngine.isPresent()) {

Review Comment:
   I don't think we should be interacting with dagProcessingEngine directly, we 
want to use the `DagManagement` `(NewDagManager.handleCancel/Kill...)` 
interface to abstract this addition of a task away. 



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/NewDagManager.java:
##########
@@ -0,0 +1,558 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.codahale.metrics.Timer;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.eventbus.EventBus;
+import com.google.inject.Inject;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.ServiceMetricNames;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.api.SpecNotFoundException;
+import org.apache.gobblin.runtime.api.TopologySpec;
+import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
+import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.FlowId;
+import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.service.modules.flow.SpecCompiler;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import 
org.apache.gobblin.service.modules.utils.FlowCompilationValidationHelper;
+import org.apache.gobblin.service.modules.utils.SharedFlowMetricsSingleton;
+import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
+import org.apache.gobblin.service.monitoring.JobStatus;
+import org.apache.gobblin.service.monitoring.JobStatusRetriever;
+import org.apache.gobblin.service.monitoring.KafkaJobStatusMonitor;
+import org.apache.gobblin.service.monitoring.KillFlowEvent;
+import org.apache.gobblin.service.monitoring.ResumeFlowEvent;
+import org.apache.gobblin.service.monitoring.event.JobStatusEvent;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
+
+import static org.apache.gobblin.service.ExecutionStatus.*;
+
+
+/**
+ * NewDagManager manages dags in memory and various mappings.

Review Comment:
   This should be a lightweight class that is only adding new tasks. Do we want 
all this logic for status retrieving & clean up after a task finishes? Need to 
decide who polls for job status after it completes and cleans up or submits a 
next job 



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/NewDagManager.java:
##########
@@ -0,0 +1,558 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.codahale.metrics.Timer;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.eventbus.EventBus;
+import com.google.inject.Inject;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.ServiceMetricNames;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.api.SpecNotFoundException;
+import org.apache.gobblin.runtime.api.TopologySpec;
+import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
+import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.FlowId;
+import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.service.modules.flow.SpecCompiler;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import 
org.apache.gobblin.service.modules.utils.FlowCompilationValidationHelper;
+import org.apache.gobblin.service.modules.utils.SharedFlowMetricsSingleton;
+import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
+import org.apache.gobblin.service.monitoring.JobStatus;
+import org.apache.gobblin.service.monitoring.JobStatusRetriever;
+import org.apache.gobblin.service.monitoring.KafkaJobStatusMonitor;
+import org.apache.gobblin.service.monitoring.KillFlowEvent;
+import org.apache.gobblin.service.monitoring.ResumeFlowEvent;
+import org.apache.gobblin.service.monitoring.event.JobStatusEvent;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
+
+import static org.apache.gobblin.service.ExecutionStatus.*;
+
+
+/**
+ * NewDagManager manages dags in memory and various mappings.
+ */
+@Slf4j
+public class NewDagManager implements DagManagement {
+  public static final String DAG_MANAGER_PREFIX = 
"gobblin.service.dagManager.";
+  public static final Integer DEFAULT_NUM_THREADS = 3;
+  public static final String NUM_THREADS_KEY = DAG_MANAGER_PREFIX + 
"numThreads";
+  private static final String DAG_STATESTORE_CLASS_KEY = DAG_MANAGER_PREFIX + 
"dagStateStoreClass";
+  private static final String FAILED_DAG_STATESTORE_PREFIX = 
"failedDagStateStore";
+  private static final String FAILED_DAG_RETENTION_TIME_UNIT = 
FAILED_DAG_STATESTORE_PREFIX + ".retention.timeUnit";
+  private static final String DEFAULT_FAILED_DAG_RETENTION_TIME_UNIT = "DAYS";
+  private static final String FAILED_DAG_RETENTION_TIME = 
FAILED_DAG_STATESTORE_PREFIX + ".retention.time";
+  private static final long DEFAULT_FAILED_DAG_RETENTION_TIME = 7L;
+  // Re-emit the final flow status if not detected within 5 minutes
+  public static final String FAILED_DAG_POLLING_INTERVAL = 
FAILED_DAG_STATESTORE_PREFIX + ".retention.pollingIntervalMinutes";
+  public static final Integer DEFAULT_FAILED_DAG_POLLING_INTERVAL = 60;
+  // Default job start SLA time if configured, measured in minutes. Default is 
10 minutes
+  private static final String JOB_START_SLA_TIME = DAG_MANAGER_PREFIX + 
ConfigurationKeys.GOBBLIN_JOB_START_SLA_TIME;
+  private static final String JOB_START_SLA_UNITS = DAG_MANAGER_PREFIX + 
ConfigurationKeys.GOBBLIN_JOB_START_SLA_TIME_UNIT;
+  private static final int INITIAL_HOUSEKEEPING_THREAD_DELAY = 2;
+  private final Config config;
+  private final Integer retentionPollingInterval;
+
+  public Set<String> getFailedDagIds() {
+    return this.failedDagIds;
+  }
+
+  public DagStateStore getFailedDagStateStore() {
+    return this.failedDagStateStore;
+  }
+
+  public DagStateStore getDagStateStore() {
+    return this.dagStateStore;
+  }
+
+  public JobStatusRetriever getJobStatusRetriever() {
+    return this.jobStatusRetriever;
+  }
+
+  public UserQuotaManager getQuotaManager() {
+    return this.quotaManager;
+  }
+
+  public Optional<Timer> getJobStatusPolledTimer() {
+    return this.jobStatusPolledTimer;
+  }
+
+  public Optional<EventSubmitter> getEventSubmitter() {
+    return this.eventSubmitter;
+  }
+
+  public DagManagerMetrics getDagManagerMetrics() {
+    return this.dagManagerMetrics;
+  }
+
+  public AtomicLong getOrchestrationDelay() {
+    return this.orchestrationDelay;
+  }
+
+  public DagProcessingEngine getDagProcessingEngine() {
+    return this.dagProcessingEngine;
+  }
+
+  public Optional<DagActionStore> getDagActionStore() {
+    return this.dagActionStore;
+  }
+
+  /**
+   * Action to be performed on a {@link Dag}, in case of a job failure. 
Currently, we allow 2 modes:
+   * <ul>
+   *   <li> FINISH_RUNNING, which allows currently running jobs to finish.</li>
+   *   <li> FINISH_ALL_POSSIBLE, which allows every possible job in the Dag to 
finish, as long as all the dependencies
+   *   of the job are successful.</li>
+   * </ul>
+   */
+  public enum FailureOption {
+    FINISH_RUNNING("FINISH_RUNNING"),
+    CANCEL("CANCEL"),
+    FINISH_ALL_POSSIBLE("FINISH_ALL_POSSIBLE");
+
+    private final String failureOption;
+
+    FailureOption(final String failureOption) {
+      this.failureOption = failureOption;
+    }
+
+    @Override
+    public String toString() {
+      return this.failureOption;
+    }
+  }
+
+  private final ScheduledExecutorService scheduledExecutorPool;
+  private Set<String> failedDagIds;
+  private final DagStateStore failedDagStateStore;
+
+  private Map<URI, TopologySpec> topologySpecMap = new HashMap<>();
+  private final boolean instrumentationEnabled;
+  @Getter private DagStateStore dagStateStore;
+  private int houseKeepingThreadInitialDelay = 
INITIAL_HOUSEKEEPING_THREAD_DELAY;
+  private final Integer numThreads;
+  protected final Long defaultJobStartSlaTimeMillis;
+  private final JobStatusRetriever jobStatusRetriever;
+  private final UserQuotaManager quotaManager;
+  private final SpecCompiler specCompiler;
+  private final boolean isFlowConcurrencyEnabled;
+  private final FlowCompilationValidationHelper 
flowCompilationValidationHelper;
+  private final Optional<Timer> jobStatusPolledTimer;
+  private final Optional<EventSubmitter> eventSubmitter;
+  private final long failedDagRetentionTime;
+  private final DagManagerMetrics dagManagerMetrics;
+  private final AtomicLong orchestrationDelay = new AtomicLong(0);
+  private final DagProcessingEngine dagProcessingEngine;
+
+  @Inject(optional=true)
+  protected Optional<DagActionStore> dagActionStore;
+  @Inject(optional=true)
+  DagManagementStateStore dagManagementStateStore;
+
+  private static final int MAX_HOUSEKEEPING_THREAD_DELAY = 180;
+
+  protected final EventBus eventBus;
+  private final FlowCatalog flowCatalog;
+
+  public NewDagManager(Config config, JobStatusRetriever jobStatusRetriever,
+      SharedFlowMetricsSingleton sharedFlowMetricsSingleton, 
FlowStatusGenerator flowStatusGenerator,
+      FlowCatalog flowCatalog, Optional<DagActionStore> dagActionStore, 
boolean instrumentationEnabled,
+      DagProcessingEngine dagProcessingEngine, DagManagementStateStore 
dagManagementStateStore)
+      throws IOException {
+    this.config = config;
+    this.numThreads = ConfigUtils.getInt(config, NUM_THREADS_KEY, 
DEFAULT_NUM_THREADS);
+    this.dagActionStore = dagActionStore;
+    this.scheduledExecutorPool = Executors.newScheduledThreadPool(numThreads);
+    this.retentionPollingInterval = ConfigUtils.getInt(config, 
FAILED_DAG_POLLING_INTERVAL, DEFAULT_FAILED_DAG_POLLING_INTERVAL);
+    this.instrumentationEnabled = instrumentationEnabled;
+    this.eventBus = KafkaJobStatusMonitor.getEventBus();
+    this.eventBus.register(this);
+    this.dagProcessingEngine = dagProcessingEngine;
+    this.dagManagementStateStore = dagManagementStateStore;
+    MetricContext metricContext;
+    if (instrumentationEnabled) {
+      metricContext = 
Instrumented.getMetricContext(ConfigUtils.configToState(ConfigFactory.empty()), 
getClass());
+      this.jobStatusPolledTimer = 
Optional.of(metricContext.timer(ServiceMetricNames.JOB_STATUS_POLLED_TIMER));
+      this.eventSubmitter = Optional.of(new 
EventSubmitter.Builder(metricContext, "org.apache.gobblin.service").build());
+    } else {
+      this.jobStatusPolledTimer = Optional.absent();
+      this.eventSubmitter = Optional.absent();
+    }
+    this.dagManagerMetrics = new DagManagerMetrics();
+    TimeUnit jobStartTimeUnit = TimeUnit.valueOf(ConfigUtils.getString(config, 
JOB_START_SLA_UNITS, 
ConfigurationKeys.FALLBACK_GOBBLIN_JOB_START_SLA_TIME_UNIT));
+    this.defaultJobStartSlaTimeMillis = 
jobStartTimeUnit.toMillis(ConfigUtils.getLong(config, JOB_START_SLA_TIME, 
ConfigurationKeys.FALLBACK_GOBBLIN_JOB_START_SLA_TIME));
+    this.jobStatusRetriever = jobStatusRetriever;
+    this.specCompiler = 
GobblinConstructorUtils.invokeConstructor(SpecCompiler.class, 
ConfigUtils.getString(config,
+        ServiceConfigKeys.GOBBLIN_SERVICE_FLOWCOMPILER_CLASS_KEY,
+        ServiceConfigKeys.DEFAULT_GOBBLIN_SERVICE_FLOWCOMPILER_CLASS), config);
+    this.isFlowConcurrencyEnabled = ConfigUtils.getBoolean(config, 
ServiceConfigKeys.FLOW_CONCURRENCY_ALLOWED,
+        ServiceConfigKeys.DEFAULT_FLOW_CONCURRENCY_ALLOWED);
+    this.quotaManager = 
GobblinConstructorUtils.invokeConstructor(UserQuotaManager.class,
+        ConfigUtils.getString(config, ServiceConfigKeys.QUOTA_MANAGER_CLASS, 
ServiceConfigKeys.DEFAULT_QUOTA_MANAGER),
+        config);
+    this.flowCompilationValidationHelper = new 
FlowCompilationValidationHelper(sharedFlowMetricsSingleton, specCompiler,
+        quotaManager, eventSubmitter, flowStatusGenerator, 
isFlowConcurrencyEnabled);
+    TimeUnit timeUnit = TimeUnit.valueOf(ConfigUtils.getString(config, 
FAILED_DAG_RETENTION_TIME_UNIT, DEFAULT_FAILED_DAG_RETENTION_TIME_UNIT));
+    this.failedDagRetentionTime = 
timeUnit.toMillis(ConfigUtils.getLong(config, FAILED_DAG_RETENTION_TIME, 
DEFAULT_FAILED_DAG_RETENTION_TIME));
+    KillDagThread killDagThread = new 
KillDagThread(defaultJobStartSlaTimeMillis, this);
+    this.scheduledExecutorPool.scheduleAtFixedRate(killDagThread, 100L, 60L, 
TimeUnit.SECONDS);
+    this.flowCatalog = flowCatalog;
+    this.failedDagStateStore =
+        createDagStateStore(ConfigUtils.getConfigOrEmpty(config, 
FAILED_DAG_STATESTORE_PREFIX).withFallback(config),
+            topologySpecMap);
+    setActive();
+  }
+
+  public synchronized void setActive() throws IOException {
+    this.dagStateStore = createDagStateStore(config, topologySpecMap);
+    DagStateStore failedDagStateStore =
+        createDagStateStore(ConfigUtils.getConfigOrEmpty(config, 
FAILED_DAG_STATESTORE_PREFIX).withFallback(config),
+            topologySpecMap);
+    this.failedDagIds = 
Collections.synchronizedSet(failedDagStateStore.getDagIds());
+    this.dagManagerMetrics.activate();
+    UserQuotaManager quotaManager = 
GobblinConstructorUtils.invokeConstructor(UserQuotaManager.class,
+        ConfigUtils.getString(config, ServiceConfigKeys.QUOTA_MANAGER_CLASS, 
ServiceConfigKeys.DEFAULT_QUOTA_MANAGER), config);
+    quotaManager.init(dagStateStore.getDags());
+    DagManager.FailedDagRetentionThread
+        failedDagRetentionThread = new 
DagManager.FailedDagRetentionThread(failedDagStateStore, failedDagIds, 
failedDagRetentionTime);
+    this.scheduledExecutorPool.scheduleAtFixedRate(failedDagRetentionThread, 
0, retentionPollingInterval, TimeUnit.MINUTES);
+    loadDagFromDagStateStore();
+    ScheduledExecutorService houseKeepingThreadPool = 
Executors.newSingleThreadScheduledExecutor();
+    for (int delay = houseKeepingThreadInitialDelay; delay < 
MAX_HOUSEKEEPING_THREAD_DELAY; delay *= 2) {
+      houseKeepingThreadPool.schedule(() -> {
+        try {
+          loadDagFromDagStateStore();
+        } catch (Exception e ) {
+          log.error("failed to sync dag state store due to ", e);
+        }}, delay, TimeUnit.MINUTES);
+    }
+    if (dagActionStore.isPresent()) {
+      Collection<DagActionStore.DagAction> dagActions = 
dagActionStore.get().getDagActions();
+      for (DagActionStore.DagAction action : dagActions) {

Review Comment:
   https://github.com/apache/gobblin/pull/3841 moved in recent PR if u rebase. 
all of these are in dagActionStoremonitor and we should call DagManagement 
interface to handle these



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to