[ 
https://issues.apache.org/jira/browse/GOBBLIN-2136?focusedWorklogId=933266&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-933266
 ]

ASF GitHub Bot logged work on GOBBLIN-2136:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 04/Sep/24 23:00
            Start Date: 04/Sep/24 23:00
    Worklog Time Spent: 10m 
      Work Description: phet commented on code in PR #4031:
URL: https://github.com/apache/gobblin/pull/4031#discussion_r1744527436


##########
gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java:
##########
@@ -1071,13 +1071,13 @@ public class ConfigurationKeys {
    * Configuration properties related to Flows
    */
   public static final String FLOW_RUN_IMMEDIATELY = "flow.runImmediately";
-  public static final String GOBBLIN_FLOW_SLA_TIME = "gobblin.flow.sla.time";
-  public static final String GOBBLIN_FLOW_SLA_TIME_UNIT = 
"gobblin.flow.sla.timeunit";
-  public static final String DEFAULT_GOBBLIN_FLOW_SLA_TIME_UNIT = 
TimeUnit.MINUTES.name();
-  public static final String GOBBLIN_JOB_START_SLA_TIME = 
"gobblin.job.start.sla.time";
-  public static final String GOBBLIN_JOB_START_SLA_TIME_UNIT = 
"gobblin.job.start.sla.timeunit";
-  public static final long FALLBACK_GOBBLIN_JOB_START_SLA_TIME = 10L;
-  public static final String FALLBACK_GOBBLIN_JOB_START_SLA_TIME_UNIT = 
TimeUnit.MINUTES.name();
+  public static final String GOBBLIN_FLOW_FINSIH_DEADLINE_TIME = 
"gobblin.flow.deadline.time";
+  public static final String GOBBLIN_FLOW_FINISH_DEADLINE_TIME_UNIT = 
"gobblin.flow.deadline.timeunit";

Review Comment:
   looks like you renamed the variable name, but not the property (to 
`gobblin.flow.finish.deadline.*`)



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java:
##########
@@ -678,8 +457,8 @@ public static void main(String[] args) throws Exception {
   @SuppressWarnings("DLS_DEAD_LOCAL_STORE")
   private static void testGobblinService(GobblinServiceManager 
gobblinServiceManager) {
 
-    FlowConfigClient client =
-        new FlowConfigClient(String.format("http://localhost:%s/";, 
gobblinServiceManager.restliServer.getPort()));
+    try (FlowConfigV2Client client =

Review Comment:
   is the indentation off here - and also in the 
   ```
   catch (RemoteInvocationException | IOException e) {
   ```
   ?



##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java:
##########
@@ -1083,110 +281,46 @@ public void testQuotasRetryFlow() throws 
URISyntaxException, IOException {
         .thenReturn(jobStatusIteratorFlow1_0)
         .thenReturn(jobStatusIteratorFlow1_1)
         .thenReturn(jobStatusIteratorFlow1_2);
-    // Dag1 is running
-    this._dagManagerThread.run();
+
     SortedMap<String, Counter> allCounters = 
metricContext.getParent().get().getCounters();
     Assert.assertEquals(allCounters.get(MetricRegistry.name(
         ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
         ServiceMetricNames.SERVICE_USERS,
         "user")).getCount(), 1);
-    // Dag1 fails and is orchestrated again
-    this._dagManagerThread.run();
-    // Dag1 is running again
-    this._dagManagerThread.run();
+
     Assert.assertEquals(allCounters.get(MetricRegistry.name(
         ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
         ServiceMetricNames.SERVICE_USERS,
         "user")).getCount(), 1);
-    // Dag1 is marked as complete, should be able to run the next Dag without 
hitting the quota limit
-    this._dagManagerThread.run();
 
-    this.queue.offer(dagList.get(1));
-    this._dagManagerThread.run();
-    this._dagManagerThread.run(); // cleanup
     Assert.assertEquals(allCounters.get(MetricRegistry.name(
         ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
         ServiceMetricNames.SERVICE_USERS,
         "user")).getCount(), 0);
-    Assert.assertEquals(this.dags.size(), 0);
-    Assert.assertEquals(this.jobToDag.size(), 0);
-    Assert.assertEquals(this.dagToJobs.size(), 0);
   }
 
-  @Test (dependsOnMethods = "testQuotasRetryFlow")
-  public void testEmitFlowMetricOnlyIfNotAdhoc() throws URISyntaxException, 
IOException {
-
-    Long flowId = System.currentTimeMillis();
-    Dag<JobExecutionPlan> adhocDag = buildDag(String.valueOf(flowId), flowId, 
"FINISH_RUNNING", 1, "proxyUser",
-        
ConfigBuilder.create().addPrimitive(ConfigurationKeys.GOBBLIN_OUTPUT_JOB_LEVEL_METRICS,
 false).build());    //Add a dag to the queue of dags
-    this.queue.offer(adhocDag);
-
-    Iterator<JobStatus> jobStatusIteratorFlow0_0 =
-        getMockJobStatus("flow" + flowId, "group" + flowId, flowId, "job0", 
"group0", String.valueOf(ExecutionStatus.COMPLETE));
-    Iterator<JobStatus> jobStatusIteratorFlow0_1 =
-        getMockFlowStatus("flow" + flowId, "group" + flowId, flowId, 
String.valueOf(ExecutionStatus.COMPLETE));
-    Iterator<JobStatus> jobStatusIteratorFlow1_0 =
-        getMockJobStatus("flow" + flowId+1, "group" + flowId+1, flowId+1, 
"job0", "group0", String.valueOf(ExecutionStatus.COMPLETE));
-    Iterator<JobStatus> jobStatusIteratorFlow1_1 =
-        getMockFlowStatus("flow" + flowId+1, "group" + flowId+1, flowId+1, 
String.valueOf(ExecutionStatus.COMPLETE));
-
-    Mockito.when(_jobStatusRetriever
-        .getJobStatusesForFlowExecution(Mockito.eq("flow" + flowId), 
Mockito.eq("group" + flowId), Mockito.anyLong(),
-            Mockito.anyString(), Mockito.anyString()))
-        .thenReturn(jobStatusIteratorFlow0_0)
-        .thenReturn(jobStatusIteratorFlow0_1);
-    Mockito.when(_jobStatusRetriever
-        .getJobStatusesForFlowExecution(Mockito.eq("flow" + (flowId+1)), 
Mockito.eq("group" + (flowId+1)), Mockito.anyLong(),
-            Mockito.anyString(), Mockito.anyString()))
-        .thenReturn(jobStatusIteratorFlow1_0)
-        .thenReturn(jobStatusIteratorFlow1_1);
-
-    String flowStateGaugeName0 = 
MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, "group"+flowId,
-        "flow"+flowId, ServiceMetricNames.RUNNING_STATUS);
-    
Assert.assertNull(metricContext.getParent().get().getGauges().get(flowStateGaugeName0));
-
-    Dag<JobExecutionPlan> scheduledDag = buildDag(String.valueOf(flowId+1), 
flowId+1, "FINISH_RUNNING", 1, "proxyUser",
-        
ConfigBuilder.create().addPrimitive(ConfigurationKeys.GOBBLIN_OUTPUT_JOB_LEVEL_METRICS,
 true).build());
-    this.queue.offer(scheduledDag);
-    this._dagManagerThread.run();
-    String flowStateGaugeName1 = 
MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, 
"group"+(flowId+1),
-        "flow"+(flowId+1), ServiceMetricNames.RUNNING_STATUS);
-
-    
Assert.assertNotNull(metricContext.getParent().get().getGauges().get(flowStateGaugeName1));
-
-    // cleanup
-    this._dagManagerThread.run();
-    // should be successful since it should be cleaned up with status complete
-    
Assert.assertEquals(metricContext.getParent().get().getGauges().get(flowStateGaugeName1).getValue(),
 DagManager.FlowState.SUCCESSFUL.value);
-    Assert.assertEquals(this.dags.size(), 0);
-    Assert.assertEquals(this.jobToDag.size(), 0);
-    Assert.assertEquals(this.dagToJobs.size(), 0);
-  }
-
-  @Test (dependsOnMethods = "testEmitFlowMetricOnlyIfNotAdhoc")
-  public void testJobSlaKilledMetrics() throws URISyntaxException, IOException 
{
+  @Test (dependsOnMethods = "testEmitFlowMetricOnlyIfNotAdhoc", enabled = 
false)
+  // todo re-write for dag proc
+  public void testJobSlaKilledMetrics() throws URISyntaxException {

Review Comment:
   `testJobStartDeadlineKilledMetrics()`?



##########
gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java:
##########
@@ -203,12 +175,11 @@ public class ServiceConfigKeys {
   public static final int DEFAULT_MEMORY_ISSUE_REPO_MAX_ISSUE_PER_CONTEXT= 20;
 
   public static final String ISSUE_REPO_CLASS = GOBBLIN_SERVICE_PREFIX + 
"issueRepo.class";
+  public static final String QUOTA_MANAGER_PREFIX = "UserQuotaManagerPrefix.";
 
   public static final String GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_PREFIX = 
ServiceConfigKeys.GOBBLIN_SERVICE_PREFIX + "dagProcessingEngine.";
-  public static final String DAG_PROCESSING_ENGINE_ENABLED = 
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_PREFIX + "enabled";
   public static final String NUM_DAG_PROC_THREADS_KEY = 
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_PREFIX + "numThreads";
   public static final String DAG_PROC_ENGINE_NON_RETRYABLE_EXCEPTIONS_KEY = 
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_PREFIX + "nonRetryableExceptions";
-
   public static final Integer DEFAULT_NUM_DAG_PROC_THREADS = 3;
-  public static final String GOBBLIN_SERVICE_MULTI_ACTIVE_EXECUTION_ENABLED = 
GOBBLIN_SERVICE_PREFIX + "multiActiveExecutionEnabled";
+  public static final long DEFAULT_FLOW_DEADLINE_MILLIS = 
TimeUnit.HOURS.toMillis(24);

Review Comment:
   `DEFAULT_FLOW_FINISH_...`



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java:
##########
@@ -433,79 +310,22 @@ private void configureServices(){
 
     registerServicesInLauncher();
 
-    // Register Scheduler to listen to changes in Flows
-    // In warm standby mode, instead of scheduler we will add orchestrator as 
listener
-    if(configuration.isWarmStandbyEnabled()) {
-      this.flowCatalog.addListener(this.orchestrator);
-    } else if (configuration.isSchedulerEnabled()) {
-      this.flowCatalog.addListener(this.scheduler);
-    }
-  }
-
-  private void ensureInjected() {
-    if (v2ResourceHandler == null) {
-      throw new IllegalStateException("GobblinServiceManager should be 
constructed through Guice dependency injection "
-          + "or through a static factory method");
-    }
+    // Register orchestrator to listen to changes in Flows
+    this.flowCatalog.addListener(this.orchestrator);
   }
 
   @Override
   public void start() throws ApplicationException {
     LOGGER.info("[Init] Starting the Gobblin Service Manager");
 
-    ensureInjected();
-
     configureServices();
-
-    if (this.helixManager.isPresent()) {
-      connectHelixManager();
-    }
-
-    this.eventBus.register(this);
     this.serviceLauncher.start();
 
-    if (this.helixManager.isPresent()) {
-      // Subscribe to leadership changes
-      this.helixManager.get().addControllerListener((ControllerChangeListener) 
this::handleLeadershipChange);
-
-
-      // Update for first time since there might be no notification
-      if (helixManager.get().isLeader()) {
-        if (configuration.isSchedulerEnabled()) {
-          LOGGER.info("[Init] Gobblin Service is running in master instance 
mode, enabling Scheduler.");
-          this.scheduler.setActive(true);
-        }
-
-        if (configuration.isGitConfigMonitorEnabled()) {
-          this.gitConfigMonitor.setActive(true);
-        }
-
-        if (helixLeaderGauges.isPresent()) {
-          helixLeaderGauges.get().setState(LeaderState.MASTER);
-        }
+    LOGGER.info("[Init] Gobblin Service is running in master instance mode, 
enabling Scheduler.");

Review Comment:
   nit: "is running in multi-active mode..."



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java:
##########
@@ -413,16 +292,14 @@ private void registerServicesInLauncher(){
       this.serviceLauncher.addService(restliServer);
     }
 
-    if (this.configuration.isWarmStandbyEnabled()) {
-      this.serviceLauncher.addService(specStoreChangeMonitor);
-      this.serviceLauncher.addService(dagActionStoreChangeMonitor);
-    }
+    this.serviceLauncher.addService(specStoreChangeMonitor);
+    this.serviceLauncher.addService(_dagManagementDagActionStoreChangeMonitor);
   }
 
   private void configureServices(){
     if (configuration.isRestLIServerEnabled()) {
       this.restliServer = EmbeddedRestliServer.builder()
-          .resources(Lists.newArrayList(FlowConfigsResource.class, 
FlowConfigsV2Resource.class))
+          .resources(Lists.newArrayList(FlowConfigsV2Resource.class, 
FlowConfigsV2Resource.class))

Review Comment:
   did you mean to name this same resource twice?



##########
gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java:
##########
@@ -290,15 +323,15 @@ public void cleanUp() throws Exception {
    */
   @Test
   public void testGetClass() {
-    
Assert.assertTrue(GobblinServiceManager.getClass(FlowCompilationValidationHelper.class)
 instanceof FlowCompilationValidationHelper);
+    
Assert.assertNotNull(GobblinServiceManager.getClass(FlowCompilationValidationHelper.class));
     // Optionally bound config
-    Assert.assertTrue(GobblinServiceManager.getClass(FlowCatalog.class) 
instanceof FlowCatalog);
+    Assert.assertNotNull(GobblinServiceManager.getClass(FlowCatalog.class));
   }
 
   /**
    * To test an existing flow in a spec store does not get deleted just 
because it is not compilable during service restarts
    */
-  @Test
+  @Test (enabled = false)

Review Comment:
   please add a comment on why disabling



##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java:
##########
@@ -943,30 +162,19 @@ public void testDagManagerQuotaExceeded() throws 
URISyntaxException, IOException
         .thenReturn(jobStatusIteratorFlow1_0)
         .thenReturn(jobStatusIteratorFlow1_1);
 
-    this._dagManagerThread.run();
     // dag will not be processed due to exceeding the quota, will log a 
message and exit out without adding it to dags
-    this.queue.offer(dagList.get(1));
-    this._dagManagerThread.run();
     SortedMap<String, Counter> allCounters = 
metricContext.getParent().get().getCounters();
     Assert.assertEquals(allCounters.get(MetricRegistry.name(
         ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
         ServiceMetricNames.SERVICE_USERS,
         "user")).getCount(), 1);
-
-    this._dagManagerThread.run(); // cleanup
-
-    Assert.assertEquals(this.dags.size(), 0);
-    Assert.assertEquals(this.jobToDag.size(), 0);
-    Assert.assertEquals(this.dagToJobs.size(), 0);
   }
 
-  @Test (dependsOnMethods = "testDagManagerQuotaExceeded")
-  public void testQuotaDecrement() throws URISyntaxException, IOException {
-
-    List<Dag<JobExecutionPlan>> dagList = buildDagList(3, "user", 
ConfigFactory.empty());
+  @Test (dependsOnMethods = "testDagManagerQuotaExceeded", enabled = false)
+  // todo re-write for dag proc
+  public void testQuotaDecrement() throws URISyntaxException {
+    List<Dag<JobExecutionPlan>> dagList = DagTestUtils.buildDagList(3, "user", 
ConfigFactory.empty());

Review Comment:
   1. do these tests for the quota manager still belong in the `DagManagerTest` 
class?
   2. is there a more direct way to write these tests?  I'm not seeing which 
method of the quota mgr is invoked.  also the test assertions are based upon 
counters.



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java:
##########
@@ -534,17 +354,9 @@ public void start() throws ApplicationException {
     //Activate the SpecCompiler, after the topologyCatalog has been 
initialized.
     this.orchestrator.getSpecCompiler().setActive(true);
 
-    //Activate the DagManager service, after the topologyCatalog has been 
initialized.
-    if (!this.configuration.isDagProcessingEngineEnabled() && 
(!this.helixManager.isPresent() || this.helixManager.get().isLeader())){
-      this.dagManager.setActive(true);
-      this.eventBus.register(this.dagManager);
-    }
-
-    // Activate both monitors last as they're dependent on the SpecCompiler, 
Scheduler, and DagManager being active
-    if (configuration.isWarmStandbyEnabled()) {
-      this.specStoreChangeMonitor.setActive();
-      this.dagActionStoreChangeMonitor.setActive();
-    }
+    // Activate both monitors last as they're dependent on the SpecCompiler 
and Scheduler being active
+    this.specStoreChangeMonitor.setActive();
+    this._dagManagementDagActionStoreChangeMonitor.setActive();

Review Comment:
   nit: why does the second one have a leading `_` in its name?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java:
##########
@@ -156,14 +143,14 @@ public DagTask next() {
 
   private void createJobStartDeadlineTrigger(DagActionStore.LeaseParams 
leaseParams)
       throws SchedulerException, IOException {
-    long timeOutForJobStart = 
DagManagerUtils.getJobStartSla(this.dagManagementStateStore.getDag(
-        leaseParams.getDagAction().getDagId()).get().getNodes().get(0), 
DagProcessingEngine.getDefaultJobStartSlaTimeMillis());
+    long timeOutForJobStart = 
DagUtils.getJobStartDeadline(this.dagManagementStateStore.getDag(
+        leaseParams.getDagAction().getDagId()).get().getNodes().get(0), 
DagProcessingEngine.getDefaultJobStartDeadlineTimeMillis());

Review Comment:
   should this handle "invalid format" / `ConfigException`, like is done for 
the flow finish deadline (below)?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java:
##########
@@ -609,15 +388,15 @@ public Collection<StandardMetrics> 
getStandardMetricsCollection() {
     return ImmutableList.of(this.metrics);
   }
 
-  private class Metrics extends StandardMetrics {
+  private static class Metrics extends StandardMetrics {
     public static final String SERVICE_LEADERSHIP_CHANGE = 
"serviceLeadershipChange";
-    private ContextAwareHistogram serviceLeadershipChange;
 
     public Metrics(final MetricContext metricContext, Config config) {
       int timeWindowSizeInMinutes = ConfigUtils.getInt(config, 
ConfigurationKeys.METRIC_TIMER_WINDOW_SIZE_IN_MINUTES,
           ConfigurationKeys.DEFAULT_METRIC_TIMER_WINDOW_SIZE_IN_MINUTES);
-      this.serviceLeadershipChange = 
metricContext.contextAwareHistogram(SERVICE_LEADERSHIP_CHANGE, 
timeWindowSizeInMinutes, TimeUnit.MINUTES);
-      this.contextAwareMetrics.add(this.serviceLeadershipChange);
+      ContextAwareHistogram serviceLeadershipChange =
+          metricContext.contextAwareHistogram(SERVICE_LEADERSHIP_CHANGE, 
timeWindowSizeInMinutes, TimeUnit.MINUTES);
+      this.contextAwareMetrics.add(serviceLeadershipChange);

Review Comment:
   does this metric still make sense?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java:
##########
@@ -215,61 +182,15 @@ public void orchestrate(Spec spec, Properties jobProps, 
long triggerTimestampMil
       String flowName = flowConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
 
       sharedFlowMetricsSingleton.addFlowGauge(spec, flowConfig, flowGroup, 
flowName);
-      /* Only compile and pass directly to `DagManager` when multi-active 
scheduler NOT enabled; otherwise
-      recompilation to occur later, once `DagActionStoreChangeMonitor` 
subsequently delegates this
-      `DagActionType.LAUNCH`
-       */
-      if (flowLaunchHandler.isPresent()) {
-        DagActionStore.DagAction launchDagAction = 
DagActionStore.DagAction.forFlow(
-            flowGroup,
-            flowName,
-            FlowUtils.getOrCreateFlowExecutionId(flowSpec),
-            DagActionStore.DagActionType.LAUNCH);
-        DagActionStore.LeaseParams
-            leaseObject = new DagActionStore.LeaseParams(launchDagAction, 
isReminderEvent,
-            triggerTimestampMillis);
-        // `flowSpec.isScheduled()` ==> adopt consensus `flowExecutionId` as 
clock drift safeguard, yet w/o disrupting API-layer's ad hoc ID assignment
-        flowLaunchHandler.get().handleFlowLaunchTriggerEvent(jobProps, 
leaseObject, flowSpec.isScheduled());
-        _log.info("Multi-active scheduler finished handling trigger event: 
[{}, is: {}, triggerEventTimestamp: {}]",
-            launchDagAction, isReminderEvent ? "reminder" : "original", 
triggerTimestampMillis);
-      } else {
-        try {
-          TimingEvent flowCompilationTimer =
-              new TimingEvent(this.eventSubmitter, 
TimingEvent.FlowTimings.FLOW_COMPILED);
-          Map<String, String> flowMetadata = 
TimingEventUtils.getFlowMetadata(flowSpec);
-          Optional<Dag<JobExecutionPlan>> compiledDagOptional =
-              
this.flowCompilationValidationHelper.validateAndHandleConcurrentExecution(flowConfig,
 flowSpec, flowGroup,
-                  flowName, flowMetadata);
-
-          if (!compiledDagOptional.isPresent()) {
-            Instrumented.markMeter(this.flowOrchestrationFailedMeter);
-            return;
-          }
-          Dag<JobExecutionPlan> compiledDag = compiledDagOptional.get();
-          if (compiledDag.isEmpty()) {
-            
FlowCompilationValidationHelper.populateFlowCompilationFailedEventMessage(eventSubmitter,
 flowSpec,
-                flowMetadata);
-            Instrumented.markMeter(this.flowOrchestrationFailedMeter);
-            
sharedFlowMetricsSingleton.conditionallyUpdateFlowGaugeSpecState(spec,
-                SharedFlowMetricsSingleton.CompiledState.FAILED);
-            _log.warn("Cannot determine an executor to run on for Spec: " + 
spec);
-            return;
-          }
-          
sharedFlowMetricsSingleton.conditionallyUpdateFlowGaugeSpecState(spec,
-              SharedFlowMetricsSingleton.CompiledState.SUCCESSFUL);
-
-          
FlowCompilationValidationHelper.addFlowExecutionIdIfAbsent(flowMetadata, 
compiledDag);
-          flowCompilationTimer.stop(flowMetadata);
-
-          // Depending on if DagManager is present, handle execution
-          submitFlowToDagManager(flowSpec, compiledDag);
-        } finally {
-          /* Remove adhoc flow spec from the flow catalog, regardless of 
whether the flow was successfully validated
-          and permitted to exec (concurrently)
-           */
-          this.dagManager.removeFlowSpecIfAdhoc(flowSpec);
-        }
-      }
+      DagActionStore.DagAction launchDagAction = 
DagActionStore.DagAction.forFlow(flowGroup, flowName,
+          FlowUtils.getOrCreateFlowExecutionId(flowSpec), 
DagActionStore.DagActionType.LAUNCH);
+      DagActionStore.LeaseParams
+          leaseObject = new DagActionStore.LeaseParams(launchDagAction, 
isReminderEvent,
+          triggerTimestampMillis);

Review Comment:
   nit: don't leave the type on a line by itself, but also put the 
`leaseObject` variable name there too



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java:
##########
@@ -85,64 +77,39 @@ public class Orchestrator implements SpecCatalogListener, 
Instrumentable {
   protected final Logger _log;
   protected final SpecCompiler specCompiler;
   protected final TopologyCatalog topologyCatalog;
-  protected final DagManager dagManager;
   private final JobStatusRetriever jobStatusRetriever;
 
   protected final MetricContext metricContext;
 
   protected final EventSubmitter eventSubmitter;
-  private final boolean isFlowConcurrencyEnabled;
   @Getter
   private Meter flowOrchestrationSuccessFulMeter;
   @Getter
   private Meter flowOrchestrationFailedMeter;
   @Getter
   private Timer flowOrchestrationTimer;
-  private Counter flowFailedForwardToDagManagerCounter;
-  @Setter
-  private FlowStatusGenerator flowStatusGenerator;
-
-  private UserQuotaManager quotaManager;
-  private final FlowCompilationValidationHelper 
flowCompilationValidationHelper;
-  private Optional<FlowLaunchHandler> flowLaunchHandler;
-  private Optional<FlowCatalog> flowCatalog;
+  private final FlowLaunchHandler flowLaunchHandler;
   @Getter
   private final SharedFlowMetricsSingleton sharedFlowMetricsSingleton;
 
   @Inject
-  public Orchestrator(Config config, TopologyCatalog topologyCatalog, 
DagManager dagManager,
-      Optional<Logger> log, FlowStatusGenerator flowStatusGenerator, 
Optional<FlowLaunchHandler> flowLaunchHandler,
-      SharedFlowMetricsSingleton sharedFlowMetricsSingleton, 
Optional<FlowCatalog> flowCatalog,
-      Optional<DagManagementStateStore> dagManagementStateStore,
+  public Orchestrator(Config config, TopologyCatalog topologyCatalog, 
Optional<Logger> log, FlowLaunchHandler flowLaunchHandler,
+      SharedFlowMetricsSingleton sharedFlowMetricsSingleton, 
DagManagementStateStore dagManagementStateStore,
       FlowCompilationValidationHelper flowCompilationValidationHelper, 
JobStatusRetriever jobStatusRetriever) throws IOException {
     _log = log.isPresent() ? log.get() : LoggerFactory.getLogger(getClass());
     this.topologyCatalog = topologyCatalog;
-    this.dagManager = dagManager;
-    this.flowStatusGenerator = flowStatusGenerator;
     this.flowLaunchHandler = flowLaunchHandler;
     this.sharedFlowMetricsSingleton = sharedFlowMetricsSingleton;
-    this.flowCatalog = flowCatalog;
     this.jobStatusRetriever = jobStatusRetriever;
-    this.flowCompilationValidationHelper = flowCompilationValidationHelper;
     this.specCompiler = flowCompilationValidationHelper.getSpecCompiler();
-    //At this point, the TopologySpecMap is initialized by the SpecCompiler. 
Pass the TopologySpecMap to the DagManager.
-    this.dagManager.setTopologySpecMap(getSpecCompiler().getTopologySpecMap());
-    if (dagManagementStateStore.isPresent()) {
-      ((MySqlDagManagementStateStore) 
dagManagementStateStore.get()).setTopologySpecMap(getSpecCompiler().getTopologySpecMap());
-    }
+    //At this point, the TopologySpecMap is initialized by the SpecCompiler. 
Pass the TopologySpecMap to the DagManagementStateStore.
+    ((MySqlDagManagementStateStore) 
dagManagementStateStore).setTopologySpecMap(getSpecCompiler().getTopologySpecMap());

Review Comment:
   seeing this cast makes me wonder:
   a. does `setTopologySpecMap` belong in the `DagManagementStateStore` 
interface?
   b. OR, perhaps even better, should it be a ctor param to the 
`MysqlDagManagementStateStore`?  I realize there are timing interdependencies 
to coordinate, but couldn't those be handled before 
`MysqlDagManagementStateStore::start` is to be called?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java:
##########
@@ -50,16 +50,15 @@
 
 
 /**
- * Helper class with functionality meant to be re-used between the DagManager 
and Orchestrator when launching
+ * Helper class with functionality meant to be re-used between the 
LaunchDagProc and Orchestrator when launching
  * executions of a flow spec. In the common case, the Orchestrator receives a 
flow to orchestrate, performs necessary
- * validations, and forwards the execution responsibility to the DagManager. 
The DagManager's responsibility is to
- * carry out any flow action requests. However, with launch executions now 
being stored in the DagActionStateStore, on
- * restart or leadership change the DagManager has to perform validations 
before executing any launch actions the
- * previous leader was unable to complete. Rather than duplicating the code or 
introducing a circular dependency between
- * the DagManager and Orchestrator, this class is utilized to store the common 
functionality. It is stateful,
- * requiring all stateful pieces to be passed as input from the caller upon 
instantiating the helper.
- * Note: We expect further refactoring to be done to the DagManager in later 
stage of multi-active development, so we do
- * not attempt major reorganization as abstractions may change.
+ * validations, and creates {@link 
org.apache.gobblin.service.modules.orchestration.DagActionStore.DagAction}s 
which the
+ * {@link 
org.apache.gobblin.service.modules.orchestration.DagProcessingEngine} is 
responsible for processing.
+ * However, with launch executions now being stored in the 
DagActionStateStore, on restart, the LaunchDagProc has to
+ * perform validations before executing any launch actions the previous 
LaunchDagProc was unable to complete.
+ * Rather than duplicating the code or introducing a circular dependency 
between the LaunchDagProc and Orchestrator,
+ * this class is utilized to store the common functionality. It is stateful, 
requiring all stateful pieces to be passed
+ * as input from the caller upon instantiating the helper.

Review Comment:
   actually, should it be, "It is **stateless**, requiring all stateful pieces 
to be passed as ..."?



##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java:
##########
@@ -344,10 +309,10 @@ public void createFlowSpec() throws Throwable {
     }
 
     // Make sure FlowCatalog has the added Flow
-    Assert.assertTrue(specs.size() == 1, "Spec store should contain 1 Spec 
after addition");
+    Assert.assertEquals(specs.size(), 1, "Spec store should contain 1 Spec 
after addition");
     // Orchestrator is a no-op listener for any new FlowSpecs
-    
Assert.assertTrue(((List)(sei.getProducer().get().listSpecs().get())).size() == 
0, "SpecProducer should contain 0 "
-        + "Spec after addition");
+    Assert.assertEquals(sei.getProducer().get().listSpecs().get().size(), 0,
+        "SpecProducer should contain 0 " + "Spec after addition");

Review Comment:
   nit: they're all on the same line, so no need to append the two halves w/ `+`



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java:
##########
@@ -158,7 +138,7 @@ public class GobblinServiceJobScheduler extends 
JobScheduler implements SpecCata
    * e.g. There are multi-datacenter deployment of GaaS Cluster. 
Intra-datacenter fail-over could be handled by
    * leadership change mechanism, while inter-datacenter fail-over would be 
handled by DR handling mechanism.
    */
-  private boolean isNominatedDRHandler;
+  private final boolean isNominatedDRHandler;

Review Comment:
   I'm not familiar w/ multi-datacenter deployment... does that play well w/ 
multi-active?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagManagementDagActionStoreChangeMonitor.java:
##########
@@ -18,58 +18,232 @@
 package org.apache.gobblin.service.monitoring;
 
 import java.io.IOException;
+import java.util.Collection;
+import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.jetbrains.annotations.NotNull;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
 import com.typesafe.config.Config;
+import com.typesafe.config.ConfigValueFactory;
 
+import lombok.AllArgsConstructor;
+import lombok.Data;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 
-import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.kafka.client.DecodeableKafkaRecord;
+import org.apache.gobblin.metrics.ContextAwareGauge;
+import org.apache.gobblin.metrics.ContextAwareMeter;
+import org.apache.gobblin.runtime.kafka.HighLevelConsumer;
+import org.apache.gobblin.runtime.metrics.RuntimeMetrics;
 import 
org.apache.gobblin.service.modules.orchestration.DagActionReminderScheduler;
 import org.apache.gobblin.service.modules.orchestration.DagActionStore;
 import org.apache.gobblin.service.modules.orchestration.DagManagement;
 import 
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
-import org.apache.gobblin.service.modules.orchestration.Orchestrator;
 import 
org.apache.gobblin.service.modules.orchestration.task.DagProcessingEngineMetrics;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.ExecutorsUtils;
 
 
 /**
- * A {@link DagActionStoreChangeMonitor} that should be used {@link 
org.apache.gobblin.service.ServiceConfigKeys#DAG_PROCESSING_ENGINE_ENABLED}
- * is set.
+ * A DagActionStore change monitor that uses {@link DagActionStoreChangeEvent} 
schema to process Kafka messages received
+ * from its corresponding consumer client. This monitor responds to requests 
to resume or delete a flow and acts as a
+ * connector between the API and execution layers of GaaS.
  */
 @Slf4j
-public class DagManagementDagActionStoreChangeMonitor extends 
DagActionStoreChangeMonitor {
-  private final DagManagement dagManagement;
-  @VisibleForTesting @Getter
-  private final DagActionReminderScheduler dagActionReminderScheduler;
+public class DagManagementDagActionStoreChangeMonitor extends 
HighLevelConsumer<String, DagActionStoreChangeEvent> {

Review Comment:
   my impression from our discussion 
[here](https://github.com/apache/gobblin/pull/4031#discussion_r1737677287) was 
to defer on refactoring this class any more than necessary (e.g. to leave it 
with its same base class for now)





Issue Time Tracking
-------------------

    Worklog Id:     (was: 933266)
    Time Spent: 3h 40m  (was: 3.5h)

> remove obsolete code related to DagManager
> ------------------------------------------
>
>                 Key: GOBBLIN-2136
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-2136
>             Project: Apache Gobblin
>          Issue Type: Task
>            Reporter: Arjun Singh Bora
>            Priority: Major
>          Time Spent: 3h 40m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to