[
https://issues.apache.org/jira/browse/GOBBLIN-2107?focusedWorklogId=925186&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-925186
]
ASF GitHub Bot logged work on GOBBLIN-2107:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 10/Jul/24 05:44
Start Date: 10/Jul/24 05:44
Worklog Time Spent: 10m
Work Description: phet commented on code in PR #3996:
URL: https://github.com/apache/gobblin/pull/3996#discussion_r1671460273
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java:
##########
@@ -271,14 +275,25 @@ public void orchestrate(Spec spec, Properties jobProps,
long triggerTimestampMil
Instrumented.updateTimer(this.flowOrchestrationTimer, System.nanoTime() -
startTime, TimeUnit.NANOSECONDS);
}
- public void submitFlowToDagManager(FlowSpec flowSpec) throws IOException,
InterruptedException {
- Optional<Dag<JobExecutionPlan>> optionalJobExecutionPlanDag =
-
this.flowCompilationValidationHelper.createExecutionPlanIfValid(flowSpec);
- if (optionalJobExecutionPlanDag.isPresent()) {
- submitFlowToDagManager(flowSpec, optionalJobExecutionPlanDag.get());
- } else {
- _log.warn("Flow: {} submitted to dagManager failed to compile and
produce a job execution plan dag", flowSpec);
- Instrumented.markMeter(this.flowOrchestrationFailedMeter);
+ /**
+ * Method that accepts a flowSpec that it compiles before forwarding to the
DagManagerfor execution. It's meant to be
+ * called by {@link
org.apache.gobblin.service.monitoring.DagActionStoreChangeMonitor}
+ * @param flowSpec
+ * @throws IOException
+ * @throws InterruptedException
Review Comment:
suggest to simplify to:
```
* Compiles the provided {@link FlowSpec} into a {@link
Dag<JobExecutionPlan>} it forwards to the {@link DagManager} for execution.
It's meant to be
* called by {@link
org.apache.gobblin.service.monitoring.DagActionStoreChangeMonitor}
```
tip: avoid "Method that... " or "Class that..." or "Code to..." in javadoc.
since that could be said of *every* method/class, let's just take it as an
implied given.
Issue Time Tracking
-------------------
Worklog Id: (was: 925186)
Time Spent: 1.5h (was: 1h 20m)
> Delete adhoc flowSpecs from flowCatalog
> ----------------------------------------
>
> Key: GOBBLIN-2107
> URL: https://issues.apache.org/jira/browse/GOBBLIN-2107
> Project: Apache Gobblin
> Issue Type: Bug
> Components: gobblin-service
> Reporter: Urmi Mustafi
> Assignee: Abhishek Tiwari
> Priority: Major
> Time Spent: 1.5h
> Remaining Estimate: 0h
>
> Delete adhoc flowSpecs from flowCatalog to avoid build up of adhoc flowSpecs
> in catalog even when job compilation fails by adding the deletion in a
> finally block in non-MA scheduler case.
> # Previous PR: [https://github.com/apache/gobblin/pull/3944/files] causes a
> bug where adhoc flows are not deleted when multi-active scheduler is enabled.
> This PR re-adds deletion of flowSpec to the dagManager to be called by the
> active host for the MA scheduler case (there will only be one active host if
> dagManager is enabled otherwise code will use dagProcessingEngine).
> # It also ensures quota is released and failed flow compilation event sent
> by the FlowCompilationValidationHelper in all compilation failure cases. This
> was being incorrectly handled before.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)