This is an automated email from the ASF dual-hosted git repository.

aplex pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new d9d2222  [GOBBLIN-1529] Clear all jobs quickly when demoting a leader 
node (#3380)
d9d2222 is described below

commit d9d22225ff34d534322e1c33675aa3d34f995969
Author: William Lo <[email protected]>
AuthorDate: Wed Sep 1 16:57:16 2021 -0700

    [GOBBLIN-1529] Clear all jobs quickly when demoting a leader node (#3380)
    
    In scenarios where there are a large number of scheduled jobs, it may take 
too long (measured in hours) to fully demote the past leader to unschedule all 
of its jobs incrementally. This can cause duplicate flow executions and 
inconsistent behavior. Instead we should bulk delete all schedules.
    
    In the future we can investigate an option where past leaders keep their 
schedules in memory and periodically synchronize their in-memory schedulers 
from MySQL and prevent them from actually executing or doing work, but there is 
no simple way of doing this right now as each scheduler would also have to 
account for updates/deletes to other hosts.
---
 .../org/apache/gobblin/scheduler/JobScheduler.java |  4 +++
 .../runtime/spec_catalog/FlowCatalogTest.java      |  3 +++
 .../scheduler/GobblinServiceJobScheduler.java      | 19 +++++++-------
 .../scheduler/GobblinServiceJobSchedulerTest.java  | 29 +++++++++++++++++-----
 4 files changed, 39 insertions(+), 16 deletions(-)

diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/scheduler/JobScheduler.java 
b/gobblin-runtime/src/main/java/org/apache/gobblin/scheduler/JobScheduler.java
index ada9bdd..1ae73e9 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/scheduler/JobScheduler.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/scheduler/JobScheduler.java
@@ -425,6 +425,10 @@ public class JobScheduler extends AbstractIdleService {
     }
   }
 
+  public void unscheduleAllJobs() throws SchedulerException {
+    this.scheduler.getScheduler().clear();
+  }
+
   /**
    * Run a job.
    *
diff --git 
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalogTest.java
 
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalogTest.java
index 9147b7f..319a0f3 100644
--- 
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalogTest.java
+++ 
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalogTest.java
@@ -111,8 +111,11 @@ public class FlowCatalogTest {
   public static FlowSpec initFlowSpec(String specStore, URI uri, String 
flowName){
     Properties properties = new Properties();
     properties.put(ConfigurationKeys.FLOW_NAME_KEY, flowName);
+    properties.put("job.name", flowName);
+    properties.put("job.group", flowName);
     properties.put("specStore.fs.dir", specStore);
     properties.put("specExecInstance.capabilities", "source:destination");
+    properties.put("job.schedule", "0 0 0 ? * * 2050");
     Config config = ConfigUtils.propertiesToConfig(properties);
 
     SpecExecutor specExecutorInstanceProducer = new 
InMemorySpecExecutor(config);
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
index 8f6327e..ffd2f15 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
@@ -24,9 +24,7 @@ import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 import java.io.IOException;
 import java.net.URI;
-import java.util.ArrayList;
 import java.util.Iterator;
-import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import javax.inject.Inject;
@@ -67,6 +65,7 @@ import org.quartz.InterruptableJob;
 import org.quartz.JobDataMap;
 import org.quartz.JobExecutionContext;
 import org.quartz.JobExecutionException;
+import org.quartz.SchedulerException;
 import org.quartz.UnableToInterruptJobException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -170,13 +169,13 @@ public class GobblinServiceJobScheduler extends 
JobScheduler implements SpecCata
       }
     } else {
       // Since we are going to change status to isActive=false, unschedule all 
flows
-      List<Spec> specs = new ArrayList<>(this.scheduledFlowSpecs.values());
-      for (Spec spec : specs) {
-        try {
-          unscheduleSpec(spec.getUri(), spec.getVersion());
-        } catch (JobException e) {
-          _log.warn(String.format("Spec with URI: %s was not unscheduled 
during shutdown", spec.getUri()), e);
-        }
+      try {
+        this.scheduledFlowSpecs.clear();
+        unscheduleAllJobs();
+      } catch (SchedulerException e) {
+        _log.error(String.format("Not all jobs were unscheduled"), e);
+        // We want to avoid duplicate flow execution, so fail loudly
+        throw new RuntimeException(e);
       }
       // Need to set active=false at the end; otherwise in the onDeleteSpec(), 
node will forward specs to active node, which is itself.
       this.isActive = isActive;
@@ -348,7 +347,7 @@ public class GobblinServiceJobScheduler extends 
JobScheduler implements SpecCata
   }
 
   /**
-   * Remove a flowSpec from schedule due to another leader being elected
+   * Remove a flowSpec from schedule
    * Unlike onDeleteSpec, we want to avoid deleting the flowSpec on the 
executor
    * and we still want to unschedule if we cannot connect to zookeeper as the 
current node cannot be the master
    * @param specURI
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java
index d2085a5..a932397 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java
@@ -27,6 +27,7 @@ import java.util.Collection;
 import java.util.Map;
 import java.util.Properties;
 import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.runtime.JobException;
 import org.apache.gobblin.runtime.api.FlowSpec;
 import org.apache.gobblin.runtime.api.Spec;
 import org.apache.gobblin.runtime.api.SpecCatalogListener;
@@ -226,8 +227,7 @@ public class GobblinServiceJobSchedulerTest {
     serviceLauncher.addService(flowCatalog);
     serviceLauncher.start();
 
-    FlowSpec flowSpec0 = 
FlowCatalogTest.initFlowSpec(specDir.getAbsolutePath(), URI.create("spec0"),
-        MockedSpecCompiler.UNCOMPILABLE_FLOW);
+    FlowSpec flowSpec0 = 
FlowCatalogTest.initFlowSpec(specDir.getAbsolutePath(), URI.create("spec0"));
     FlowSpec flowSpec1 = 
FlowCatalogTest.initFlowSpec(specDir.getAbsolutePath(), URI.create("spec1"));
     FlowSpec flowSpec2 = 
FlowCatalogTest.initFlowSpec(specDir.getAbsolutePath(), URI.create("spec2"));
 
@@ -239,11 +239,13 @@ public class GobblinServiceJobSchedulerTest {
     Assert.assertEquals(flowCatalog.getSpecs().size(), 3);
 
     Orchestrator mockOrchestrator = Mockito.mock(Orchestrator.class);
-
+    SchedulerService schedulerService = new SchedulerService(new Properties());
     // Mock a GaaS scheduler.
     TestGobblinServiceJobScheduler scheduler = new 
TestGobblinServiceJobScheduler("testscheduler",
-        ConfigFactory.empty(), Optional.of(flowCatalog), null, 
mockOrchestrator, null);
+        ConfigFactory.empty(), Optional.of(flowCatalog), null, 
mockOrchestrator, schedulerService );
 
+    schedulerService.startAsync().awaitRunning();
+    scheduler.startUp();
     SpecCompiler mockCompiler = Mockito.mock(SpecCompiler.class);
     Mockito.when(mockOrchestrator.getSpecCompiler()).thenReturn(mockCompiler);
     Mockito.doAnswer((Answer<Void>) a -> {
@@ -258,8 +260,9 @@ public class GobblinServiceJobSchedulerTest {
           @Override
           public boolean apply(Void input) {
             Map<String, Spec> scheduledFlowSpecs = 
scheduler.scheduledFlowSpecs;
-            if (scheduledFlowSpecs != null && scheduledFlowSpecs.size() == 2) {
-              return scheduler.scheduledFlowSpecs.containsKey("spec1") &&
+            if (scheduledFlowSpecs != null && scheduledFlowSpecs.size() == 3) {
+              return scheduler.scheduledFlowSpecs.containsKey("spec0") &&
+                  scheduler.scheduledFlowSpecs.containsKey("spec1") &&
                   scheduler.scheduledFlowSpecs.containsKey("spec2");
             } else {
               return false;
@@ -275,15 +278,22 @@ public class GobblinServiceJobSchedulerTest {
       // ensure that orchestrator is not calling remove
       Assert.assertFalse(invocation.getMethod().getName().equals("remove"));
     }
+
+    Assert.assertEquals(scheduler.scheduledFlowSpecs.size(), 0);
+    
Assert.assertEquals(schedulerService.getScheduler().getJobGroupNames().size(), 
0);
   }
 
   class TestGobblinServiceJobScheduler extends GobblinServiceJobScheduler {
     public boolean isCompilerHealthy = false;
+    private boolean hasScheduler = false;
 
     public TestGobblinServiceJobScheduler(String serviceName, Config config,
         Optional<FlowCatalog> flowCatalog, Optional<TopologyCatalog> 
topologyCatalog, Orchestrator orchestrator,
         SchedulerService schedulerService) throws Exception {
       super(serviceName, config, Optional.absent(), flowCatalog, 
topologyCatalog, orchestrator, schedulerService, Optional.absent());
+      if (schedulerService != null) {
+        hasScheduler = true;
+      }
     }
 
     /**
@@ -296,6 +306,13 @@ public class GobblinServiceJobSchedulerTest {
         throw new RuntimeException("Could not compile flow");
       }
       super.scheduledFlowSpecs.put(addedSpec.getUri().toString(), addedSpec);
+      if (hasScheduler) {
+        try {
+          scheduleJob(((FlowSpec) addedSpec).getConfigAsProperties(), null);
+        } catch (JobException e) {
+          throw new RuntimeException(e);
+        }
+      }
       // Check that compiler is healthy at time of scheduling flows
       Assert.assertTrue(isCompilerHealthy);
       return new AddSpecResponse(addedSpec.getDescription());

Reply via email to