[ https://issues.apache.org/jira/browse/GOBBLIN-1160?focusedWorklogId=438950&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-438950 ]
ASF GitHub Bot logged work on GOBBLIN-1160: ------------------------------------------- Author: ASF GitHub Bot Created on: 29/May/20 21:50 Start Date: 29/May/20 21:50 Worklog Time Spent: 10m Work Description: sv2000 commented on a change in pull request #3011: URL: https://github.com/apache/incubator-gobblin/pull/3011#discussion_r432755115 ########## File path: gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java ########## @@ -180,50 +199,137 @@ public void cleanUp() throws Exception { } } + /** + * To test an existing flow in a spec store does not get deleted just because it is not compilable during service restarts + */ @Test - public void testCreate() throws Exception { - Map<String, String> flowProperties = Maps.newHashMap(); - flowProperties.put("param1", "value1"); - flowProperties.put(ServiceConfigKeys.FLOW_SOURCE_IDENTIFIER_KEY, TEST_SOURCE_NAME); - flowProperties.put(ServiceConfigKeys.FLOW_DESTINATION_IDENTIFIER_KEY, TEST_SINK_NAME); + public void testRestart() throws Exception { + FlowConfig flowConfig = new FlowConfig().setId(UNCOMPILABLE_FLOW_ID).setTemplateUris(TEST_TEMPLATE_URI) + .setSchedule(new Schedule().setCronSchedule(TEST_SCHEDULE).setRunImmediately(false)) + .setProperties(new StringMap(flowProperties)); + FlowSpec spec = FlowConfigResourceLocalHandler.createFlowSpecForConfig(flowConfig); + FlowConfig runOnceFlowConfig = new FlowConfig().setId(TEST_FLOW_ID) + .setTemplateUris(TEST_TEMPLATE_URI).setProperties(new StringMap(flowProperties)); + FlowSpec runOnceSpec = FlowConfigResourceLocalHandler.createFlowSpecForConfig(runOnceFlowConfig); + + // add the non compilable flow directly to the spec store skipping flow catalog which would not allow this + this.gobblinServiceManager.getFlowCatalog().getSpecStore().addSpec(spec); + this.gobblinServiceManager.getFlowCatalog().getSpecStore().addSpec(runOnceSpec); + + List<Spec> specs = (List<Spec>) this.gobblinServiceManager.getFlowCatalog().getSpecs(); + + Assert.assertEquals(specs.size(), 2); + if (specs.get(0).getUri().equals(spec.getUri())) { + Assert.assertEquals(specs.get(1).getUri(), runOnceSpec.getUri()); + } else if (specs.get(0).getUri().equals(runOnceSpec.getUri())) { + Assert.assertEquals(specs.get(1).getUri(), spec.getUri()); + } else { + Assert.fail(); + } + + // restart the service + serviceReboot(); + + // runOnce job should get deleted from the spec store after running and uncompilable job should stay + AssertWithBackoff.create().maxSleepMs(200L).timeoutMs(20000L).backoffFactor(1) + .assertTrue(input -> this.gobblinServiceManager.getFlowCatalog().getSpecs().size() == 1, + "Waiting for the runOnce job to finish"); + + specs = (List<Spec>) this.gobblinServiceManager.getFlowCatalog().getSpecs(); + Assert.assertEquals(specs.get(0).getUri(), spec.getUri()); + Assert.assertFalse(flowConfig.getSchedule().isRunImmediately()); + + // clean it + this.gobblinServiceManager.getFlowCatalog().remove(spec.getUri()); + specs = (List<Spec>) this.gobblinServiceManager.getFlowCatalog().getSpecs(); + Assert.assertEquals(specs.size(), 0); + } + + @Test (dependsOnMethods = "testRestart") + public void testUncompilableJob() throws Exception { + FlowId flowId = new FlowId().setFlowGroup(TEST_GROUP_NAME).setFlowName(MockedSpecCompiler.UNCOMPILABLE_FLOW); + URI uri = FlowConfigResourceLocalHandler.FlowUriUtils.createFlowSpecUri(flowId); + FlowConfig flowConfig = new FlowConfig().setId(flowId) + .setTemplateUris(TEST_TEMPLATE_URI).setProperties(new StringMap(flowProperties)); + RestLiResponseException exception = null; + try { + this.flowConfigClient.createFlowConfig(flowConfig); + } catch (RestLiResponseException e) { + exception = e; + } + Assert.assertEquals(exception.getStatus(), HttpStatus.BAD_REQUEST_400); + // uncompilable job should not be persisted + Assert.assertEquals(this.gobblinServiceManager.getFlowCatalog().getSpecs().size(), 0); + Assert.assertFalse(this.gobblinServiceManager.getScheduler().getScheduledFlowSpecs().containsKey(uri.toString())); + } + + @Test (dependsOnMethods = "testUncompilableJob") + public void testRunOnceJob() throws Exception { + FlowConfig flowConfig = new FlowConfig().setId(TEST_FLOW_ID) + .setTemplateUris(TEST_TEMPLATE_URI).setProperties(new StringMap(flowProperties)); + + this.flowConfigClient.createFlowConfig(flowConfig); + + // runOnce job should not be persisted + AssertWithBackoff.create().maxSleepMs(200L).timeoutMs(2000L).backoffFactor(1) + .assertTrue(input -> this.gobblinServiceManager.getFlowCatalog().getSpecs().size() == 0, + "Flow that was created is not " + "reflecting in FlowCatalog"); Review comment: Can we have a more descriptive message? I do not understand "not reflecting in FlowCatalog" really means. ########## File path: gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java ########## @@ -180,50 +199,137 @@ public void cleanUp() throws Exception { } } + /** + * To test an existing flow in a spec store does not get deleted just because it is not compilable during service restarts + */ @Test - public void testCreate() throws Exception { - Map<String, String> flowProperties = Maps.newHashMap(); - flowProperties.put("param1", "value1"); - flowProperties.put(ServiceConfigKeys.FLOW_SOURCE_IDENTIFIER_KEY, TEST_SOURCE_NAME); - flowProperties.put(ServiceConfigKeys.FLOW_DESTINATION_IDENTIFIER_KEY, TEST_SINK_NAME); + public void testRestart() throws Exception { + FlowConfig flowConfig = new FlowConfig().setId(UNCOMPILABLE_FLOW_ID).setTemplateUris(TEST_TEMPLATE_URI) + .setSchedule(new Schedule().setCronSchedule(TEST_SCHEDULE).setRunImmediately(false)) + .setProperties(new StringMap(flowProperties)); + FlowSpec spec = FlowConfigResourceLocalHandler.createFlowSpecForConfig(flowConfig); + FlowConfig runOnceFlowConfig = new FlowConfig().setId(TEST_FLOW_ID) + .setTemplateUris(TEST_TEMPLATE_URI).setProperties(new StringMap(flowProperties)); + FlowSpec runOnceSpec = FlowConfigResourceLocalHandler.createFlowSpecForConfig(runOnceFlowConfig); + + // add the non compilable flow directly to the spec store skipping flow catalog which would not allow this + this.gobblinServiceManager.getFlowCatalog().getSpecStore().addSpec(spec); + this.gobblinServiceManager.getFlowCatalog().getSpecStore().addSpec(runOnceSpec); + + List<Spec> specs = (List<Spec>) this.gobblinServiceManager.getFlowCatalog().getSpecs(); + + Assert.assertEquals(specs.size(), 2); + if (specs.get(0).getUri().equals(spec.getUri())) { + Assert.assertEquals(specs.get(1).getUri(), runOnceSpec.getUri()); + } else if (specs.get(0).getUri().equals(runOnceSpec.getUri())) { + Assert.assertEquals(specs.get(1).getUri(), spec.getUri()); + } else { + Assert.fail(); + } + + // restart the service + serviceReboot(); + + // runOnce job should get deleted from the spec store after running and uncompilable job should stay + AssertWithBackoff.create().maxSleepMs(200L).timeoutMs(20000L).backoffFactor(1) + .assertTrue(input -> this.gobblinServiceManager.getFlowCatalog().getSpecs().size() == 1, + "Waiting for the runOnce job to finish"); + + specs = (List<Spec>) this.gobblinServiceManager.getFlowCatalog().getSpecs(); + Assert.assertEquals(specs.get(0).getUri(), spec.getUri()); + Assert.assertFalse(flowConfig.getSchedule().isRunImmediately()); + + // clean it + this.gobblinServiceManager.getFlowCatalog().remove(spec.getUri()); + specs = (List<Spec>) this.gobblinServiceManager.getFlowCatalog().getSpecs(); + Assert.assertEquals(specs.size(), 0); + } + + @Test (dependsOnMethods = "testRestart") + public void testUncompilableJob() throws Exception { + FlowId flowId = new FlowId().setFlowGroup(TEST_GROUP_NAME).setFlowName(MockedSpecCompiler.UNCOMPILABLE_FLOW); + URI uri = FlowConfigResourceLocalHandler.FlowUriUtils.createFlowSpecUri(flowId); + FlowConfig flowConfig = new FlowConfig().setId(flowId) + .setTemplateUris(TEST_TEMPLATE_URI).setProperties(new StringMap(flowProperties)); + RestLiResponseException exception = null; + try { + this.flowConfigClient.createFlowConfig(flowConfig); + } catch (RestLiResponseException e) { + exception = e; + } + Assert.assertEquals(exception.getStatus(), HttpStatus.BAD_REQUEST_400); + // uncompilable job should not be persisted + Assert.assertEquals(this.gobblinServiceManager.getFlowCatalog().getSpecs().size(), 0); + Assert.assertFalse(this.gobblinServiceManager.getScheduler().getScheduledFlowSpecs().containsKey(uri.toString())); + } + + @Test (dependsOnMethods = "testUncompilableJob") + public void testRunOnceJob() throws Exception { + FlowConfig flowConfig = new FlowConfig().setId(TEST_FLOW_ID) + .setTemplateUris(TEST_TEMPLATE_URI).setProperties(new StringMap(flowProperties)); + + this.flowConfigClient.createFlowConfig(flowConfig); + + // runOnce job should not be persisted Review comment: This comment is confusing. What you want to say here is that the runOnce job is deleted soon after it is run. ########## File path: gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java ########## @@ -180,50 +199,137 @@ public void cleanUp() throws Exception { } } + /** + * To test an existing flow in a spec store does not get deleted just because it is not compilable during service restarts + */ @Test - public void testCreate() throws Exception { - Map<String, String> flowProperties = Maps.newHashMap(); - flowProperties.put("param1", "value1"); - flowProperties.put(ServiceConfigKeys.FLOW_SOURCE_IDENTIFIER_KEY, TEST_SOURCE_NAME); - flowProperties.put(ServiceConfigKeys.FLOW_DESTINATION_IDENTIFIER_KEY, TEST_SINK_NAME); + public void testRestart() throws Exception { + FlowConfig flowConfig = new FlowConfig().setId(UNCOMPILABLE_FLOW_ID).setTemplateUris(TEST_TEMPLATE_URI) + .setSchedule(new Schedule().setCronSchedule(TEST_SCHEDULE).setRunImmediately(false)) + .setProperties(new StringMap(flowProperties)); + FlowSpec spec = FlowConfigResourceLocalHandler.createFlowSpecForConfig(flowConfig); + FlowConfig runOnceFlowConfig = new FlowConfig().setId(TEST_FLOW_ID) + .setTemplateUris(TEST_TEMPLATE_URI).setProperties(new StringMap(flowProperties)); + FlowSpec runOnceSpec = FlowConfigResourceLocalHandler.createFlowSpecForConfig(runOnceFlowConfig); + + // add the non compilable flow directly to the spec store skipping flow catalog which would not allow this + this.gobblinServiceManager.getFlowCatalog().getSpecStore().addSpec(spec); + this.gobblinServiceManager.getFlowCatalog().getSpecStore().addSpec(runOnceSpec); + + List<Spec> specs = (List<Spec>) this.gobblinServiceManager.getFlowCatalog().getSpecs(); + + Assert.assertEquals(specs.size(), 2); + if (specs.get(0).getUri().equals(spec.getUri())) { + Assert.assertEquals(specs.get(1).getUri(), runOnceSpec.getUri()); + } else if (specs.get(0).getUri().equals(runOnceSpec.getUri())) { + Assert.assertEquals(specs.get(1).getUri(), spec.getUri()); + } else { + Assert.fail(); + } + + // restart the service + serviceReboot(); + + // runOnce job should get deleted from the spec store after running and uncompilable job should stay + AssertWithBackoff.create().maxSleepMs(200L).timeoutMs(20000L).backoffFactor(1) + .assertTrue(input -> this.gobblinServiceManager.getFlowCatalog().getSpecs().size() == 1, + "Waiting for the runOnce job to finish"); + + specs = (List<Spec>) this.gobblinServiceManager.getFlowCatalog().getSpecs(); + Assert.assertEquals(specs.get(0).getUri(), spec.getUri()); + Assert.assertFalse(flowConfig.getSchedule().isRunImmediately()); + + // clean it + this.gobblinServiceManager.getFlowCatalog().remove(spec.getUri()); + specs = (List<Spec>) this.gobblinServiceManager.getFlowCatalog().getSpecs(); + Assert.assertEquals(specs.size(), 0); + } + + @Test (dependsOnMethods = "testRestart") + public void testUncompilableJob() throws Exception { + FlowId flowId = new FlowId().setFlowGroup(TEST_GROUP_NAME).setFlowName(MockedSpecCompiler.UNCOMPILABLE_FLOW); + URI uri = FlowConfigResourceLocalHandler.FlowUriUtils.createFlowSpecUri(flowId); + FlowConfig flowConfig = new FlowConfig().setId(flowId) + .setTemplateUris(TEST_TEMPLATE_URI).setProperties(new StringMap(flowProperties)); + RestLiResponseException exception = null; + try { + this.flowConfigClient.createFlowConfig(flowConfig); + } catch (RestLiResponseException e) { + exception = e; + } + Assert.assertEquals(exception.getStatus(), HttpStatus.BAD_REQUEST_400); + // uncompilable job should not be persisted + Assert.assertEquals(this.gobblinServiceManager.getFlowCatalog().getSpecs().size(), 0); + Assert.assertFalse(this.gobblinServiceManager.getScheduler().getScheduledFlowSpecs().containsKey(uri.toString())); + } + + @Test (dependsOnMethods = "testUncompilableJob") + public void testRunOnceJob() throws Exception { + FlowConfig flowConfig = new FlowConfig().setId(TEST_FLOW_ID) + .setTemplateUris(TEST_TEMPLATE_URI).setProperties(new StringMap(flowProperties)); + + this.flowConfigClient.createFlowConfig(flowConfig); + + // runOnce job should not be persisted + AssertWithBackoff.create().maxSleepMs(200L).timeoutMs(2000L).backoffFactor(1) + .assertTrue(input -> this.gobblinServiceManager.getFlowCatalog().getSpecs().size() == 0, + "Flow that was created is not " + "reflecting in FlowCatalog"); + AssertWithBackoff.create().maxSleepMs(100L).timeoutMs(1000L).backoffFactor(1) + .assertTrue(input -> !this.gobblinServiceManager.getScheduler().getScheduledFlowSpecs().containsKey(TEST_URI.toString()), + "Waiting for job to get orchestrated..."); + } + + @Test (dependsOnMethods = "testRunOnceJob") + public void testExplainJob() throws Exception { FlowConfig flowConfig = new FlowConfig().setId(new FlowId().setFlowGroup(TEST_GROUP_NAME).setFlowName(TEST_FLOW_NAME)) + .setTemplateUris(TEST_TEMPLATE_URI).setProperties(new StringMap(flowProperties)).setExplain(true); + + this.flowConfigClient.createFlowConfig(flowConfig); + + // explain job should not be persisted + Assert.assertEquals(this.gobblinServiceManager.getFlowCatalog().getSpecs().size(), 0, + "Flow that was created is not " + "reflecting in FlowCatalog"); + Assert.assertFalse(this.gobblinServiceManager.getScheduler().getScheduledFlowSpecs().containsKey(TEST_URI.toString())); + } + + @Test (dependsOnMethods = "testExplainJob") + public void testCreate() throws Exception { + FlowConfig flowConfig = new FlowConfig().setId(TEST_FLOW_ID) .setTemplateUris(TEST_TEMPLATE_URI).setSchedule(new Schedule().setCronSchedule(TEST_SCHEDULE).setRunImmediately(true)) .setProperties(new StringMap(flowProperties)); this.flowConfigClient.createFlowConfig(flowConfig); - Assert.assertTrue(this.gobblinServiceManager.getFlowCatalog().getSpecs().size() == 1, "Flow that was created is not " - + "reflecting in FlowCatalog"); + Assert.assertEquals(this.gobblinServiceManager.getFlowCatalog().getSpecs().size(), 1, + "Flow that was created is not " + "reflecting in FlowCatalog"); Review comment: Same comment as earlier about the assertion failure message. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 438950) Time Spent: 40m (was: 0.5h) > do not delete spec on compilation failures at startup > ----------------------------------------------------- > > Key: GOBBLIN-1160 > URL: https://issues.apache.org/jira/browse/GOBBLIN-1160 > Project: Apache Gobblin > Issue Type: Bug > Reporter: Arjun Singh Bora > Priority: Major > Time Spent: 40m > Remaining Estimate: 0h > > do not delete spec on compilation failures at service startup, because git > flow graph and topology services might not have started yet. > -- This message was sent by Atlassian Jira (v8.3.4#803005)