phet commented on code in PR #4031: URL: https://github.com/apache/gobblin/pull/4031#discussion_r1737127941
########## gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java: ########## @@ -168,21 +160,6 @@ protected void shutDown() throws Exception { this.listeners.close(); } - /*************************************************** - /* Catalog listeners * - /**************************************************/ - - protected void notifyAllListeners() { - try { - Iterator<URI> uriIterator = getSpecURIs(); - while (uriIterator.hasNext()) { - this.listeners.onAddSpec(getSpecWrapper(uriIterator.next())); - } - } catch (IOException e) { - log.error("Cannot retrieve specs from catalog:", e); - } - } - Review Comment: wondering on this... there's still an `addListener` method. is there no any need to `notifyListeners`? ########## gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java: ########## @@ -205,10 +163,10 @@ public class ServiceConfigKeys { public static final String ISSUE_REPO_CLASS = GOBBLIN_SERVICE_PREFIX + "issueRepo.class"; public static final String GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_PREFIX = ServiceConfigKeys.GOBBLIN_SERVICE_PREFIX + "dagProcessingEngine."; - public static final String DAG_PROCESSING_ENGINE_ENABLED = GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_PREFIX + "enabled"; public static final String NUM_DAG_PROC_THREADS_KEY = GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_PREFIX + "numThreads"; public static final String DAG_PROC_ENGINE_NON_RETRYABLE_EXCEPTIONS_KEY = GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_PREFIX + "nonRetryableExceptions"; - public static final Integer DEFAULT_NUM_DAG_PROC_THREADS = 3; - public static final String GOBBLIN_SERVICE_MULTI_ACTIVE_EXECUTION_ENABLED = GOBBLIN_SERVICE_PREFIX + "multiActiveExecutionEnabled"; + public static final String JOB_START_SLA_TIME = GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_PREFIX + ConfigurationKeys.GOBBLIN_JOB_START_SLA_TIME; + public static final String JOB_START_SLA_UNITS = GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_PREFIX + ConfigurationKeys.GOBBLIN_JOB_START_SLA_TIME_UNIT; + public static final long DEFAULT_FLOW_SLA_MILLIS = TimeUnit.HOURS.toMillis(24); Review Comment: three thoughts here: 1. previously the property included the `.dagManager` segment: ``` // Default job start SLA time if configured, measured in minutes. Default is 10 minutes // todo - rename "sla" -> "deadline", and move them to DagProcUtils public static final String JOB_START_SLA_TIME = DAG_MANAGER_PREFIX + ConfigurationKeys.GOBBLIN_JOB_START_SLA_TIME; ``` should we continue to accept that legacy prop name so existing flow defs remain b/w compat? 2. as we introduce a new prop name, let's take the opportunity to align naming and semantics by calling it `start.deadline`, rather than SLA 3. given that `.dagProcessingEngine` is as much impl. detail as `.dagManager`, it may not belong in the customer-facing name. ########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java: ########## @@ -74,7 +74,7 @@ public abstract class BaseFlowToJobSpecCompiler implements SpecCompiler { // to these data structures. @Getter @Setter - protected final Map<URI, TopologySpec> topologySpecMap; + protected Map<URI, TopologySpec> topologySpecMap; Review Comment: I didn't notice why this wouldn't still be `final`. is a derived class assigning to it? ########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerMetrics.java: ########## @@ -55,7 +55,7 @@ @Slf4j public class DagManagerMetrics { Review Comment: rename to `DagMetrics` or leave as-is? ########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/Dag.java: ########## @@ -299,4 +303,43 @@ public int hashCode() { public String toString() { return this.getNodes().stream().map(node -> node.getValue().toString()).collect(Collectors.toList()).toString(); } + + public enum FlowState { + FAILED(-1), + RUNNING(0), + SUCCESSFUL(1); + + public final int value; + + FlowState(int value) { + this.value = value; + } + } + + @Getter + @EqualsAndHashCode + public static class DagId { + String flowGroup; + String flowName; + long flowExecutionId; + + public DagId(String flowGroup, String flowName, long flowExecutionId) { + this.flowGroup = flowGroup; + this.flowName = flowName; + this.flowExecutionId = flowExecutionId; + } Review Comment: `@AllArgsCtor`? actually if you can make the three fields `final`, `@Data` should take care of many of these things for you (not clear if that's possible, since the fields aren't `private`... even if not `@Data` and/or `final`, can we make them `private`?) ########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/AbstractUserQuotaManager.java: ########## @@ -37,9 +38,9 @@ */ @Slf4j abstract public class AbstractUserQuotaManager implements UserQuotaManager { - public static final String PER_USER_QUOTA = DagManager.DAG_MANAGER_PREFIX + "perUserQuota"; - public static final String PER_FLOWGROUP_QUOTA = DagManager.DAG_MANAGER_PREFIX + "perFlowGroupQuota"; - public static final String USER_JOB_QUOTA_KEY = DagManager.DAG_MANAGER_PREFIX + "defaultJobQuota"; + public static final String PER_USER_QUOTA = ServiceConfigKeys.GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_PREFIX + "perUserQuota"; + public static final String PER_FLOWGROUP_QUOTA = ServiceConfigKeys.GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_PREFIX + "perFlowGroupQuota"; + public static final String USER_JOB_QUOTA_KEY = ServiceConfigKeys.GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_PREFIX + "defaultJobQuota"; Review Comment: again, should we honor the old prop names for b/w compat? also, when we devise a new name to deprecate the old one, let's avoid embedding impl-specific identifiers in what should otherwise be a user-level config option w/ clear and steady semantics ########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngine.java: ########## @@ -115,6 +109,31 @@ protected void shutDown() this.scheduledExecutorPool.awaitTermination(TERMINATION_TIMEOUT, TimeUnit.SECONDS); } + /** + * Action to be performed on a {@link Dag}, in case of a job failure. Currently, we allow 2 modes: + * <ul> + * <li> FINISH_RUNNING, which allows currently running jobs to finish.</li> + * <li> FINISH_ALL_POSSIBLE, which allows every possible job in the Dag to finish, as long as all the dependencies + * of the job are successful.</li> + * </ul> + */ + public enum FailureOption { + FINISH_RUNNING("FINISH_RUNNING"), + CANCEL("CANCEL"), + FINISH_ALL_POSSIBLE("FINISH_ALL_POSSIBLE"); + + private final String failureOption; + + FailureOption(final String failureOption) { + this.failureOption = failureOption; + } + + @Override + public String toString() { + return this.failureOption; + } + } Review Comment: somehow I though that: ``` public enum FailureOption { FINISH_RUNNING, CANCEL, FINISH_ALL_POSSIBLE; } ``` that the `.toString()` would automatically print the enum's name, no? ########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStore.java: ########## @@ -69,7 +70,7 @@ public class MySqlDagManagementStateStore implements DagManagementStateStore { Map<URI, TopologySpec> topologySpecMap; private final Config config; public static final String FAILED_DAG_STATESTORE_PREFIX = "failedDagStateStore"; - public static final String DAG_STATESTORE_CLASS_KEY = DagManager.DAG_MANAGER_PREFIX + "dagStateStoreClass"; + public static final String DAG_STATESTORE_CLASS_KEY = ServiceConfigKeys.GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_PREFIX + "dagStateStoreClass"; Review Comment: will this render all present configs silently ignored? (presuming there's a default... if not, I expect it would instead likely fail fast) ########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngine.java: ########## @@ -57,31 +57,25 @@ @Singleton public class DagProcessingEngine extends AbstractIdleService { - @Getter private final Optional<DagTaskStream> dagTaskStream; - @Getter Optional<DagManagementStateStore> dagManagementStateStore; + @Getter private final DagTaskStream dagTaskStream; + @Getter DagManagementStateStore dagManagementStateStore; private final Config config; - private final Optional<DagProcFactory> dagProcFactory; + private final DagProcFactory dagProcFactory; private ScheduledExecutorService scheduledExecutorPool; private final DagProcessingEngineMetrics dagProcEngineMetrics; private static final Integer TERMINATION_TIMEOUT = 30; public static final String DEFAULT_JOB_START_DEADLINE_TIME_MS = "defaultJobStartDeadlineTimeMillis"; @Getter static long defaultJobStartSlaTimeMillis; + public static final String DEFAULT_FLOW_FAILURE_OPTION = DagProcessingEngine.FailureOption.FINISH_ALL_POSSIBLE.name(); Review Comment: we don't need to use `DagProcessingEngine.` qualifier inside that same class, do we? ########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerMetrics.java: ########## @@ -271,8 +267,7 @@ protected static MetricNameRegexFilter getMetricsFilterForDagManager() { } public void cleanup() { - // Add null check so that unit test will not affect each other when we de-active non-instrumented DagManager - if(this.metricContext != null && this.metricContext.getTagMap().get(GobblinMetricsKeys.CLASS_META).equals(DagManager.class.getSimpleName())) { + if (this.metricContext != null && this.metricContext.getTagMap().get(GobblinMetricsKeys.CLASS_META).equals(DagManagerMetrics.class.getSimpleName())) { // The DMThread's metrics mappings follow the lifecycle of the DMThread itself and so are lost by DM deactivation-reactivation but the RootMetricContext is a (persistent) singleton. // To avoid IllegalArgumentException by the RMC preventing (re-)add of a metric already known, remove all metrics that a new DMThread thread would attempt to add (in DagManagerThread::initialize) whenever running post-re-enablement RootMetricContext.get().removeMatching(getMetricsFilterForDagManager()); Review Comment: this code may need reworking (since aspects recorded in the comments seem to be changing), but at the least, update the remaining comment and also consider whether to rename `getMetricsFilterForDagManager()` ########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagStateStore.java: ########## @@ -27,17 +27,15 @@ /** - * An interface for storing and retrieving currently running {@link Dag<JobExecutionPlan>}s. In case of a leadership - * change in the {@link org.apache.gobblin.service.modules.core.GobblinServiceManager}, the corresponding {@link DagManager} - * loads the running {@link Dag}s from the {@link DagStateStore} to resume their execution. + * An interface for storing and retrieving currently running {@link Dag<JobExecutionPlan>}s. */ @Alpha Review Comment: shall we remove? ########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagStateStore.java: ########## @@ -84,8 +81,4 @@ default Dag<JobExecutionPlan> getDag(DagManager.DagId dagId) throws IOException */ @Deprecated Set<String> getDagIds() throws IOException; - - default boolean existsDag(DagManager.DagId dagId) throws IOException { - throw new UnsupportedOperationException("containsDag not implemented"); - } Review Comment: interesting to see this go away... is the thinking that one would merely `getDag` rather than check for existence? ########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStore.java: ########## @@ -136,11 +135,6 @@ public void cleanUp(String dagId) this.totalDagCount.dec(); Review Comment: is this in-memory metric still valid in a multi-leader ensemble? ########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/UserQuotaManager.java: ########## @@ -26,9 +26,9 @@ /** * Manages the statically configured user quotas for the proxy user in user.to.proxy configuration, the API requester(s) - * and the flow group. - * It is used by the {@link DagManager} to ensure that the number of currently running jobs do not exceed the quota, if - * the quota is exceeded, the execution will fail without running on the underlying executor. + * and the flow group. It is used by the {@link org.apache.gobblin.service.modules.orchestration.proc.DagProc} to ensure Review Comment: specifically, is it the `LaunchDagProc`? ########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceJobStartDeadlineDagProc.java: ########## @@ -63,7 +63,7 @@ protected void enforceDeadline(DagManagementStateStore dagManagementStateStore, } Dag.DagNode<JobExecutionPlan> dagNode = dagNodeToCheckDeadline.getLeft().get(); - long timeOutForJobStart = DagManagerUtils.getJobStartSla(dagNode, DagProcessingEngine.getDefaultJobStartSlaTimeMillis()); + long timeOutForJobStart = DagUtils.getJobStartSla(dagNode, DagProcessingEngine.getDefaultJobStartSlaTimeMillis()); Review Comment: let's use the class-rename as an opportunity to update the method too - `getJobStartDeadline` ########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java: ########## Review Comment: I do believe they should be separate... and since they're such closely named classes now, consider augmenting the javadoc that these utils contain functionality for *processing* a DAG, whereas `DagUtils` is for *interrogating* a DAG. the javadoc for each might reference the other (circularity should be ok for javadoc) ########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java: ########## @@ -50,16 +50,14 @@ /** - * Helper class with functionality meant to be re-used between the DagManager and Orchestrator when launching + * Helper class with functionality meant to be re-used between the LaunchDagProc and Orchestrator when launching * executions of a flow spec. In the common case, the Orchestrator receives a flow to orchestrate, performs necessary - * validations, and forwards the execution responsibility to the DagManager. The DagManager's responsibility is to - * carry out any flow action requests. However, with launch executions now being stored in the DagActionStateStore, on - * restart or leadership change the DagManager has to perform validations before executing any launch actions the - * previous leader was unable to complete. Rather than duplicating the code or introducing a circular dependency between - * the DagManager and Orchestrator, this class is utilized to store the common functionality. It is stateful, + * validations, and creates DagActions. The DagProcessingEngine's responsibility is to + * process out dag action requests. However, with launch executions now being stored in the DagActionStateStore, on + * restart, the LaunchDagProc has to perform validations before executing any launch actions the previous LaunchDagProc Review Comment: the LaunchDP was unable to complete? more like, "that are not yet fully processed". ########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceFlowFinishDeadlineDagProc.java: ########## @@ -48,8 +48,8 @@ public EnforceFlowFinishDeadlineDagProc(EnforceFlowFinishDeadlineDagTask enforce protected void enforceDeadline(DagManagementStateStore dagManagementStateStore, Dag<JobExecutionPlan> dag, DagProcessingEngineMetrics dagProcEngineMetrics) throws IOException { Dag.DagNode<JobExecutionPlan> dagNode = dag.getNodes().get(0); - long flowFinishDeadline = DagManagerUtils.getFlowSLA(dagNode); - long flowStartTime = DagManagerUtils.getFlowStartTime(dagNode); + long flowFinishDeadline = DagUtils.getFlowSLA(dagNode); Review Comment: let's use the class-rename as an opportunity to update the method too - `getFlowFinishDeadline` ########## gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java: ########## @@ -67,64 +61,51 @@ */ @Slf4j public class DagActionStoreChangeMonitor extends HighLevelConsumer<String, DagActionStoreChangeEvent> { Review Comment: I realize we no longer have a need for both `DagActionStoreCM` and also `DagManagementDagActionStoreCM`, but it's much clearer to read AND less error prone to keep the latter class as-is, rather than porting over what's different about it here into what had been its base class. I'm not against eventually consolidating those into something like this, but doing so in a PR w/ >110 files carries unnecessary risk that we may have introduced a small error in the porting. ########## gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/SpecStoreChangeMonitor.java: ########## @@ -204,7 +204,9 @@ protected void createMetrics() { this.messageProcessedMeter = this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_SPEC_STORE_MESSAGE_PROCESSED); this.duplicateMessagesMeter = this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_SPEC_STORE_DUPLICATE_MESSAGES); this.heartbeatMessagesMeter = this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_SPEC_STORE_HEARTBEAT_MESSAGES); - this.produceToConsumeDelayMillis = this.getMetricContext().newContextAwareGauge(RuntimeMetrics.GOBBLIN_SPEC_STORE_PRODUCE_TO_CONSUME_DELAY_MILLIS, () -> produceToConsumeDelayValue); - this.getMetricContext().register(this.produceToConsumeDelayMillis); + // Reports delay from all partitions in one gauge + ContextAwareGauge<Long> produceToConsumeDelayMillis = this.getMetricContext().newContextAwareGauge( Review Comment: all the other metrics have an instance member. why use a local for this one? ########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java: ########## @@ -50,16 +50,14 @@ /** - * Helper class with functionality meant to be re-used between the DagManager and Orchestrator when launching + * Helper class with functionality meant to be re-used between the LaunchDagProc and Orchestrator when launching * executions of a flow spec. In the common case, the Orchestrator receives a flow to orchestrate, performs necessary - * validations, and forwards the execution responsibility to the DagManager. The DagManager's responsibility is to - * carry out any flow action requests. However, with launch executions now being stored in the DagActionStateStore, on - * restart or leadership change the DagManager has to perform validations before executing any launch actions the - * previous leader was unable to complete. Rather than duplicating the code or introducing a circular dependency between - * the DagManager and Orchestrator, this class is utilized to store the common functionality. It is stateful, + * validations, and creates DagActions. The DagProcessingEngine's responsibility is to + * process out dag action requests. However, with launch executions now being stored in the DagActionStateStore, on Review Comment: extra "out" in "process out dag"... but this could simply be: > creates {@link DagAction}s, which the {@link DPE} is responsible for processing. ########## gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/InMemoryUserQuotaManagerTest.java: ########## @@ -45,8 +48,8 @@ public void setUp() { // Tests that if exceeding the quota on startup, do not throw an exception and do not decrement the counter @Test public void testExceedsQuotaOnStartup() throws Exception { - List<Dag<JobExecutionPlan>> dags = DagManagerTest.buildDagList(2, "user", ConfigFactory.empty()); - // Ensure that the current attempt is 1, normally done by DagManager + List<Dag<JobExecutionPlan>> dags = DagTestUtils.buildDagList(2, "user", ConfigFactory.empty()); + // Ensure that the current attempt is 1, normally done by DagProcs Review Comment: which one, launch? reeval? ########## gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/GitConfigMonitorTest.java: ########## @@ -79,50 +78,47 @@ public class GitConfigMonitorTest { private final File testFlowFile2 = new File(testGroupDir, TEST_FLOW_FILE2); private final File testFlowFile3 = new File(testGroupDir, TEST_FLOW_FILE3); - private RefSpec masterRefSpec = new RefSpec("master"); + private final RefSpec masterRefSpec = new RefSpec("master"); private FlowCatalog flowCatalog; - private SpecCatalogListener mockListener; - private Config config; private GitConfigMonitor gitConfigMonitor; @BeforeClass public void setup() throws Exception { - cleanUpDir(TEST_DIR); + cleanUpDir(); // Create a bare repository RepositoryCache.FileKey fileKey = RepositoryCache.FileKey.exact(remoteDir, FS.DETECTED); - this.remoteRepo = fileKey.open(false); - this.remoteRepo.create(true); + Repository remoteRepo = fileKey.open(false); + remoteRepo.create(true); - this.gitForPush = Git.cloneRepository().setURI(this.remoteRepo.getDirectory().getAbsolutePath()).setDirectory(cloneDir).call(); + this.gitForPush = Git.cloneRepository().setURI(remoteRepo.getDirectory().getAbsolutePath()).setDirectory(cloneDir).call(); // push an empty commit as a base for detecting changes this.gitForPush.commit().setMessage("First commit").call(); this.gitForPush.push().setRemote("origin").setRefSpecs(this.masterRefSpec).call(); - this.config = ConfigBuilder.create() + Config config = ConfigBuilder.create() .addPrimitive(GitConfigMonitor.GIT_CONFIG_MONITOR_PREFIX + "." + ConfigurationKeys.GIT_MONITOR_REPO_URI, - this.remoteRepo.getDirectory().getAbsolutePath()) - .addPrimitive(GitConfigMonitor.GIT_CONFIG_MONITOR_PREFIX + "." + ConfigurationKeys.GIT_MONITOR_REPO_DIR, TEST_DIR + "/jobConfig") - .addPrimitive(FlowCatalog.FLOWSPEC_STORE_DIR_KEY, TEST_DIR + "flowCatalog") - .addPrimitive(ConfigurationKeys.GIT_MONITOR_POLLING_INTERVAL, 5) - .build(); + remoteRepo.getDirectory().getAbsolutePath()) + .addPrimitive(GitConfigMonitor.GIT_CONFIG_MONITOR_PREFIX + "." + ConfigurationKeys.GIT_MONITOR_REPO_DIR, + TEST_DIR + "/jobConfig").addPrimitive(FlowCatalog.FLOWSPEC_STORE_DIR_KEY, TEST_DIR + "flowCatalog") + .addPrimitive(ConfigurationKeys.GIT_MONITOR_POLLING_INTERVAL, 5).build(); this.flowCatalog = new FlowCatalog(config); - this.mockListener = mock(SpecCatalogListener.class); - when(mockListener.getName()).thenReturn(ServiceConfigKeys.GOBBLIN_SERVICE_JOB_SCHEDULER_LISTENER_CLASS); - when(mockListener.onAddSpec(any())).thenReturn(new AddSpecResponse("")); + SpecCatalogListener mockListener = mock(SpecCatalogListener.class); + when(mockListener.getName()).thenReturn(ServiceConfigKeys.GOBBLIN_ORCHESTRATOR_LISTENER_CLASS); + when(mockListener.onAddSpec(any())).thenReturn(new AddSpecResponse<>("")); this.flowCatalog.addListener(mockListener); this.flowCatalog.startAsync().awaitRunning(); - this.gitConfigMonitor = new GitConfigMonitor(this.config, this.flowCatalog); + this.gitConfigMonitor = new GitConfigMonitor(config, this.flowCatalog); this.gitConfigMonitor.setActive(true); } - private void cleanUpDir(String dir) { - File specStoreDir = new File(dir); + private void cleanUpDir() { + File specStoreDir = new File(GitConfigMonitorTest.TEST_DIR); Review Comment: NBD, but I personally favor hard-coding params, rather than dropping them to hard-code the impl. the former is not only clearer to read, but more future-proof ########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java: ########## @@ -78,32 +76,19 @@ public class DagManagementTaskStreamImpl implements DagManagement, DagTaskStream private final Config config; @Getter private final EventSubmitter eventSubmitter; protected MultiActiveLeaseArbiter dagActionProcessingLeaseArbiter; - protected Optional<DagActionReminderScheduler> dagActionReminderScheduler; - private final boolean isMultiActiveExecutionEnabled; + protected DagActionReminderScheduler dagActionReminderScheduler; private static final int MAX_HOUSEKEEPING_THREAD_DELAY = 180; private final BlockingQueue<DagActionStore.LeaseParams> leaseParamsQueue = new LinkedBlockingQueue<>(); private final DagManagementStateStore dagManagementStateStore; private final DagProcessingEngineMetrics dagProcEngineMetrics; @Inject - public DagManagementTaskStreamImpl(Config config, Optional<DagActionStore> dagActionStore, - @Named(ConfigurationKeys.PROCESSING_LEASE_ARBITER_NAME) MultiActiveLeaseArbiter dagActionProcessingLeaseArbiter, Review Comment: don't we need this name for init/config-time discernment between two different versions of the MALA? ########## gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagTestUtils.java: ########## @@ -81,7 +83,7 @@ public static Dag<JobExecutionPlan> buildDag(String id, Long flowExecutionId) th addPrimitive(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, flowExecutionId). addPrimitive(ConfigurationKeys.JOB_NAME_KEY, "job" + suffix).build(); if (i > 0) { - jobConfig = jobConfig.withValue(ConfigurationKeys.JOB_DEPENDENCIES, ConfigValueFactory.fromAnyRef("job" + (i - 1))); + jobConfig = jobConfig.withValue(ConfigurationKeys.JOB_DEPENDENCIES, ConfigValueFactory.fromAnyRef("job0")); Review Comment: semantics have changed here. if we're confident we always want "job0", should we remove the `if (i > 0)` (or change it to `if (i == 1)`)? ########## gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/GitConfigMonitorTest.java: ########## @@ -241,14 +237,9 @@ public void testForcedPushConfig() throws IOException, GitAPIException, URISynta Collection<Spec> specs = this.flowCatalog.getSpecs(); - Assert.assertTrue(specs.size() == 2); + Assert.assertEquals(specs.size(), 2); List<Spec> specList = Lists.newArrayList(specs); - specList.sort(new Comparator<Spec>() { - @Override - public int compare(Spec o1, Spec o2) { - return o1.getUri().compareTo(o2.getUri()); - } - }); + specList.sort(Comparator.comparing(Spec::getUri)); Review Comment: great improvement! ########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java: ########## @@ -329,22 +207,15 @@ public void remove(Spec spec, Properties headers) throws IOException { if (spec instanceof FlowSpec) { String flowGroup = FlowSpec.Utils.getFlowGroup(uri); String flowName = FlowSpec.Utils.getFlowName(uri); - if (this.flowLaunchHandler.isPresent()) { - List<Long> flowExecutionIds = this.jobStatusRetriever.getLatestExecutionIdsForFlow(flowName, flowGroup, 10); - _log.info("Found {} flows to cancel.", flowExecutionIds.size()); - - for (long flowExecutionId : flowExecutionIds) { - DagActionStore.DagAction killDagAction = DagActionStore.DagAction.forFlow(flowGroup, flowName, flowExecutionId, - DagActionStore.DagActionType.KILL); - DagActionStore.LeaseParams leaseParams = new DagActionStore.LeaseParams(killDagAction, false, - System.currentTimeMillis()); - flowLaunchHandler.get().handleFlowKillTriggerEvent(new Properties(), leaseParams); - } - } else { - //Send the dag to the DagManager to stop it. - //Also send it to the SpecProducer to do any cleanup tasks on SpecExecutor. - _log.info("Forwarding cancel request for flow URI {} to DagManager.", spec.getUri()); - this.dagManager.stopDag(uri); + List<Long> flowExecutionIds = this.jobStatusRetriever.getLatestExecutionIdsForFlow(flowName, flowGroup, 10); + _log.info("Found {} flows to cancel.", flowExecutionIds.size()); + + for (long flowExecutionId : flowExecutionIds) { + DagActionStore.DagAction killDagAction = DagActionStore.DagAction.forFlow(flowGroup, flowName, flowExecutionId, Review Comment: indentation ########## gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagFlowTest.java: ########## Review Comment: is this the right class? isn't this more of a "JobSpecTest"? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@gobblin.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org