[ 
https://issues.apache.org/jira/browse/GOBBLIN-2062?focusedWorklogId=923053&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-923053
 ]

ASF GitHub Bot logged work on GOBBLIN-2062:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 12/Jun/24 01:15
            Start Date: 12/Jun/24 01:15
    Worklog Time Spent: 10m 
      Work Description: umustafi commented on code in PR #3944:
URL: https://github.com/apache/gobblin/pull/3944#discussion_r1635654894


##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java:
##########
@@ -278,18 +278,10 @@ protected void startUp() {
   }
 
   /**
-   * Method to submit a {@link Dag} to the {@link DagManager} and delete adhoc 
flowSpecs from the FlowCatalog after
-   * persisting it in the other addDag method called. The DagManager's failure 
recovery method ensures the flow will be
-   * executed in the event of downtime.
-   * @param flowSpec
-   * @param dag
-   * @param persist
-   * @param setStatus
-   * @throws IOException
+   * Delete adhoc flowSpecs from the {@link FlowCatalog} after (separately) 
persisting via {@link DagManager#addDag(Dag, boolean, boolean)}.
+   * This DagManager's failure recovery mechanisms ensure the flow will be 
executed, even in the event of downtime.
    */
-  public synchronized void addDagAndRemoveAdhocFlowSpec(FlowSpec flowSpec, 
Dag<JobExecutionPlan> dag, boolean persist, boolean setStatus)
-      throws IOException {
-    addDag(dag, persist, setStatus);
+  public synchronized void removeFlowSpecIfAdhoc(FlowSpec flowSpec) throws 
IOException {
     // Only the active dagManager should delete the flowSpec
     if (isActive) {
       deleteSpecFromCatalogIfAdhoc(flowSpec);

Review Comment:
   It shouldn't be in GSJS because all hosts in multi-active scheduler state 
needed access to the flowSpec in catalog see this PR desc: 
https://github.com/apache/gobblin/pull/3846. We were running into `no spec 
found ` in the `DagActionStoreChangeMonitor` error if an inactive host deleted 
the `flowSpec` before active host processed CDC stream message. For MA 
execution we want the spec in catalog until all hosts have received the CDC 
stream message and retrieved spec -> need to think about how to achieve this



##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java:
##########
@@ -127,19 +133,31 @@ public void setup() throws Exception {
 
     SharedFlowMetricsSingleton sharedFlowMetricsSingleton = new 
SharedFlowMetricsSingleton(ConfigUtils.propertiesToConfig(orchestratorProperties));
 
-    this.orchestrator = new 
Orchestrator(ConfigUtils.propertiesToConfig(orchestratorProperties),
-        this.topologyCatalog, mockDagManager, Optional.of(logger), 
mockStatusGenerator,
-        Optional.of(mockFlowTriggerHandler), sharedFlowMetricsSingleton, 
Optional.of(mock(FlowCatalog.class)), Optional.of(dagManagementStateStore),
-        new FlowCompilationValidationHelper(config, 
sharedFlowMetricsSingleton, mock(UserQuotaManager.class), mockStatusGenerator));
-    this.topologyCatalog.addListener(orchestrator);
-    this.flowCatalog.addListener(orchestrator);
+    FlowCompilationValidationHelper flowCompilationValidationHelper = new 
FlowCompilationValidationHelper(config, sharedFlowMetricsSingleton, 
mock(UserQuotaManager.class), mockFlowStatusGenerator);
+    this.dagMgrNotFlowLaunchHandlerBasedOrchestrator = new 
Orchestrator(ConfigUtils.propertiesToConfig(orchestratorProperties),

Review Comment:
   I can make a note to add the other code path for `FlowLaunchHandler baed 
orchestrate`. If those reside in this test class, then following the 
methodology discussed about we should have a separate `setupClass` method used 
for that test. 





Issue Time Tracking
-------------------

    Worklog Id:     (was: 923053)
    Time Spent: 5h 50m  (was: 5h 40m)

> adhoc flow failure due to concurrent execs must be removed from flow catalog
> ----------------------------------------------------------------------------
>
>                 Key: GOBBLIN-2062
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-2062
>             Project: Apache Gobblin
>          Issue Type: New Feature
>          Components: gobblin-service
>            Reporter: Kip Kohn
>            Assignee: Abhishek Tiwari
>            Priority: Major
>          Time Spent: 5h 50m
>  Remaining Estimate: 0h
>
> the Orchestrator + DagManager MUST remove adhoc flows that violate concurrent 
> execs from the flow catalog.  otherwise gaas will continue to return '409 
> Conflict' to each subsequent attempt to create an adhoc flow with the same 
> flowGroup+flowName.  this is despite the fact that the flow (which still 
> remains in the FlowCatalog, when it shouldn't be) already has the status 
> FAILED, which is a "final status".



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to