This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 03911c6 [GOBBLIN-1255] Wait for compiler to be healthy before
scheduling flows on startup
03911c6 is described below
commit 03911c65ddc23e4924bd4c201ca05023f025db0a
Author: Jack Moseley <[email protected]>
AuthorDate: Wed Sep 2 11:54:04 2020 -0700
[GOBBLIN-1255] Wait for compiler to be healthy before scheduling flows on
startup
Closes #3096 from jack-moseley/fix-race-condition
---
.../modules/scheduler/GobblinServiceJobScheduler.java | 6 ++++++
.../scheduler/GobblinServiceJobSchedulerTest.java | 19 ++++++++++++++++++-
2 files changed, 24 insertions(+), 1 deletion(-)
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
index fb17a87..7e644db 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
@@ -144,6 +144,12 @@ public class GobblinServiceJobScheduler extends
JobScheduler implements SpecCata
Thread scheduleSpec = new Thread(new Runnable() {
@Override
public void run() {
+ // Ensure compiler is healthy before attempting to schedule flows
+ try {
+
GobblinServiceJobScheduler.this.orchestrator.getSpecCompiler().awaitHealthy();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
scheduleSpecsFromCatalog();
}
});
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java
index d30de37..a67b14f 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java
@@ -33,10 +33,14 @@ import
org.apache.gobblin.runtime.spec_catalog.AddSpecResponse;
import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
import org.apache.gobblin.runtime.spec_catalog.TopologyCatalog;
import org.apache.gobblin.scheduler.SchedulerService;
+import org.apache.gobblin.service.modules.flow.SpecCompiler;
import org.apache.gobblin.service.modules.orchestration.Orchestrator;
import org.apache.gobblin.runtime.spec_catalog.FlowCatalogTest;
import org.apache.gobblin.testing.AssertWithBackoff;
import org.apache.gobblin.util.ConfigUtils;
+
+import org.mockito.Mockito;
+import org.mockito.stubbing.Answer;
import org.testng.Assert;
import org.testng.annotations.Test;
@@ -73,9 +77,18 @@ public class GobblinServiceJobSchedulerTest {
Assert.assertEquals(flowCatalog.getSpecs().size(), 2);
+ Orchestrator mockOrchestrator = Mockito.mock(Orchestrator.class);
+
// Mock a GaaS scheduler.
TestGobblinServiceJobScheduler scheduler = new
TestGobblinServiceJobScheduler("testscheduler",
- ConfigFactory.empty(), Optional.of(flowCatalog), null, null, null);
+ ConfigFactory.empty(), Optional.of(flowCatalog), null,
mockOrchestrator, null);
+
+ SpecCompiler mockCompiler = Mockito.mock(SpecCompiler.class);
+ Mockito.when(mockOrchestrator.getSpecCompiler()).thenReturn(mockCompiler);
+ Mockito.doAnswer((Answer<Void>) a -> {
+ scheduler.isCompilerHealthy = true;
+ return null;
+ }).when(mockCompiler).awaitHealthy();
scheduler.setActive(true);
@@ -117,6 +130,8 @@ public class GobblinServiceJobSchedulerTest {
}
class TestGobblinServiceJobScheduler extends GobblinServiceJobScheduler {
+ public boolean isCompilerHealthy = false;
+
public TestGobblinServiceJobScheduler(String serviceName, Config config,
Optional<FlowCatalog> flowCatalog, Optional<TopologyCatalog>
topologyCatalog, Orchestrator orchestrator,
SchedulerService schedulerService) throws Exception {
@@ -129,6 +144,8 @@ public class GobblinServiceJobSchedulerTest {
@Override
public AddSpecResponse onAddSpec(Spec addedSpec) {
super.scheduledFlowSpecs.put(addedSpec.getUri().toString(), addedSpec);
+ // Check that compiler is healthy at time of scheduling flows
+ Assert.assertTrue(isCompilerHealthy);
return new AddSpecResponse(addedSpec.getDescription());
}
}