umustafi commented on code in PR #4012:
URL: https://github.com/apache/gobblin/pull/4012#discussion_r1700916336


##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java:
##########
@@ -319,14 +322,30 @@ public void submitFlowToDagManager(FlowSpec flowSpec, 
Dag<JobExecutionPlan> jobE
   }
 
   public void remove(Spec spec, Properties headers) throws IOException {
+    URI uri = spec.getUri();
     // TODO: Evolve logic to cache and reuse previously compiled JobSpecs
     // .. this will work for Identity compiler but not always for multi-hop.
     // Note: Current logic assumes compilation is consistent between all 
executions
     if (spec instanceof FlowSpec) {
-      //Send the dag to the DagManager to stop it.
-      //Also send it to the SpecProducer to do any cleanup tasks on 
SpecExecutor.
-      _log.info("Forwarding cancel request for flow URI {} to DagManager.", 
spec.getUri());
-      this.dagManager.stopDag(spec.getUri());
+      String flowGroup = FlowSpec.Utils.getFlowGroup(uri);
+      String flowName = FlowSpec.Utils.getFlowName(uri);
+      if (this.flowLaunchHandler.isPresent()) {
+        List<Long> flowExecutionIds = 
this.jobStatusRetriever.getLatestExecutionIdsForFlow(flowName, flowGroup, 10);
+        _log.info("Found {} flows to cancel.", flowExecutionIds.size());
+
+        for (long flowExecutionId : flowExecutionIds) {
+        DagActionStore.DagAction killDagAction = 
DagActionStore.DagAction.forFlow(flowGroup, flowName, flowExecutionId,
+            DagActionStore.DagActionType.KILL);

Review Comment:
   I see this is only for delete flowSpec then it makes sense



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to