phet commented on code in PR #4031:
URL: https://github.com/apache/gobblin/pull/4031#discussion_r1744527436


##########
gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java:
##########
@@ -1071,13 +1071,13 @@ public class ConfigurationKeys {
    * Configuration properties related to Flows
    */
   public static final String FLOW_RUN_IMMEDIATELY = "flow.runImmediately";
-  public static final String GOBBLIN_FLOW_SLA_TIME = "gobblin.flow.sla.time";
-  public static final String GOBBLIN_FLOW_SLA_TIME_UNIT = 
"gobblin.flow.sla.timeunit";
-  public static final String DEFAULT_GOBBLIN_FLOW_SLA_TIME_UNIT = 
TimeUnit.MINUTES.name();
-  public static final String GOBBLIN_JOB_START_SLA_TIME = 
"gobblin.job.start.sla.time";
-  public static final String GOBBLIN_JOB_START_SLA_TIME_UNIT = 
"gobblin.job.start.sla.timeunit";
-  public static final long FALLBACK_GOBBLIN_JOB_START_SLA_TIME = 10L;
-  public static final String FALLBACK_GOBBLIN_JOB_START_SLA_TIME_UNIT = 
TimeUnit.MINUTES.name();
+  public static final String GOBBLIN_FLOW_FINSIH_DEADLINE_TIME = 
"gobblin.flow.deadline.time";
+  public static final String GOBBLIN_FLOW_FINISH_DEADLINE_TIME_UNIT = 
"gobblin.flow.deadline.timeunit";

Review Comment:
   looks like you renamed the variable name, but not the property (to 
`gobblin.flow.finish.deadline.*`)



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java:
##########
@@ -678,8 +457,8 @@ public static void main(String[] args) throws Exception {
   @SuppressWarnings("DLS_DEAD_LOCAL_STORE")
   private static void testGobblinService(GobblinServiceManager 
gobblinServiceManager) {
 
-    FlowConfigClient client =
-        new FlowConfigClient(String.format("http://localhost:%s/";, 
gobblinServiceManager.restliServer.getPort()));
+    try (FlowConfigV2Client client =

Review Comment:
   is the indentation off here - and also in the 
   ```
   catch (RemoteInvocationException | IOException e) {
   ```
   ?



##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java:
##########
@@ -1083,110 +281,46 @@ public void testQuotasRetryFlow() throws 
URISyntaxException, IOException {
         .thenReturn(jobStatusIteratorFlow1_0)
         .thenReturn(jobStatusIteratorFlow1_1)
         .thenReturn(jobStatusIteratorFlow1_2);
-    // Dag1 is running
-    this._dagManagerThread.run();
+
     SortedMap<String, Counter> allCounters = 
metricContext.getParent().get().getCounters();
     Assert.assertEquals(allCounters.get(MetricRegistry.name(
         ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
         ServiceMetricNames.SERVICE_USERS,
         "user")).getCount(), 1);
-    // Dag1 fails and is orchestrated again
-    this._dagManagerThread.run();
-    // Dag1 is running again
-    this._dagManagerThread.run();
+
     Assert.assertEquals(allCounters.get(MetricRegistry.name(
         ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
         ServiceMetricNames.SERVICE_USERS,
         "user")).getCount(), 1);
-    // Dag1 is marked as complete, should be able to run the next Dag without 
hitting the quota limit
-    this._dagManagerThread.run();
 
-    this.queue.offer(dagList.get(1));
-    this._dagManagerThread.run();
-    this._dagManagerThread.run(); // cleanup
     Assert.assertEquals(allCounters.get(MetricRegistry.name(
         ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
         ServiceMetricNames.SERVICE_USERS,
         "user")).getCount(), 0);
-    Assert.assertEquals(this.dags.size(), 0);
-    Assert.assertEquals(this.jobToDag.size(), 0);
-    Assert.assertEquals(this.dagToJobs.size(), 0);
   }
 
-  @Test (dependsOnMethods = "testQuotasRetryFlow")
-  public void testEmitFlowMetricOnlyIfNotAdhoc() throws URISyntaxException, 
IOException {
-
-    Long flowId = System.currentTimeMillis();
-    Dag<JobExecutionPlan> adhocDag = buildDag(String.valueOf(flowId), flowId, 
"FINISH_RUNNING", 1, "proxyUser",
-        
ConfigBuilder.create().addPrimitive(ConfigurationKeys.GOBBLIN_OUTPUT_JOB_LEVEL_METRICS,
 false).build());    //Add a dag to the queue of dags
-    this.queue.offer(adhocDag);
-
-    Iterator<JobStatus> jobStatusIteratorFlow0_0 =
-        getMockJobStatus("flow" + flowId, "group" + flowId, flowId, "job0", 
"group0", String.valueOf(ExecutionStatus.COMPLETE));
-    Iterator<JobStatus> jobStatusIteratorFlow0_1 =
-        getMockFlowStatus("flow" + flowId, "group" + flowId, flowId, 
String.valueOf(ExecutionStatus.COMPLETE));
-    Iterator<JobStatus> jobStatusIteratorFlow1_0 =
-        getMockJobStatus("flow" + flowId+1, "group" + flowId+1, flowId+1, 
"job0", "group0", String.valueOf(ExecutionStatus.COMPLETE));
-    Iterator<JobStatus> jobStatusIteratorFlow1_1 =
-        getMockFlowStatus("flow" + flowId+1, "group" + flowId+1, flowId+1, 
String.valueOf(ExecutionStatus.COMPLETE));
-
-    Mockito.when(_jobStatusRetriever
-        .getJobStatusesForFlowExecution(Mockito.eq("flow" + flowId), 
Mockito.eq("group" + flowId), Mockito.anyLong(),
-            Mockito.anyString(), Mockito.anyString()))
-        .thenReturn(jobStatusIteratorFlow0_0)
-        .thenReturn(jobStatusIteratorFlow0_1);
-    Mockito.when(_jobStatusRetriever
-        .getJobStatusesForFlowExecution(Mockito.eq("flow" + (flowId+1)), 
Mockito.eq("group" + (flowId+1)), Mockito.anyLong(),
-            Mockito.anyString(), Mockito.anyString()))
-        .thenReturn(jobStatusIteratorFlow1_0)
-        .thenReturn(jobStatusIteratorFlow1_1);
-
-    String flowStateGaugeName0 = 
MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, "group"+flowId,
-        "flow"+flowId, ServiceMetricNames.RUNNING_STATUS);
-    
Assert.assertNull(metricContext.getParent().get().getGauges().get(flowStateGaugeName0));
-
-    Dag<JobExecutionPlan> scheduledDag = buildDag(String.valueOf(flowId+1), 
flowId+1, "FINISH_RUNNING", 1, "proxyUser",
-        
ConfigBuilder.create().addPrimitive(ConfigurationKeys.GOBBLIN_OUTPUT_JOB_LEVEL_METRICS,
 true).build());
-    this.queue.offer(scheduledDag);
-    this._dagManagerThread.run();
-    String flowStateGaugeName1 = 
MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, 
"group"+(flowId+1),
-        "flow"+(flowId+1), ServiceMetricNames.RUNNING_STATUS);
-
-    
Assert.assertNotNull(metricContext.getParent().get().getGauges().get(flowStateGaugeName1));
-
-    // cleanup
-    this._dagManagerThread.run();
-    // should be successful since it should be cleaned up with status complete
-    
Assert.assertEquals(metricContext.getParent().get().getGauges().get(flowStateGaugeName1).getValue(),
 DagManager.FlowState.SUCCESSFUL.value);
-    Assert.assertEquals(this.dags.size(), 0);
-    Assert.assertEquals(this.jobToDag.size(), 0);
-    Assert.assertEquals(this.dagToJobs.size(), 0);
-  }
-
-  @Test (dependsOnMethods = "testEmitFlowMetricOnlyIfNotAdhoc")
-  public void testJobSlaKilledMetrics() throws URISyntaxException, IOException 
{
+  @Test (dependsOnMethods = "testEmitFlowMetricOnlyIfNotAdhoc", enabled = 
false)
+  // todo re-write for dag proc
+  public void testJobSlaKilledMetrics() throws URISyntaxException {

Review Comment:
   `testJobStartDeadlineKilledMetrics()`?



##########
gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java:
##########
@@ -203,12 +175,11 @@ public class ServiceConfigKeys {
   public static final int DEFAULT_MEMORY_ISSUE_REPO_MAX_ISSUE_PER_CONTEXT= 20;
 
   public static final String ISSUE_REPO_CLASS = GOBBLIN_SERVICE_PREFIX + 
"issueRepo.class";
+  public static final String QUOTA_MANAGER_PREFIX = "UserQuotaManagerPrefix.";
 
   public static final String GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_PREFIX = 
ServiceConfigKeys.GOBBLIN_SERVICE_PREFIX + "dagProcessingEngine.";
-  public static final String DAG_PROCESSING_ENGINE_ENABLED = 
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_PREFIX + "enabled";
   public static final String NUM_DAG_PROC_THREADS_KEY = 
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_PREFIX + "numThreads";
   public static final String DAG_PROC_ENGINE_NON_RETRYABLE_EXCEPTIONS_KEY = 
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_PREFIX + "nonRetryableExceptions";
-
   public static final Integer DEFAULT_NUM_DAG_PROC_THREADS = 3;
-  public static final String GOBBLIN_SERVICE_MULTI_ACTIVE_EXECUTION_ENABLED = 
GOBBLIN_SERVICE_PREFIX + "multiActiveExecutionEnabled";
+  public static final long DEFAULT_FLOW_DEADLINE_MILLIS = 
TimeUnit.HOURS.toMillis(24);

Review Comment:
   `DEFAULT_FLOW_FINISH_...`



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java:
##########
@@ -433,79 +310,22 @@ private void configureServices(){
 
     registerServicesInLauncher();
 
-    // Register Scheduler to listen to changes in Flows
-    // In warm standby mode, instead of scheduler we will add orchestrator as 
listener
-    if(configuration.isWarmStandbyEnabled()) {
-      this.flowCatalog.addListener(this.orchestrator);
-    } else if (configuration.isSchedulerEnabled()) {
-      this.flowCatalog.addListener(this.scheduler);
-    }
-  }
-
-  private void ensureInjected() {
-    if (v2ResourceHandler == null) {
-      throw new IllegalStateException("GobblinServiceManager should be 
constructed through Guice dependency injection "
-          + "or through a static factory method");
-    }
+    // Register orchestrator to listen to changes in Flows
+    this.flowCatalog.addListener(this.orchestrator);
   }
 
   @Override
   public void start() throws ApplicationException {
     LOGGER.info("[Init] Starting the Gobblin Service Manager");
 
-    ensureInjected();
-
     configureServices();
-
-    if (this.helixManager.isPresent()) {
-      connectHelixManager();
-    }
-
-    this.eventBus.register(this);
     this.serviceLauncher.start();
 
-    if (this.helixManager.isPresent()) {
-      // Subscribe to leadership changes
-      this.helixManager.get().addControllerListener((ControllerChangeListener) 
this::handleLeadershipChange);
-
-
-      // Update for first time since there might be no notification
-      if (helixManager.get().isLeader()) {
-        if (configuration.isSchedulerEnabled()) {
-          LOGGER.info("[Init] Gobblin Service is running in master instance 
mode, enabling Scheduler.");
-          this.scheduler.setActive(true);
-        }
-
-        if (configuration.isGitConfigMonitorEnabled()) {
-          this.gitConfigMonitor.setActive(true);
-        }
-
-        if (helixLeaderGauges.isPresent()) {
-          helixLeaderGauges.get().setState(LeaderState.MASTER);
-        }
+    LOGGER.info("[Init] Gobblin Service is running in master instance mode, 
enabling Scheduler.");

Review Comment:
   nit: "is running in multi-active mode..."



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java:
##########
@@ -413,16 +292,14 @@ private void registerServicesInLauncher(){
       this.serviceLauncher.addService(restliServer);
     }
 
-    if (this.configuration.isWarmStandbyEnabled()) {
-      this.serviceLauncher.addService(specStoreChangeMonitor);
-      this.serviceLauncher.addService(dagActionStoreChangeMonitor);
-    }
+    this.serviceLauncher.addService(specStoreChangeMonitor);
+    this.serviceLauncher.addService(_dagManagementDagActionStoreChangeMonitor);
   }
 
   private void configureServices(){
     if (configuration.isRestLIServerEnabled()) {
       this.restliServer = EmbeddedRestliServer.builder()
-          .resources(Lists.newArrayList(FlowConfigsResource.class, 
FlowConfigsV2Resource.class))
+          .resources(Lists.newArrayList(FlowConfigsV2Resource.class, 
FlowConfigsV2Resource.class))

Review Comment:
   did you mean to name this same resource twice?



##########
gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java:
##########
@@ -290,15 +323,15 @@ public void cleanUp() throws Exception {
    */
   @Test
   public void testGetClass() {
-    
Assert.assertTrue(GobblinServiceManager.getClass(FlowCompilationValidationHelper.class)
 instanceof FlowCompilationValidationHelper);
+    
Assert.assertNotNull(GobblinServiceManager.getClass(FlowCompilationValidationHelper.class));
     // Optionally bound config
-    Assert.assertTrue(GobblinServiceManager.getClass(FlowCatalog.class) 
instanceof FlowCatalog);
+    Assert.assertNotNull(GobblinServiceManager.getClass(FlowCatalog.class));
   }
 
   /**
    * To test an existing flow in a spec store does not get deleted just 
because it is not compilable during service restarts
    */
-  @Test
+  @Test (enabled = false)

Review Comment:
   please add a comment on why disabling



##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java:
##########
@@ -943,30 +162,19 @@ public void testDagManagerQuotaExceeded() throws 
URISyntaxException, IOException
         .thenReturn(jobStatusIteratorFlow1_0)
         .thenReturn(jobStatusIteratorFlow1_1);
 
-    this._dagManagerThread.run();
     // dag will not be processed due to exceeding the quota, will log a 
message and exit out without adding it to dags
-    this.queue.offer(dagList.get(1));
-    this._dagManagerThread.run();
     SortedMap<String, Counter> allCounters = 
metricContext.getParent().get().getCounters();
     Assert.assertEquals(allCounters.get(MetricRegistry.name(
         ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
         ServiceMetricNames.SERVICE_USERS,
         "user")).getCount(), 1);
-
-    this._dagManagerThread.run(); // cleanup
-
-    Assert.assertEquals(this.dags.size(), 0);
-    Assert.assertEquals(this.jobToDag.size(), 0);
-    Assert.assertEquals(this.dagToJobs.size(), 0);
   }
 
-  @Test (dependsOnMethods = "testDagManagerQuotaExceeded")
-  public void testQuotaDecrement() throws URISyntaxException, IOException {
-
-    List<Dag<JobExecutionPlan>> dagList = buildDagList(3, "user", 
ConfigFactory.empty());
+  @Test (dependsOnMethods = "testDagManagerQuotaExceeded", enabled = false)
+  // todo re-write for dag proc
+  public void testQuotaDecrement() throws URISyntaxException {
+    List<Dag<JobExecutionPlan>> dagList = DagTestUtils.buildDagList(3, "user", 
ConfigFactory.empty());

Review Comment:
   1. do these tests for the quota manager still belong in the `DagManagerTest` 
class?
   2. is there a more direct way to write these tests?  I'm not seeing which 
method of the quota mgr is invoked.  also the test assertions are based upon 
counters.



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java:
##########
@@ -534,17 +354,9 @@ public void start() throws ApplicationException {
     //Activate the SpecCompiler, after the topologyCatalog has been 
initialized.
     this.orchestrator.getSpecCompiler().setActive(true);
 
-    //Activate the DagManager service, after the topologyCatalog has been 
initialized.
-    if (!this.configuration.isDagProcessingEngineEnabled() && 
(!this.helixManager.isPresent() || this.helixManager.get().isLeader())){
-      this.dagManager.setActive(true);
-      this.eventBus.register(this.dagManager);
-    }
-
-    // Activate both monitors last as they're dependent on the SpecCompiler, 
Scheduler, and DagManager being active
-    if (configuration.isWarmStandbyEnabled()) {
-      this.specStoreChangeMonitor.setActive();
-      this.dagActionStoreChangeMonitor.setActive();
-    }
+    // Activate both monitors last as they're dependent on the SpecCompiler 
and Scheduler being active
+    this.specStoreChangeMonitor.setActive();
+    this._dagManagementDagActionStoreChangeMonitor.setActive();

Review Comment:
   nit: why does the second one have a leading `_` in its name?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java:
##########
@@ -156,14 +143,14 @@ public DagTask next() {
 
   private void createJobStartDeadlineTrigger(DagActionStore.LeaseParams 
leaseParams)
       throws SchedulerException, IOException {
-    long timeOutForJobStart = 
DagManagerUtils.getJobStartSla(this.dagManagementStateStore.getDag(
-        leaseParams.getDagAction().getDagId()).get().getNodes().get(0), 
DagProcessingEngine.getDefaultJobStartSlaTimeMillis());
+    long timeOutForJobStart = 
DagUtils.getJobStartDeadline(this.dagManagementStateStore.getDag(
+        leaseParams.getDagAction().getDagId()).get().getNodes().get(0), 
DagProcessingEngine.getDefaultJobStartDeadlineTimeMillis());

Review Comment:
   should this handle "invalid format" / `ConfigException`, like is done for 
the flow finish deadline (below)?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java:
##########
@@ -609,15 +388,15 @@ public Collection<StandardMetrics> 
getStandardMetricsCollection() {
     return ImmutableList.of(this.metrics);
   }
 
-  private class Metrics extends StandardMetrics {
+  private static class Metrics extends StandardMetrics {
     public static final String SERVICE_LEADERSHIP_CHANGE = 
"serviceLeadershipChange";
-    private ContextAwareHistogram serviceLeadershipChange;
 
     public Metrics(final MetricContext metricContext, Config config) {
       int timeWindowSizeInMinutes = ConfigUtils.getInt(config, 
ConfigurationKeys.METRIC_TIMER_WINDOW_SIZE_IN_MINUTES,
           ConfigurationKeys.DEFAULT_METRIC_TIMER_WINDOW_SIZE_IN_MINUTES);
-      this.serviceLeadershipChange = 
metricContext.contextAwareHistogram(SERVICE_LEADERSHIP_CHANGE, 
timeWindowSizeInMinutes, TimeUnit.MINUTES);
-      this.contextAwareMetrics.add(this.serviceLeadershipChange);
+      ContextAwareHistogram serviceLeadershipChange =
+          metricContext.contextAwareHistogram(SERVICE_LEADERSHIP_CHANGE, 
timeWindowSizeInMinutes, TimeUnit.MINUTES);
+      this.contextAwareMetrics.add(serviceLeadershipChange);

Review Comment:
   does this metric still make sense?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java:
##########
@@ -215,61 +182,15 @@ public void orchestrate(Spec spec, Properties jobProps, 
long triggerTimestampMil
       String flowName = flowConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
 
       sharedFlowMetricsSingleton.addFlowGauge(spec, flowConfig, flowGroup, 
flowName);
-      /* Only compile and pass directly to `DagManager` when multi-active 
scheduler NOT enabled; otherwise
-      recompilation to occur later, once `DagActionStoreChangeMonitor` 
subsequently delegates this
-      `DagActionType.LAUNCH`
-       */
-      if (flowLaunchHandler.isPresent()) {
-        DagActionStore.DagAction launchDagAction = 
DagActionStore.DagAction.forFlow(
-            flowGroup,
-            flowName,
-            FlowUtils.getOrCreateFlowExecutionId(flowSpec),
-            DagActionStore.DagActionType.LAUNCH);
-        DagActionStore.LeaseParams
-            leaseObject = new DagActionStore.LeaseParams(launchDagAction, 
isReminderEvent,
-            triggerTimestampMillis);
-        // `flowSpec.isScheduled()` ==> adopt consensus `flowExecutionId` as 
clock drift safeguard, yet w/o disrupting API-layer's ad hoc ID assignment
-        flowLaunchHandler.get().handleFlowLaunchTriggerEvent(jobProps, 
leaseObject, flowSpec.isScheduled());
-        _log.info("Multi-active scheduler finished handling trigger event: 
[{}, is: {}, triggerEventTimestamp: {}]",
-            launchDagAction, isReminderEvent ? "reminder" : "original", 
triggerTimestampMillis);
-      } else {
-        try {
-          TimingEvent flowCompilationTimer =
-              new TimingEvent(this.eventSubmitter, 
TimingEvent.FlowTimings.FLOW_COMPILED);
-          Map<String, String> flowMetadata = 
TimingEventUtils.getFlowMetadata(flowSpec);
-          Optional<Dag<JobExecutionPlan>> compiledDagOptional =
-              
this.flowCompilationValidationHelper.validateAndHandleConcurrentExecution(flowConfig,
 flowSpec, flowGroup,
-                  flowName, flowMetadata);
-
-          if (!compiledDagOptional.isPresent()) {
-            Instrumented.markMeter(this.flowOrchestrationFailedMeter);
-            return;
-          }
-          Dag<JobExecutionPlan> compiledDag = compiledDagOptional.get();
-          if (compiledDag.isEmpty()) {
-            
FlowCompilationValidationHelper.populateFlowCompilationFailedEventMessage(eventSubmitter,
 flowSpec,
-                flowMetadata);
-            Instrumented.markMeter(this.flowOrchestrationFailedMeter);
-            
sharedFlowMetricsSingleton.conditionallyUpdateFlowGaugeSpecState(spec,
-                SharedFlowMetricsSingleton.CompiledState.FAILED);
-            _log.warn("Cannot determine an executor to run on for Spec: " + 
spec);
-            return;
-          }
-          
sharedFlowMetricsSingleton.conditionallyUpdateFlowGaugeSpecState(spec,
-              SharedFlowMetricsSingleton.CompiledState.SUCCESSFUL);
-
-          
FlowCompilationValidationHelper.addFlowExecutionIdIfAbsent(flowMetadata, 
compiledDag);
-          flowCompilationTimer.stop(flowMetadata);
-
-          // Depending on if DagManager is present, handle execution
-          submitFlowToDagManager(flowSpec, compiledDag);
-        } finally {
-          /* Remove adhoc flow spec from the flow catalog, regardless of 
whether the flow was successfully validated
-          and permitted to exec (concurrently)
-           */
-          this.dagManager.removeFlowSpecIfAdhoc(flowSpec);
-        }
-      }
+      DagActionStore.DagAction launchDagAction = 
DagActionStore.DagAction.forFlow(flowGroup, flowName,
+          FlowUtils.getOrCreateFlowExecutionId(flowSpec), 
DagActionStore.DagActionType.LAUNCH);
+      DagActionStore.LeaseParams
+          leaseObject = new DagActionStore.LeaseParams(launchDagAction, 
isReminderEvent,
+          triggerTimestampMillis);

Review Comment:
   nit: don't leave the type on a line by itself, but also put the 
`leaseObject` variable name there too



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java:
##########
@@ -85,64 +77,39 @@ public class Orchestrator implements SpecCatalogListener, 
Instrumentable {
   protected final Logger _log;
   protected final SpecCompiler specCompiler;
   protected final TopologyCatalog topologyCatalog;
-  protected final DagManager dagManager;
   private final JobStatusRetriever jobStatusRetriever;
 
   protected final MetricContext metricContext;
 
   protected final EventSubmitter eventSubmitter;
-  private final boolean isFlowConcurrencyEnabled;
   @Getter
   private Meter flowOrchestrationSuccessFulMeter;
   @Getter
   private Meter flowOrchestrationFailedMeter;
   @Getter
   private Timer flowOrchestrationTimer;
-  private Counter flowFailedForwardToDagManagerCounter;
-  @Setter
-  private FlowStatusGenerator flowStatusGenerator;
-
-  private UserQuotaManager quotaManager;
-  private final FlowCompilationValidationHelper 
flowCompilationValidationHelper;
-  private Optional<FlowLaunchHandler> flowLaunchHandler;
-  private Optional<FlowCatalog> flowCatalog;
+  private final FlowLaunchHandler flowLaunchHandler;
   @Getter
   private final SharedFlowMetricsSingleton sharedFlowMetricsSingleton;
 
   @Inject
-  public Orchestrator(Config config, TopologyCatalog topologyCatalog, 
DagManager dagManager,
-      Optional<Logger> log, FlowStatusGenerator flowStatusGenerator, 
Optional<FlowLaunchHandler> flowLaunchHandler,
-      SharedFlowMetricsSingleton sharedFlowMetricsSingleton, 
Optional<FlowCatalog> flowCatalog,
-      Optional<DagManagementStateStore> dagManagementStateStore,
+  public Orchestrator(Config config, TopologyCatalog topologyCatalog, 
Optional<Logger> log, FlowLaunchHandler flowLaunchHandler,
+      SharedFlowMetricsSingleton sharedFlowMetricsSingleton, 
DagManagementStateStore dagManagementStateStore,
       FlowCompilationValidationHelper flowCompilationValidationHelper, 
JobStatusRetriever jobStatusRetriever) throws IOException {
     _log = log.isPresent() ? log.get() : LoggerFactory.getLogger(getClass());
     this.topologyCatalog = topologyCatalog;
-    this.dagManager = dagManager;
-    this.flowStatusGenerator = flowStatusGenerator;
     this.flowLaunchHandler = flowLaunchHandler;
     this.sharedFlowMetricsSingleton = sharedFlowMetricsSingleton;
-    this.flowCatalog = flowCatalog;
     this.jobStatusRetriever = jobStatusRetriever;
-    this.flowCompilationValidationHelper = flowCompilationValidationHelper;
     this.specCompiler = flowCompilationValidationHelper.getSpecCompiler();
-    //At this point, the TopologySpecMap is initialized by the SpecCompiler. 
Pass the TopologySpecMap to the DagManager.
-    this.dagManager.setTopologySpecMap(getSpecCompiler().getTopologySpecMap());
-    if (dagManagementStateStore.isPresent()) {
-      ((MySqlDagManagementStateStore) 
dagManagementStateStore.get()).setTopologySpecMap(getSpecCompiler().getTopologySpecMap());
-    }
+    //At this point, the TopologySpecMap is initialized by the SpecCompiler. 
Pass the TopologySpecMap to the DagManagementStateStore.
+    ((MySqlDagManagementStateStore) 
dagManagementStateStore).setTopologySpecMap(getSpecCompiler().getTopologySpecMap());

Review Comment:
   seeing this cast makes me wonder:
   a. does `setTopologySpecMap` belong in the `DagManagementStateStore` 
interface?
   b. OR, perhaps even better, should it be a ctor param to the 
`MysqlDagManagementStateStore`?  I realize there are timing interdependencies 
to coordinate, but couldn't those be handled before 
`MysqlDagManagementStateStore::start` is to be called?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java:
##########
@@ -50,16 +50,15 @@
 
 
 /**
- * Helper class with functionality meant to be re-used between the DagManager 
and Orchestrator when launching
+ * Helper class with functionality meant to be re-used between the 
LaunchDagProc and Orchestrator when launching
  * executions of a flow spec. In the common case, the Orchestrator receives a 
flow to orchestrate, performs necessary
- * validations, and forwards the execution responsibility to the DagManager. 
The DagManager's responsibility is to
- * carry out any flow action requests. However, with launch executions now 
being stored in the DagActionStateStore, on
- * restart or leadership change the DagManager has to perform validations 
before executing any launch actions the
- * previous leader was unable to complete. Rather than duplicating the code or 
introducing a circular dependency between
- * the DagManager and Orchestrator, this class is utilized to store the common 
functionality. It is stateful,
- * requiring all stateful pieces to be passed as input from the caller upon 
instantiating the helper.
- * Note: We expect further refactoring to be done to the DagManager in later 
stage of multi-active development, so we do
- * not attempt major reorganization as abstractions may change.
+ * validations, and creates {@link 
org.apache.gobblin.service.modules.orchestration.DagActionStore.DagAction}s 
which the
+ * {@link 
org.apache.gobblin.service.modules.orchestration.DagProcessingEngine} is 
responsible for processing.
+ * However, with launch executions now being stored in the 
DagActionStateStore, on restart, the LaunchDagProc has to
+ * perform validations before executing any launch actions the previous 
LaunchDagProc was unable to complete.
+ * Rather than duplicating the code or introducing a circular dependency 
between the LaunchDagProc and Orchestrator,
+ * this class is utilized to store the common functionality. It is stateful, 
requiring all stateful pieces to be passed
+ * as input from the caller upon instantiating the helper.

Review Comment:
   actually, should it be, "It is **stateless**, requiring all stateful pieces 
to be passed as ..."?



##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java:
##########
@@ -344,10 +309,10 @@ public void createFlowSpec() throws Throwable {
     }
 
     // Make sure FlowCatalog has the added Flow
-    Assert.assertTrue(specs.size() == 1, "Spec store should contain 1 Spec 
after addition");
+    Assert.assertEquals(specs.size(), 1, "Spec store should contain 1 Spec 
after addition");
     // Orchestrator is a no-op listener for any new FlowSpecs
-    
Assert.assertTrue(((List)(sei.getProducer().get().listSpecs().get())).size() == 
0, "SpecProducer should contain 0 "
-        + "Spec after addition");
+    Assert.assertEquals(sei.getProducer().get().listSpecs().get().size(), 0,
+        "SpecProducer should contain 0 " + "Spec after addition");

Review Comment:
   nit: they're all on the same line, so no need to append the two halves w/ `+`



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java:
##########
@@ -158,7 +138,7 @@ public class GobblinServiceJobScheduler extends 
JobScheduler implements SpecCata
    * e.g. There are multi-datacenter deployment of GaaS Cluster. 
Intra-datacenter fail-over could be handled by
    * leadership change mechanism, while inter-datacenter fail-over would be 
handled by DR handling mechanism.
    */
-  private boolean isNominatedDRHandler;
+  private final boolean isNominatedDRHandler;

Review Comment:
   I'm not familiar w/ multi-datacenter deployment... does that play well w/ 
multi-active?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagManagementDagActionStoreChangeMonitor.java:
##########
@@ -18,58 +18,232 @@
 package org.apache.gobblin.service.monitoring;
 
 import java.io.IOException;
+import java.util.Collection;
+import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.jetbrains.annotations.NotNull;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
 import com.typesafe.config.Config;
+import com.typesafe.config.ConfigValueFactory;
 
+import lombok.AllArgsConstructor;
+import lombok.Data;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 
-import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.kafka.client.DecodeableKafkaRecord;
+import org.apache.gobblin.metrics.ContextAwareGauge;
+import org.apache.gobblin.metrics.ContextAwareMeter;
+import org.apache.gobblin.runtime.kafka.HighLevelConsumer;
+import org.apache.gobblin.runtime.metrics.RuntimeMetrics;
 import 
org.apache.gobblin.service.modules.orchestration.DagActionReminderScheduler;
 import org.apache.gobblin.service.modules.orchestration.DagActionStore;
 import org.apache.gobblin.service.modules.orchestration.DagManagement;
 import 
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
-import org.apache.gobblin.service.modules.orchestration.Orchestrator;
 import 
org.apache.gobblin.service.modules.orchestration.task.DagProcessingEngineMetrics;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.ExecutorsUtils;
 
 
 /**
- * A {@link DagActionStoreChangeMonitor} that should be used {@link 
org.apache.gobblin.service.ServiceConfigKeys#DAG_PROCESSING_ENGINE_ENABLED}
- * is set.
+ * A DagActionStore change monitor that uses {@link DagActionStoreChangeEvent} 
schema to process Kafka messages received
+ * from its corresponding consumer client. This monitor responds to requests 
to resume or delete a flow and acts as a
+ * connector between the API and execution layers of GaaS.
  */
 @Slf4j
-public class DagManagementDagActionStoreChangeMonitor extends 
DagActionStoreChangeMonitor {
-  private final DagManagement dagManagement;
-  @VisibleForTesting @Getter
-  private final DagActionReminderScheduler dagActionReminderScheduler;
+public class DagManagementDagActionStoreChangeMonitor extends 
HighLevelConsumer<String, DagActionStoreChangeEvent> {

Review Comment:
   my impression from our discussion 
[here](https://github.com/apache/gobblin/pull/4031#discussion_r1737677287) was 
to defer on refactoring this class any more than necessary (e.g. to leave it 
with its same base class for now)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to