[ 
https://issues.apache.org/jira/browse/GOBBLIN-1974?focusedWorklogId=895544&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-895544
 ]

ASF GitHub Bot logged work on GOBBLIN-1974:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 13/Dec/23 20:27
            Start Date: 13/Dec/23 20:27
    Worklog Time Spent: 10m 
      Work Description: umustafi commented on code in PR #3846:
URL: https://github.com/apache/gobblin/pull/3846#discussion_r1425850602


##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java:
##########
@@ -359,6 +363,11 @@ public void submitFlowToDagManager(FlowSpec flowSpec, 
Dag<JobExecutionPlan> jobE
     try {
       // Send the dag to the DagManager
       this.dagManager.get().addDag(jobExecutionPlanDag, true, true);
+
+      // Delete spec from flow catalog for adhoc executions after persisting 
it in DagManager
+      if (!flowSpec.getConfig().hasPath(ConfigurationKeys.JOB_SCHEDULE_KEY)) {
+        this.flowCatalog.get().remove(flowSpec.getUri(), new Properties(), 
false);
+      }

Review Comment:
   Yes, it will. If we've gotten to the point where `addDag` persists the `dag` 
to `dagStateStore`, now its responsibility of `dagManager` to execute it. The 
multi-active scheduler component ensures persisting to `dagActionStore` so the 
event will not be reprocessed through `dagActionChangeMonitor`.  The 
`dagActionChangeMonitor` only commits the offset after submitting a launch 
event to the `Orchestrator` for compiling and completing the `addDag` call. The 
new leader's `DagManager` will recover it from `DagStateStore` if the old 
leader never completes it.





Issue Time Tracking
-------------------

    Worklog Id:     (was: 895544)
    Time Spent: 50m  (was: 40m)

> Ensure Adhoc Flows can be Executed in Multi-active Scheduler state
> ------------------------------------------------------------------
>
>                 Key: GOBBLIN-1974
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-1974
>             Project: Apache Gobblin
>          Issue Type: Bug
>          Components: gobblin-service
>            Reporter: Urmi Mustafi
>            Assignee: Abhishek Tiwari
>            Priority: Major
>          Time Spent: 50m
>  Remaining Estimate: 0h
>
> Adhoc flow executions delete the flow spec from the FlowCatalog after adding 
> the spec to the scheduler for the single active scheduler. This makes sense 
> in prior context since the single active scheduler has the information and 
> the spec is no longer needed. However, in the multi-active case we run into a 
> {{No spec found}} error here 
> [https://github.com/apache/gobblin/blob/00c60ca6492644652c66905eb384ab145e348bbf/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java#L263].
>  This PR updates the code to delete the spec after persisting it in the adhoc 
> execution case. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to