umustafi commented on a change in pull request #3268:
URL: https://github.com/apache/gobblin/pull/3268#discussion_r621514135



##########
File path: 
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
##########
@@ -365,38 +365,42 @@ public void remove(Spec spec, Properties headers) throws 
IOException {
     // .. this will work for Identity compiler but not always for multi-hop.
     // Note: Current logic assumes compilation is consistent between all 
executions
     if (spec instanceof FlowSpec) {
+      //Send the dag to the DagManager to stop it.
+      //Also send it to the SpecProducer to do any cleanup tasks on 
SpecExecutor.
       if (this.dagManager.isPresent()) {
-        //Send the dag to the DagManager.
         _log.info("Forwarding cancel request for flow URI {} to DagManager.", 
spec.getUri());
         this.dagManager.get().stopDag(spec.getUri());
-      } else {
-        // If DagManager is not enabled, we need to recompile the flow to find 
the spec producer,
-        // If compilation results is different, it remove request can go to 
some different spec producer
-        Dag<JobExecutionPlan> jobExecutionPlanDag = 
specCompiler.compileFlow(spec);
-
-        if (jobExecutionPlanDag.isEmpty()) {
-          _log.warn("Cannot determine an executor to delete Spec: " + spec);
-          return;
-        }
-
-        // Delete all compiled JobSpecs on their respective Executor
-        for (Dag.DagNode<JobExecutionPlan> dagNode: 
jobExecutionPlanDag.getNodes()) {
-          JobExecutionPlan jobExecutionPlan = dagNode.getValue();
-          Spec jobSpec = jobExecutionPlan.getJobSpec();
-          try {
-            SpecProducer<Spec> producer = 
jobExecutionPlan.getSpecExecutor().getProducer().get();
-            _log.info(String.format("Going to delete JobSpec: %s on Executor: 
%s", jobSpec, producer));
-            producer.deleteSpec(jobSpec.getUri(), headers);
-          } catch (Exception e) {
-            _log.error(String.format("Could not delete JobSpec: %s for flow: 
%s", jobSpec, spec), e);
-          }
-        }
       }
+      // We need to recompile the flow to find the spec producer,
+      // If compilation results is different, it remove request can go to some 
different spec producer

Review comment:
       small nitpick here "it's"




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to