umustafi commented on code in PR #3996:
URL: https://github.com/apache/gobblin/pull/3996#discussion_r1671397418


##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java:
##########
@@ -140,31 +140,39 @@ public Optional<Dag<JobExecutionPlan>> 
validateAndHandleConcurrentExecution(Conf
         ConfigurationKeys.FLOW_ALLOW_CONCURRENT_EXECUTION, 
String.valueOf(this.isFlowConcurrencyEnabled)));
 
     Dag<JobExecutionPlan> jobExecutionPlanDag = 
specCompiler.compileFlow(flowSpec);
-    if (jobExecutionPlanDag == null || jobExecutionPlanDag.isEmpty()) {
-      return Optional.absent();
-    }
-    addFlowExecutionIdIfAbsent(flowMetadata, jobExecutionPlanDag);
-
-    if (isExecutionPermitted(flowStatusGenerator, flowGroup, flowName, 
allowConcurrentExecution,
-        
Long.parseLong(flowMetadata.get(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD))))
 {
-      return Optional.fromNullable(jobExecutionPlanDag);
-    } else {
-      log.warn("Another instance of flowGroup: {}, flowName: {} running; 
Skipping flow execution since "
-          + "concurrent executions are disabled for this flow.", flowGroup, 
flowName);
-      
sharedFlowMetricsSingleton.conditionallyUpdateFlowGaugeSpecState(flowSpec,
-          SharedFlowMetricsSingleton.CompiledState.SKIPPED);
-      
Instrumented.markMeter(sharedFlowMetricsSingleton.getSkippedFlowsMeter());
-      if (!flowSpec.isScheduled()) {
-        // For ad-hoc flow, we might already increase quota, we need to 
decrease here
-        for (Dag.DagNode dagNode : jobExecutionPlanDag.getStartNodes()) {
-          quotaManager.releaseQuota(dagNode);
+    Optional<Dag<JobExecutionPlan>> optionalJobExecutionPlan = null;
+
+    try {
+      if (jobExecutionPlanDag == null || jobExecutionPlanDag.isEmpty()) {
+        optionalJobExecutionPlan = Optional.absent();
+      }
+      addFlowExecutionIdIfAbsent(flowMetadata, jobExecutionPlanDag);
+
+      if (isExecutionPermitted(flowStatusGenerator, flowGroup, flowName, 
allowConcurrentExecution,
+          
Long.parseLong(flowMetadata.get(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD))))
 {
+        optionalJobExecutionPlan = Optional.fromNullable(jobExecutionPlanDag);
+      } else {
+        log.warn("Another instance of flowGroup: {}, flowName: {} running; 
Skipping flow execution since "
+            + "concurrent executions are disabled for this flow.", flowGroup, 
flowName);
+        optionalJobExecutionPlan = Optional.absent();
+      }
+    } finally {
+      if (optionalJobExecutionPlan == null || 
!optionalJobExecutionPlan.isPresent()) {

Review Comment:
   updated this code path to make it more clear. I realized the quota manager 
does not need to be updated when the compilation fails so this looks much 
simpler now



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java:
##########
@@ -140,31 +140,39 @@ public Optional<Dag<JobExecutionPlan>> 
validateAndHandleConcurrentExecution(Conf
         ConfigurationKeys.FLOW_ALLOW_CONCURRENT_EXECUTION, 
String.valueOf(this.isFlowConcurrencyEnabled)));
 
     Dag<JobExecutionPlan> jobExecutionPlanDag = 
specCompiler.compileFlow(flowSpec);
-    if (jobExecutionPlanDag == null || jobExecutionPlanDag.isEmpty()) {
-      return Optional.absent();
-    }
-    addFlowExecutionIdIfAbsent(flowMetadata, jobExecutionPlanDag);
-
-    if (isExecutionPermitted(flowStatusGenerator, flowGroup, flowName, 
allowConcurrentExecution,
-        
Long.parseLong(flowMetadata.get(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD))))
 {
-      return Optional.fromNullable(jobExecutionPlanDag);
-    } else {
-      log.warn("Another instance of flowGroup: {}, flowName: {} running; 
Skipping flow execution since "
-          + "concurrent executions are disabled for this flow.", flowGroup, 
flowName);
-      
sharedFlowMetricsSingleton.conditionallyUpdateFlowGaugeSpecState(flowSpec,
-          SharedFlowMetricsSingleton.CompiledState.SKIPPED);
-      
Instrumented.markMeter(sharedFlowMetricsSingleton.getSkippedFlowsMeter());
-      if (!flowSpec.isScheduled()) {
-        // For ad-hoc flow, we might already increase quota, we need to 
decrease here
-        for (Dag.DagNode dagNode : jobExecutionPlanDag.getStartNodes()) {
-          quotaManager.releaseQuota(dagNode);
+    Optional<Dag<JobExecutionPlan>> optionalJobExecutionPlan = null;
+
+    try {
+      if (jobExecutionPlanDag == null || jobExecutionPlanDag.isEmpty()) {
+        optionalJobExecutionPlan = Optional.absent();
+      }
+      addFlowExecutionIdIfAbsent(flowMetadata, jobExecutionPlanDag);
+
+      if (isExecutionPermitted(flowStatusGenerator, flowGroup, flowName, 
allowConcurrentExecution,
+          
Long.parseLong(flowMetadata.get(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD))))
 {
+        optionalJobExecutionPlan = Optional.fromNullable(jobExecutionPlanDag);
+      } else {
+        log.warn("Another instance of flowGroup: {}, flowName: {} running; 
Skipping flow execution since "
+            + "concurrent executions are disabled for this flow.", flowGroup, 
flowName);
+        optionalJobExecutionPlan = Optional.absent();
+      }
+    } finally {
+      if (optionalJobExecutionPlan == null || 
!optionalJobExecutionPlan.isPresent()) {

Review Comment:
   updated this code path to make it more clear. I realized the quota manager 
does not need to be updated when the compilation fails so this looks much 
simpler now



##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java:
##########
@@ -321,8 +321,10 @@ protected void 
submitFlowToDagManagerHelper(DagActionStore.DagAction dagAction,
       launchSubmissionMetricProxy.markFailure();
       return;
     } catch (SpecNotFoundException e) {
-      log.warn("Spec not found for flowId {} due to exception {}", flowId, 
e.getMessage());
-      launchSubmissionMetricProxy.markFailure();
+      log.info("Spec not found for flowId {} due to deletion by active 
dagManager host due to exception {}",
+          flowId, e.getMessage());
+      // TODO: mark this failure if there are other valid cases of this 
exception
+      // launchSubmissionMetricProxy.markFailure();

Review Comment:
   It's speculative, I think there could be a case where a 
SpecNotFoundException occurs in a case _not_ due to deletion by active DM if 
something goes wrong at a different point in the system (mistaken deletion NOT 
from the deleteFlowSpecIfAdhoc code path). It's hard to definitively say all of 
these errors are due to an active DM deleting it but I would expect them all to 
be. 



##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java:
##########
@@ -321,8 +321,10 @@ protected void 
submitFlowToDagManagerHelper(DagActionStore.DagAction dagAction,
       launchSubmissionMetricProxy.markFailure();
       return;
     } catch (SpecNotFoundException e) {
-      log.warn("Spec not found for flowId {} due to exception {}", flowId, 
e.getMessage());
-      launchSubmissionMetricProxy.markFailure();
+      log.info("Spec not found for flowId {} due to deletion by active 
dagManager host due to exception {}",
+          flowId, e.getMessage());
+      // TODO: mark this failure if there are other valid cases of this 
exception
+      // launchSubmissionMetricProxy.markFailure();

Review Comment:
   It's speculative, I think there could be a case where a 
SpecNotFoundException occurs in a case _not_ due to deletion by active DM if 
something goes wrong at a different point in the system (mistaken deletion NOT 
from the deleteFlowSpecIfAdhoc code path). It's hard to definitively say all of 
these errors are due to an active DM deleting it but I would expect them all to 
be. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to