phet commented on code in PR #4031:
URL: https://github.com/apache/gobblin/pull/4031#discussion_r1737127941


##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java:
##########
@@ -168,21 +160,6 @@ protected void shutDown() throws Exception {
     this.listeners.close();
   }
 
-  /***************************************************
-   /* Catalog listeners                              *
-   /**************************************************/
-
-  protected void notifyAllListeners() {
-    try {
-      Iterator<URI> uriIterator = getSpecURIs();
-      while (uriIterator.hasNext()) {
-        this.listeners.onAddSpec(getSpecWrapper(uriIterator.next()));
-      }
-    } catch (IOException e) {
-      log.error("Cannot retrieve specs from catalog:", e);
-    }
-  }
-

Review Comment:
   wondering on this...
   there's still an `addListener` method.  is there no any need to 
`notifyListeners`?



##########
gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java:
##########
@@ -205,10 +163,10 @@ public class ServiceConfigKeys {
   public static final String ISSUE_REPO_CLASS = GOBBLIN_SERVICE_PREFIX + 
"issueRepo.class";
 
   public static final String GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_PREFIX = 
ServiceConfigKeys.GOBBLIN_SERVICE_PREFIX + "dagProcessingEngine.";
-  public static final String DAG_PROCESSING_ENGINE_ENABLED = 
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_PREFIX + "enabled";
   public static final String NUM_DAG_PROC_THREADS_KEY = 
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_PREFIX + "numThreads";
   public static final String DAG_PROC_ENGINE_NON_RETRYABLE_EXCEPTIONS_KEY = 
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_PREFIX + "nonRetryableExceptions";
-
   public static final Integer DEFAULT_NUM_DAG_PROC_THREADS = 3;
-  public static final String GOBBLIN_SERVICE_MULTI_ACTIVE_EXECUTION_ENABLED = 
GOBBLIN_SERVICE_PREFIX + "multiActiveExecutionEnabled";
+  public static final String JOB_START_SLA_TIME = 
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_PREFIX + 
ConfigurationKeys.GOBBLIN_JOB_START_SLA_TIME;
+  public static final String JOB_START_SLA_UNITS = 
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_PREFIX + 
ConfigurationKeys.GOBBLIN_JOB_START_SLA_TIME_UNIT;
+  public static final long DEFAULT_FLOW_SLA_MILLIS = 
TimeUnit.HOURS.toMillis(24);

Review Comment:
   three thoughts here:
   
   1. previously the property included the `.dagManager` segment:
   ```
     // Default job start SLA time if configured, measured in minutes. Default 
is 10 minutes
     // todo - rename "sla" -> "deadline", and move them to DagProcUtils
     public static final String JOB_START_SLA_TIME = DAG_MANAGER_PREFIX + 
ConfigurationKeys.GOBBLIN_JOB_START_SLA_TIME;
    ```
   should we continue to accept that legacy prop name so existing flow defs 
remain b/w compat?
   
   2. as we introduce a new prop name, let's take the opportunity to align 
naming and semantics by calling it `start.deadline`, rather than SLA
   
   3. given that `.dagProcessingEngine` is as much impl. detail as 
`.dagManager`, it may not belong in the customer-facing name.



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java:
##########
@@ -74,7 +74,7 @@ public abstract class BaseFlowToJobSpecCompiler implements 
SpecCompiler {
   // to these data structures.
   @Getter
   @Setter
-  protected final Map<URI, TopologySpec> topologySpecMap;
+  protected Map<URI, TopologySpec> topologySpecMap;

Review Comment:
   I didn't notice why this wouldn't still be `final`.  is a derived class 
assigning to it?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerMetrics.java:
##########
@@ -55,7 +55,7 @@
 
 @Slf4j
 public class DagManagerMetrics {

Review Comment:
   rename to `DagMetrics` or leave as-is?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/Dag.java:
##########
@@ -299,4 +303,43 @@ public int hashCode() {
   public String toString() {
     return this.getNodes().stream().map(node -> 
node.getValue().toString()).collect(Collectors.toList()).toString();
   }
+
+  public enum FlowState {
+    FAILED(-1),
+    RUNNING(0),
+    SUCCESSFUL(1);
+
+    public final int value;
+
+    FlowState(int value) {
+      this.value = value;
+    }
+  }
+
+  @Getter
+  @EqualsAndHashCode
+  public static class DagId {
+    String flowGroup;
+    String flowName;
+    long flowExecutionId;
+
+    public DagId(String flowGroup, String flowName, long flowExecutionId) {
+      this.flowGroup = flowGroup;
+      this.flowName = flowName;
+      this.flowExecutionId = flowExecutionId;
+    }

Review Comment:
   `@AllArgsCtor`?  actually if you can make the three fields `final`, `@Data` 
should take care of many of these things for you
   
   (not clear if that's possible, since the fields aren't `private`... even if 
not `@Data` and/or `final`, can we make them `private`?)



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/AbstractUserQuotaManager.java:
##########
@@ -37,9 +38,9 @@
  */
 @Slf4j
 abstract public class AbstractUserQuotaManager implements UserQuotaManager {
-  public static final String PER_USER_QUOTA = DagManager.DAG_MANAGER_PREFIX + 
"perUserQuota";
-  public static final String PER_FLOWGROUP_QUOTA = 
DagManager.DAG_MANAGER_PREFIX + "perFlowGroupQuota";
-  public static final String USER_JOB_QUOTA_KEY = 
DagManager.DAG_MANAGER_PREFIX + "defaultJobQuota";
+  public static final String PER_USER_QUOTA = 
ServiceConfigKeys.GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_PREFIX + "perUserQuota";
+  public static final String PER_FLOWGROUP_QUOTA = 
ServiceConfigKeys.GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_PREFIX + 
"perFlowGroupQuota";
+  public static final String USER_JOB_QUOTA_KEY = 
ServiceConfigKeys.GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_PREFIX + 
"defaultJobQuota";

Review Comment:
   again, should we honor the old prop names for b/w compat?
   
   also, when we devise a new name to deprecate the old one, let's avoid 
embedding impl-specific identifiers in what should otherwise be a user-level 
config option w/ clear and steady semantics



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngine.java:
##########
@@ -115,6 +109,31 @@ protected void shutDown()
     this.scheduledExecutorPool.awaitTermination(TERMINATION_TIMEOUT, 
TimeUnit.SECONDS);
   }
 
+  /**
+   * Action to be performed on a {@link Dag}, in case of a job failure. 
Currently, we allow 2 modes:
+   * <ul>
+   *   <li> FINISH_RUNNING, which allows currently running jobs to finish.</li>
+   *   <li> FINISH_ALL_POSSIBLE, which allows every possible job in the Dag to 
finish, as long as all the dependencies
+   *   of the job are successful.</li>
+   * </ul>
+   */
+  public enum FailureOption {
+    FINISH_RUNNING("FINISH_RUNNING"),
+    CANCEL("CANCEL"),
+    FINISH_ALL_POSSIBLE("FINISH_ALL_POSSIBLE");
+
+    private final String failureOption;
+
+    FailureOption(final String failureOption) {
+      this.failureOption = failureOption;
+    }
+
+    @Override
+    public String toString() {
+      return this.failureOption;
+    }
+  }

Review Comment:
   somehow I though that:
   ```
   public enum FailureOption {
     FINISH_RUNNING, CANCEL, FINISH_ALL_POSSIBLE;
   }
   ```
   that the `.toString()` would automatically print the enum's name, no?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStore.java:
##########
@@ -69,7 +70,7 @@ public class MySqlDagManagementStateStore implements 
DagManagementStateStore {
   Map<URI, TopologySpec> topologySpecMap;
   private final Config config;
   public static final String FAILED_DAG_STATESTORE_PREFIX = 
"failedDagStateStore";
-  public static final String DAG_STATESTORE_CLASS_KEY = 
DagManager.DAG_MANAGER_PREFIX + "dagStateStoreClass";
+  public static final String DAG_STATESTORE_CLASS_KEY = 
ServiceConfigKeys.GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_PREFIX + 
"dagStateStoreClass";

Review Comment:
   will this render all present configs silently ignored?
   
   (presuming there's a default... if not, I expect it would instead likely 
fail fast)



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngine.java:
##########
@@ -57,31 +57,25 @@
 @Singleton
 public class DagProcessingEngine extends AbstractIdleService {
 
-  @Getter private final Optional<DagTaskStream> dagTaskStream;
-  @Getter Optional<DagManagementStateStore> dagManagementStateStore;
+  @Getter private final DagTaskStream dagTaskStream;
+  @Getter DagManagementStateStore dagManagementStateStore;
   private final Config config;
-  private final Optional<DagProcFactory> dagProcFactory;
+  private final DagProcFactory dagProcFactory;
   private ScheduledExecutorService scheduledExecutorPool;
   private final DagProcessingEngineMetrics dagProcEngineMetrics;
   private static final Integer TERMINATION_TIMEOUT = 30;
   public static final String DEFAULT_JOB_START_DEADLINE_TIME_MS = 
"defaultJobStartDeadlineTimeMillis";
   @Getter static long defaultJobStartSlaTimeMillis;
+  public static final String DEFAULT_FLOW_FAILURE_OPTION = 
DagProcessingEngine.FailureOption.FINISH_ALL_POSSIBLE.name();

Review Comment:
   we don't need to use `DagProcessingEngine.` qualifier inside that same 
class, do we?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerMetrics.java:
##########
@@ -271,8 +267,7 @@ protected static MetricNameRegexFilter 
getMetricsFilterForDagManager() {
   }
 
   public void cleanup() {
-    // Add null check so that unit test will not affect each other when we 
de-active non-instrumented DagManager
-    if(this.metricContext != null && 
this.metricContext.getTagMap().get(GobblinMetricsKeys.CLASS_META).equals(DagManager.class.getSimpleName()))
 {
+    if (this.metricContext != null && 
this.metricContext.getTagMap().get(GobblinMetricsKeys.CLASS_META).equals(DagManagerMetrics.class.getSimpleName()))
 {
       // The DMThread's metrics mappings follow the lifecycle of the DMThread 
itself and so are lost by DM deactivation-reactivation but the 
RootMetricContext is a (persistent) singleton.
       // To avoid IllegalArgumentException by the RMC preventing (re-)add of a 
metric already known, remove all metrics that a new DMThread thread would 
attempt to add (in DagManagerThread::initialize) whenever running 
post-re-enablement
       RootMetricContext.get().removeMatching(getMetricsFilterForDagManager());

Review Comment:
   this code may need reworking (since aspects recorded in the comments seem to 
be changing), but at the least, update the remaining comment and also consider 
whether to rename `getMetricsFilterForDagManager()`



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagStateStore.java:
##########
@@ -27,17 +27,15 @@
 
 
 /**
- * An interface for storing and retrieving currently running {@link 
Dag<JobExecutionPlan>}s. In case of a leadership
- * change in the {@link 
org.apache.gobblin.service.modules.core.GobblinServiceManager}, the 
corresponding {@link DagManager}
- * loads the running {@link Dag}s from the {@link DagStateStore} to resume 
their execution.
+ * An interface for storing and retrieving currently running {@link 
Dag<JobExecutionPlan>}s.
  */
 @Alpha

Review Comment:
   shall we remove?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagStateStore.java:
##########
@@ -84,8 +81,4 @@ default Dag<JobExecutionPlan> getDag(DagManager.DagId dagId) 
throws IOException
    */
   @Deprecated
   Set<String> getDagIds() throws IOException;
-
-  default boolean existsDag(DagManager.DagId dagId) throws IOException {
-    throw new UnsupportedOperationException("containsDag not implemented");
-  }

Review Comment:
   interesting to see this go away...  is the thinking that one would merely 
`getDag` rather than check for existence?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStore.java:
##########
@@ -136,11 +135,6 @@ public void cleanUp(String dagId)
     this.totalDagCount.dec();

Review Comment:
   is this in-memory metric still valid in a multi-leader ensemble?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/UserQuotaManager.java:
##########
@@ -26,9 +26,9 @@
 
 /**
  * Manages the statically configured user quotas for the proxy user in 
user.to.proxy configuration, the API requester(s)
- * and the flow group.
- * It is used by the {@link DagManager} to ensure that the number of currently 
running jobs do not exceed the quota, if
- * the quota is exceeded, the execution will fail without running on the 
underlying executor.
+ * and the flow group. It is used by the {@link 
org.apache.gobblin.service.modules.orchestration.proc.DagProc} to ensure

Review Comment:
   specifically, is it the `LaunchDagProc`?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceJobStartDeadlineDagProc.java:
##########
@@ -63,7 +63,7 @@ protected void enforceDeadline(DagManagementStateStore 
dagManagementStateStore,
     }
 
     Dag.DagNode<JobExecutionPlan> dagNode = 
dagNodeToCheckDeadline.getLeft().get();
-    long timeOutForJobStart = DagManagerUtils.getJobStartSla(dagNode, 
DagProcessingEngine.getDefaultJobStartSlaTimeMillis());
+    long timeOutForJobStart = DagUtils.getJobStartSla(dagNode, 
DagProcessingEngine.getDefaultJobStartSlaTimeMillis());

Review Comment:
   let's use the class-rename as an opportunity to update the method too - 
`getJobStartDeadline`



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java:
##########


Review Comment:
   I do believe they should be separate... and since they're such closely named 
classes now, consider augmenting the javadoc that these utils contain 
functionality for *processing* a DAG, whereas `DagUtils` is for *interrogating* 
a DAG.  the javadoc for each might reference the other (circularity should be 
ok for javadoc)



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java:
##########
@@ -50,16 +50,14 @@
 
 
 /**
- * Helper class with functionality meant to be re-used between the DagManager 
and Orchestrator when launching
+ * Helper class with functionality meant to be re-used between the 
LaunchDagProc and Orchestrator when launching
  * executions of a flow spec. In the common case, the Orchestrator receives a 
flow to orchestrate, performs necessary
- * validations, and forwards the execution responsibility to the DagManager. 
The DagManager's responsibility is to
- * carry out any flow action requests. However, with launch executions now 
being stored in the DagActionStateStore, on
- * restart or leadership change the DagManager has to perform validations 
before executing any launch actions the
- * previous leader was unable to complete. Rather than duplicating the code or 
introducing a circular dependency between
- * the DagManager and Orchestrator, this class is utilized to store the common 
functionality. It is stateful,
+ * validations, and creates DagActions. The DagProcessingEngine's 
responsibility is to
+ * process out dag action requests. However, with launch executions now being 
stored in the DagActionStateStore, on
+ * restart, the LaunchDagProc has to perform validations before executing any 
launch actions the previous LaunchDagProc

Review Comment:
   the LaunchDP was unable to complete?  more like, "that are not yet fully 
processed".



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceFlowFinishDeadlineDagProc.java:
##########
@@ -48,8 +48,8 @@ public 
EnforceFlowFinishDeadlineDagProc(EnforceFlowFinishDeadlineDagTask enforce
   protected void enforceDeadline(DagManagementStateStore 
dagManagementStateStore, Dag<JobExecutionPlan> dag,
       DagProcessingEngineMetrics dagProcEngineMetrics) throws IOException {
     Dag.DagNode<JobExecutionPlan> dagNode = dag.getNodes().get(0);
-    long flowFinishDeadline = DagManagerUtils.getFlowSLA(dagNode);
-    long flowStartTime = DagManagerUtils.getFlowStartTime(dagNode);
+    long flowFinishDeadline = DagUtils.getFlowSLA(dagNode);

Review Comment:
   let's use the class-rename as an opportunity to update the method too - 
`getFlowFinishDeadline`



##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java:
##########
@@ -67,64 +61,51 @@
  */
 @Slf4j
 public class DagActionStoreChangeMonitor extends HighLevelConsumer<String, 
DagActionStoreChangeEvent> {

Review Comment:
   I realize we no longer have a need for both `DagActionStoreCM` and also 
`DagManagementDagActionStoreCM`, but it's much clearer to read AND less error 
prone to keep the latter class as-is, rather than porting over what's different 
about it here into what had been its base class.
   
   I'm not against eventually consolidating those into something like this, but 
doing so in a PR w/ >110 files carries unnecessary risk that we may have 
introduced a small error in the porting.



##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/SpecStoreChangeMonitor.java:
##########
@@ -204,7 +204,9 @@ protected void createMetrics() {
     this.messageProcessedMeter = 
this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_SPEC_STORE_MESSAGE_PROCESSED);
     this.duplicateMessagesMeter = 
this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_SPEC_STORE_DUPLICATE_MESSAGES);
     this.heartbeatMessagesMeter = 
this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_SPEC_STORE_HEARTBEAT_MESSAGES);
-    this.produceToConsumeDelayMillis = 
this.getMetricContext().newContextAwareGauge(RuntimeMetrics.GOBBLIN_SPEC_STORE_PRODUCE_TO_CONSUME_DELAY_MILLIS,
 () -> produceToConsumeDelayValue);
-    this.getMetricContext().register(this.produceToConsumeDelayMillis);
+   // Reports delay from all partitions in one gauge
+   ContextAwareGauge<Long> produceToConsumeDelayMillis = 
this.getMetricContext().newContextAwareGauge(

Review Comment:
   all the other metrics have an instance member.  why use a local for this one?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java:
##########
@@ -50,16 +50,14 @@
 
 
 /**
- * Helper class with functionality meant to be re-used between the DagManager 
and Orchestrator when launching
+ * Helper class with functionality meant to be re-used between the 
LaunchDagProc and Orchestrator when launching
  * executions of a flow spec. In the common case, the Orchestrator receives a 
flow to orchestrate, performs necessary
- * validations, and forwards the execution responsibility to the DagManager. 
The DagManager's responsibility is to
- * carry out any flow action requests. However, with launch executions now 
being stored in the DagActionStateStore, on
- * restart or leadership change the DagManager has to perform validations 
before executing any launch actions the
- * previous leader was unable to complete. Rather than duplicating the code or 
introducing a circular dependency between
- * the DagManager and Orchestrator, this class is utilized to store the common 
functionality. It is stateful,
+ * validations, and creates DagActions. The DagProcessingEngine's 
responsibility is to
+ * process out dag action requests. However, with launch executions now being 
stored in the DagActionStateStore, on

Review Comment:
   extra "out" in "process out dag"... but this could simply be:
   > creates {@link DagAction}s, which the {@link DPE} is responsible for 
processing.



##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/InMemoryUserQuotaManagerTest.java:
##########
@@ -45,8 +48,8 @@ public void setUp() {
   // Tests that if exceeding the quota on startup, do not throw an exception 
and do not decrement the counter
   @Test
   public void testExceedsQuotaOnStartup() throws Exception {
-    List<Dag<JobExecutionPlan>> dags = DagManagerTest.buildDagList(2, "user", 
ConfigFactory.empty());
-    // Ensure that the current attempt is 1, normally done by DagManager
+    List<Dag<JobExecutionPlan>> dags = DagTestUtils.buildDagList(2, "user", 
ConfigFactory.empty());
+    // Ensure that the current attempt is 1, normally done by DagProcs

Review Comment:
   which one, launch?  reeval?



##########
gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/GitConfigMonitorTest.java:
##########
@@ -79,50 +78,47 @@ public class GitConfigMonitorTest {
   private final File testFlowFile2 = new File(testGroupDir, TEST_FLOW_FILE2);
   private final File testFlowFile3 = new File(testGroupDir, TEST_FLOW_FILE3);
 
-  private RefSpec masterRefSpec = new RefSpec("master");
+  private final RefSpec masterRefSpec = new RefSpec("master");
   private FlowCatalog flowCatalog;
-  private SpecCatalogListener mockListener;
-  private Config config;
   private GitConfigMonitor gitConfigMonitor;
 
 
   @BeforeClass
   public void setup() throws Exception {
-    cleanUpDir(TEST_DIR);
+    cleanUpDir();
 
     // Create a bare repository
     RepositoryCache.FileKey fileKey = RepositoryCache.FileKey.exact(remoteDir, 
FS.DETECTED);
-    this.remoteRepo = fileKey.open(false);
-    this.remoteRepo.create(true);
+    Repository remoteRepo = fileKey.open(false);
+    remoteRepo.create(true);
 
-    this.gitForPush = 
Git.cloneRepository().setURI(this.remoteRepo.getDirectory().getAbsolutePath()).setDirectory(cloneDir).call();
+    this.gitForPush = 
Git.cloneRepository().setURI(remoteRepo.getDirectory().getAbsolutePath()).setDirectory(cloneDir).call();
 
     // push an empty commit as a base for detecting changes
     this.gitForPush.commit().setMessage("First commit").call();
     
this.gitForPush.push().setRemote("origin").setRefSpecs(this.masterRefSpec).call();
 
-    this.config = ConfigBuilder.create()
+    Config config = ConfigBuilder.create()
         .addPrimitive(GitConfigMonitor.GIT_CONFIG_MONITOR_PREFIX + "." + 
ConfigurationKeys.GIT_MONITOR_REPO_URI,
-            this.remoteRepo.getDirectory().getAbsolutePath())
-        .addPrimitive(GitConfigMonitor.GIT_CONFIG_MONITOR_PREFIX + "." + 
ConfigurationKeys.GIT_MONITOR_REPO_DIR, TEST_DIR + "/jobConfig")
-        .addPrimitive(FlowCatalog.FLOWSPEC_STORE_DIR_KEY, TEST_DIR + 
"flowCatalog")
-        .addPrimitive(ConfigurationKeys.GIT_MONITOR_POLLING_INTERVAL, 5)
-        .build();
+            remoteRepo.getDirectory().getAbsolutePath())
+        .addPrimitive(GitConfigMonitor.GIT_CONFIG_MONITOR_PREFIX + "." + 
ConfigurationKeys.GIT_MONITOR_REPO_DIR,
+            TEST_DIR + 
"/jobConfig").addPrimitive(FlowCatalog.FLOWSPEC_STORE_DIR_KEY, TEST_DIR + 
"flowCatalog")
+        .addPrimitive(ConfigurationKeys.GIT_MONITOR_POLLING_INTERVAL, 
5).build();
 
     this.flowCatalog = new FlowCatalog(config);
 
-    this.mockListener = mock(SpecCatalogListener.class);
-    
when(mockListener.getName()).thenReturn(ServiceConfigKeys.GOBBLIN_SERVICE_JOB_SCHEDULER_LISTENER_CLASS);
-    when(mockListener.onAddSpec(any())).thenReturn(new AddSpecResponse(""));
+    SpecCatalogListener mockListener = mock(SpecCatalogListener.class);
+    
when(mockListener.getName()).thenReturn(ServiceConfigKeys.GOBBLIN_ORCHESTRATOR_LISTENER_CLASS);
+    when(mockListener.onAddSpec(any())).thenReturn(new AddSpecResponse<>(""));
 
     this.flowCatalog.addListener(mockListener);
     this.flowCatalog.startAsync().awaitRunning();
-    this.gitConfigMonitor = new GitConfigMonitor(this.config, 
this.flowCatalog);
+    this.gitConfigMonitor = new GitConfigMonitor(config, this.flowCatalog);
     this.gitConfigMonitor.setActive(true);
   }
 
-  private void cleanUpDir(String dir) {
-    File specStoreDir = new File(dir);
+  private void cleanUpDir() {
+    File specStoreDir = new File(GitConfigMonitorTest.TEST_DIR);

Review Comment:
   NBD, but I personally favor hard-coding params, rather than dropping them to 
hard-code the impl.  the former is not only clearer to read, but more 
future-proof



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java:
##########
@@ -78,32 +76,19 @@ public class DagManagementTaskStreamImpl implements 
DagManagement, DagTaskStream
   private final Config config;
   @Getter private final EventSubmitter eventSubmitter;
   protected MultiActiveLeaseArbiter dagActionProcessingLeaseArbiter;
-  protected Optional<DagActionReminderScheduler> dagActionReminderScheduler;
-  private final boolean isMultiActiveExecutionEnabled;
+  protected DagActionReminderScheduler dagActionReminderScheduler;
   private static final int MAX_HOUSEKEEPING_THREAD_DELAY = 180;
   private final BlockingQueue<DagActionStore.LeaseParams> leaseParamsQueue = 
new LinkedBlockingQueue<>();
   private final DagManagementStateStore dagManagementStateStore;
   private final DagProcessingEngineMetrics dagProcEngineMetrics;
 
   @Inject
-  public DagManagementTaskStreamImpl(Config config, Optional<DagActionStore> 
dagActionStore,
-      @Named(ConfigurationKeys.PROCESSING_LEASE_ARBITER_NAME) 
MultiActiveLeaseArbiter dagActionProcessingLeaseArbiter,

Review Comment:
   don't we need this name for init/config-time discernment between two 
different versions of the MALA?



##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagTestUtils.java:
##########
@@ -81,7 +83,7 @@ public static Dag<JobExecutionPlan> buildDag(String id, Long 
flowExecutionId) th
           addPrimitive(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, 
flowExecutionId).
           addPrimitive(ConfigurationKeys.JOB_NAME_KEY, "job" + suffix).build();
       if (i > 0) {
-        jobConfig = jobConfig.withValue(ConfigurationKeys.JOB_DEPENDENCIES, 
ConfigValueFactory.fromAnyRef("job" + (i - 1)));
+        jobConfig = jobConfig.withValue(ConfigurationKeys.JOB_DEPENDENCIES, 
ConfigValueFactory.fromAnyRef("job0"));

Review Comment:
   semantics have changed here.  if we're confident we always want "job0", 
should we remove the `if (i > 0)` (or change it to `if (i == 1)`)?



##########
gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/GitConfigMonitorTest.java:
##########
@@ -241,14 +237,9 @@ public void testForcedPushConfig() throws IOException, 
GitAPIException, URISynta
 
     Collection<Spec> specs = this.flowCatalog.getSpecs();
 
-    Assert.assertTrue(specs.size() == 2);
+    Assert.assertEquals(specs.size(), 2);
     List<Spec> specList = Lists.newArrayList(specs);
-    specList.sort(new Comparator<Spec>() {
-      @Override
-      public int compare(Spec o1, Spec o2) {
-        return o1.getUri().compareTo(o2.getUri());
-      }
-    });
+    specList.sort(Comparator.comparing(Spec::getUri));

Review Comment:
   great improvement!



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java:
##########
@@ -329,22 +207,15 @@ public void remove(Spec spec, Properties headers) throws 
IOException {
     if (spec instanceof FlowSpec) {
       String flowGroup = FlowSpec.Utils.getFlowGroup(uri);
       String flowName = FlowSpec.Utils.getFlowName(uri);
-      if (this.flowLaunchHandler.isPresent()) {
-        List<Long> flowExecutionIds = 
this.jobStatusRetriever.getLatestExecutionIdsForFlow(flowName, flowGroup, 10);
-        _log.info("Found {} flows to cancel.", flowExecutionIds.size());
-
-        for (long flowExecutionId : flowExecutionIds) {
-        DagActionStore.DagAction killDagAction = 
DagActionStore.DagAction.forFlow(flowGroup, flowName, flowExecutionId,
-            DagActionStore.DagActionType.KILL);
-        DagActionStore.LeaseParams leaseParams = new 
DagActionStore.LeaseParams(killDagAction, false,
-            System.currentTimeMillis());
-        flowLaunchHandler.get().handleFlowKillTriggerEvent(new Properties(), 
leaseParams);
-        }
-      } else {
-        //Send the dag to the DagManager to stop it.
-        //Also send it to the SpecProducer to do any cleanup tasks on 
SpecExecutor.
-        _log.info("Forwarding cancel request for flow URI {} to DagManager.", 
spec.getUri());
-        this.dagManager.stopDag(uri);
+      List<Long> flowExecutionIds = 
this.jobStatusRetriever.getLatestExecutionIdsForFlow(flowName, flowGroup, 10);
+      _log.info("Found {} flows to cancel.", flowExecutionIds.size());
+
+      for (long flowExecutionId : flowExecutionIds) {
+      DagActionStore.DagAction killDagAction = 
DagActionStore.DagAction.forFlow(flowGroup, flowName, flowExecutionId,

Review Comment:
   indentation



##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagFlowTest.java:
##########


Review Comment:
   is this the right class?  isn't this more of a "JobSpecTest"?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to