[
https://issues.apache.org/jira/browse/GOBBLIN-2062?focusedWorklogId=917930&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-917930
]
ASF GitHub Bot logged work on GOBBLIN-2062:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 06/May/24 20:28
Start Date: 06/May/24 20:28
Worklog Time Spent: 10m
Work Description: phet commented on code in PR #3944:
URL: https://github.com/apache/gobblin/pull/3944#discussion_r1591495305
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java:
##########
@@ -229,33 +229,39 @@ public void orchestrate(Spec spec, Properties jobProps,
long triggerTimestampMil
_log.info("Multi-active scheduler finished handling trigger event:
[{}, is: {}, triggerEventTimestamp: {}]",
launchDagAction, isReminderEvent ? "reminder" : "original",
triggerTimestampMillis);
} else {
- TimingEvent flowCompilationTimer = new
TimingEvent(this.eventSubmitter, TimingEvent.FlowTimings.FLOW_COMPILED);
- Map<String, String> flowMetadata =
TimingEventUtils.getFlowMetadata(flowSpec);
- Optional<Dag<JobExecutionPlan>> compiledDagOptional =
-
this.flowCompilationValidationHelper.validateAndHandleConcurrentExecution(flowConfig,
flowSpec, flowGroup,
- flowName, flowMetadata);
-
- if (!compiledDagOptional.isPresent()) {
- Instrumented.markMeter(this.flowOrchestrationFailedMeter);
- return;
- }
- Dag<JobExecutionPlan> compiledDag = compiledDagOptional.get();
- if (compiledDag.isEmpty()) {
-
FlowCompilationValidationHelper.populateFlowCompilationFailedEventMessage(eventSubmitter,
flowSpec, flowMetadata);
- Instrumented.markMeter(this.flowOrchestrationFailedMeter);
+ try {
+ TimingEvent flowCompilationTimer = new
TimingEvent(this.eventSubmitter, TimingEvent.FlowTimings.FLOW_COMPILED);
+ Map<String, String> flowMetadata =
TimingEventUtils.getFlowMetadata(flowSpec);
+ Optional<Dag<JobExecutionPlan>> compiledDagOptional =
+
this.flowCompilationValidationHelper.validateAndHandleConcurrentExecution(flowConfig,
flowSpec, flowGroup,
+ flowName, flowMetadata);
+
+ if (!compiledDagOptional.isPresent()) {
+ Instrumented.markMeter(this.flowOrchestrationFailedMeter);
+ return;
+ }
+ Dag<JobExecutionPlan> compiledDag = compiledDagOptional.get();
+ if (compiledDag.isEmpty()) {
+
FlowCompilationValidationHelper.populateFlowCompilationFailedEventMessage(eventSubmitter,
flowSpec,
+ flowMetadata);
+ Instrumented.markMeter(this.flowOrchestrationFailedMeter);
+
sharedFlowMetricsSingleton.conditionallyUpdateFlowGaugeSpecState(spec,
+ SharedFlowMetricsSingleton.CompiledState.FAILED);
+ _log.warn("Cannot determine an executor to run on for Spec: " +
spec);
+ return;
+ }
sharedFlowMetricsSingleton.conditionallyUpdateFlowGaugeSpecState(spec,
- SharedFlowMetricsSingleton.CompiledState.FAILED);
- _log.warn("Cannot determine an executor to run on for Spec: " +
spec);
- return;
- }
- sharedFlowMetricsSingleton.conditionallyUpdateFlowGaugeSpecState(spec,
- SharedFlowMetricsSingleton.CompiledState.SUCCESSFUL);
+ SharedFlowMetricsSingleton.CompiledState.SUCCESSFUL);
-
FlowCompilationValidationHelper.addFlowExecutionIdIfAbsent(flowMetadata,
compiledDag);
- flowCompilationTimer.stop(flowMetadata);
+
FlowCompilationValidationHelper.addFlowExecutionIdIfAbsent(flowMetadata,
compiledDag);
+ flowCompilationTimer.stop(flowMetadata);
- // Depending on if DagManager is present, handle execution
- submitFlowToDagManager(flowSpec, compiledDag);
+ // Depending on if DagManager is present, handle execution
+ submitFlowToDagManager(flowSpec, compiledDag);
+ } finally {
+ // remove from the flow catalog, regardless of whether the flow was
successfully validated and permitted to exec (concurrently)
+ this.dagManager.removeFlowSpecIfAdhoc(flowSpec);
+ }
Review Comment:
too bad the diff above doesn't clearly indicate it was solely an indentation
change to add the `try ... finally` here. the purpose of which is to ensure
FlowCatalog cleanup, come whatever may
Issue Time Tracking
-------------------
Worklog Id: (was: 917930)
Time Spent: 40m (was: 0.5h)
> adhoc flow failure due to concurrent execs must be removed from flow catalog
> ----------------------------------------------------------------------------
>
> Key: GOBBLIN-2062
> URL: https://issues.apache.org/jira/browse/GOBBLIN-2062
> Project: Apache Gobblin
> Issue Type: New Feature
> Components: gobblin-service
> Reporter: Kip Kohn
> Assignee: Abhishek Tiwari
> Priority: Major
> Time Spent: 40m
> Remaining Estimate: 0h
>
> the Orchestrator + DagManager MUST remove adhoc flows that violate concurrent
> execs from the flow catalog. otherwise gaas will continue to return '409
> Conflict' to each subsequent attempt to create an adhoc flow with the same
> flowGroup+flowName. this is despite the fact that the flow (which still
> remains in the FlowCatalog, when it shouldn't be) already has the status
> FAILED, which is a "final status".
--
This message was sent by Atlassian Jira
(v8.20.10#820010)