This is an automated email from the ASF dual-hosted git repository.

aplex pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 8a4cda4  [GOBBLIN-1507] Prevent orchestrator removals when 
unscheduling flows (#3353)
8a4cda4 is described below

commit 8a4cda47aaf954041c82e43b6a10f389d1e4a4db
Author: William Lo <[email protected]>
AuthorDate: Mon Aug 9 18:00:21 2021 -0700

    [GOBBLIN-1507] Prevent orchestrator removals when unscheduling flows (#3353)
    
    When demoted from leader, GaaS removes all flows from orchestrator. We need 
to ensure that fail over events are not disruptive or destructive to currently 
running flows, we only want the new leader node to handle scheduling 
responsibilities from that point forward.
---
 .../scheduler/GobblinServiceJobScheduler.java      | 72 ++++++++++++---------
 .../gobblin/service/GobblinServiceManagerTest.java |  2 +-
 .../scheduler/GobblinServiceJobSchedulerTest.java  | 75 ++++++++++++++++++++++
 3 files changed, 118 insertions(+), 31 deletions(-)

diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
index d9b778e..8f6327e 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
@@ -17,6 +17,11 @@
 
 package org.apache.gobblin.service.modules.scheduler;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.collect.Maps;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
 import java.io.IOException;
 import java.net.URI;
 import java.util.ArrayList;
@@ -24,34 +29,16 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.gobblin.metrics.ContextAwareMeter;
-import org.apache.helix.HelixManager;
-import org.quartz.DisallowConcurrentExecution;
-import org.quartz.InterruptableJob;
-import org.quartz.JobDataMap;
-import org.quartz.JobExecutionContext;
-import org.quartz.JobExecutionException;
-import org.quartz.UnableToInterruptJobException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Optional;
-import com.google.common.collect.Maps;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-
 import javax.inject.Inject;
 import javax.inject.Named;
 import javax.inject.Singleton;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
-
+import org.apache.commons.lang.StringUtils;
 import org.apache.gobblin.annotation.Alpha;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.ContextAwareMeter;
 import org.apache.gobblin.metrics.MetricContext;
 import org.apache.gobblin.metrics.ServiceMetricNames;
 import org.apache.gobblin.runtime.JobException;
@@ -74,6 +61,15 @@ import 
org.apache.gobblin.service.modules.utils.InjectionNames;
 import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
 import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.PropertiesUtils;
+import org.apache.helix.HelixManager;
+import org.quartz.DisallowConcurrentExecution;
+import org.quartz.InterruptableJob;
+import org.quartz.JobDataMap;
+import org.quartz.JobExecutionContext;
+import org.quartz.JobExecutionException;
+import org.quartz.UnableToInterruptJobException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import static 
org.apache.gobblin.service.ServiceConfigKeys.GOBBLIN_SERVICE_PREFIX;
 
@@ -176,7 +172,11 @@ public class GobblinServiceJobScheduler extends 
JobScheduler implements SpecCata
       // Since we are going to change status to isActive=false, unschedule all 
flows
       List<Spec> specs = new ArrayList<>(this.scheduledFlowSpecs.values());
       for (Spec spec : specs) {
-        onDeleteSpec(spec.getUri(), spec.getVersion());
+        try {
+          unscheduleSpec(spec.getUri(), spec.getVersion());
+        } catch (JobException e) {
+          _log.warn(String.format("Spec with URI: %s was not unscheduled 
during shutdown", spec.getUri()), e);
+        }
       }
       // Need to set active=false at the end; otherwise in the onDeleteSpec(), 
node will forward specs to active node, which is itself.
       this.isActive = isActive;
@@ -347,6 +347,25 @@ public class GobblinServiceJobScheduler extends 
JobScheduler implements SpecCata
     return new AddSpecResponse<>(response);
   }
 
+  /**
+   * Remove a flowSpec from schedule due to another leader being elected
+   * Unlike onDeleteSpec, we want to avoid deleting the flowSpec on the 
executor
+   * and we still want to unschedule if we cannot connect to zookeeper as the 
current node cannot be the master
+   * @param specURI
+   * @param specVersion
+   */
+  private void unscheduleSpec(URI specURI, String specVersion) throws 
JobException {
+    if (this.scheduledFlowSpecs.containsKey(specURI.toString())) {
+      _log.info("Unscheduling flowSpec " + specURI + "/" + specVersion);
+      this.scheduledFlowSpecs.remove(specURI.toString());
+      unscheduleJob(specURI.toString());
+    } else {
+      throw new JobException(String.format(
+          "Spec with URI: %s was not found in cache. May be it was cleaned, if 
not please clean it manually",
+          specURI));
+    }
+  }
+
   public void onDeleteSpec(URI deletedSpecURI, String deletedSpecVersion) {
     onDeleteSpec(deletedSpecURI, deletedSpecVersion, new Properties());
   }
@@ -364,15 +383,8 @@ public class GobblinServiceJobScheduler extends 
JobScheduler implements SpecCata
 
     try {
       Spec deletedSpec = 
this.scheduledFlowSpecs.get(deletedSpecURI.toString());
-      if (null != deletedSpec) {
-        this.orchestrator.remove(deletedSpec, headers);
-        this.scheduledFlowSpecs.remove(deletedSpecURI.toString());
-        unscheduleJob(deletedSpecURI.toString());
-      } else {
-        _log.warn(String.format(
-            "Spec with URI: %s was not found in cache. May be it was cleaned, 
if not please clean it manually",
-            deletedSpecURI));
-      }
+      unscheduleSpec(deletedSpecURI, deletedSpecVersion);
+      this.orchestrator.remove(deletedSpec, headers);
     } catch (JobException | IOException e) {
       _log.warn(String.format("Spec with URI: %s was not unscheduled 
cleaning", deletedSpecURI), e);
     }
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java
index ef30050..82eb270 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java
@@ -476,7 +476,7 @@ null, null, null, null);
     FlowId flowId = new 
FlowId().setFlowGroup(TEST_DUMMY_GROUP_NAME).setFlowName(TEST_DUMMY_FLOW_NAME);
 
     try {
-      this.flowConfigClient.getFlowConfig(flowId);
+      this.flowConfigClient.deleteFlowConfig(flowId);
     } catch (RestLiResponseException e) {
       Assert.assertEquals(e.getStatus(), HttpStatus.NOT_FOUND_404);
       return;
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java
index cc84632..d2085a5 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java
@@ -23,6 +23,7 @@ import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 import java.io.File;
 import java.net.URI;
+import java.util.Collection;
 import java.util.Map;
 import java.util.Properties;
 import org.apache.gobblin.configuration.ConfigurationKeys;
@@ -43,6 +44,7 @@ import org.apache.gobblin.testing.AssertWithBackoff;
 import org.apache.gobblin.util.ConfigUtils;
 
 import org.mockito.Mockito;
+import org.mockito.invocation.Invocation;
 import org.mockito.stubbing.Answer;
 import org.testng.Assert;
 import org.testng.annotations.Test;
@@ -202,6 +204,79 @@ public class GobblinServiceJobSchedulerTest {
         }, "Waiting all flowSpecs to be scheduled");
   }
 
+  /**
+   * Test that flowSpecs that throw compilation errors do not block the 
scheduling of other flowSpecs
+   */
+  @Test
+  public void testJobSchedulerUnschedule() throws Exception {
+    // Mock a FlowCatalog.
+    File specDir = Files.createTempDir();
+
+    Properties properties = new Properties();
+    properties.setProperty(FLOWSPEC_STORE_DIR_KEY, specDir.getAbsolutePath());
+    FlowCatalog flowCatalog = new 
FlowCatalog(ConfigUtils.propertiesToConfig(properties));
+    ServiceBasedAppLauncher serviceLauncher = new 
ServiceBasedAppLauncher(properties, "GaaSJobSchedulerTest");
+
+    // Assume that the catalog can store corrupted flows
+    SpecCatalogListener mockListener = Mockito.mock(SpecCatalogListener.class);
+    
when(mockListener.getName()).thenReturn(ServiceConfigKeys.GOBBLIN_SERVICE_JOB_SCHEDULER_LISTENER_CLASS);
+    when(mockListener.onAddSpec(any())).thenReturn(new AddSpecResponse(""));
+    flowCatalog.addListener(mockListener);
+
+    serviceLauncher.addService(flowCatalog);
+    serviceLauncher.start();
+
+    FlowSpec flowSpec0 = 
FlowCatalogTest.initFlowSpec(specDir.getAbsolutePath(), URI.create("spec0"),
+        MockedSpecCompiler.UNCOMPILABLE_FLOW);
+    FlowSpec flowSpec1 = 
FlowCatalogTest.initFlowSpec(specDir.getAbsolutePath(), URI.create("spec1"));
+    FlowSpec flowSpec2 = 
FlowCatalogTest.initFlowSpec(specDir.getAbsolutePath(), URI.create("spec2"));
+
+    // Ensure that these flows are scheduled
+    flowCatalog.put(flowSpec0, true);
+    flowCatalog.put(flowSpec1, true);
+    flowCatalog.put(flowSpec2, true);
+
+    Assert.assertEquals(flowCatalog.getSpecs().size(), 3);
+
+    Orchestrator mockOrchestrator = Mockito.mock(Orchestrator.class);
+
+    // Mock a GaaS scheduler.
+    TestGobblinServiceJobScheduler scheduler = new 
TestGobblinServiceJobScheduler("testscheduler",
+        ConfigFactory.empty(), Optional.of(flowCatalog), null, 
mockOrchestrator, null);
+
+    SpecCompiler mockCompiler = Mockito.mock(SpecCompiler.class);
+    Mockito.when(mockOrchestrator.getSpecCompiler()).thenReturn(mockCompiler);
+    Mockito.doAnswer((Answer<Void>) a -> {
+      scheduler.isCompilerHealthy = true;
+      return null;
+    }).when(mockCompiler).awaitHealthy();
+
+    scheduler.setActive(true);
+
+    
AssertWithBackoff.create().timeoutMs(20000).maxSleepMs(2000).backoffFactor(2)
+        .assertTrue(new Predicate<Void>() {
+          @Override
+          public boolean apply(Void input) {
+            Map<String, Spec> scheduledFlowSpecs = 
scheduler.scheduledFlowSpecs;
+            if (scheduledFlowSpecs != null && scheduledFlowSpecs.size() == 2) {
+              return scheduler.scheduledFlowSpecs.containsKey("spec1") &&
+                  scheduler.scheduledFlowSpecs.containsKey("spec2");
+            } else {
+              return false;
+            }
+          }
+        }, "Waiting all flowSpecs to be scheduled");
+
+    // set scheduler to be inactive and unschedule flows
+    scheduler.setActive(false);
+    Collection<Invocation> invocations = 
Mockito.mockingDetails(mockOrchestrator).getInvocations();
+
+    for (Invocation invocation: invocations) {
+      // ensure that orchestrator is not calling remove
+      Assert.assertFalse(invocation.getMethod().getName().equals("remove"));
+    }
+  }
+
   class TestGobblinServiceJobScheduler extends GobblinServiceJobScheduler {
     public boolean isCompilerHealthy = false;
 

Reply via email to