umustafi commented on code in PR #3944:
URL: https://github.com/apache/gobblin/pull/3944#discussion_r1635654894
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java:
##########
@@ -278,18 +278,10 @@ protected void startUp() {
}
/**
- * Method to submit a {@link Dag} to the {@link DagManager} and delete adhoc
flowSpecs from the FlowCatalog after
- * persisting it in the other addDag method called. The DagManager's failure
recovery method ensures the flow will be
- * executed in the event of downtime.
- * @param flowSpec
- * @param dag
- * @param persist
- * @param setStatus
- * @throws IOException
+ * Delete adhoc flowSpecs from the {@link FlowCatalog} after (separately)
persisting via {@link DagManager#addDag(Dag, boolean, boolean)}.
+ * This DagManager's failure recovery mechanisms ensure the flow will be
executed, even in the event of downtime.
*/
- public synchronized void addDagAndRemoveAdhocFlowSpec(FlowSpec flowSpec,
Dag<JobExecutionPlan> dag, boolean persist, boolean setStatus)
- throws IOException {
- addDag(dag, persist, setStatus);
+ public synchronized void removeFlowSpecIfAdhoc(FlowSpec flowSpec) throws
IOException {
// Only the active dagManager should delete the flowSpec
if (isActive) {
deleteSpecFromCatalogIfAdhoc(flowSpec);
Review Comment:
It shouldn't be in GSJS because all hosts in multi-active scheduler state
needed access to the flowSpec in catalog see this PR desc:
https://github.com/apache/gobblin/pull/3846. We were running into `no spec
found ` in the `DagActionStoreChangeMonitor` error if an inactive host deleted
the `flowSpec` before active host processed CDC stream message. For MA
execution we want the spec in catalog until all hosts have received the CDC
stream message and retrieved spec -> need to think about how to achieve this
##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java:
##########
@@ -127,19 +133,31 @@ public void setup() throws Exception {
SharedFlowMetricsSingleton sharedFlowMetricsSingleton = new
SharedFlowMetricsSingleton(ConfigUtils.propertiesToConfig(orchestratorProperties));
- this.orchestrator = new
Orchestrator(ConfigUtils.propertiesToConfig(orchestratorProperties),
- this.topologyCatalog, mockDagManager, Optional.of(logger),
mockStatusGenerator,
- Optional.of(mockFlowTriggerHandler), sharedFlowMetricsSingleton,
Optional.of(mock(FlowCatalog.class)), Optional.of(dagManagementStateStore),
- new FlowCompilationValidationHelper(config,
sharedFlowMetricsSingleton, mock(UserQuotaManager.class), mockStatusGenerator));
- this.topologyCatalog.addListener(orchestrator);
- this.flowCatalog.addListener(orchestrator);
+ FlowCompilationValidationHelper flowCompilationValidationHelper = new
FlowCompilationValidationHelper(config, sharedFlowMetricsSingleton,
mock(UserQuotaManager.class), mockFlowStatusGenerator);
+ this.dagMgrNotFlowLaunchHandlerBasedOrchestrator = new
Orchestrator(ConfigUtils.propertiesToConfig(orchestratorProperties),
Review Comment:
I can make a note to add the other code path for `FlowLaunchHandler baed
orchestrate`. If those reside in this test class, then following the
methodology discussed about we should have a separate `setupClass` method used
for that test.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]