phet commented on code in PR #4012:
URL: https://github.com/apache/gobblin/pull/4012#discussion_r1701582523


##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java:
##########
@@ -319,14 +322,30 @@ public void submitFlowToDagManager(FlowSpec flowSpec, 
Dag<JobExecutionPlan> jobE
   }
 
   public void remove(Spec spec, Properties headers) throws IOException {
+    URI uri = spec.getUri();
     // TODO: Evolve logic to cache and reuse previously compiled JobSpecs
     // .. this will work for Identity compiler but not always for multi-hop.
     // Note: Current logic assumes compilation is consistent between all 
executions
     if (spec instanceof FlowSpec) {
-      //Send the dag to the DagManager to stop it.
-      //Also send it to the SpecProducer to do any cleanup tasks on 
SpecExecutor.
-      _log.info("Forwarding cancel request for flow URI {} to DagManager.", 
spec.getUri());
-      this.dagManager.stopDag(spec.getUri());
+      String flowGroup = FlowSpec.Utils.getFlowGroup(uri);
+      String flowName = FlowSpec.Utils.getFlowName(uri);
+      if (this.flowLaunchHandler.isPresent()) {
+        List<Long> flowExecutionIds = 
this.jobStatusRetriever.getLatestExecutionIdsForFlow(flowName, flowGroup, 10);
+        _log.info("Found {} flows to cancel.", flowExecutionIds.size());
+
+        for (long flowExecutionId : flowExecutionIds) {
+        DagActionStore.DagAction killDagAction = 
DagActionStore.DagAction.forFlow(flowGroup, flowName, flowExecutionId,
+            DagActionStore.DagActionType.KILL);
+        DagActionStore.LeaseParams leaseObject = new 
DagActionStore.LeaseParams(killDagAction, false,

Review Comment:
   nit: `leaseParams` rather than `leaseObject`



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandler.java:
##########


Review Comment:
   let's keep the PRs as small and as cohesive as possible.  so next time, I'd 
strongly encourage you to put this and related mods into a PR separate from 
something like the non-transient exceptions work



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngine.java:
##########
@@ -140,6 +153,11 @@ public void run() {
           dagTask.conclude();
         } catch (Exception e) {
           log.error("DagProcEngineThread encountered exception while 
processing dag " + dagProc.getDagId(), e);
+          if (KafkaJobStatusMonitor.isThrowableInstanceOf(e, 
this.nonRetryableExceptions)) {
+            // conclude the lease so that it is not retried, if the dag proc 
fails with non-transient exception

Review Comment:
   suggest to log (although better would be within the predicate impl (i.e. 
`Exception -> boolean`)



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngine.java:
##########
@@ -122,6 +134,7 @@ static class DagProcEngineThread implements Runnable {
     private final DagProcFactory dagProcFactory;
     private final DagManagementStateStore dagManagementStateStore;
     private final DagProcessingEngineMetrics dagProcEngineMetrics;
+    private final List<Class<? extends Exception>> nonRetryableExceptions;

Review Comment:
   let's use a predicate as the interface - i.e. a function of `Exception -> 
boolean`, but you could certainly implement one instance of that Strategy 
interface in terms of a `List<Class<? extends Exception>>`



##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java:
##########
@@ -426,7 +426,15 @@ public static long getExecutionIdFromTableName(String 
tableName) {
 
   protected abstract org.apache.gobblin.configuration.State 
parseJobStatus(GobblinTrackingEvent event);
 
-  public static boolean isThrowableInstanceOf(Throwable exception, 
List<Class<? extends Exception>> typesList) {
-    return typesList.stream().anyMatch(e -> e.isInstance(exception));
+  public static boolean isThrowableInstanceOf(Throwable exception, 
List<Class<? extends Exception>> exceptionsList) {

Review Comment:
   this changes the semantics.  perhaps that's OK, but a more precise name 
along w/ some javadoc might help.  e.g. `isThrowableOrCaseInstanceOf` or 
`isThrowableInstanceOfRecursive`
   
   also, seems better suited to living within a utils class, rather than here 
in KJSM which was probably just the first place we realized we needed such 
fucntionality.



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngine.java:
##########
@@ -140,6 +153,11 @@ public void run() {
           dagTask.conclude();
         } catch (Exception e) {
           log.error("DagProcEngineThread encountered exception while 
processing dag " + dagProc.getDagId(), e);
+          if (KafkaJobStatusMonitor.isThrowableInstanceOf(e, 
this.nonRetryableExceptions)) {
+            // conclude the lease so that it is not retried, if the dag proc 
fails with non-transient exception
+            dagTask.conclude();
+            
dagManagementStateStore.getDagManagerMetrics().dagProcessingNonRetryableExceptionMeter.mark();

Review Comment:
   couldn't this live entirely encapsulated within `DagProc::process`?  that 
would give a normal return, which means the DPE would next call 
`DagTask::conclude`
   
   for that approach, initialize the `DagProcFactory` w/ the exception predicate



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DeadlineEnforcementDagProc.java:
##########
@@ -68,7 +68,7 @@ protected boolean 
isDagStillPresent(Optional<Dag<JobExecutionPlan>> dag, DagMana
 
     if (!dagManagementStateStore.existsJobDagAction(dagAction.getFlowGroup(), 
dagAction.getFlowName(),
         dagAction.getFlowExecutionId(), dagAction.getJobName(), 
dagAction.getDagActionType())) {
-      log.warn("Dag action {} is cleaned up from DMSS. No further action is 
required.", dagAction);
+      log.info("Dag action {} is cleaned up from DMSS. No further action is 
required.", dagAction);
       return false;

Review Comment:
   don't we already hold a lease by the time `act` is called?  if so, it would 
be exceedingly rare (and really no different than any other `DagProc::act` 
derived class impl).  hence, I'd suggest to remove.  but if we really believe 
we need this, let's put the impl into the `DagProc` base class



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to