aplex commented on a change in pull request #3273:
URL: https://github.com/apache/gobblin/pull/3273#discussion_r625307907
##########
File path:
gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java
##########
@@ -129,6 +130,65 @@ public void testDisableFlowRunImmediatelyOnStart()
Assert.assertEquals(modifiedConfig.getString(ConfigurationKeys.JOB_NAME_KEY),
TEST_FLOW_NAME);
}
+ /**
+ * Test that flowSpecs that throw compilation errors do not block the
scheduling of other flowSpecs
+ */
+ @Test
+ public void testJobSchedulerInitWithFailedSpec() throws Exception {
+ // Mock a FlowCatalog.
+ File specDir = Files.createTempDir();
+
+ Properties properties = new Properties();
+ properties.setProperty(FLOWSPEC_STORE_DIR_KEY, specDir.getAbsolutePath());
+ FlowCatalog flowCatalog = new
FlowCatalog(ConfigUtils.propertiesToConfig(properties));
+ ServiceBasedAppLauncher serviceLauncher = new
ServiceBasedAppLauncher(properties, "GaaSJobSchedulerTest");
+
+ serviceLauncher.addService(flowCatalog);
+ serviceLauncher.start();
+
+ FlowSpec flowSpec0 =
FlowCatalogTest.initFlowSpec(specDir.getAbsolutePath(), URI.create("spec0"),
+ MockedSpecCompiler.UNCOMPILABLE_FLOW);
+ FlowSpec flowSpec1 =
FlowCatalogTest.initFlowSpec(specDir.getAbsolutePath(), URI.create("spec1"));
+ FlowSpec flowSpec2 =
FlowCatalogTest.initFlowSpec(specDir.getAbsolutePath(), URI.create("spec2"));
+
+ // Assume that the catalog can store corrupted flows
+ flowCatalog.put(flowSpec0, false);
+ // Ensure that these flows are scheduled
+ flowCatalog.put(flowSpec1, false);
+ flowCatalog.put(flowSpec2, false);
+
+ Assert.assertEquals(flowCatalog.getSpecs().size(), 3);
+
+ Orchestrator mockOrchestrator = Mockito.mock(Orchestrator.class);
+
+ // Mock a GaaS scheduler.
+ TestGobblinServiceJobScheduler scheduler = new
TestGobblinServiceJobScheduler("testscheduler",
+ ConfigFactory.empty(), Optional.of(flowCatalog), null,
mockOrchestrator, null);
+
+ SpecCompiler mockCompiler = Mockito.mock(SpecCompiler.class);
+ Mockito.when(mockOrchestrator.getSpecCompiler()).thenReturn(mockCompiler);
+ Mockito.doAnswer((Answer<Void>) a -> {
+ scheduler.isCompilerHealthy = true;
+ return null;
+ }).when(mockCompiler).awaitHealthy();
+
+ scheduler.setActive(true);
+
+
AssertWithBackoff.create().timeoutMs(6000).maxSleepMs(2000).backoffFactor(2)
Review comment:
Timed waits in tests can introduce flakiness. Usually this happens when
dozens of tests run concurrently on an overloaded CI machine, and even basic
operation take way longer than developer expects.
The best way to address this it to have some kind of notification/event from
a class, and react to it. It will have no delays, and can also tolerate the
situation when it takes a long time for operation to happen.
If it's difficult or unclear how to do that in this case, I suggest to bump
the timeout to 15-30 seconds.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]