This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 03911c6  [GOBBLIN-1255] Wait for compiler to be healthy before 
scheduling flows on startup
03911c6 is described below

commit 03911c65ddc23e4924bd4c201ca05023f025db0a
Author: Jack Moseley <[email protected]>
AuthorDate: Wed Sep 2 11:54:04 2020 -0700

    [GOBBLIN-1255] Wait for compiler to be healthy before scheduling flows on 
startup
    
    Closes #3096 from jack-moseley/fix-race-condition
---
 .../modules/scheduler/GobblinServiceJobScheduler.java |  6 ++++++
 .../scheduler/GobblinServiceJobSchedulerTest.java     | 19 ++++++++++++++++++-
 2 files changed, 24 insertions(+), 1 deletion(-)

diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
index fb17a87..7e644db 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
@@ -144,6 +144,12 @@ public class GobblinServiceJobScheduler extends 
JobScheduler implements SpecCata
         Thread scheduleSpec = new Thread(new Runnable() {
           @Override
           public void run() {
+            // Ensure compiler is healthy before attempting to schedule flows
+            try {
+              
GobblinServiceJobScheduler.this.orchestrator.getSpecCompiler().awaitHealthy();
+            } catch (InterruptedException e) {
+              throw new RuntimeException(e);
+            }
             scheduleSpecsFromCatalog();
           }
         });
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java
index d30de37..a67b14f 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java
@@ -33,10 +33,14 @@ import 
org.apache.gobblin.runtime.spec_catalog.AddSpecResponse;
 import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
 import org.apache.gobblin.runtime.spec_catalog.TopologyCatalog;
 import org.apache.gobblin.scheduler.SchedulerService;
+import org.apache.gobblin.service.modules.flow.SpecCompiler;
 import org.apache.gobblin.service.modules.orchestration.Orchestrator;
 import org.apache.gobblin.runtime.spec_catalog.FlowCatalogTest;
 import org.apache.gobblin.testing.AssertWithBackoff;
 import org.apache.gobblin.util.ConfigUtils;
+
+import org.mockito.Mockito;
+import org.mockito.stubbing.Answer;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
@@ -73,9 +77,18 @@ public class GobblinServiceJobSchedulerTest {
 
     Assert.assertEquals(flowCatalog.getSpecs().size(), 2);
 
+    Orchestrator mockOrchestrator = Mockito.mock(Orchestrator.class);
+
     // Mock a GaaS scheduler.
     TestGobblinServiceJobScheduler scheduler = new 
TestGobblinServiceJobScheduler("testscheduler",
-        ConfigFactory.empty(), Optional.of(flowCatalog), null, null, null);
+        ConfigFactory.empty(), Optional.of(flowCatalog), null, 
mockOrchestrator, null);
+
+    SpecCompiler mockCompiler = Mockito.mock(SpecCompiler.class);
+    Mockito.when(mockOrchestrator.getSpecCompiler()).thenReturn(mockCompiler);
+    Mockito.doAnswer((Answer<Void>) a -> {
+      scheduler.isCompilerHealthy = true;
+      return null;
+    }).when(mockCompiler).awaitHealthy();
 
     scheduler.setActive(true);
 
@@ -117,6 +130,8 @@ public class GobblinServiceJobSchedulerTest {
   }
 
   class TestGobblinServiceJobScheduler extends GobblinServiceJobScheduler {
+    public boolean isCompilerHealthy = false;
+
     public TestGobblinServiceJobScheduler(String serviceName, Config config,
         Optional<FlowCatalog> flowCatalog, Optional<TopologyCatalog> 
topologyCatalog, Orchestrator orchestrator,
         SchedulerService schedulerService) throws Exception {
@@ -129,6 +144,8 @@ public class GobblinServiceJobSchedulerTest {
     @Override
     public AddSpecResponse onAddSpec(Spec addedSpec) {
       super.scheduledFlowSpecs.put(addedSpec.getUri().toString(), addedSpec);
+      // Check that compiler is healthy at time of scheduling flows
+      Assert.assertTrue(isCompilerHealthy);
       return new AddSpecResponse(addedSpec.getDescription());
     }
   }

Reply via email to