[ 
https://issues.apache.org/jira/browse/GOBBLIN-1160?focusedWorklogId=438950&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-438950
 ]

ASF GitHub Bot logged work on GOBBLIN-1160:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 29/May/20 21:50
            Start Date: 29/May/20 21:50
    Worklog Time Spent: 10m 
      Work Description: sv2000 commented on a change in pull request #3011:
URL: https://github.com/apache/incubator-gobblin/pull/3011#discussion_r432755115



##########
File path: 
gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java
##########
@@ -180,50 +199,137 @@ public void cleanUp() throws Exception {
     }
   }
 
+  /**
+   * To test an existing flow in a spec store does not get deleted just 
because it is not compilable during service restarts
+   */
   @Test
-  public void testCreate() throws Exception {
-    Map<String, String> flowProperties = Maps.newHashMap();
-    flowProperties.put("param1", "value1");
-    flowProperties.put(ServiceConfigKeys.FLOW_SOURCE_IDENTIFIER_KEY, 
TEST_SOURCE_NAME);
-    flowProperties.put(ServiceConfigKeys.FLOW_DESTINATION_IDENTIFIER_KEY, 
TEST_SINK_NAME);
+  public void testRestart() throws Exception {
+    FlowConfig flowConfig = new 
FlowConfig().setId(UNCOMPILABLE_FLOW_ID).setTemplateUris(TEST_TEMPLATE_URI)
+        .setSchedule(new 
Schedule().setCronSchedule(TEST_SCHEDULE).setRunImmediately(false))
+        .setProperties(new StringMap(flowProperties));
+    FlowSpec spec = 
FlowConfigResourceLocalHandler.createFlowSpecForConfig(flowConfig);
+    FlowConfig runOnceFlowConfig = new FlowConfig().setId(TEST_FLOW_ID)
+        .setTemplateUris(TEST_TEMPLATE_URI).setProperties(new 
StringMap(flowProperties));
+    FlowSpec runOnceSpec = 
FlowConfigResourceLocalHandler.createFlowSpecForConfig(runOnceFlowConfig);
+
+    // add the non compilable flow directly to the spec store skipping flow 
catalog which would not allow this
+    this.gobblinServiceManager.getFlowCatalog().getSpecStore().addSpec(spec);
+    
this.gobblinServiceManager.getFlowCatalog().getSpecStore().addSpec(runOnceSpec);
+
+    List<Spec> specs = (List<Spec>) 
this.gobblinServiceManager.getFlowCatalog().getSpecs();
+
+    Assert.assertEquals(specs.size(), 2);
+    if (specs.get(0).getUri().equals(spec.getUri())) {
+      Assert.assertEquals(specs.get(1).getUri(), runOnceSpec.getUri());
+    } else if (specs.get(0).getUri().equals(runOnceSpec.getUri())) {
+      Assert.assertEquals(specs.get(1).getUri(), spec.getUri());
+    } else {
+      Assert.fail();
+    }
+
+    // restart the service
+    serviceReboot();
+
+    // runOnce job should get deleted from the spec store after running and 
uncompilable job should stay
+    
AssertWithBackoff.create().maxSleepMs(200L).timeoutMs(20000L).backoffFactor(1)
+        .assertTrue(input -> 
this.gobblinServiceManager.getFlowCatalog().getSpecs().size() == 1,
+            "Waiting for the runOnce job to finish");
+
+    specs = (List<Spec>) 
this.gobblinServiceManager.getFlowCatalog().getSpecs();
+    Assert.assertEquals(specs.get(0).getUri(), spec.getUri());
+    Assert.assertFalse(flowConfig.getSchedule().isRunImmediately());
+
+    // clean it
+    this.gobblinServiceManager.getFlowCatalog().remove(spec.getUri());
+    specs = (List<Spec>) 
this.gobblinServiceManager.getFlowCatalog().getSpecs();
+    Assert.assertEquals(specs.size(), 0);
+  }
+
+  @Test (dependsOnMethods = "testRestart")
+  public void testUncompilableJob() throws Exception {
+    FlowId flowId = new 
FlowId().setFlowGroup(TEST_GROUP_NAME).setFlowName(MockedSpecCompiler.UNCOMPILABLE_FLOW);
+    URI uri = 
FlowConfigResourceLocalHandler.FlowUriUtils.createFlowSpecUri(flowId);
+    FlowConfig flowConfig = new FlowConfig().setId(flowId)
+        .setTemplateUris(TEST_TEMPLATE_URI).setProperties(new 
StringMap(flowProperties));
 
+    RestLiResponseException exception = null;
+    try {
+      this.flowConfigClient.createFlowConfig(flowConfig);
+    } catch (RestLiResponseException e) {
+      exception = e;
+    }
+    Assert.assertEquals(exception.getStatus(), HttpStatus.BAD_REQUEST_400);
+    // uncompilable job should not be persisted
+    
Assert.assertEquals(this.gobblinServiceManager.getFlowCatalog().getSpecs().size(),
 0);
+    
Assert.assertFalse(this.gobblinServiceManager.getScheduler().getScheduledFlowSpecs().containsKey(uri.toString()));
+  }
+
+  @Test (dependsOnMethods = "testUncompilableJob")
+  public void testRunOnceJob() throws Exception {
+    FlowConfig flowConfig = new FlowConfig().setId(TEST_FLOW_ID)
+        .setTemplateUris(TEST_TEMPLATE_URI).setProperties(new 
StringMap(flowProperties));
+
+    this.flowConfigClient.createFlowConfig(flowConfig);
+
+    // runOnce job should not be persisted
+    
AssertWithBackoff.create().maxSleepMs(200L).timeoutMs(2000L).backoffFactor(1)
+        .assertTrue(input -> 
this.gobblinServiceManager.getFlowCatalog().getSpecs().size() == 0,
+          "Flow that was created is not " + "reflecting in FlowCatalog");

Review comment:
       Can we have a more descriptive message? I do not understand "not 
reflecting in FlowCatalog" really means. 

##########
File path: 
gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java
##########
@@ -180,50 +199,137 @@ public void cleanUp() throws Exception {
     }
   }
 
+  /**
+   * To test an existing flow in a spec store does not get deleted just 
because it is not compilable during service restarts
+   */
   @Test
-  public void testCreate() throws Exception {
-    Map<String, String> flowProperties = Maps.newHashMap();
-    flowProperties.put("param1", "value1");
-    flowProperties.put(ServiceConfigKeys.FLOW_SOURCE_IDENTIFIER_KEY, 
TEST_SOURCE_NAME);
-    flowProperties.put(ServiceConfigKeys.FLOW_DESTINATION_IDENTIFIER_KEY, 
TEST_SINK_NAME);
+  public void testRestart() throws Exception {
+    FlowConfig flowConfig = new 
FlowConfig().setId(UNCOMPILABLE_FLOW_ID).setTemplateUris(TEST_TEMPLATE_URI)
+        .setSchedule(new 
Schedule().setCronSchedule(TEST_SCHEDULE).setRunImmediately(false))
+        .setProperties(new StringMap(flowProperties));
+    FlowSpec spec = 
FlowConfigResourceLocalHandler.createFlowSpecForConfig(flowConfig);
+    FlowConfig runOnceFlowConfig = new FlowConfig().setId(TEST_FLOW_ID)
+        .setTemplateUris(TEST_TEMPLATE_URI).setProperties(new 
StringMap(flowProperties));
+    FlowSpec runOnceSpec = 
FlowConfigResourceLocalHandler.createFlowSpecForConfig(runOnceFlowConfig);
+
+    // add the non compilable flow directly to the spec store skipping flow 
catalog which would not allow this
+    this.gobblinServiceManager.getFlowCatalog().getSpecStore().addSpec(spec);
+    
this.gobblinServiceManager.getFlowCatalog().getSpecStore().addSpec(runOnceSpec);
+
+    List<Spec> specs = (List<Spec>) 
this.gobblinServiceManager.getFlowCatalog().getSpecs();
+
+    Assert.assertEquals(specs.size(), 2);
+    if (specs.get(0).getUri().equals(spec.getUri())) {
+      Assert.assertEquals(specs.get(1).getUri(), runOnceSpec.getUri());
+    } else if (specs.get(0).getUri().equals(runOnceSpec.getUri())) {
+      Assert.assertEquals(specs.get(1).getUri(), spec.getUri());
+    } else {
+      Assert.fail();
+    }
+
+    // restart the service
+    serviceReboot();
+
+    // runOnce job should get deleted from the spec store after running and 
uncompilable job should stay
+    
AssertWithBackoff.create().maxSleepMs(200L).timeoutMs(20000L).backoffFactor(1)
+        .assertTrue(input -> 
this.gobblinServiceManager.getFlowCatalog().getSpecs().size() == 1,
+            "Waiting for the runOnce job to finish");
+
+    specs = (List<Spec>) 
this.gobblinServiceManager.getFlowCatalog().getSpecs();
+    Assert.assertEquals(specs.get(0).getUri(), spec.getUri());
+    Assert.assertFalse(flowConfig.getSchedule().isRunImmediately());
+
+    // clean it
+    this.gobblinServiceManager.getFlowCatalog().remove(spec.getUri());
+    specs = (List<Spec>) 
this.gobblinServiceManager.getFlowCatalog().getSpecs();
+    Assert.assertEquals(specs.size(), 0);
+  }
+
+  @Test (dependsOnMethods = "testRestart")
+  public void testUncompilableJob() throws Exception {
+    FlowId flowId = new 
FlowId().setFlowGroup(TEST_GROUP_NAME).setFlowName(MockedSpecCompiler.UNCOMPILABLE_FLOW);
+    URI uri = 
FlowConfigResourceLocalHandler.FlowUriUtils.createFlowSpecUri(flowId);
+    FlowConfig flowConfig = new FlowConfig().setId(flowId)
+        .setTemplateUris(TEST_TEMPLATE_URI).setProperties(new 
StringMap(flowProperties));
 
+    RestLiResponseException exception = null;
+    try {
+      this.flowConfigClient.createFlowConfig(flowConfig);
+    } catch (RestLiResponseException e) {
+      exception = e;
+    }
+    Assert.assertEquals(exception.getStatus(), HttpStatus.BAD_REQUEST_400);
+    // uncompilable job should not be persisted
+    
Assert.assertEquals(this.gobblinServiceManager.getFlowCatalog().getSpecs().size(),
 0);
+    
Assert.assertFalse(this.gobblinServiceManager.getScheduler().getScheduledFlowSpecs().containsKey(uri.toString()));
+  }
+
+  @Test (dependsOnMethods = "testUncompilableJob")
+  public void testRunOnceJob() throws Exception {
+    FlowConfig flowConfig = new FlowConfig().setId(TEST_FLOW_ID)
+        .setTemplateUris(TEST_TEMPLATE_URI).setProperties(new 
StringMap(flowProperties));
+
+    this.flowConfigClient.createFlowConfig(flowConfig);
+
+    // runOnce job should not be persisted

Review comment:
       This comment is confusing. What you want to say here is that the runOnce 
job is deleted soon after it is run. 

##########
File path: 
gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java
##########
@@ -180,50 +199,137 @@ public void cleanUp() throws Exception {
     }
   }
 
+  /**
+   * To test an existing flow in a spec store does not get deleted just 
because it is not compilable during service restarts
+   */
   @Test
-  public void testCreate() throws Exception {
-    Map<String, String> flowProperties = Maps.newHashMap();
-    flowProperties.put("param1", "value1");
-    flowProperties.put(ServiceConfigKeys.FLOW_SOURCE_IDENTIFIER_KEY, 
TEST_SOURCE_NAME);
-    flowProperties.put(ServiceConfigKeys.FLOW_DESTINATION_IDENTIFIER_KEY, 
TEST_SINK_NAME);
+  public void testRestart() throws Exception {
+    FlowConfig flowConfig = new 
FlowConfig().setId(UNCOMPILABLE_FLOW_ID).setTemplateUris(TEST_TEMPLATE_URI)
+        .setSchedule(new 
Schedule().setCronSchedule(TEST_SCHEDULE).setRunImmediately(false))
+        .setProperties(new StringMap(flowProperties));
+    FlowSpec spec = 
FlowConfigResourceLocalHandler.createFlowSpecForConfig(flowConfig);
+    FlowConfig runOnceFlowConfig = new FlowConfig().setId(TEST_FLOW_ID)
+        .setTemplateUris(TEST_TEMPLATE_URI).setProperties(new 
StringMap(flowProperties));
+    FlowSpec runOnceSpec = 
FlowConfigResourceLocalHandler.createFlowSpecForConfig(runOnceFlowConfig);
+
+    // add the non compilable flow directly to the spec store skipping flow 
catalog which would not allow this
+    this.gobblinServiceManager.getFlowCatalog().getSpecStore().addSpec(spec);
+    
this.gobblinServiceManager.getFlowCatalog().getSpecStore().addSpec(runOnceSpec);
+
+    List<Spec> specs = (List<Spec>) 
this.gobblinServiceManager.getFlowCatalog().getSpecs();
+
+    Assert.assertEquals(specs.size(), 2);
+    if (specs.get(0).getUri().equals(spec.getUri())) {
+      Assert.assertEquals(specs.get(1).getUri(), runOnceSpec.getUri());
+    } else if (specs.get(0).getUri().equals(runOnceSpec.getUri())) {
+      Assert.assertEquals(specs.get(1).getUri(), spec.getUri());
+    } else {
+      Assert.fail();
+    }
+
+    // restart the service
+    serviceReboot();
+
+    // runOnce job should get deleted from the spec store after running and 
uncompilable job should stay
+    
AssertWithBackoff.create().maxSleepMs(200L).timeoutMs(20000L).backoffFactor(1)
+        .assertTrue(input -> 
this.gobblinServiceManager.getFlowCatalog().getSpecs().size() == 1,
+            "Waiting for the runOnce job to finish");
+
+    specs = (List<Spec>) 
this.gobblinServiceManager.getFlowCatalog().getSpecs();
+    Assert.assertEquals(specs.get(0).getUri(), spec.getUri());
+    Assert.assertFalse(flowConfig.getSchedule().isRunImmediately());
+
+    // clean it
+    this.gobblinServiceManager.getFlowCatalog().remove(spec.getUri());
+    specs = (List<Spec>) 
this.gobblinServiceManager.getFlowCatalog().getSpecs();
+    Assert.assertEquals(specs.size(), 0);
+  }
+
+  @Test (dependsOnMethods = "testRestart")
+  public void testUncompilableJob() throws Exception {
+    FlowId flowId = new 
FlowId().setFlowGroup(TEST_GROUP_NAME).setFlowName(MockedSpecCompiler.UNCOMPILABLE_FLOW);
+    URI uri = 
FlowConfigResourceLocalHandler.FlowUriUtils.createFlowSpecUri(flowId);
+    FlowConfig flowConfig = new FlowConfig().setId(flowId)
+        .setTemplateUris(TEST_TEMPLATE_URI).setProperties(new 
StringMap(flowProperties));
 
+    RestLiResponseException exception = null;
+    try {
+      this.flowConfigClient.createFlowConfig(flowConfig);
+    } catch (RestLiResponseException e) {
+      exception = e;
+    }
+    Assert.assertEquals(exception.getStatus(), HttpStatus.BAD_REQUEST_400);
+    // uncompilable job should not be persisted
+    
Assert.assertEquals(this.gobblinServiceManager.getFlowCatalog().getSpecs().size(),
 0);
+    
Assert.assertFalse(this.gobblinServiceManager.getScheduler().getScheduledFlowSpecs().containsKey(uri.toString()));
+  }
+
+  @Test (dependsOnMethods = "testUncompilableJob")
+  public void testRunOnceJob() throws Exception {
+    FlowConfig flowConfig = new FlowConfig().setId(TEST_FLOW_ID)
+        .setTemplateUris(TEST_TEMPLATE_URI).setProperties(new 
StringMap(flowProperties));
+
+    this.flowConfigClient.createFlowConfig(flowConfig);
+
+    // runOnce job should not be persisted
+    
AssertWithBackoff.create().maxSleepMs(200L).timeoutMs(2000L).backoffFactor(1)
+        .assertTrue(input -> 
this.gobblinServiceManager.getFlowCatalog().getSpecs().size() == 0,
+          "Flow that was created is not " + "reflecting in FlowCatalog");
+    
AssertWithBackoff.create().maxSleepMs(100L).timeoutMs(1000L).backoffFactor(1)
+          .assertTrue(input -> 
!this.gobblinServiceManager.getScheduler().getScheduledFlowSpecs().containsKey(TEST_URI.toString()),
+              "Waiting for job to get orchestrated...");
+  }
+
+  @Test (dependsOnMethods = "testRunOnceJob")
+  public void testExplainJob() throws Exception {
     FlowConfig flowConfig = new FlowConfig().setId(new 
FlowId().setFlowGroup(TEST_GROUP_NAME).setFlowName(TEST_FLOW_NAME))
+        .setTemplateUris(TEST_TEMPLATE_URI).setProperties(new 
StringMap(flowProperties)).setExplain(true);
+
+    this.flowConfigClient.createFlowConfig(flowConfig);
+
+    // explain job should not be persisted
+    
Assert.assertEquals(this.gobblinServiceManager.getFlowCatalog().getSpecs().size(),
 0,
+        "Flow that was created is not " + "reflecting in FlowCatalog");
+    
Assert.assertFalse(this.gobblinServiceManager.getScheduler().getScheduledFlowSpecs().containsKey(TEST_URI.toString()));
+  }
+
+  @Test (dependsOnMethods = "testExplainJob")
+  public void testCreate() throws Exception {
+    FlowConfig flowConfig = new FlowConfig().setId(TEST_FLOW_ID)
         .setTemplateUris(TEST_TEMPLATE_URI).setSchedule(new 
Schedule().setCronSchedule(TEST_SCHEDULE).setRunImmediately(true))
         .setProperties(new StringMap(flowProperties));
 
     this.flowConfigClient.createFlowConfig(flowConfig);
-    
Assert.assertTrue(this.gobblinServiceManager.getFlowCatalog().getSpecs().size() 
== 1, "Flow that was created is not "
-        + "reflecting in FlowCatalog");
+    
Assert.assertEquals(this.gobblinServiceManager.getFlowCatalog().getSpecs().size(),
 1,
+        "Flow that was created is not " + "reflecting in FlowCatalog");

Review comment:
       Same comment as earlier about the assertion failure message. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 438950)
    Time Spent: 40m  (was: 0.5h)

> do not delete spec on compilation failures at startup
> -----------------------------------------------------
>
>                 Key: GOBBLIN-1160
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-1160
>             Project: Apache Gobblin
>          Issue Type: Bug
>            Reporter: Arjun Singh Bora
>            Priority: Major
>          Time Spent: 40m
>  Remaining Estimate: 0h
>
> do not delete spec on compilation failures at service startup, because git 
> flow graph and topology services might not have started yet.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to