phet commented on code in PR #3944:
URL: https://github.com/apache/gobblin/pull/3944#discussion_r1591495305


##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java:
##########
@@ -229,33 +229,39 @@ public void orchestrate(Spec spec, Properties jobProps, 
long triggerTimestampMil
         _log.info("Multi-active scheduler finished handling trigger event: 
[{}, is: {}, triggerEventTimestamp: {}]",
             launchDagAction, isReminderEvent ? "reminder" : "original", 
triggerTimestampMillis);
       } else {
-        TimingEvent flowCompilationTimer = new 
TimingEvent(this.eventSubmitter, TimingEvent.FlowTimings.FLOW_COMPILED);
-        Map<String, String> flowMetadata = 
TimingEventUtils.getFlowMetadata(flowSpec);
-        Optional<Dag<JobExecutionPlan>> compiledDagOptional =
-            
this.flowCompilationValidationHelper.validateAndHandleConcurrentExecution(flowConfig,
 flowSpec, flowGroup,
-                flowName, flowMetadata);
-
-        if (!compiledDagOptional.isPresent()) {
-          Instrumented.markMeter(this.flowOrchestrationFailedMeter);
-          return;
-        }
-        Dag<JobExecutionPlan> compiledDag = compiledDagOptional.get();
-        if (compiledDag.isEmpty()) {
-          
FlowCompilationValidationHelper.populateFlowCompilationFailedEventMessage(eventSubmitter,
 flowSpec, flowMetadata);
-          Instrumented.markMeter(this.flowOrchestrationFailedMeter);
+        try {
+          TimingEvent flowCompilationTimer = new 
TimingEvent(this.eventSubmitter, TimingEvent.FlowTimings.FLOW_COMPILED);
+          Map<String, String> flowMetadata = 
TimingEventUtils.getFlowMetadata(flowSpec);
+          Optional<Dag<JobExecutionPlan>> compiledDagOptional =
+              
this.flowCompilationValidationHelper.validateAndHandleConcurrentExecution(flowConfig,
 flowSpec, flowGroup,
+                  flowName, flowMetadata);
+
+          if (!compiledDagOptional.isPresent()) {
+            Instrumented.markMeter(this.flowOrchestrationFailedMeter);
+            return;
+          }
+          Dag<JobExecutionPlan> compiledDag = compiledDagOptional.get();
+          if (compiledDag.isEmpty()) {
+            
FlowCompilationValidationHelper.populateFlowCompilationFailedEventMessage(eventSubmitter,
 flowSpec,
+                flowMetadata);
+            Instrumented.markMeter(this.flowOrchestrationFailedMeter);
+            
sharedFlowMetricsSingleton.conditionallyUpdateFlowGaugeSpecState(spec,
+                SharedFlowMetricsSingleton.CompiledState.FAILED);
+            _log.warn("Cannot determine an executor to run on for Spec: " + 
spec);
+            return;
+          }
           
sharedFlowMetricsSingleton.conditionallyUpdateFlowGaugeSpecState(spec,
-              SharedFlowMetricsSingleton.CompiledState.FAILED);
-          _log.warn("Cannot determine an executor to run on for Spec: " + 
spec);
-          return;
-        }
-        sharedFlowMetricsSingleton.conditionallyUpdateFlowGaugeSpecState(spec,
-            SharedFlowMetricsSingleton.CompiledState.SUCCESSFUL);
+              SharedFlowMetricsSingleton.CompiledState.SUCCESSFUL);
 
-        
FlowCompilationValidationHelper.addFlowExecutionIdIfAbsent(flowMetadata, 
compiledDag);
-        flowCompilationTimer.stop(flowMetadata);
+          
FlowCompilationValidationHelper.addFlowExecutionIdIfAbsent(flowMetadata, 
compiledDag);
+          flowCompilationTimer.stop(flowMetadata);
 
-        // Depending on if DagManager is present, handle execution
-        submitFlowToDagManager(flowSpec, compiledDag);
+          // Depending on if DagManager is present, handle execution
+          submitFlowToDagManager(flowSpec, compiledDag);
+        } finally {
+          // remove from the flow catalog, regardless of whether the flow was 
successfully validated and permitted to exec (concurrently)
+          this.dagManager.removeFlowSpecIfAdhoc(flowSpec);
+        }

Review Comment:
   too bad the diff above doesn't clearly indicate it was solely an indentation 
change to add the `try ... finally` here.  the purpose of which is to ensure 
FlowCatalog cleanup, come whatever may



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to