Will-Lo commented on code in PR #3516:
URL: https://github.com/apache/gobblin/pull/3516#discussion_r889337991
##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java:
##########
@@ -283,14 +301,61 @@ public boolean apply(Void input) {
Assert.assertEquals(schedulerService.getScheduler().getJobGroupNames().size(),
0);
}
+ @Test
+ public void testJobSchedulerAddFlowQuotaExceeded() throws Exception {
+ File specDir = Files.createTempDir();
+
+ Properties properties = new Properties();
+ properties.setProperty(FLOWSPEC_STORE_DIR_KEY, specDir.getAbsolutePath());
+ FlowCatalog flowCatalog = new
FlowCatalog(ConfigUtils.propertiesToConfig(properties));
+ ServiceBasedAppLauncher serviceLauncher = new
ServiceBasedAppLauncher(properties, "GaaSJobSchedulerTest");
+
+
+ serviceLauncher.addService(flowCatalog);
+ serviceLauncher.start();
+
+ FlowSpec flowSpec0 =
FlowCatalogTest.initFlowSpec(specDir.getAbsolutePath(), URI.create("spec0"),
"flowName0", "group1",
+
ConfigFactory.empty().withValue(ConfigurationKeys.FLOW_RUN_IMMEDIATELY,
ConfigValueFactory.fromAnyRef("true")));
+ FlowSpec flowSpec1 =
FlowCatalogTest.initFlowSpec(specDir.getAbsolutePath(), URI.create("spec1"),
"flowName1", "group1",
+
ConfigFactory.empty().withValue(ConfigurationKeys.FLOW_RUN_IMMEDIATELY,
ConfigValueFactory.fromAnyRef("true")));
+
+ Orchestrator mockOrchestrator = Mockito.mock(Orchestrator.class);
+ SpecCompiler mockSpecCompiler = Mockito.mock(SpecCompiler.class);
+ when(mockOrchestrator.getSpecCompiler()).thenReturn(mockSpecCompiler);
+ Dag<JobExecutionPlan> mockDag0 = this.buildDag(flowSpec0.getConfig(), "0");
+ Dag<JobExecutionPlan> mockDag1 = this.buildDag(flowSpec1.getConfig(), "1");
+ when(mockSpecCompiler.compileFlow(flowSpec0)).thenReturn(mockDag0);
+ when(mockSpecCompiler.compileFlow(flowSpec1)).thenReturn(mockDag1);
+
+ SchedulerService schedulerService = new SchedulerService(new Properties());
+ // Mock a GaaS scheduler.
+ GobblinServiceJobScheduler scheduler = new
GobblinServiceJobScheduler("testscheduler",
+ ConfigFactory.empty(), Optional.absent(), Optional.of(flowCatalog),
null, mockOrchestrator, schedulerService, Optional.of(new
UserQuotaManager(quotaConfig)), Optional.absent());
+
+ schedulerService.startAsync().awaitRunning();
+ scheduler.startUp();
+ scheduler.setActive(true);
+
+ scheduler.onAddSpec(flowSpec0); //Ignore the response for this request
+ AddSpecResponse<String> response1 = scheduler.onAddSpec(flowSpec1);
+ Assert.assertEquals(scheduler.scheduledFlowSpecs.size(), 1);
+
+ Assert.assertEquals(response1.getValue(),
"org.apache.gobblin.exception.QuotaExceededException: Quota exceeded for
flowgroup group1 on executor jobExecutor : quota=1, requests above quota=1\n");
+ // Second flow should not be added to scheduled flows since it was rejected
+ Assert.assertEquals(scheduler.scheduledFlowSpecs.size(), 1);
+ // set scheduler to be inactive and unschedule flows
+ scheduler.setActive(false);
+ Assert.assertEquals(scheduler.scheduledFlowSpecs.size(), 0);
Review Comment:
yeah it's cleanup
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]