phet commented on code in PR #4031: URL: https://github.com/apache/gobblin/pull/4031#discussion_r1744527436
########## gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java: ########## @@ -1071,13 +1071,13 @@ public class ConfigurationKeys { * Configuration properties related to Flows */ public static final String FLOW_RUN_IMMEDIATELY = "flow.runImmediately"; - public static final String GOBBLIN_FLOW_SLA_TIME = "gobblin.flow.sla.time"; - public static final String GOBBLIN_FLOW_SLA_TIME_UNIT = "gobblin.flow.sla.timeunit"; - public static final String DEFAULT_GOBBLIN_FLOW_SLA_TIME_UNIT = TimeUnit.MINUTES.name(); - public static final String GOBBLIN_JOB_START_SLA_TIME = "gobblin.job.start.sla.time"; - public static final String GOBBLIN_JOB_START_SLA_TIME_UNIT = "gobblin.job.start.sla.timeunit"; - public static final long FALLBACK_GOBBLIN_JOB_START_SLA_TIME = 10L; - public static final String FALLBACK_GOBBLIN_JOB_START_SLA_TIME_UNIT = TimeUnit.MINUTES.name(); + public static final String GOBBLIN_FLOW_FINSIH_DEADLINE_TIME = "gobblin.flow.deadline.time"; + public static final String GOBBLIN_FLOW_FINISH_DEADLINE_TIME_UNIT = "gobblin.flow.deadline.timeunit"; Review Comment: looks like you renamed the variable name, but not the property (to `gobblin.flow.finish.deadline.*`) ########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java: ########## @@ -678,8 +457,8 @@ public static void main(String[] args) throws Exception { @SuppressWarnings("DLS_DEAD_LOCAL_STORE") private static void testGobblinService(GobblinServiceManager gobblinServiceManager) { - FlowConfigClient client = - new FlowConfigClient(String.format("http://localhost:%s/", gobblinServiceManager.restliServer.getPort())); + try (FlowConfigV2Client client = Review Comment: is the indentation off here - and also in the ``` catch (RemoteInvocationException | IOException e) { ``` ? ########## gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java: ########## @@ -1083,110 +281,46 @@ public void testQuotasRetryFlow() throws URISyntaxException, IOException { .thenReturn(jobStatusIteratorFlow1_0) .thenReturn(jobStatusIteratorFlow1_1) .thenReturn(jobStatusIteratorFlow1_2); - // Dag1 is running - this._dagManagerThread.run(); + SortedMap<String, Counter> allCounters = metricContext.getParent().get().getCounters(); Assert.assertEquals(allCounters.get(MetricRegistry.name( ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, ServiceMetricNames.SERVICE_USERS, "user")).getCount(), 1); - // Dag1 fails and is orchestrated again - this._dagManagerThread.run(); - // Dag1 is running again - this._dagManagerThread.run(); + Assert.assertEquals(allCounters.get(MetricRegistry.name( ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, ServiceMetricNames.SERVICE_USERS, "user")).getCount(), 1); - // Dag1 is marked as complete, should be able to run the next Dag without hitting the quota limit - this._dagManagerThread.run(); - this.queue.offer(dagList.get(1)); - this._dagManagerThread.run(); - this._dagManagerThread.run(); // cleanup Assert.assertEquals(allCounters.get(MetricRegistry.name( ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, ServiceMetricNames.SERVICE_USERS, "user")).getCount(), 0); - Assert.assertEquals(this.dags.size(), 0); - Assert.assertEquals(this.jobToDag.size(), 0); - Assert.assertEquals(this.dagToJobs.size(), 0); } - @Test (dependsOnMethods = "testQuotasRetryFlow") - public void testEmitFlowMetricOnlyIfNotAdhoc() throws URISyntaxException, IOException { - - Long flowId = System.currentTimeMillis(); - Dag<JobExecutionPlan> adhocDag = buildDag(String.valueOf(flowId), flowId, "FINISH_RUNNING", 1, "proxyUser", - ConfigBuilder.create().addPrimitive(ConfigurationKeys.GOBBLIN_OUTPUT_JOB_LEVEL_METRICS, false).build()); //Add a dag to the queue of dags - this.queue.offer(adhocDag); - - Iterator<JobStatus> jobStatusIteratorFlow0_0 = - getMockJobStatus("flow" + flowId, "group" + flowId, flowId, "job0", "group0", String.valueOf(ExecutionStatus.COMPLETE)); - Iterator<JobStatus> jobStatusIteratorFlow0_1 = - getMockFlowStatus("flow" + flowId, "group" + flowId, flowId, String.valueOf(ExecutionStatus.COMPLETE)); - Iterator<JobStatus> jobStatusIteratorFlow1_0 = - getMockJobStatus("flow" + flowId+1, "group" + flowId+1, flowId+1, "job0", "group0", String.valueOf(ExecutionStatus.COMPLETE)); - Iterator<JobStatus> jobStatusIteratorFlow1_1 = - getMockFlowStatus("flow" + flowId+1, "group" + flowId+1, flowId+1, String.valueOf(ExecutionStatus.COMPLETE)); - - Mockito.when(_jobStatusRetriever - .getJobStatusesForFlowExecution(Mockito.eq("flow" + flowId), Mockito.eq("group" + flowId), Mockito.anyLong(), - Mockito.anyString(), Mockito.anyString())) - .thenReturn(jobStatusIteratorFlow0_0) - .thenReturn(jobStatusIteratorFlow0_1); - Mockito.when(_jobStatusRetriever - .getJobStatusesForFlowExecution(Mockito.eq("flow" + (flowId+1)), Mockito.eq("group" + (flowId+1)), Mockito.anyLong(), - Mockito.anyString(), Mockito.anyString())) - .thenReturn(jobStatusIteratorFlow1_0) - .thenReturn(jobStatusIteratorFlow1_1); - - String flowStateGaugeName0 = MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, "group"+flowId, - "flow"+flowId, ServiceMetricNames.RUNNING_STATUS); - Assert.assertNull(metricContext.getParent().get().getGauges().get(flowStateGaugeName0)); - - Dag<JobExecutionPlan> scheduledDag = buildDag(String.valueOf(flowId+1), flowId+1, "FINISH_RUNNING", 1, "proxyUser", - ConfigBuilder.create().addPrimitive(ConfigurationKeys.GOBBLIN_OUTPUT_JOB_LEVEL_METRICS, true).build()); - this.queue.offer(scheduledDag); - this._dagManagerThread.run(); - String flowStateGaugeName1 = MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, "group"+(flowId+1), - "flow"+(flowId+1), ServiceMetricNames.RUNNING_STATUS); - - Assert.assertNotNull(metricContext.getParent().get().getGauges().get(flowStateGaugeName1)); - - // cleanup - this._dagManagerThread.run(); - // should be successful since it should be cleaned up with status complete - Assert.assertEquals(metricContext.getParent().get().getGauges().get(flowStateGaugeName1).getValue(), DagManager.FlowState.SUCCESSFUL.value); - Assert.assertEquals(this.dags.size(), 0); - Assert.assertEquals(this.jobToDag.size(), 0); - Assert.assertEquals(this.dagToJobs.size(), 0); - } - - @Test (dependsOnMethods = "testEmitFlowMetricOnlyIfNotAdhoc") - public void testJobSlaKilledMetrics() throws URISyntaxException, IOException { + @Test (dependsOnMethods = "testEmitFlowMetricOnlyIfNotAdhoc", enabled = false) + // todo re-write for dag proc + public void testJobSlaKilledMetrics() throws URISyntaxException { Review Comment: `testJobStartDeadlineKilledMetrics()`? ########## gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java: ########## @@ -203,12 +175,11 @@ public class ServiceConfigKeys { public static final int DEFAULT_MEMORY_ISSUE_REPO_MAX_ISSUE_PER_CONTEXT= 20; public static final String ISSUE_REPO_CLASS = GOBBLIN_SERVICE_PREFIX + "issueRepo.class"; + public static final String QUOTA_MANAGER_PREFIX = "UserQuotaManagerPrefix."; public static final String GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_PREFIX = ServiceConfigKeys.GOBBLIN_SERVICE_PREFIX + "dagProcessingEngine."; - public static final String DAG_PROCESSING_ENGINE_ENABLED = GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_PREFIX + "enabled"; public static final String NUM_DAG_PROC_THREADS_KEY = GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_PREFIX + "numThreads"; public static final String DAG_PROC_ENGINE_NON_RETRYABLE_EXCEPTIONS_KEY = GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_PREFIX + "nonRetryableExceptions"; - public static final Integer DEFAULT_NUM_DAG_PROC_THREADS = 3; - public static final String GOBBLIN_SERVICE_MULTI_ACTIVE_EXECUTION_ENABLED = GOBBLIN_SERVICE_PREFIX + "multiActiveExecutionEnabled"; + public static final long DEFAULT_FLOW_DEADLINE_MILLIS = TimeUnit.HOURS.toMillis(24); Review Comment: `DEFAULT_FLOW_FINISH_...` ########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java: ########## @@ -433,79 +310,22 @@ private void configureServices(){ registerServicesInLauncher(); - // Register Scheduler to listen to changes in Flows - // In warm standby mode, instead of scheduler we will add orchestrator as listener - if(configuration.isWarmStandbyEnabled()) { - this.flowCatalog.addListener(this.orchestrator); - } else if (configuration.isSchedulerEnabled()) { - this.flowCatalog.addListener(this.scheduler); - } - } - - private void ensureInjected() { - if (v2ResourceHandler == null) { - throw new IllegalStateException("GobblinServiceManager should be constructed through Guice dependency injection " - + "or through a static factory method"); - } + // Register orchestrator to listen to changes in Flows + this.flowCatalog.addListener(this.orchestrator); } @Override public void start() throws ApplicationException { LOGGER.info("[Init] Starting the Gobblin Service Manager"); - ensureInjected(); - configureServices(); - - if (this.helixManager.isPresent()) { - connectHelixManager(); - } - - this.eventBus.register(this); this.serviceLauncher.start(); - if (this.helixManager.isPresent()) { - // Subscribe to leadership changes - this.helixManager.get().addControllerListener((ControllerChangeListener) this::handleLeadershipChange); - - - // Update for first time since there might be no notification - if (helixManager.get().isLeader()) { - if (configuration.isSchedulerEnabled()) { - LOGGER.info("[Init] Gobblin Service is running in master instance mode, enabling Scheduler."); - this.scheduler.setActive(true); - } - - if (configuration.isGitConfigMonitorEnabled()) { - this.gitConfigMonitor.setActive(true); - } - - if (helixLeaderGauges.isPresent()) { - helixLeaderGauges.get().setState(LeaderState.MASTER); - } + LOGGER.info("[Init] Gobblin Service is running in master instance mode, enabling Scheduler."); Review Comment: nit: "is running in multi-active mode..." ########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java: ########## @@ -413,16 +292,14 @@ private void registerServicesInLauncher(){ this.serviceLauncher.addService(restliServer); } - if (this.configuration.isWarmStandbyEnabled()) { - this.serviceLauncher.addService(specStoreChangeMonitor); - this.serviceLauncher.addService(dagActionStoreChangeMonitor); - } + this.serviceLauncher.addService(specStoreChangeMonitor); + this.serviceLauncher.addService(_dagManagementDagActionStoreChangeMonitor); } private void configureServices(){ if (configuration.isRestLIServerEnabled()) { this.restliServer = EmbeddedRestliServer.builder() - .resources(Lists.newArrayList(FlowConfigsResource.class, FlowConfigsV2Resource.class)) + .resources(Lists.newArrayList(FlowConfigsV2Resource.class, FlowConfigsV2Resource.class)) Review Comment: did you mean to name this same resource twice? ########## gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java: ########## @@ -290,15 +323,15 @@ public void cleanUp() throws Exception { */ @Test public void testGetClass() { - Assert.assertTrue(GobblinServiceManager.getClass(FlowCompilationValidationHelper.class) instanceof FlowCompilationValidationHelper); + Assert.assertNotNull(GobblinServiceManager.getClass(FlowCompilationValidationHelper.class)); // Optionally bound config - Assert.assertTrue(GobblinServiceManager.getClass(FlowCatalog.class) instanceof FlowCatalog); + Assert.assertNotNull(GobblinServiceManager.getClass(FlowCatalog.class)); } /** * To test an existing flow in a spec store does not get deleted just because it is not compilable during service restarts */ - @Test + @Test (enabled = false) Review Comment: please add a comment on why disabling ########## gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java: ########## @@ -943,30 +162,19 @@ public void testDagManagerQuotaExceeded() throws URISyntaxException, IOException .thenReturn(jobStatusIteratorFlow1_0) .thenReturn(jobStatusIteratorFlow1_1); - this._dagManagerThread.run(); // dag will not be processed due to exceeding the quota, will log a message and exit out without adding it to dags - this.queue.offer(dagList.get(1)); - this._dagManagerThread.run(); SortedMap<String, Counter> allCounters = metricContext.getParent().get().getCounters(); Assert.assertEquals(allCounters.get(MetricRegistry.name( ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, ServiceMetricNames.SERVICE_USERS, "user")).getCount(), 1); - - this._dagManagerThread.run(); // cleanup - - Assert.assertEquals(this.dags.size(), 0); - Assert.assertEquals(this.jobToDag.size(), 0); - Assert.assertEquals(this.dagToJobs.size(), 0); } - @Test (dependsOnMethods = "testDagManagerQuotaExceeded") - public void testQuotaDecrement() throws URISyntaxException, IOException { - - List<Dag<JobExecutionPlan>> dagList = buildDagList(3, "user", ConfigFactory.empty()); + @Test (dependsOnMethods = "testDagManagerQuotaExceeded", enabled = false) + // todo re-write for dag proc + public void testQuotaDecrement() throws URISyntaxException { + List<Dag<JobExecutionPlan>> dagList = DagTestUtils.buildDagList(3, "user", ConfigFactory.empty()); Review Comment: 1. do these tests for the quota manager still belong in the `DagManagerTest` class? 2. is there a more direct way to write these tests? I'm not seeing which method of the quota mgr is invoked. also the test assertions are based upon counters. ########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java: ########## @@ -534,17 +354,9 @@ public void start() throws ApplicationException { //Activate the SpecCompiler, after the topologyCatalog has been initialized. this.orchestrator.getSpecCompiler().setActive(true); - //Activate the DagManager service, after the topologyCatalog has been initialized. - if (!this.configuration.isDagProcessingEngineEnabled() && (!this.helixManager.isPresent() || this.helixManager.get().isLeader())){ - this.dagManager.setActive(true); - this.eventBus.register(this.dagManager); - } - - // Activate both monitors last as they're dependent on the SpecCompiler, Scheduler, and DagManager being active - if (configuration.isWarmStandbyEnabled()) { - this.specStoreChangeMonitor.setActive(); - this.dagActionStoreChangeMonitor.setActive(); - } + // Activate both monitors last as they're dependent on the SpecCompiler and Scheduler being active + this.specStoreChangeMonitor.setActive(); + this._dagManagementDagActionStoreChangeMonitor.setActive(); Review Comment: nit: why does the second one have a leading `_` in its name? ########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java: ########## @@ -156,14 +143,14 @@ public DagTask next() { private void createJobStartDeadlineTrigger(DagActionStore.LeaseParams leaseParams) throws SchedulerException, IOException { - long timeOutForJobStart = DagManagerUtils.getJobStartSla(this.dagManagementStateStore.getDag( - leaseParams.getDagAction().getDagId()).get().getNodes().get(0), DagProcessingEngine.getDefaultJobStartSlaTimeMillis()); + long timeOutForJobStart = DagUtils.getJobStartDeadline(this.dagManagementStateStore.getDag( + leaseParams.getDagAction().getDagId()).get().getNodes().get(0), DagProcessingEngine.getDefaultJobStartDeadlineTimeMillis()); Review Comment: should this handle "invalid format" / `ConfigException`, like is done for the flow finish deadline (below)? ########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java: ########## @@ -609,15 +388,15 @@ public Collection<StandardMetrics> getStandardMetricsCollection() { return ImmutableList.of(this.metrics); } - private class Metrics extends StandardMetrics { + private static class Metrics extends StandardMetrics { public static final String SERVICE_LEADERSHIP_CHANGE = "serviceLeadershipChange"; - private ContextAwareHistogram serviceLeadershipChange; public Metrics(final MetricContext metricContext, Config config) { int timeWindowSizeInMinutes = ConfigUtils.getInt(config, ConfigurationKeys.METRIC_TIMER_WINDOW_SIZE_IN_MINUTES, ConfigurationKeys.DEFAULT_METRIC_TIMER_WINDOW_SIZE_IN_MINUTES); - this.serviceLeadershipChange = metricContext.contextAwareHistogram(SERVICE_LEADERSHIP_CHANGE, timeWindowSizeInMinutes, TimeUnit.MINUTES); - this.contextAwareMetrics.add(this.serviceLeadershipChange); + ContextAwareHistogram serviceLeadershipChange = + metricContext.contextAwareHistogram(SERVICE_LEADERSHIP_CHANGE, timeWindowSizeInMinutes, TimeUnit.MINUTES); + this.contextAwareMetrics.add(serviceLeadershipChange); Review Comment: does this metric still make sense? ########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java: ########## @@ -215,61 +182,15 @@ public void orchestrate(Spec spec, Properties jobProps, long triggerTimestampMil String flowName = flowConfig.getString(ConfigurationKeys.FLOW_NAME_KEY); sharedFlowMetricsSingleton.addFlowGauge(spec, flowConfig, flowGroup, flowName); - /* Only compile and pass directly to `DagManager` when multi-active scheduler NOT enabled; otherwise - recompilation to occur later, once `DagActionStoreChangeMonitor` subsequently delegates this - `DagActionType.LAUNCH` - */ - if (flowLaunchHandler.isPresent()) { - DagActionStore.DagAction launchDagAction = DagActionStore.DagAction.forFlow( - flowGroup, - flowName, - FlowUtils.getOrCreateFlowExecutionId(flowSpec), - DagActionStore.DagActionType.LAUNCH); - DagActionStore.LeaseParams - leaseObject = new DagActionStore.LeaseParams(launchDagAction, isReminderEvent, - triggerTimestampMillis); - // `flowSpec.isScheduled()` ==> adopt consensus `flowExecutionId` as clock drift safeguard, yet w/o disrupting API-layer's ad hoc ID assignment - flowLaunchHandler.get().handleFlowLaunchTriggerEvent(jobProps, leaseObject, flowSpec.isScheduled()); - _log.info("Multi-active scheduler finished handling trigger event: [{}, is: {}, triggerEventTimestamp: {}]", - launchDagAction, isReminderEvent ? "reminder" : "original", triggerTimestampMillis); - } else { - try { - TimingEvent flowCompilationTimer = - new TimingEvent(this.eventSubmitter, TimingEvent.FlowTimings.FLOW_COMPILED); - Map<String, String> flowMetadata = TimingEventUtils.getFlowMetadata(flowSpec); - Optional<Dag<JobExecutionPlan>> compiledDagOptional = - this.flowCompilationValidationHelper.validateAndHandleConcurrentExecution(flowConfig, flowSpec, flowGroup, - flowName, flowMetadata); - - if (!compiledDagOptional.isPresent()) { - Instrumented.markMeter(this.flowOrchestrationFailedMeter); - return; - } - Dag<JobExecutionPlan> compiledDag = compiledDagOptional.get(); - if (compiledDag.isEmpty()) { - FlowCompilationValidationHelper.populateFlowCompilationFailedEventMessage(eventSubmitter, flowSpec, - flowMetadata); - Instrumented.markMeter(this.flowOrchestrationFailedMeter); - sharedFlowMetricsSingleton.conditionallyUpdateFlowGaugeSpecState(spec, - SharedFlowMetricsSingleton.CompiledState.FAILED); - _log.warn("Cannot determine an executor to run on for Spec: " + spec); - return; - } - sharedFlowMetricsSingleton.conditionallyUpdateFlowGaugeSpecState(spec, - SharedFlowMetricsSingleton.CompiledState.SUCCESSFUL); - - FlowCompilationValidationHelper.addFlowExecutionIdIfAbsent(flowMetadata, compiledDag); - flowCompilationTimer.stop(flowMetadata); - - // Depending on if DagManager is present, handle execution - submitFlowToDagManager(flowSpec, compiledDag); - } finally { - /* Remove adhoc flow spec from the flow catalog, regardless of whether the flow was successfully validated - and permitted to exec (concurrently) - */ - this.dagManager.removeFlowSpecIfAdhoc(flowSpec); - } - } + DagActionStore.DagAction launchDagAction = DagActionStore.DagAction.forFlow(flowGroup, flowName, + FlowUtils.getOrCreateFlowExecutionId(flowSpec), DagActionStore.DagActionType.LAUNCH); + DagActionStore.LeaseParams + leaseObject = new DagActionStore.LeaseParams(launchDagAction, isReminderEvent, + triggerTimestampMillis); Review Comment: nit: don't leave the type on a line by itself, but also put the `leaseObject` variable name there too ########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java: ########## @@ -85,64 +77,39 @@ public class Orchestrator implements SpecCatalogListener, Instrumentable { protected final Logger _log; protected final SpecCompiler specCompiler; protected final TopologyCatalog topologyCatalog; - protected final DagManager dagManager; private final JobStatusRetriever jobStatusRetriever; protected final MetricContext metricContext; protected final EventSubmitter eventSubmitter; - private final boolean isFlowConcurrencyEnabled; @Getter private Meter flowOrchestrationSuccessFulMeter; @Getter private Meter flowOrchestrationFailedMeter; @Getter private Timer flowOrchestrationTimer; - private Counter flowFailedForwardToDagManagerCounter; - @Setter - private FlowStatusGenerator flowStatusGenerator; - - private UserQuotaManager quotaManager; - private final FlowCompilationValidationHelper flowCompilationValidationHelper; - private Optional<FlowLaunchHandler> flowLaunchHandler; - private Optional<FlowCatalog> flowCatalog; + private final FlowLaunchHandler flowLaunchHandler; @Getter private final SharedFlowMetricsSingleton sharedFlowMetricsSingleton; @Inject - public Orchestrator(Config config, TopologyCatalog topologyCatalog, DagManager dagManager, - Optional<Logger> log, FlowStatusGenerator flowStatusGenerator, Optional<FlowLaunchHandler> flowLaunchHandler, - SharedFlowMetricsSingleton sharedFlowMetricsSingleton, Optional<FlowCatalog> flowCatalog, - Optional<DagManagementStateStore> dagManagementStateStore, + public Orchestrator(Config config, TopologyCatalog topologyCatalog, Optional<Logger> log, FlowLaunchHandler flowLaunchHandler, + SharedFlowMetricsSingleton sharedFlowMetricsSingleton, DagManagementStateStore dagManagementStateStore, FlowCompilationValidationHelper flowCompilationValidationHelper, JobStatusRetriever jobStatusRetriever) throws IOException { _log = log.isPresent() ? log.get() : LoggerFactory.getLogger(getClass()); this.topologyCatalog = topologyCatalog; - this.dagManager = dagManager; - this.flowStatusGenerator = flowStatusGenerator; this.flowLaunchHandler = flowLaunchHandler; this.sharedFlowMetricsSingleton = sharedFlowMetricsSingleton; - this.flowCatalog = flowCatalog; this.jobStatusRetriever = jobStatusRetriever; - this.flowCompilationValidationHelper = flowCompilationValidationHelper; this.specCompiler = flowCompilationValidationHelper.getSpecCompiler(); - //At this point, the TopologySpecMap is initialized by the SpecCompiler. Pass the TopologySpecMap to the DagManager. - this.dagManager.setTopologySpecMap(getSpecCompiler().getTopologySpecMap()); - if (dagManagementStateStore.isPresent()) { - ((MySqlDagManagementStateStore) dagManagementStateStore.get()).setTopologySpecMap(getSpecCompiler().getTopologySpecMap()); - } + //At this point, the TopologySpecMap is initialized by the SpecCompiler. Pass the TopologySpecMap to the DagManagementStateStore. + ((MySqlDagManagementStateStore) dagManagementStateStore).setTopologySpecMap(getSpecCompiler().getTopologySpecMap()); Review Comment: seeing this cast makes me wonder: a. does `setTopologySpecMap` belong in the `DagManagementStateStore` interface? b. OR, perhaps even better, should it be a ctor param to the `MysqlDagManagementStateStore`? I realize there are timing interdependencies to coordinate, but couldn't those be handled before `MysqlDagManagementStateStore::start` is to be called? ########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java: ########## @@ -50,16 +50,15 @@ /** - * Helper class with functionality meant to be re-used between the DagManager and Orchestrator when launching + * Helper class with functionality meant to be re-used between the LaunchDagProc and Orchestrator when launching * executions of a flow spec. In the common case, the Orchestrator receives a flow to orchestrate, performs necessary - * validations, and forwards the execution responsibility to the DagManager. The DagManager's responsibility is to - * carry out any flow action requests. However, with launch executions now being stored in the DagActionStateStore, on - * restart or leadership change the DagManager has to perform validations before executing any launch actions the - * previous leader was unable to complete. Rather than duplicating the code or introducing a circular dependency between - * the DagManager and Orchestrator, this class is utilized to store the common functionality. It is stateful, - * requiring all stateful pieces to be passed as input from the caller upon instantiating the helper. - * Note: We expect further refactoring to be done to the DagManager in later stage of multi-active development, so we do - * not attempt major reorganization as abstractions may change. + * validations, and creates {@link org.apache.gobblin.service.modules.orchestration.DagActionStore.DagAction}s which the + * {@link org.apache.gobblin.service.modules.orchestration.DagProcessingEngine} is responsible for processing. + * However, with launch executions now being stored in the DagActionStateStore, on restart, the LaunchDagProc has to + * perform validations before executing any launch actions the previous LaunchDagProc was unable to complete. + * Rather than duplicating the code or introducing a circular dependency between the LaunchDagProc and Orchestrator, + * this class is utilized to store the common functionality. It is stateful, requiring all stateful pieces to be passed + * as input from the caller upon instantiating the helper. Review Comment: actually, should it be, "It is **stateless**, requiring all stateful pieces to be passed as ..."? ########## gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java: ########## @@ -344,10 +309,10 @@ public void createFlowSpec() throws Throwable { } // Make sure FlowCatalog has the added Flow - Assert.assertTrue(specs.size() == 1, "Spec store should contain 1 Spec after addition"); + Assert.assertEquals(specs.size(), 1, "Spec store should contain 1 Spec after addition"); // Orchestrator is a no-op listener for any new FlowSpecs - Assert.assertTrue(((List)(sei.getProducer().get().listSpecs().get())).size() == 0, "SpecProducer should contain 0 " - + "Spec after addition"); + Assert.assertEquals(sei.getProducer().get().listSpecs().get().size(), 0, + "SpecProducer should contain 0 " + "Spec after addition"); Review Comment: nit: they're all on the same line, so no need to append the two halves w/ `+` ########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java: ########## @@ -158,7 +138,7 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata * e.g. There are multi-datacenter deployment of GaaS Cluster. Intra-datacenter fail-over could be handled by * leadership change mechanism, while inter-datacenter fail-over would be handled by DR handling mechanism. */ - private boolean isNominatedDRHandler; + private final boolean isNominatedDRHandler; Review Comment: I'm not familiar w/ multi-datacenter deployment... does that play well w/ multi-active? ########## gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagManagementDagActionStoreChangeMonitor.java: ########## @@ -18,58 +18,232 @@ package org.apache.gobblin.service.monitoring; import java.io.IOException; +import java.util.Collection; +import java.util.UUID; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import org.jetbrains.annotations.NotNull; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Optional; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; import com.typesafe.config.Config; +import com.typesafe.config.ConfigValueFactory; +import lombok.AllArgsConstructor; +import lombok.Data; import lombok.Getter; import lombok.extern.slf4j.Slf4j; -import org.apache.gobblin.runtime.spec_catalog.FlowCatalog; +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.kafka.client.DecodeableKafkaRecord; +import org.apache.gobblin.metrics.ContextAwareGauge; +import org.apache.gobblin.metrics.ContextAwareMeter; +import org.apache.gobblin.runtime.kafka.HighLevelConsumer; +import org.apache.gobblin.runtime.metrics.RuntimeMetrics; import org.apache.gobblin.service.modules.orchestration.DagActionReminderScheduler; import org.apache.gobblin.service.modules.orchestration.DagActionStore; import org.apache.gobblin.service.modules.orchestration.DagManagement; import org.apache.gobblin.service.modules.orchestration.DagManagementStateStore; -import org.apache.gobblin.service.modules.orchestration.Orchestrator; import org.apache.gobblin.service.modules.orchestration.task.DagProcessingEngineMetrics; +import org.apache.gobblin.util.ConfigUtils; +import org.apache.gobblin.util.ExecutorsUtils; /** - * A {@link DagActionStoreChangeMonitor} that should be used {@link org.apache.gobblin.service.ServiceConfigKeys#DAG_PROCESSING_ENGINE_ENABLED} - * is set. + * A DagActionStore change monitor that uses {@link DagActionStoreChangeEvent} schema to process Kafka messages received + * from its corresponding consumer client. This monitor responds to requests to resume or delete a flow and acts as a + * connector between the API and execution layers of GaaS. */ @Slf4j -public class DagManagementDagActionStoreChangeMonitor extends DagActionStoreChangeMonitor { - private final DagManagement dagManagement; - @VisibleForTesting @Getter - private final DagActionReminderScheduler dagActionReminderScheduler; +public class DagManagementDagActionStoreChangeMonitor extends HighLevelConsumer<String, DagActionStoreChangeEvent> { Review Comment: my impression from our discussion [here](https://github.com/apache/gobblin/pull/4031#discussion_r1737677287) was to defer on refactoring this class any more than necessary (e.g. to leave it with its same base class for now) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@gobblin.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org