This is an automated email from the ASF dual-hosted git repository.

wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new be34e9e81 [GOBBLIN-1974] Ensure Adhoc Flows can be Executed in 
Multi-active Scheduler state (#3846)
be34e9e81 is described below

commit be34e9e815f8b64678222193311bb3fa5123d408
Author: umustafi <[email protected]>
AuthorDate: Thu Dec 14 12:19:32 2023 -0800

    [GOBBLIN-1974] Ensure Adhoc Flows can be Executed in Multi-active Scheduler 
state (#3846)
    
    * Ensure Adhoc Flows can be Executed in Multi-active Scheduler state
    
    * Only delete spec for adhoc flows & always after orchestration
    
    * Delete adhoc flows when dagManager is not present as well
    
    * Fix flaky test for scheduler
    
    * Add clarifying comment about failure recovery
    
    * Re-ordered private method
    
    * Move private methods again
    
    * Enforce sequential ordering of unit tests to make more reliable
    
    ---------
    
    Co-authored-by: Urmi Mustafi <[email protected]>
---
 .../modules/orchestration/Orchestrator.java        | 83 ++++++++++++++--------
 .../scheduler/GobblinServiceJobScheduler.java      | 16 ++---
 .../gobblin/service/GobblinServiceManagerTest.java |  9 +--
 .../modules/orchestration/OrchestratorTest.java    |  2 +-
 .../scheduler/GobblinServiceJobSchedulerTest.java  | 16 +++--
 5 files changed, 76 insertions(+), 50 deletions(-)

diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
index 023f87117..f839fda2d 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
@@ -55,6 +55,7 @@ import org.apache.gobblin.runtime.api.SpecCatalogListener;
 import org.apache.gobblin.runtime.api.SpecProducer;
 import org.apache.gobblin.runtime.api.TopologySpec;
 import org.apache.gobblin.runtime.spec_catalog.AddSpecResponse;
+import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
 import org.apache.gobblin.runtime.spec_catalog.TopologyCatalog;
 import org.apache.gobblin.service.ServiceConfigKeys;
 import org.apache.gobblin.service.modules.flow.SpecCompiler;
@@ -101,6 +102,7 @@ public class Orchestrator implements SpecCatalogListener, 
Instrumentable {
   private UserQuotaManager quotaManager;
   private final FlowCompilationValidationHelper 
flowCompilationValidationHelper;
   private Optional<FlowTriggerHandler> flowTriggerHandler;
+  private Optional<FlowCatalog> flowCatalog;
   @Getter
   private final SharedFlowMetricsSingleton sharedFlowMetricsSingleton;
 
@@ -108,7 +110,8 @@ public class Orchestrator implements SpecCatalogListener, 
Instrumentable {
 
   public Orchestrator(Config config, Optional<TopologyCatalog> 
topologyCatalog, Optional<DagManager> dagManager,
       Optional<Logger> log, FlowStatusGenerator flowStatusGenerator, boolean 
instrumentationEnabled,
-      Optional<FlowTriggerHandler> flowTriggerHandler, 
SharedFlowMetricsSingleton sharedFlowMetricsSingleton) {
+      Optional<FlowTriggerHandler> flowTriggerHandler, 
SharedFlowMetricsSingleton sharedFlowMetricsSingleton,
+      Optional<FlowCatalog> flowCatalog) {
     _log = log.isPresent() ? log.get() : LoggerFactory.getLogger(getClass());
     this.aliasResolver = new ClassAliasResolver<>(SpecCompiler.class);
     this.topologyCatalog = topologyCatalog;
@@ -116,6 +119,7 @@ public class Orchestrator implements SpecCatalogListener, 
Instrumentable {
     this.flowStatusGenerator = flowStatusGenerator;
     this.flowTriggerHandler = flowTriggerHandler;
     this.sharedFlowMetricsSingleton = sharedFlowMetricsSingleton;
+    this.flowCatalog = flowCatalog;
     try {
       String specCompilerClassName = 
ServiceConfigKeys.DEFAULT_GOBBLIN_SERVICE_FLOWCOMPILER_CLASS;
       if 
(config.hasPath(ServiceConfigKeys.GOBBLIN_SERVICE_FLOWCOMPILER_CLASS_KEY)) {
@@ -161,9 +165,9 @@ public class Orchestrator implements SpecCatalogListener, 
Instrumentable {
   @Inject
   public Orchestrator(Config config, FlowStatusGenerator flowStatusGenerator, 
Optional<TopologyCatalog> topologyCatalog,
       Optional<DagManager> dagManager, Optional<Logger> log, 
Optional<FlowTriggerHandler> flowTriggerHandler,
-      SharedFlowMetricsSingleton sharedFlowMetricsSingleton) {
+      SharedFlowMetricsSingleton sharedFlowMetricsSingleton, 
Optional<FlowCatalog> flowCatalog) {
     this(config, topologyCatalog, dagManager, log, flowStatusGenerator, true, 
flowTriggerHandler,
-        sharedFlowMetricsSingleton);
+        sharedFlowMetricsSingleton, flowCatalog);
   }
 
 
@@ -234,7 +238,8 @@ public class Orchestrator implements SpecCatalogListener, 
Instrumentable {
 
     long startTime = System.nanoTime();
     if (spec instanceof FlowSpec) {
-      Config flowConfig = ((FlowSpec) spec).getConfig();
+      FlowSpec flowSpec = (FlowSpec) spec;
+      Config flowConfig = (flowSpec).getConfig();
       String flowGroup = 
flowConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
       String flowName = flowConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
 
@@ -248,7 +253,7 @@ public class Orchestrator implements SpecCatalogListener, 
Instrumentable {
         Instrumented.markMeter(this.flowOrchestrationFailedMeter);
         return;
       }
-      Map<String, String> flowMetadata = 
TimingEventUtils.getFlowMetadata((FlowSpec) spec);
+      Map<String, String> flowMetadata = 
TimingEventUtils.getFlowMetadata(flowSpec);
       FlowCompilationValidationHelper.addFlowExecutionIdIfAbsent(flowMetadata, 
jobExecutionPlanDagOptional.get());
       java.util.Optional<String> flowExecutionId = 
TimingEventUtils.getFlowExecutionIdFromFlowMetadata(flowMetadata);
 
@@ -299,7 +304,7 @@ public class Orchestrator implements SpecCatalogListener, 
Instrumentable {
 
         // Depending on if DagManager is present, handle execution
         if (this.dagManager.isPresent()) {
-          submitFlowToDagManager((FlowSpec) spec, jobExecutionPlanDag);
+          submitFlowToDagManager(flowSpec, jobExecutionPlanDag);
         } else {
           // Schedule all compiled JobSpecs on their respective Executor
           for (Dag.DagNode<JobExecutionPlan> dagNode : 
jobExecutionPlanDag.getNodes()) {
@@ -332,6 +337,7 @@ public class Orchestrator implements SpecCatalogListener, 
Instrumentable {
                   + " for flow: " + spec, e);
             }
           }
+          deleteSpecFromCatalogIfAdhoc(flowSpec);
         }
       }
     } else {
@@ -359,6 +365,13 @@ public class Orchestrator implements SpecCatalogListener, 
Instrumentable {
     try {
       // Send the dag to the DagManager
       this.dagManager.get().addDag(jobExecutionPlanDag, true, true);
+
+      /*
+      Adhoc flows can be deleted after persisting it in DagManager as the 
DagManager's failure recovery method ensures
+      it will be executed in the event of downtime. Note that the 
responsibility of the multi-active scheduler mode ends
+      after this method is completed AND the consumption of a launch type 
event is committed to the consumer.
+       */
+      deleteSpecFromCatalogIfAdhoc(flowSpec);
     } catch (Exception ex) {
       String failureMessage = "Failed to add Job Execution Plan due to: " + 
ex.getMessage();
       _log.warn("Orchestrator call - " + failureMessage, ex);
@@ -394,6 +407,33 @@ public class Orchestrator implements SpecCatalogListener, 
Instrumentable {
     }
   }
 
+  @Nonnull
+  @Override
+  public MetricContext getMetricContext() {
+    return this.metricContext;
+  }
+
+  @Override
+  public boolean isInstrumentationEnabled() {
+    return null != this.metricContext;
+  }
+
+  @Override
+  public List<Tag<?>> generateTags(State state) {
+    return Collections.emptyList();
+  }
+
+  @Override
+  public void switchMetricContext(List<Tag<?>> tags) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void switchMetricContext(MetricContext context) {
+    throw new UnsupportedOperationException();
+  }
+
+
   private void deleteFromExecutor(Spec spec, Properties headers) {
     Dag<JobExecutionPlan> jobExecutionPlanDag = specCompiler.compileFlow(spec);
 
@@ -419,29 +459,12 @@ public class Orchestrator implements SpecCatalogListener, 
Instrumentable {
     }
   }
 
-  @Nonnull
-  @Override
-  public MetricContext getMetricContext() {
-    return this.metricContext;
-  }
-
-  @Override
-  public boolean isInstrumentationEnabled() {
-    return null != this.metricContext;
-  }
-
-  @Override
-  public List<Tag<?>> generateTags(State state) {
-    return Collections.emptyList();
-  }
-
-  @Override
-  public void switchMetricContext(List<Tag<?>> tags) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public void switchMetricContext(MetricContext context) {
-    throw new UnsupportedOperationException();
+  /*
+   Deletes spec from flowCatalog if it is an adhoc flow (not containing a job 
schedule)
+ */
+  private void deleteSpecFromCatalogIfAdhoc(FlowSpec flowSpec) {
+    if (!flowSpec.getConfig().hasPath(ConfigurationKeys.JOB_SCHEDULE_KEY)) {
+      this.flowCatalog.get().remove(flowSpec.getUri(), new Properties(), 
false);
+    }
   }
 }
\ No newline at end of file
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
index 45053a93d..7ccc094b1 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
@@ -112,7 +112,7 @@ public class GobblinServiceJobScheduler extends 
JobScheduler implements SpecCata
   protected final Optional<FlowCatalog> flowCatalog;
   protected final Optional<HelixManager> helixManager;
   protected final Orchestrator orchestrator;
-  protected final Boolean warmStandbyEnabled;
+  protected final Boolean isWarmStandbyEnabled;
   protected final Optional<UserQuotaManager> quotaManager;
   protected final Optional<FlowTriggerHandler> flowTriggerHandler;
   @Getter
@@ -170,7 +170,7 @@ public class GobblinServiceJobScheduler extends 
JobScheduler implements SpecCata
       Config config,
       Optional<HelixManager> helixManager, Optional<FlowCatalog> flowCatalog, 
Optional<TopologyCatalog> topologyCatalog,
       Orchestrator orchestrator, SchedulerService schedulerService, 
Optional<UserQuotaManager> quotaManager, Optional<Logger> log,
-      @Named(InjectionNames.WARM_STANDBY_ENABLED) boolean warmStandbyEnabled,
+      @Named(InjectionNames.WARM_STANDBY_ENABLED) boolean isWarmStandbyEnabled,
       Optional<FlowTriggerHandler> flowTriggerHandler) throws Exception {
     super(ConfigUtils.configToProperties(config), schedulerService);
 
@@ -185,7 +185,7 @@ public class GobblinServiceJobScheduler extends 
JobScheduler implements SpecCata
     this.skipSchedulingFlowsAfterNumDays = 
Integer.parseInt(ConfigUtils.configToProperties(config).getProperty(ConfigurationKeys.SKIP_SCHEDULING_FLOWS_AFTER_NUM_DAYS,
 String.valueOf(ConfigurationKeys.DEFAULT_NUM_DAYS_TO_SKIP_AFTER)));
     this.isNominatedDRHandler = 
config.hasPath(GOBBLIN_SERVICE_SCHEDULER_DR_NOMINATED)
         && config.hasPath(GOBBLIN_SERVICE_SCHEDULER_DR_NOMINATED);
-    this.warmStandbyEnabled = warmStandbyEnabled;
+    this.isWarmStandbyEnabled = isWarmStandbyEnabled;
     this.quotaManager = quotaManager;
     this.flowTriggerHandler = flowTriggerHandler;
     // Check that these metrics do not exist before adding, mainly for testing 
purpose which creates multiple instances
@@ -209,13 +209,13 @@ public class GobblinServiceJobScheduler extends 
JobScheduler implements SpecCata
   public GobblinServiceJobScheduler(String serviceName, Config config, 
FlowStatusGenerator flowStatusGenerator,
       Optional<HelixManager> helixManager, Optional<FlowCatalog> flowCatalog, 
Optional<TopologyCatalog> topologyCatalog,
       Optional<DagManager> dagManager, Optional<UserQuotaManager> 
quotaManager, SchedulerService schedulerService,
-      Optional<Logger> log, boolean warmStandbyEnabled, Optional 
<FlowTriggerHandler> flowTriggerHandler,
+      Optional<Logger> log, boolean isWarmStandbyEnabled, Optional 
<FlowTriggerHandler> flowTriggerHandler,
       SharedFlowMetricsSingleton sharedFlowMetricsSingleton)
       throws Exception {
     this(serviceName, config, helixManager, flowCatalog, topologyCatalog,
         new Orchestrator(config, flowStatusGenerator, topologyCatalog, 
dagManager, log, flowTriggerHandler,
-            sharedFlowMetricsSingleton),
-        schedulerService, quotaManager, log, warmStandbyEnabled, 
flowTriggerHandler);
+            sharedFlowMetricsSingleton, flowCatalog),
+        schedulerService, quotaManager, log, isWarmStandbyEnabled, 
flowTriggerHandler);
   }
 
   public synchronized void setActive(boolean isActive) {
@@ -560,7 +560,7 @@ public class GobblinServiceJobScheduler extends 
JobScheduler implements SpecCata
     }
     // Check quota limits against adhoc flows before saving the schedule
     // In warm standby mode, this quota check will happen on restli API layer 
when we accept the flow
-    if (!this.warmStandbyEnabled && 
!jobConfig.containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY)) {
+    if (!this.isWarmStandbyEnabled && 
!jobConfig.containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY)) {
       // This block should be reachable only for the execution for the adhoc 
flows
       // For flow that has scheduler but run-immediately set to be true, we 
won't check the quota as we will use a different execution id later
       if (quotaManager.isPresent()) {
@@ -820,7 +820,7 @@ public class GobblinServiceJobScheduler extends 
JobScheduler implements SpecCata
               }
             }
           }
-          GobblinServiceJobScheduler.this.flowCatalog.get().remove(specUri, 
new Properties(), false);
+          // Note that we only remove the spec from the flow catalog after it 
is orchestrated
           
GobblinServiceJobScheduler.this.scheduledFlowSpecs.remove(specUri.toString());
           
GobblinServiceJobScheduler.this.lastUpdatedTimeForFlowSpec.remove(specUri.toString());
         }
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java
index 02f6f58de..08e89ef0b 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java
@@ -346,13 +346,14 @@ public class GobblinServiceManagerTest {
 
   @Test (dependsOnMethods = "testRunQuotaExceeds")
   public void testExplainJob() throws Exception {
+    int sizeBeforeTest = 
this.gobblinServiceManager.getFlowCatalog().getSpecs().size();
     FlowConfig flowConfig = new FlowConfig().setId(new 
FlowId().setFlowGroup(TEST_GROUP_NAME).setFlowName(TEST_FLOW_NAME))
         .setTemplateUris(TEST_TEMPLATE_URI).setProperties(new 
StringMap(flowProperties)).setExplain(true);
 
     this.flowConfigClient.createFlowConfig(flowConfig);
 
     // explain job should not be persisted
-    
Assert.assertEquals(this.gobblinServiceManager.getFlowCatalog().getSpecs().size(),
 0);
+    
Assert.assertEquals(this.gobblinServiceManager.getFlowCatalog().getSpecs().size(),
 sizeBeforeTest);
     
Assert.assertFalse(this.gobblinServiceManager.getScheduler().getScheduledFlowSpecs().containsKey(TEST_URI.toString()));
   }
 
@@ -506,7 +507,7 @@ null, null, null, null);
             "Waiting for job to get orchestrated...");
   }
 
-  @Test
+  @Test (dependsOnMethods = "testGitCreate")
   public void testBadGet() throws Exception {
     FlowId flowId = new 
FlowId().setFlowGroup(TEST_DUMMY_GROUP_NAME).setFlowName(TEST_DUMMY_FLOW_NAME);
 
@@ -520,7 +521,7 @@ null, null, null, null);
     Assert.fail("Get should have raised a 404 error");
   }
 
-  @Test
+  @Test (dependsOnMethods = "testBadGet")
   public void testBadDelete() throws Exception {
     FlowId flowId = new 
FlowId().setFlowGroup(TEST_DUMMY_GROUP_NAME).setFlowName(TEST_DUMMY_FLOW_NAME);
 
@@ -534,7 +535,7 @@ null, null, null, null);
     Assert.fail("Get should have raised a 404 error");
   }
 
-  @Test
+  @Test (dependsOnMethods = "testBadDelete")
   public void testBadUpdate() throws Exception {
     Map<String, String> flowProperties = Maps.newHashMap();
     flowProperties.put("param1", "value1b");
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
index 444f2dc8c..7e7daef9b 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
@@ -109,7 +109,7 @@ public class OrchestratorTest {
     this.orchestrator = new 
Orchestrator(ConfigUtils.propertiesToConfig(orchestratorProperties),
         this.mockStatusGenerator, Optional.of(this.topologyCatalog), 
Optional.<DagManager>absent(), Optional.of(logger),
          Optional.of(this._mockFlowTriggerHandler), new 
SharedFlowMetricsSingleton(
-             ConfigUtils.propertiesToConfig(orchestratorProperties)));
+             ConfigUtils.propertiesToConfig(orchestratorProperties)), 
Optional.of(mock(FlowCatalog.class)));
     this.topologyCatalog.addListener(orchestrator);
     this.flowCatalog.addListener(orchestrator);
     // Start application
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java
index 9949c97d5..0fdfc894b 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java
@@ -338,6 +338,7 @@ public class GobblinServiceJobSchedulerTest {
     serviceLauncher.addService(flowCatalog);
     serviceLauncher.start();
 
+    // We need to test adhoc flows since scheduled flows do not have a quota 
check in the scheduler
     FlowSpec flowSpec0 = 
FlowCatalogTest.initFlowSpec(specDir.getAbsolutePath(), URI.create("spec0"), 
"flowName0", "group1",
         ConfigFactory.empty(), true);
     FlowSpec flowSpec1 = 
FlowCatalogTest.initFlowSpec(specDir.getAbsolutePath(), URI.create("spec1"), 
"flowName1", "group1",
@@ -354,7 +355,8 @@ public class GobblinServiceJobSchedulerTest {
     SchedulerService schedulerService = new SchedulerService(new Properties());
     // Mock a GaaS scheduler not in warm standby mode
     GobblinServiceJobScheduler scheduler = new 
GobblinServiceJobScheduler("testscheduler",
-        ConfigFactory.empty(), Optional.absent(), Optional.of(flowCatalog), 
null, mockOrchestrator, schedulerService, Optional.of(new 
InMemoryUserQuotaManager(quotaConfig)), Optional.absent(), false, 
Optional.of(Mockito.mock(
+        ConfigFactory.empty(), Optional.absent(), Optional.of(flowCatalog), 
null, mockOrchestrator, schedulerService,
+        Optional.of(new InMemoryUserQuotaManager(quotaConfig)), 
Optional.absent(), false, Optional.of(Mockito.mock(
         FlowTriggerHandler.class)));
 
     schedulerService.startAsync().awaitRunning();
@@ -363,17 +365,17 @@ public class GobblinServiceJobSchedulerTest {
 
     scheduler.onAddSpec(flowSpec0); //Ignore the response for this request
     Assert.assertThrows(RuntimeException.class, () -> 
scheduler.onAddSpec(flowSpec1));
+    // We don't check scheduledFlowSpecs size here because it results in a 
flaky timing issue where the spec may be
+    // deleted for adhoc flows before we assert the size.
 
-    Assert.assertEquals(scheduler.scheduledFlowSpecs.size(), 1);
-    // Second flow should not be added to scheduled flows since it was rejected
-    Assert.assertEquals(scheduler.scheduledFlowSpecs.size(), 1);
     // set scheduler to be inactive and unschedule flows
     scheduler.setActive(false);
     Assert.assertEquals(scheduler.scheduledFlowSpecs.size(), 0);
 
     //Mock a GaaS scheduler in warm standby mode, where we don't check quota
     GobblinServiceJobScheduler schedulerWithWarmStandbyEnabled = new 
GobblinServiceJobScheduler("testscheduler",
-        ConfigFactory.empty(), Optional.absent(), Optional.of(flowCatalog), 
null, mockOrchestrator, schedulerService, Optional.of(new 
InMemoryUserQuotaManager(quotaConfig)), Optional.absent(), true, 
Optional.of(Mockito.mock(
+        ConfigFactory.empty(), Optional.absent(), Optional.of(flowCatalog), 
null, mockOrchestrator, schedulerService,
+        Optional.of(new InMemoryUserQuotaManager(quotaConfig)), 
Optional.absent(), true, Optional.of(Mockito.mock(
         FlowTriggerHandler.class)));
 
     schedulerWithWarmStandbyEnabled.startUp();
@@ -396,8 +398,8 @@ public class GobblinServiceJobSchedulerTest {
     public TestGobblinServiceJobScheduler(String serviceName, Config config,
         Optional<FlowCatalog> flowCatalog, Optional<TopologyCatalog> 
topologyCatalog, Orchestrator orchestrator, Optional<UserQuotaManager> 
quotaManager,
         SchedulerService schedulerService, boolean isWarmStandbyEnabled) 
throws Exception {
-      super(serviceName, config, Optional.absent(), flowCatalog, 
topologyCatalog, orchestrator, schedulerService, quotaManager, 
Optional.absent(), isWarmStandbyEnabled, Optional.of(Mockito.mock(
-          FlowTriggerHandler.class)));
+      super(serviceName, config, Optional.absent(), flowCatalog, 
topologyCatalog, orchestrator, schedulerService,
+          quotaManager, Optional.absent(), isWarmStandbyEnabled, 
Optional.of(Mockito.mock(FlowTriggerHandler.class)));
       if (schedulerService != null) {
         hasScheduler = true;
       }

Reply via email to