umustafi commented on code in PR #3944:
URL: https://github.com/apache/gobblin/pull/3944#discussion_r1635654894


##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java:
##########
@@ -278,18 +278,10 @@ protected void startUp() {
   }
 
   /**
-   * Method to submit a {@link Dag} to the {@link DagManager} and delete adhoc 
flowSpecs from the FlowCatalog after
-   * persisting it in the other addDag method called. The DagManager's failure 
recovery method ensures the flow will be
-   * executed in the event of downtime.
-   * @param flowSpec
-   * @param dag
-   * @param persist
-   * @param setStatus
-   * @throws IOException
+   * Delete adhoc flowSpecs from the {@link FlowCatalog} after (separately) 
persisting via {@link DagManager#addDag(Dag, boolean, boolean)}.
+   * This DagManager's failure recovery mechanisms ensure the flow will be 
executed, even in the event of downtime.
    */
-  public synchronized void addDagAndRemoveAdhocFlowSpec(FlowSpec flowSpec, 
Dag<JobExecutionPlan> dag, boolean persist, boolean setStatus)
-      throws IOException {
-    addDag(dag, persist, setStatus);
+  public synchronized void removeFlowSpecIfAdhoc(FlowSpec flowSpec) throws 
IOException {
     // Only the active dagManager should delete the flowSpec
     if (isActive) {
       deleteSpecFromCatalogIfAdhoc(flowSpec);

Review Comment:
   It shouldn't be in GSJS because all hosts in multi-active scheduler state 
needed access to the flowSpec in catalog see this PR desc: 
https://github.com/apache/gobblin/pull/3846. We were running into `no spec 
found ` in the `DagActionStoreChangeMonitor` error if an inactive host deleted 
the `flowSpec` before active host processed CDC stream message. For MA 
execution we want the spec in catalog until all hosts have received the CDC 
stream message and retrieved spec -> need to think about how to achieve this



##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java:
##########
@@ -127,19 +133,31 @@ public void setup() throws Exception {
 
     SharedFlowMetricsSingleton sharedFlowMetricsSingleton = new 
SharedFlowMetricsSingleton(ConfigUtils.propertiesToConfig(orchestratorProperties));
 
-    this.orchestrator = new 
Orchestrator(ConfigUtils.propertiesToConfig(orchestratorProperties),
-        this.topologyCatalog, mockDagManager, Optional.of(logger), 
mockStatusGenerator,
-        Optional.of(mockFlowTriggerHandler), sharedFlowMetricsSingleton, 
Optional.of(mock(FlowCatalog.class)), Optional.of(dagManagementStateStore),
-        new FlowCompilationValidationHelper(config, 
sharedFlowMetricsSingleton, mock(UserQuotaManager.class), mockStatusGenerator));
-    this.topologyCatalog.addListener(orchestrator);
-    this.flowCatalog.addListener(orchestrator);
+    FlowCompilationValidationHelper flowCompilationValidationHelper = new 
FlowCompilationValidationHelper(config, sharedFlowMetricsSingleton, 
mock(UserQuotaManager.class), mockFlowStatusGenerator);
+    this.dagMgrNotFlowLaunchHandlerBasedOrchestrator = new 
Orchestrator(ConfigUtils.propertiesToConfig(orchestratorProperties),

Review Comment:
   I can make a note to add the other code path for `FlowLaunchHandler baed 
orchestrate`. If those reside in this test class, then following the 
methodology discussed about we should have a separate `setupClass` method used 
for that test. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to