This is an automated email from the ASF dual-hosted git repository.
aplex pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new d9d2222 [GOBBLIN-1529] Clear all jobs quickly when demoting a leader
node (#3380)
d9d2222 is described below
commit d9d22225ff34d534322e1c33675aa3d34f995969
Author: William Lo <[email protected]>
AuthorDate: Wed Sep 1 16:57:16 2021 -0700
[GOBBLIN-1529] Clear all jobs quickly when demoting a leader node (#3380)
In scenarios where there are a large number of scheduled jobs, it may take
too long (measured in hours) to fully demote the past leader to unschedule all
of its jobs incrementally. This can cause duplicate flow executions and
inconsistent behavior. Instead we should bulk delete all schedules.
In the future we can investigate an option where past leaders keep their
schedules in memory and periodically synchronize their in-memory schedulers
from MySQL and prevent them from actually executing or doing work, but there is
no simple way of doing this right now as each scheduler would also have to
account for updates/deletes to other hosts.
---
.../org/apache/gobblin/scheduler/JobScheduler.java | 4 +++
.../runtime/spec_catalog/FlowCatalogTest.java | 3 +++
.../scheduler/GobblinServiceJobScheduler.java | 19 +++++++-------
.../scheduler/GobblinServiceJobSchedulerTest.java | 29 +++++++++++++++++-----
4 files changed, 39 insertions(+), 16 deletions(-)
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/scheduler/JobScheduler.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/scheduler/JobScheduler.java
index ada9bdd..1ae73e9 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/scheduler/JobScheduler.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/scheduler/JobScheduler.java
@@ -425,6 +425,10 @@ public class JobScheduler extends AbstractIdleService {
}
}
+ public void unscheduleAllJobs() throws SchedulerException {
+ this.scheduler.getScheduler().clear();
+ }
+
/**
* Run a job.
*
diff --git
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalogTest.java
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalogTest.java
index 9147b7f..319a0f3 100644
---
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalogTest.java
+++
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalogTest.java
@@ -111,8 +111,11 @@ public class FlowCatalogTest {
public static FlowSpec initFlowSpec(String specStore, URI uri, String
flowName){
Properties properties = new Properties();
properties.put(ConfigurationKeys.FLOW_NAME_KEY, flowName);
+ properties.put("job.name", flowName);
+ properties.put("job.group", flowName);
properties.put("specStore.fs.dir", specStore);
properties.put("specExecInstance.capabilities", "source:destination");
+ properties.put("job.schedule", "0 0 0 ? * * 2050");
Config config = ConfigUtils.propertiesToConfig(properties);
SpecExecutor specExecutorInstanceProducer = new
InMemorySpecExecutor(config);
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
index 8f6327e..ffd2f15 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
@@ -24,9 +24,7 @@ import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import java.io.IOException;
import java.net.URI;
-import java.util.ArrayList;
import java.util.Iterator;
-import java.util.List;
import java.util.Map;
import java.util.Properties;
import javax.inject.Inject;
@@ -67,6 +65,7 @@ import org.quartz.InterruptableJob;
import org.quartz.JobDataMap;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
+import org.quartz.SchedulerException;
import org.quartz.UnableToInterruptJobException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -170,13 +169,13 @@ public class GobblinServiceJobScheduler extends
JobScheduler implements SpecCata
}
} else {
// Since we are going to change status to isActive=false, unschedule all
flows
- List<Spec> specs = new ArrayList<>(this.scheduledFlowSpecs.values());
- for (Spec spec : specs) {
- try {
- unscheduleSpec(spec.getUri(), spec.getVersion());
- } catch (JobException e) {
- _log.warn(String.format("Spec with URI: %s was not unscheduled
during shutdown", spec.getUri()), e);
- }
+ try {
+ this.scheduledFlowSpecs.clear();
+ unscheduleAllJobs();
+ } catch (SchedulerException e) {
+ _log.error(String.format("Not all jobs were unscheduled"), e);
+ // We want to avoid duplicate flow execution, so fail loudly
+ throw new RuntimeException(e);
}
// Need to set active=false at the end; otherwise in the onDeleteSpec(),
node will forward specs to active node, which is itself.
this.isActive = isActive;
@@ -348,7 +347,7 @@ public class GobblinServiceJobScheduler extends
JobScheduler implements SpecCata
}
/**
- * Remove a flowSpec from schedule due to another leader being elected
+ * Remove a flowSpec from schedule
* Unlike onDeleteSpec, we want to avoid deleting the flowSpec on the
executor
* and we still want to unschedule if we cannot connect to zookeeper as the
current node cannot be the master
* @param specURI
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java
index d2085a5..a932397 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java
@@ -27,6 +27,7 @@ import java.util.Collection;
import java.util.Map;
import java.util.Properties;
import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.runtime.JobException;
import org.apache.gobblin.runtime.api.FlowSpec;
import org.apache.gobblin.runtime.api.Spec;
import org.apache.gobblin.runtime.api.SpecCatalogListener;
@@ -226,8 +227,7 @@ public class GobblinServiceJobSchedulerTest {
serviceLauncher.addService(flowCatalog);
serviceLauncher.start();
- FlowSpec flowSpec0 =
FlowCatalogTest.initFlowSpec(specDir.getAbsolutePath(), URI.create("spec0"),
- MockedSpecCompiler.UNCOMPILABLE_FLOW);
+ FlowSpec flowSpec0 =
FlowCatalogTest.initFlowSpec(specDir.getAbsolutePath(), URI.create("spec0"));
FlowSpec flowSpec1 =
FlowCatalogTest.initFlowSpec(specDir.getAbsolutePath(), URI.create("spec1"));
FlowSpec flowSpec2 =
FlowCatalogTest.initFlowSpec(specDir.getAbsolutePath(), URI.create("spec2"));
@@ -239,11 +239,13 @@ public class GobblinServiceJobSchedulerTest {
Assert.assertEquals(flowCatalog.getSpecs().size(), 3);
Orchestrator mockOrchestrator = Mockito.mock(Orchestrator.class);
-
+ SchedulerService schedulerService = new SchedulerService(new Properties());
// Mock a GaaS scheduler.
TestGobblinServiceJobScheduler scheduler = new
TestGobblinServiceJobScheduler("testscheduler",
- ConfigFactory.empty(), Optional.of(flowCatalog), null,
mockOrchestrator, null);
+ ConfigFactory.empty(), Optional.of(flowCatalog), null,
mockOrchestrator, schedulerService );
+ schedulerService.startAsync().awaitRunning();
+ scheduler.startUp();
SpecCompiler mockCompiler = Mockito.mock(SpecCompiler.class);
Mockito.when(mockOrchestrator.getSpecCompiler()).thenReturn(mockCompiler);
Mockito.doAnswer((Answer<Void>) a -> {
@@ -258,8 +260,9 @@ public class GobblinServiceJobSchedulerTest {
@Override
public boolean apply(Void input) {
Map<String, Spec> scheduledFlowSpecs =
scheduler.scheduledFlowSpecs;
- if (scheduledFlowSpecs != null && scheduledFlowSpecs.size() == 2) {
- return scheduler.scheduledFlowSpecs.containsKey("spec1") &&
+ if (scheduledFlowSpecs != null && scheduledFlowSpecs.size() == 3) {
+ return scheduler.scheduledFlowSpecs.containsKey("spec0") &&
+ scheduler.scheduledFlowSpecs.containsKey("spec1") &&
scheduler.scheduledFlowSpecs.containsKey("spec2");
} else {
return false;
@@ -275,15 +278,22 @@ public class GobblinServiceJobSchedulerTest {
// ensure that orchestrator is not calling remove
Assert.assertFalse(invocation.getMethod().getName().equals("remove"));
}
+
+ Assert.assertEquals(scheduler.scheduledFlowSpecs.size(), 0);
+
Assert.assertEquals(schedulerService.getScheduler().getJobGroupNames().size(),
0);
}
class TestGobblinServiceJobScheduler extends GobblinServiceJobScheduler {
public boolean isCompilerHealthy = false;
+ private boolean hasScheduler = false;
public TestGobblinServiceJobScheduler(String serviceName, Config config,
Optional<FlowCatalog> flowCatalog, Optional<TopologyCatalog>
topologyCatalog, Orchestrator orchestrator,
SchedulerService schedulerService) throws Exception {
super(serviceName, config, Optional.absent(), flowCatalog,
topologyCatalog, orchestrator, schedulerService, Optional.absent());
+ if (schedulerService != null) {
+ hasScheduler = true;
+ }
}
/**
@@ -296,6 +306,13 @@ public class GobblinServiceJobSchedulerTest {
throw new RuntimeException("Could not compile flow");
}
super.scheduledFlowSpecs.put(addedSpec.getUri().toString(), addedSpec);
+ if (hasScheduler) {
+ try {
+ scheduleJob(((FlowSpec) addedSpec).getConfigAsProperties(), null);
+ } catch (JobException e) {
+ throw new RuntimeException(e);
+ }
+ }
// Check that compiler is healthy at time of scheduling flows
Assert.assertTrue(isCompilerHealthy);
return new AddSpecResponse(addedSpec.getDescription());