arjun4084346 commented on a change in pull request #3004: URL: https://github.com/apache/incubator-gobblin/pull/3004#discussion_r432634755
########## File path: gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java ########## @@ -181,19 +192,107 @@ public void cleanUp() throws Exception { } @Test - public void testCreate() throws Exception { + public void testRestart() throws Exception { + Map<String, String> flowProperties = Maps.newHashMap(); + flowProperties.put("param1", "value1"); + flowProperties.put(ServiceConfigKeys.FLOW_SOURCE_IDENTIFIER_KEY, TEST_SOURCE_NAME); + flowProperties.put(ServiceConfigKeys.FLOW_DESTINATION_IDENTIFIER_KEY, TEST_SINK_NAME); + FlowConfig flowConfig = new FlowConfig().setId(new FlowId().setFlowGroup(TEST_GROUP_NAME).setFlowName(MockedSpecCompiler.UNCOMPILABLE_FLOW)) + .setTemplateUris(TEST_TEMPLATE_URI).setSchedule(new Schedule().setCronSchedule(TEST_SCHEDULE).setRunImmediately(false)) + .setProperties(new StringMap(flowProperties)); + FlowSpec spec = FlowConfigResourceLocalHandler.createFlowSpecForConfig(flowConfig); + + this.gobblinServiceManager.getFlowCatalog().getSpecStore().addSpec(spec); + + serviceReboot(); + + List<Spec> specs = (List<Spec>) this.gobblinServiceManager.getFlowCatalog().getSpecs(); + Assert.assertEquals(specs.size(), 1, "Flow that was created is not " + "reflecting in FlowCatalog"); + Assert.assertEquals(specs.get(0).getUri(), spec.getUri()); + Assert.assertFalse(flowConfig.getSchedule().isRunImmediately()); + + // clean it + this.gobblinServiceManager.getFlowCatalog().remove(spec.getUri()); + } + + @Test (dependsOnMethods = "testRestart") + public void testUncompilableJob() throws Exception { + Map<String, String> flowProperties = Maps.newHashMap(); + flowProperties.put("param1", "value1"); + flowProperties.put(ServiceConfigKeys.FLOW_SOURCE_IDENTIFIER_KEY, TEST_SOURCE_NAME); + flowProperties.put(ServiceConfigKeys.FLOW_DESTINATION_IDENTIFIER_KEY, TEST_SINK_NAME); + FlowId flowId = new FlowId().setFlowGroup(TEST_GROUP_NAME).setFlowName(MockedSpecCompiler.UNCOMPILABLE_FLOW); + URI uri = FlowConfigResourceLocalHandler.FlowUriUtils.createFlowSpecUri(flowId); + FlowConfig flowConfig = new FlowConfig().setId(flowId) + .setTemplateUris(TEST_TEMPLATE_URI).setProperties(new StringMap(flowProperties)); + + RestLiResponseException exception = null; + try { + this.flowConfigClient.createFlowConfig(flowConfig); + } catch (RestLiResponseException e) { + exception = e; + } + Assert.assertEquals(exception.getStatus(), HttpStatus.BAD_REQUEST_400); + // uncompilable job should not be persisted + Assert.assertEquals(this.gobblinServiceManager.getFlowCatalog().getSpecs().size(), 0, + "Flow that was created is not " + "reflecting in FlowCatalog"); + Assert.assertFalse(this.gobblinServiceManager.getScheduler().getScheduledFlowSpecs().containsKey(uri.toString())); + } + + @Test (dependsOnMethods = "testUncompilableJob") + public void testRunOnceJob() throws Exception { + Map<String, String> flowProperties = Maps.newHashMap(); + flowProperties.put("param1", "value1"); + flowProperties.put(ServiceConfigKeys.FLOW_SOURCE_IDENTIFIER_KEY, TEST_SOURCE_NAME); + flowProperties.put(ServiceConfigKeys.FLOW_DESTINATION_IDENTIFIER_KEY, TEST_SINK_NAME); + + FlowConfig flowConfig = new FlowConfig().setId(TEST_FLOW_ID) + .setTemplateUris(TEST_TEMPLATE_URI).setProperties(new StringMap(flowProperties)); + + this.flowConfigClient.createFlowConfig(flowConfig); + + // runOnce job should not be persisted Review comment: the runOnce job's existence is difficult to check because it can instantly be deleted. hence i am checking that it eventually get deleted. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org