This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 0a1debc  [GOBBLIN-990] Don't allow creation of flow config that 
already exists
0a1debc is described below

commit 0a1debc30734ec57d2be7351270894756a6c18af
Author: Jack Moseley <[email protected]>
AuthorDate: Mon Dec 9 14:26:30 2019 -0800

    [GOBBLIN-990] Don't allow creation of flow config that already exists
    
    Closes #2836 from jack-moseley/create-conflict
---
 .../test/java/org/apache/gobblin/service/FlowConfigTest.java   |  8 ++++++--
 .../test/java/org/apache/gobblin/service/FlowConfigV2Test.java |  5 +++--
 .../apache/gobblin/service/FlowConfigResourceLocalHandler.java | 10 +++-------
 .../gobblin/service/FlowConfigV2ResourceLocalHandler.java      |  9 ++++++++-
 .../gobblin/service/modules/core/GobblinServiceHATest.java     | 10 ++++++++--
 .../service/modules/core/GobblinServiceManagerTest.java        |  7 +++++--
 6 files changed, 33 insertions(+), 16 deletions(-)

diff --git 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigTest.java
 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigTest.java
index ecd884e..b20d944 100644
--- 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigTest.java
+++ 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigTest.java
@@ -174,11 +174,15 @@ public class FlowConfigTest {
         .setTemplateUris(TEST_TEMPLATE_URI).setSchedule(new 
Schedule().setCronSchedule(TEST_SCHEDULE))
         .setProperties(new StringMap(flowProperties));
 
+    RestLiResponseException exception = null;
     try {
       _client.createFlowConfig(flowConfig);
     } catch (RestLiResponseException e) {
-      Assert.fail("Create Again should pass without complaining that the spec 
already exists.");
+      exception = e;
     }
+
+    Assert.assertNotNull(exception);
+    Assert.assertEquals(exception.getStatus(), 
HttpStatus.S_409_CONFLICT.getCode());
   }
 
   @Test (dependsOnMethods = "testCreateAgain")
@@ -190,7 +194,7 @@ public class FlowConfigTest {
     Assert.assertEquals(flowConfig.getId().getFlowName(), TEST_FLOW_NAME);
     Assert.assertEquals(flowConfig.getSchedule().getCronSchedule(), 
TEST_SCHEDULE );
     Assert.assertEquals(flowConfig.getTemplateUris(), TEST_TEMPLATE_URI);
-    Assert.assertFalse(flowConfig.getSchedule().isRunImmediately());
+    Assert.assertTrue(flowConfig.getSchedule().isRunImmediately());
     // Add this asssert back when getFlowSpec() is changed to return the raw 
flow spec
     //Assert.assertEquals(flowConfig.getProperties().size(), 1);
     Assert.assertEquals(flowConfig.getProperties().get("param1"), "value1");
diff --git 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigV2Test.java
 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigV2Test.java
index 21f862a..83a200e 100644
--- 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigV2Test.java
+++ 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigV2Test.java
@@ -58,9 +58,10 @@ public class FlowConfigV2Test {
   private EmbeddedRestliServer _server;
   private File _testDirectory;
 
-  private static final String TEST_SPEC_STORE_DIR = "/tmp/flowConfigTest/";
+  private static final String TEST_SPEC_STORE_DIR = "/tmp/flowConfigV2Test/";
   private static final String TEST_GROUP_NAME = "testGroup1";
   private static final String TEST_FLOW_NAME = "testFlow1";
+  private static final String TEST_FLOW_NAME_2 = "testFlow2";
   private static final String TEST_SCHEDULE = "0 1/0 * ? * *";
   private static final String TEST_TEMPLATE_URI = 
"FS:///templates/test.template";
 
@@ -122,7 +123,7 @@ public class FlowConfigV2Test {
     Assert.assertEquals(TEST_FLOW_NAME, flowStatusId.getFlowName());
     Assert.assertTrue(flowStatusId.getFlowExecutionId() != -1);
 
-    flowConfig = new FlowConfig().setId(new 
FlowId().setFlowGroup(TEST_GROUP_NAME).setFlowName(TEST_FLOW_NAME))
+    flowConfig = new FlowConfig().setId(new 
FlowId().setFlowGroup(TEST_GROUP_NAME).setFlowName(TEST_FLOW_NAME_2))
         .setTemplateUris(TEST_TEMPLATE_URI).setProperties(new 
StringMap(flowProperties))
         .setSchedule(new 
Schedule().setCronSchedule(TEST_SCHEDULE).setRunImmediately(true));
     
Assert.assertEquals(_client.createFlowConfig(flowConfig).getFlowExecutionId().longValue(),
 -1L);
diff --git 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java
 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java
index b20b5be..24c4ce9 100644
--- 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java
+++ 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java
@@ -26,7 +26,6 @@ import org.apache.commons.lang.StringUtils;
 import com.codahale.metrics.MetricRegistry;
 import com.google.common.collect.Maps;
 import com.linkedin.data.template.StringMap;
-import com.linkedin.data.transform.DataProcessingException;
 import com.linkedin.restli.common.ComplexResourceKey;
 import com.linkedin.restli.common.EmptyRecord;
 import com.linkedin.restli.common.HttpStatus;
@@ -34,11 +33,9 @@ import com.linkedin.restli.common.PatchRequest;
 import com.linkedin.restli.server.CreateResponse;
 import com.linkedin.restli.server.RestLiServiceException;
 import com.linkedin.restli.server.UpdateResponse;
-import com.linkedin.restli.server.util.PatchApplier;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 
-import javax.naming.OperationNotSupportedException;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 
@@ -142,10 +139,9 @@ public class FlowConfigResourceLocalHandler implements 
FlowConfigsResourceHandle
     }
 
     FlowSpec flowSpec = createFlowSpecForConfig(flowConfig);
-    // Existence of a flow spec in the flow catalog implies that the flow is 
currently running.
-    // If the new flow spec has a schedule we should allow submission of the 
new flow to accept the new schedule.
-    // However, if the new flow spec does not have a schedule, we should allow 
submission only if it is not running.
-    if (!flowConfig.hasSchedule() && 
this.flowCatalog.exists(flowSpec.getUri())) {
+    // Return conflict and take no action if flowSpec has already been created
+    if (this.flowCatalog.exists(flowSpec.getUri())) {
+      log.warn("Flowspec with URI {} already exists, no action will be taken");
       return new CreateResponse(new ComplexResourceKey<>(flowConfig.getId(), 
new EmptyRecord()), HttpStatus.S_409_CONFLICT);
     } else {
       this.flowCatalog.put(flowSpec, triggerListener);
diff --git 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java
 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java
index c4a3e45..edbcf8b 100644
--- 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java
+++ 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java
@@ -52,7 +52,6 @@ public class FlowConfigV2ResourceLocalHandler extends 
FlowConfigResourceLocalHan
     }
     log.info(createLog);
     FlowSpec flowSpec = createFlowSpecForConfig(flowConfig);
-    Map<String, AddSpecResponse> responseMap = this.flowCatalog.put(flowSpec, 
triggerListener);
     FlowStatusId flowStatusId = new FlowStatusId()
         
.setFlowName(flowSpec.getConfigAsProperties().getProperty(ConfigurationKeys.FLOW_NAME_KEY))
         
.setFlowGroup(flowSpec.getConfigAsProperties().getProperty(ConfigurationKeys.FLOW_GROUP_KEY));
@@ -61,6 +60,14 @@ public class FlowConfigV2ResourceLocalHandler extends 
FlowConfigResourceLocalHan
     } else {
       flowStatusId.setFlowExecutionId(-1L);
     }
+
+    // Return conflict and take no action if flowSpec has already been created
+    if (this.flowCatalog.exists(flowSpec.getUri())) {
+      log.warn("Flowspec with URI {} already exists, no action will be taken");
+      return new CreateKVResponse(new ComplexResourceKey<>(flowConfig.getId(), 
flowStatusId), flowConfig, HttpStatus.S_409_CONFLICT);
+    }
+
+    Map<String, AddSpecResponse> responseMap = this.flowCatalog.put(flowSpec, 
triggerListener);
     HttpStatus httpStatus = HttpStatus.S_201_CREATED;
 
     if (flowConfig.hasExplain() && flowConfig.isExplain()) {
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceHATest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceHATest.java
index 5ae0b37..2752add 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceHATest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceHATest.java
@@ -317,17 +317,23 @@ public class GobblinServiceHATest {
         .setProperties(new StringMap(flowProperties));
 
     // Try create on both nodes
+    RestLiResponseException exception1 = null;
     try {
       this.node1FlowConfigClient.createFlowConfig(flowConfig1);
     } catch (RestLiResponseException e) {
-      Assert.fail("Create Again should pass without complaining that the spec 
already exists.");
+      exception1 = e;
     }
+    Assert.assertNotNull(exception1);
+    Assert.assertEquals(exception1.getStatus(), 
com.linkedin.restli.common.HttpStatus.S_409_CONFLICT.getCode());
 
+    RestLiResponseException exception2 = null;
     try {
       this.node2FlowConfigClient.createFlowConfig(flowConfig2);
     } catch (RestLiResponseException e) {
-      Assert.fail("Create Again should pass without complaining that the spec 
already exists.");
+      exception2 = e;
     }
+    Assert.assertNotNull(exception2);
+    Assert.assertEquals(exception2.getStatus(), 
com.linkedin.restli.common.HttpStatus.S_409_CONFLICT.getCode());
 
     logger.info("+++++++++++++++++++ testCreateAgain END");
   }
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceManagerTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceManagerTest.java
index 1d1736c..43017ed 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceManagerTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceManagerTest.java
@@ -207,11 +207,14 @@ public class GobblinServiceManagerTest {
         .setTemplateUris(TEST_TEMPLATE_URI).setSchedule(new 
Schedule().setCronSchedule(TEST_SCHEDULE))
         .setProperties(new StringMap(flowProperties));
 
+    RestLiResponseException exception = null;
     try {
       this.flowConfigClient.createFlowConfig(flowConfig);
     } catch (RestLiResponseException e) {
-      Assert.fail("Create Again should pass without complaining that the spec 
already exists.");
+      exception = e;
     }
+    Assert.assertNotNull(exception);
+    Assert.assertEquals(exception.getStatus(), 
com.linkedin.restli.common.HttpStatus.S_409_CONFLICT.getCode());
   }
 
   @Test (dependsOnMethods = "testCreateAgain")
@@ -223,7 +226,7 @@ public class GobblinServiceManagerTest {
     Assert.assertEquals(flowConfig.getId().getFlowName(), TEST_FLOW_NAME);
     Assert.assertEquals(flowConfig.getSchedule().getCronSchedule(), 
TEST_SCHEDULE);
     Assert.assertEquals(flowConfig.getTemplateUris(), TEST_TEMPLATE_URI);
-    Assert.assertFalse(flowConfig.getSchedule().isRunImmediately());
+    Assert.assertTrue(flowConfig.getSchedule().isRunImmediately());
     // Add this assert back when getFlowSpec() is changed to return the raw 
flow spec
     //Assert.assertEquals(flowConfig.getProperties().size(), 1);
     Assert.assertEquals(flowConfig.getProperties().get("param1"), "value1");

Reply via email to