umustafi commented on code in PR #3996:
URL: https://github.com/apache/gobblin/pull/3996#discussion_r1671397418
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java:
##########
@@ -140,31 +140,39 @@ public Optional<Dag<JobExecutionPlan>>
validateAndHandleConcurrentExecution(Conf
ConfigurationKeys.FLOW_ALLOW_CONCURRENT_EXECUTION,
String.valueOf(this.isFlowConcurrencyEnabled)));
Dag<JobExecutionPlan> jobExecutionPlanDag =
specCompiler.compileFlow(flowSpec);
- if (jobExecutionPlanDag == null || jobExecutionPlanDag.isEmpty()) {
- return Optional.absent();
- }
- addFlowExecutionIdIfAbsent(flowMetadata, jobExecutionPlanDag);
-
- if (isExecutionPermitted(flowStatusGenerator, flowGroup, flowName,
allowConcurrentExecution,
-
Long.parseLong(flowMetadata.get(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD))))
{
- return Optional.fromNullable(jobExecutionPlanDag);
- } else {
- log.warn("Another instance of flowGroup: {}, flowName: {} running;
Skipping flow execution since "
- + "concurrent executions are disabled for this flow.", flowGroup,
flowName);
-
sharedFlowMetricsSingleton.conditionallyUpdateFlowGaugeSpecState(flowSpec,
- SharedFlowMetricsSingleton.CompiledState.SKIPPED);
-
Instrumented.markMeter(sharedFlowMetricsSingleton.getSkippedFlowsMeter());
- if (!flowSpec.isScheduled()) {
- // For ad-hoc flow, we might already increase quota, we need to
decrease here
- for (Dag.DagNode dagNode : jobExecutionPlanDag.getStartNodes()) {
- quotaManager.releaseQuota(dagNode);
+ Optional<Dag<JobExecutionPlan>> optionalJobExecutionPlan = null;
+
+ try {
+ if (jobExecutionPlanDag == null || jobExecutionPlanDag.isEmpty()) {
+ optionalJobExecutionPlan = Optional.absent();
+ }
+ addFlowExecutionIdIfAbsent(flowMetadata, jobExecutionPlanDag);
+
+ if (isExecutionPermitted(flowStatusGenerator, flowGroup, flowName,
allowConcurrentExecution,
+
Long.parseLong(flowMetadata.get(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD))))
{
+ optionalJobExecutionPlan = Optional.fromNullable(jobExecutionPlanDag);
+ } else {
+ log.warn("Another instance of flowGroup: {}, flowName: {} running;
Skipping flow execution since "
+ + "concurrent executions are disabled for this flow.", flowGroup,
flowName);
+ optionalJobExecutionPlan = Optional.absent();
+ }
+ } finally {
+ if (optionalJobExecutionPlan == null ||
!optionalJobExecutionPlan.isPresent()) {
Review Comment:
updated this code path to make it more clear. I realized the quota manager
does not need to be updated when the compilation fails so this looks much
simpler now
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java:
##########
@@ -140,31 +140,39 @@ public Optional<Dag<JobExecutionPlan>>
validateAndHandleConcurrentExecution(Conf
ConfigurationKeys.FLOW_ALLOW_CONCURRENT_EXECUTION,
String.valueOf(this.isFlowConcurrencyEnabled)));
Dag<JobExecutionPlan> jobExecutionPlanDag =
specCompiler.compileFlow(flowSpec);
- if (jobExecutionPlanDag == null || jobExecutionPlanDag.isEmpty()) {
- return Optional.absent();
- }
- addFlowExecutionIdIfAbsent(flowMetadata, jobExecutionPlanDag);
-
- if (isExecutionPermitted(flowStatusGenerator, flowGroup, flowName,
allowConcurrentExecution,
-
Long.parseLong(flowMetadata.get(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD))))
{
- return Optional.fromNullable(jobExecutionPlanDag);
- } else {
- log.warn("Another instance of flowGroup: {}, flowName: {} running;
Skipping flow execution since "
- + "concurrent executions are disabled for this flow.", flowGroup,
flowName);
-
sharedFlowMetricsSingleton.conditionallyUpdateFlowGaugeSpecState(flowSpec,
- SharedFlowMetricsSingleton.CompiledState.SKIPPED);
-
Instrumented.markMeter(sharedFlowMetricsSingleton.getSkippedFlowsMeter());
- if (!flowSpec.isScheduled()) {
- // For ad-hoc flow, we might already increase quota, we need to
decrease here
- for (Dag.DagNode dagNode : jobExecutionPlanDag.getStartNodes()) {
- quotaManager.releaseQuota(dagNode);
+ Optional<Dag<JobExecutionPlan>> optionalJobExecutionPlan = null;
+
+ try {
+ if (jobExecutionPlanDag == null || jobExecutionPlanDag.isEmpty()) {
+ optionalJobExecutionPlan = Optional.absent();
+ }
+ addFlowExecutionIdIfAbsent(flowMetadata, jobExecutionPlanDag);
+
+ if (isExecutionPermitted(flowStatusGenerator, flowGroup, flowName,
allowConcurrentExecution,
+
Long.parseLong(flowMetadata.get(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD))))
{
+ optionalJobExecutionPlan = Optional.fromNullable(jobExecutionPlanDag);
+ } else {
+ log.warn("Another instance of flowGroup: {}, flowName: {} running;
Skipping flow execution since "
+ + "concurrent executions are disabled for this flow.", flowGroup,
flowName);
+ optionalJobExecutionPlan = Optional.absent();
+ }
+ } finally {
+ if (optionalJobExecutionPlan == null ||
!optionalJobExecutionPlan.isPresent()) {
Review Comment:
updated this code path to make it more clear. I realized the quota manager
does not need to be updated when the compilation fails so this looks much
simpler now
##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java:
##########
@@ -321,8 +321,10 @@ protected void
submitFlowToDagManagerHelper(DagActionStore.DagAction dagAction,
launchSubmissionMetricProxy.markFailure();
return;
} catch (SpecNotFoundException e) {
- log.warn("Spec not found for flowId {} due to exception {}", flowId,
e.getMessage());
- launchSubmissionMetricProxy.markFailure();
+ log.info("Spec not found for flowId {} due to deletion by active
dagManager host due to exception {}",
+ flowId, e.getMessage());
+ // TODO: mark this failure if there are other valid cases of this
exception
+ // launchSubmissionMetricProxy.markFailure();
Review Comment:
It's speculative, I think there could be a case where a
SpecNotFoundException occurs in a case _not_ due to deletion by active DM if
something goes wrong at a different point in the system (mistaken deletion NOT
from the deleteFlowSpecIfAdhoc code path). It's hard to definitively say all of
these errors are due to an active DM deleting it but I would expect them all to
be.
##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java:
##########
@@ -321,8 +321,10 @@ protected void
submitFlowToDagManagerHelper(DagActionStore.DagAction dagAction,
launchSubmissionMetricProxy.markFailure();
return;
} catch (SpecNotFoundException e) {
- log.warn("Spec not found for flowId {} due to exception {}", flowId,
e.getMessage());
- launchSubmissionMetricProxy.markFailure();
+ log.info("Spec not found for flowId {} due to deletion by active
dagManager host due to exception {}",
+ flowId, e.getMessage());
+ // TODO: mark this failure if there are other valid cases of this
exception
+ // launchSubmissionMetricProxy.markFailure();
Review Comment:
It's speculative, I think there could be a case where a
SpecNotFoundException occurs in a case _not_ due to deletion by active DM if
something goes wrong at a different point in the system (mistaken deletion NOT
from the deleteFlowSpecIfAdhoc code path). It's hard to definitively say all of
these errors are due to an active DM deleting it but I would expect them all to
be.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]