This is an automated email from the ASF dual-hosted git repository.
wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new be34e9e81 [GOBBLIN-1974] Ensure Adhoc Flows can be Executed in
Multi-active Scheduler state (#3846)
be34e9e81 is described below
commit be34e9e815f8b64678222193311bb3fa5123d408
Author: umustafi <[email protected]>
AuthorDate: Thu Dec 14 12:19:32 2023 -0800
[GOBBLIN-1974] Ensure Adhoc Flows can be Executed in Multi-active Scheduler
state (#3846)
* Ensure Adhoc Flows can be Executed in Multi-active Scheduler state
* Only delete spec for adhoc flows & always after orchestration
* Delete adhoc flows when dagManager is not present as well
* Fix flaky test for scheduler
* Add clarifying comment about failure recovery
* Re-ordered private method
* Move private methods again
* Enforce sequential ordering of unit tests to make more reliable
---------
Co-authored-by: Urmi Mustafi <[email protected]>
---
.../modules/orchestration/Orchestrator.java | 83 ++++++++++++++--------
.../scheduler/GobblinServiceJobScheduler.java | 16 ++---
.../gobblin/service/GobblinServiceManagerTest.java | 9 +--
.../modules/orchestration/OrchestratorTest.java | 2 +-
.../scheduler/GobblinServiceJobSchedulerTest.java | 16 +++--
5 files changed, 76 insertions(+), 50 deletions(-)
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
index 023f87117..f839fda2d 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
@@ -55,6 +55,7 @@ import org.apache.gobblin.runtime.api.SpecCatalogListener;
import org.apache.gobblin.runtime.api.SpecProducer;
import org.apache.gobblin.runtime.api.TopologySpec;
import org.apache.gobblin.runtime.spec_catalog.AddSpecResponse;
+import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
import org.apache.gobblin.runtime.spec_catalog.TopologyCatalog;
import org.apache.gobblin.service.ServiceConfigKeys;
import org.apache.gobblin.service.modules.flow.SpecCompiler;
@@ -101,6 +102,7 @@ public class Orchestrator implements SpecCatalogListener,
Instrumentable {
private UserQuotaManager quotaManager;
private final FlowCompilationValidationHelper
flowCompilationValidationHelper;
private Optional<FlowTriggerHandler> flowTriggerHandler;
+ private Optional<FlowCatalog> flowCatalog;
@Getter
private final SharedFlowMetricsSingleton sharedFlowMetricsSingleton;
@@ -108,7 +110,8 @@ public class Orchestrator implements SpecCatalogListener,
Instrumentable {
public Orchestrator(Config config, Optional<TopologyCatalog>
topologyCatalog, Optional<DagManager> dagManager,
Optional<Logger> log, FlowStatusGenerator flowStatusGenerator, boolean
instrumentationEnabled,
- Optional<FlowTriggerHandler> flowTriggerHandler,
SharedFlowMetricsSingleton sharedFlowMetricsSingleton) {
+ Optional<FlowTriggerHandler> flowTriggerHandler,
SharedFlowMetricsSingleton sharedFlowMetricsSingleton,
+ Optional<FlowCatalog> flowCatalog) {
_log = log.isPresent() ? log.get() : LoggerFactory.getLogger(getClass());
this.aliasResolver = new ClassAliasResolver<>(SpecCompiler.class);
this.topologyCatalog = topologyCatalog;
@@ -116,6 +119,7 @@ public class Orchestrator implements SpecCatalogListener,
Instrumentable {
this.flowStatusGenerator = flowStatusGenerator;
this.flowTriggerHandler = flowTriggerHandler;
this.sharedFlowMetricsSingleton = sharedFlowMetricsSingleton;
+ this.flowCatalog = flowCatalog;
try {
String specCompilerClassName =
ServiceConfigKeys.DEFAULT_GOBBLIN_SERVICE_FLOWCOMPILER_CLASS;
if
(config.hasPath(ServiceConfigKeys.GOBBLIN_SERVICE_FLOWCOMPILER_CLASS_KEY)) {
@@ -161,9 +165,9 @@ public class Orchestrator implements SpecCatalogListener,
Instrumentable {
@Inject
public Orchestrator(Config config, FlowStatusGenerator flowStatusGenerator,
Optional<TopologyCatalog> topologyCatalog,
Optional<DagManager> dagManager, Optional<Logger> log,
Optional<FlowTriggerHandler> flowTriggerHandler,
- SharedFlowMetricsSingleton sharedFlowMetricsSingleton) {
+ SharedFlowMetricsSingleton sharedFlowMetricsSingleton,
Optional<FlowCatalog> flowCatalog) {
this(config, topologyCatalog, dagManager, log, flowStatusGenerator, true,
flowTriggerHandler,
- sharedFlowMetricsSingleton);
+ sharedFlowMetricsSingleton, flowCatalog);
}
@@ -234,7 +238,8 @@ public class Orchestrator implements SpecCatalogListener,
Instrumentable {
long startTime = System.nanoTime();
if (spec instanceof FlowSpec) {
- Config flowConfig = ((FlowSpec) spec).getConfig();
+ FlowSpec flowSpec = (FlowSpec) spec;
+ Config flowConfig = (flowSpec).getConfig();
String flowGroup =
flowConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
String flowName = flowConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
@@ -248,7 +253,7 @@ public class Orchestrator implements SpecCatalogListener,
Instrumentable {
Instrumented.markMeter(this.flowOrchestrationFailedMeter);
return;
}
- Map<String, String> flowMetadata =
TimingEventUtils.getFlowMetadata((FlowSpec) spec);
+ Map<String, String> flowMetadata =
TimingEventUtils.getFlowMetadata(flowSpec);
FlowCompilationValidationHelper.addFlowExecutionIdIfAbsent(flowMetadata,
jobExecutionPlanDagOptional.get());
java.util.Optional<String> flowExecutionId =
TimingEventUtils.getFlowExecutionIdFromFlowMetadata(flowMetadata);
@@ -299,7 +304,7 @@ public class Orchestrator implements SpecCatalogListener,
Instrumentable {
// Depending on if DagManager is present, handle execution
if (this.dagManager.isPresent()) {
- submitFlowToDagManager((FlowSpec) spec, jobExecutionPlanDag);
+ submitFlowToDagManager(flowSpec, jobExecutionPlanDag);
} else {
// Schedule all compiled JobSpecs on their respective Executor
for (Dag.DagNode<JobExecutionPlan> dagNode :
jobExecutionPlanDag.getNodes()) {
@@ -332,6 +337,7 @@ public class Orchestrator implements SpecCatalogListener,
Instrumentable {
+ " for flow: " + spec, e);
}
}
+ deleteSpecFromCatalogIfAdhoc(flowSpec);
}
}
} else {
@@ -359,6 +365,13 @@ public class Orchestrator implements SpecCatalogListener,
Instrumentable {
try {
// Send the dag to the DagManager
this.dagManager.get().addDag(jobExecutionPlanDag, true, true);
+
+ /*
+ Adhoc flows can be deleted after persisting it in DagManager as the
DagManager's failure recovery method ensures
+ it will be executed in the event of downtime. Note that the
responsibility of the multi-active scheduler mode ends
+ after this method is completed AND the consumption of a launch type
event is committed to the consumer.
+ */
+ deleteSpecFromCatalogIfAdhoc(flowSpec);
} catch (Exception ex) {
String failureMessage = "Failed to add Job Execution Plan due to: " +
ex.getMessage();
_log.warn("Orchestrator call - " + failureMessage, ex);
@@ -394,6 +407,33 @@ public class Orchestrator implements SpecCatalogListener,
Instrumentable {
}
}
+ @Nonnull
+ @Override
+ public MetricContext getMetricContext() {
+ return this.metricContext;
+ }
+
+ @Override
+ public boolean isInstrumentationEnabled() {
+ return null != this.metricContext;
+ }
+
+ @Override
+ public List<Tag<?>> generateTags(State state) {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public void switchMetricContext(List<Tag<?>> tags) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void switchMetricContext(MetricContext context) {
+ throw new UnsupportedOperationException();
+ }
+
+
private void deleteFromExecutor(Spec spec, Properties headers) {
Dag<JobExecutionPlan> jobExecutionPlanDag = specCompiler.compileFlow(spec);
@@ -419,29 +459,12 @@ public class Orchestrator implements SpecCatalogListener,
Instrumentable {
}
}
- @Nonnull
- @Override
- public MetricContext getMetricContext() {
- return this.metricContext;
- }
-
- @Override
- public boolean isInstrumentationEnabled() {
- return null != this.metricContext;
- }
-
- @Override
- public List<Tag<?>> generateTags(State state) {
- return Collections.emptyList();
- }
-
- @Override
- public void switchMetricContext(List<Tag<?>> tags) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void switchMetricContext(MetricContext context) {
- throw new UnsupportedOperationException();
+ /*
+ Deletes spec from flowCatalog if it is an adhoc flow (not containing a job
schedule)
+ */
+ private void deleteSpecFromCatalogIfAdhoc(FlowSpec flowSpec) {
+ if (!flowSpec.getConfig().hasPath(ConfigurationKeys.JOB_SCHEDULE_KEY)) {
+ this.flowCatalog.get().remove(flowSpec.getUri(), new Properties(),
false);
+ }
}
}
\ No newline at end of file
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
index 45053a93d..7ccc094b1 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
@@ -112,7 +112,7 @@ public class GobblinServiceJobScheduler extends
JobScheduler implements SpecCata
protected final Optional<FlowCatalog> flowCatalog;
protected final Optional<HelixManager> helixManager;
protected final Orchestrator orchestrator;
- protected final Boolean warmStandbyEnabled;
+ protected final Boolean isWarmStandbyEnabled;
protected final Optional<UserQuotaManager> quotaManager;
protected final Optional<FlowTriggerHandler> flowTriggerHandler;
@Getter
@@ -170,7 +170,7 @@ public class GobblinServiceJobScheduler extends
JobScheduler implements SpecCata
Config config,
Optional<HelixManager> helixManager, Optional<FlowCatalog> flowCatalog,
Optional<TopologyCatalog> topologyCatalog,
Orchestrator orchestrator, SchedulerService schedulerService,
Optional<UserQuotaManager> quotaManager, Optional<Logger> log,
- @Named(InjectionNames.WARM_STANDBY_ENABLED) boolean warmStandbyEnabled,
+ @Named(InjectionNames.WARM_STANDBY_ENABLED) boolean isWarmStandbyEnabled,
Optional<FlowTriggerHandler> flowTriggerHandler) throws Exception {
super(ConfigUtils.configToProperties(config), schedulerService);
@@ -185,7 +185,7 @@ public class GobblinServiceJobScheduler extends
JobScheduler implements SpecCata
this.skipSchedulingFlowsAfterNumDays =
Integer.parseInt(ConfigUtils.configToProperties(config).getProperty(ConfigurationKeys.SKIP_SCHEDULING_FLOWS_AFTER_NUM_DAYS,
String.valueOf(ConfigurationKeys.DEFAULT_NUM_DAYS_TO_SKIP_AFTER)));
this.isNominatedDRHandler =
config.hasPath(GOBBLIN_SERVICE_SCHEDULER_DR_NOMINATED)
&& config.hasPath(GOBBLIN_SERVICE_SCHEDULER_DR_NOMINATED);
- this.warmStandbyEnabled = warmStandbyEnabled;
+ this.isWarmStandbyEnabled = isWarmStandbyEnabled;
this.quotaManager = quotaManager;
this.flowTriggerHandler = flowTriggerHandler;
// Check that these metrics do not exist before adding, mainly for testing
purpose which creates multiple instances
@@ -209,13 +209,13 @@ public class GobblinServiceJobScheduler extends
JobScheduler implements SpecCata
public GobblinServiceJobScheduler(String serviceName, Config config,
FlowStatusGenerator flowStatusGenerator,
Optional<HelixManager> helixManager, Optional<FlowCatalog> flowCatalog,
Optional<TopologyCatalog> topologyCatalog,
Optional<DagManager> dagManager, Optional<UserQuotaManager>
quotaManager, SchedulerService schedulerService,
- Optional<Logger> log, boolean warmStandbyEnabled, Optional
<FlowTriggerHandler> flowTriggerHandler,
+ Optional<Logger> log, boolean isWarmStandbyEnabled, Optional
<FlowTriggerHandler> flowTriggerHandler,
SharedFlowMetricsSingleton sharedFlowMetricsSingleton)
throws Exception {
this(serviceName, config, helixManager, flowCatalog, topologyCatalog,
new Orchestrator(config, flowStatusGenerator, topologyCatalog,
dagManager, log, flowTriggerHandler,
- sharedFlowMetricsSingleton),
- schedulerService, quotaManager, log, warmStandbyEnabled,
flowTriggerHandler);
+ sharedFlowMetricsSingleton, flowCatalog),
+ schedulerService, quotaManager, log, isWarmStandbyEnabled,
flowTriggerHandler);
}
public synchronized void setActive(boolean isActive) {
@@ -560,7 +560,7 @@ public class GobblinServiceJobScheduler extends
JobScheduler implements SpecCata
}
// Check quota limits against adhoc flows before saving the schedule
// In warm standby mode, this quota check will happen on restli API layer
when we accept the flow
- if (!this.warmStandbyEnabled &&
!jobConfig.containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY)) {
+ if (!this.isWarmStandbyEnabled &&
!jobConfig.containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY)) {
// This block should be reachable only for the execution for the adhoc
flows
// For flow that has scheduler but run-immediately set to be true, we
won't check the quota as we will use a different execution id later
if (quotaManager.isPresent()) {
@@ -820,7 +820,7 @@ public class GobblinServiceJobScheduler extends
JobScheduler implements SpecCata
}
}
}
- GobblinServiceJobScheduler.this.flowCatalog.get().remove(specUri,
new Properties(), false);
+ // Note that we only remove the spec from the flow catalog after it
is orchestrated
GobblinServiceJobScheduler.this.scheduledFlowSpecs.remove(specUri.toString());
GobblinServiceJobScheduler.this.lastUpdatedTimeForFlowSpec.remove(specUri.toString());
}
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java
index 02f6f58de..08e89ef0b 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java
@@ -346,13 +346,14 @@ public class GobblinServiceManagerTest {
@Test (dependsOnMethods = "testRunQuotaExceeds")
public void testExplainJob() throws Exception {
+ int sizeBeforeTest =
this.gobblinServiceManager.getFlowCatalog().getSpecs().size();
FlowConfig flowConfig = new FlowConfig().setId(new
FlowId().setFlowGroup(TEST_GROUP_NAME).setFlowName(TEST_FLOW_NAME))
.setTemplateUris(TEST_TEMPLATE_URI).setProperties(new
StringMap(flowProperties)).setExplain(true);
this.flowConfigClient.createFlowConfig(flowConfig);
// explain job should not be persisted
-
Assert.assertEquals(this.gobblinServiceManager.getFlowCatalog().getSpecs().size(),
0);
+
Assert.assertEquals(this.gobblinServiceManager.getFlowCatalog().getSpecs().size(),
sizeBeforeTest);
Assert.assertFalse(this.gobblinServiceManager.getScheduler().getScheduledFlowSpecs().containsKey(TEST_URI.toString()));
}
@@ -506,7 +507,7 @@ null, null, null, null);
"Waiting for job to get orchestrated...");
}
- @Test
+ @Test (dependsOnMethods = "testGitCreate")
public void testBadGet() throws Exception {
FlowId flowId = new
FlowId().setFlowGroup(TEST_DUMMY_GROUP_NAME).setFlowName(TEST_DUMMY_FLOW_NAME);
@@ -520,7 +521,7 @@ null, null, null, null);
Assert.fail("Get should have raised a 404 error");
}
- @Test
+ @Test (dependsOnMethods = "testBadGet")
public void testBadDelete() throws Exception {
FlowId flowId = new
FlowId().setFlowGroup(TEST_DUMMY_GROUP_NAME).setFlowName(TEST_DUMMY_FLOW_NAME);
@@ -534,7 +535,7 @@ null, null, null, null);
Assert.fail("Get should have raised a 404 error");
}
- @Test
+ @Test (dependsOnMethods = "testBadDelete")
public void testBadUpdate() throws Exception {
Map<String, String> flowProperties = Maps.newHashMap();
flowProperties.put("param1", "value1b");
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
index 444f2dc8c..7e7daef9b 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
@@ -109,7 +109,7 @@ public class OrchestratorTest {
this.orchestrator = new
Orchestrator(ConfigUtils.propertiesToConfig(orchestratorProperties),
this.mockStatusGenerator, Optional.of(this.topologyCatalog),
Optional.<DagManager>absent(), Optional.of(logger),
Optional.of(this._mockFlowTriggerHandler), new
SharedFlowMetricsSingleton(
- ConfigUtils.propertiesToConfig(orchestratorProperties)));
+ ConfigUtils.propertiesToConfig(orchestratorProperties)),
Optional.of(mock(FlowCatalog.class)));
this.topologyCatalog.addListener(orchestrator);
this.flowCatalog.addListener(orchestrator);
// Start application
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java
index 9949c97d5..0fdfc894b 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java
@@ -338,6 +338,7 @@ public class GobblinServiceJobSchedulerTest {
serviceLauncher.addService(flowCatalog);
serviceLauncher.start();
+ // We need to test adhoc flows since scheduled flows do not have a quota
check in the scheduler
FlowSpec flowSpec0 =
FlowCatalogTest.initFlowSpec(specDir.getAbsolutePath(), URI.create("spec0"),
"flowName0", "group1",
ConfigFactory.empty(), true);
FlowSpec flowSpec1 =
FlowCatalogTest.initFlowSpec(specDir.getAbsolutePath(), URI.create("spec1"),
"flowName1", "group1",
@@ -354,7 +355,8 @@ public class GobblinServiceJobSchedulerTest {
SchedulerService schedulerService = new SchedulerService(new Properties());
// Mock a GaaS scheduler not in warm standby mode
GobblinServiceJobScheduler scheduler = new
GobblinServiceJobScheduler("testscheduler",
- ConfigFactory.empty(), Optional.absent(), Optional.of(flowCatalog),
null, mockOrchestrator, schedulerService, Optional.of(new
InMemoryUserQuotaManager(quotaConfig)), Optional.absent(), false,
Optional.of(Mockito.mock(
+ ConfigFactory.empty(), Optional.absent(), Optional.of(flowCatalog),
null, mockOrchestrator, schedulerService,
+ Optional.of(new InMemoryUserQuotaManager(quotaConfig)),
Optional.absent(), false, Optional.of(Mockito.mock(
FlowTriggerHandler.class)));
schedulerService.startAsync().awaitRunning();
@@ -363,17 +365,17 @@ public class GobblinServiceJobSchedulerTest {
scheduler.onAddSpec(flowSpec0); //Ignore the response for this request
Assert.assertThrows(RuntimeException.class, () ->
scheduler.onAddSpec(flowSpec1));
+ // We don't check scheduledFlowSpecs size here because it results in a
flaky timing issue where the spec may be
+ // deleted for adhoc flows before we assert the size.
- Assert.assertEquals(scheduler.scheduledFlowSpecs.size(), 1);
- // Second flow should not be added to scheduled flows since it was rejected
- Assert.assertEquals(scheduler.scheduledFlowSpecs.size(), 1);
// set scheduler to be inactive and unschedule flows
scheduler.setActive(false);
Assert.assertEquals(scheduler.scheduledFlowSpecs.size(), 0);
//Mock a GaaS scheduler in warm standby mode, where we don't check quota
GobblinServiceJobScheduler schedulerWithWarmStandbyEnabled = new
GobblinServiceJobScheduler("testscheduler",
- ConfigFactory.empty(), Optional.absent(), Optional.of(flowCatalog),
null, mockOrchestrator, schedulerService, Optional.of(new
InMemoryUserQuotaManager(quotaConfig)), Optional.absent(), true,
Optional.of(Mockito.mock(
+ ConfigFactory.empty(), Optional.absent(), Optional.of(flowCatalog),
null, mockOrchestrator, schedulerService,
+ Optional.of(new InMemoryUserQuotaManager(quotaConfig)),
Optional.absent(), true, Optional.of(Mockito.mock(
FlowTriggerHandler.class)));
schedulerWithWarmStandbyEnabled.startUp();
@@ -396,8 +398,8 @@ public class GobblinServiceJobSchedulerTest {
public TestGobblinServiceJobScheduler(String serviceName, Config config,
Optional<FlowCatalog> flowCatalog, Optional<TopologyCatalog>
topologyCatalog, Orchestrator orchestrator, Optional<UserQuotaManager>
quotaManager,
SchedulerService schedulerService, boolean isWarmStandbyEnabled)
throws Exception {
- super(serviceName, config, Optional.absent(), flowCatalog,
topologyCatalog, orchestrator, schedulerService, quotaManager,
Optional.absent(), isWarmStandbyEnabled, Optional.of(Mockito.mock(
- FlowTriggerHandler.class)));
+ super(serviceName, config, Optional.absent(), flowCatalog,
topologyCatalog, orchestrator, schedulerService,
+ quotaManager, Optional.absent(), isWarmStandbyEnabled,
Optional.of(Mockito.mock(FlowTriggerHandler.class)));
if (schedulerService != null) {
hasScheduler = true;
}