This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 0a1debc [GOBBLIN-990] Don't allow creation of flow config that
already exists
0a1debc is described below
commit 0a1debc30734ec57d2be7351270894756a6c18af
Author: Jack Moseley <[email protected]>
AuthorDate: Mon Dec 9 14:26:30 2019 -0800
[GOBBLIN-990] Don't allow creation of flow config that already exists
Closes #2836 from jack-moseley/create-conflict
---
.../test/java/org/apache/gobblin/service/FlowConfigTest.java | 8 ++++++--
.../test/java/org/apache/gobblin/service/FlowConfigV2Test.java | 5 +++--
.../apache/gobblin/service/FlowConfigResourceLocalHandler.java | 10 +++-------
.../gobblin/service/FlowConfigV2ResourceLocalHandler.java | 9 ++++++++-
.../gobblin/service/modules/core/GobblinServiceHATest.java | 10 ++++++++--
.../service/modules/core/GobblinServiceManagerTest.java | 7 +++++--
6 files changed, 33 insertions(+), 16 deletions(-)
diff --git
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigTest.java
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigTest.java
index ecd884e..b20d944 100644
---
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigTest.java
+++
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigTest.java
@@ -174,11 +174,15 @@ public class FlowConfigTest {
.setTemplateUris(TEST_TEMPLATE_URI).setSchedule(new
Schedule().setCronSchedule(TEST_SCHEDULE))
.setProperties(new StringMap(flowProperties));
+ RestLiResponseException exception = null;
try {
_client.createFlowConfig(flowConfig);
} catch (RestLiResponseException e) {
- Assert.fail("Create Again should pass without complaining that the spec
already exists.");
+ exception = e;
}
+
+ Assert.assertNotNull(exception);
+ Assert.assertEquals(exception.getStatus(),
HttpStatus.S_409_CONFLICT.getCode());
}
@Test (dependsOnMethods = "testCreateAgain")
@@ -190,7 +194,7 @@ public class FlowConfigTest {
Assert.assertEquals(flowConfig.getId().getFlowName(), TEST_FLOW_NAME);
Assert.assertEquals(flowConfig.getSchedule().getCronSchedule(),
TEST_SCHEDULE );
Assert.assertEquals(flowConfig.getTemplateUris(), TEST_TEMPLATE_URI);
- Assert.assertFalse(flowConfig.getSchedule().isRunImmediately());
+ Assert.assertTrue(flowConfig.getSchedule().isRunImmediately());
// Add this asssert back when getFlowSpec() is changed to return the raw
flow spec
//Assert.assertEquals(flowConfig.getProperties().size(), 1);
Assert.assertEquals(flowConfig.getProperties().get("param1"), "value1");
diff --git
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigV2Test.java
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigV2Test.java
index 21f862a..83a200e 100644
---
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigV2Test.java
+++
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigV2Test.java
@@ -58,9 +58,10 @@ public class FlowConfigV2Test {
private EmbeddedRestliServer _server;
private File _testDirectory;
- private static final String TEST_SPEC_STORE_DIR = "/tmp/flowConfigTest/";
+ private static final String TEST_SPEC_STORE_DIR = "/tmp/flowConfigV2Test/";
private static final String TEST_GROUP_NAME = "testGroup1";
private static final String TEST_FLOW_NAME = "testFlow1";
+ private static final String TEST_FLOW_NAME_2 = "testFlow2";
private static final String TEST_SCHEDULE = "0 1/0 * ? * *";
private static final String TEST_TEMPLATE_URI =
"FS:///templates/test.template";
@@ -122,7 +123,7 @@ public class FlowConfigV2Test {
Assert.assertEquals(TEST_FLOW_NAME, flowStatusId.getFlowName());
Assert.assertTrue(flowStatusId.getFlowExecutionId() != -1);
- flowConfig = new FlowConfig().setId(new
FlowId().setFlowGroup(TEST_GROUP_NAME).setFlowName(TEST_FLOW_NAME))
+ flowConfig = new FlowConfig().setId(new
FlowId().setFlowGroup(TEST_GROUP_NAME).setFlowName(TEST_FLOW_NAME_2))
.setTemplateUris(TEST_TEMPLATE_URI).setProperties(new
StringMap(flowProperties))
.setSchedule(new
Schedule().setCronSchedule(TEST_SCHEDULE).setRunImmediately(true));
Assert.assertEquals(_client.createFlowConfig(flowConfig).getFlowExecutionId().longValue(),
-1L);
diff --git
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java
index b20b5be..24c4ce9 100644
---
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java
+++
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java
@@ -26,7 +26,6 @@ import org.apache.commons.lang.StringUtils;
import com.codahale.metrics.MetricRegistry;
import com.google.common.collect.Maps;
import com.linkedin.data.template.StringMap;
-import com.linkedin.data.transform.DataProcessingException;
import com.linkedin.restli.common.ComplexResourceKey;
import com.linkedin.restli.common.EmptyRecord;
import com.linkedin.restli.common.HttpStatus;
@@ -34,11 +33,9 @@ import com.linkedin.restli.common.PatchRequest;
import com.linkedin.restli.server.CreateResponse;
import com.linkedin.restli.server.RestLiServiceException;
import com.linkedin.restli.server.UpdateResponse;
-import com.linkedin.restli.server.util.PatchApplier;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
-import javax.naming.OperationNotSupportedException;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
@@ -142,10 +139,9 @@ public class FlowConfigResourceLocalHandler implements
FlowConfigsResourceHandle
}
FlowSpec flowSpec = createFlowSpecForConfig(flowConfig);
- // Existence of a flow spec in the flow catalog implies that the flow is
currently running.
- // If the new flow spec has a schedule we should allow submission of the
new flow to accept the new schedule.
- // However, if the new flow spec does not have a schedule, we should allow
submission only if it is not running.
- if (!flowConfig.hasSchedule() &&
this.flowCatalog.exists(flowSpec.getUri())) {
+ // Return conflict and take no action if flowSpec has already been created
+ if (this.flowCatalog.exists(flowSpec.getUri())) {
+ log.warn("Flowspec with URI {} already exists, no action will be taken");
return new CreateResponse(new ComplexResourceKey<>(flowConfig.getId(),
new EmptyRecord()), HttpStatus.S_409_CONFLICT);
} else {
this.flowCatalog.put(flowSpec, triggerListener);
diff --git
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java
index c4a3e45..edbcf8b 100644
---
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java
+++
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java
@@ -52,7 +52,6 @@ public class FlowConfigV2ResourceLocalHandler extends
FlowConfigResourceLocalHan
}
log.info(createLog);
FlowSpec flowSpec = createFlowSpecForConfig(flowConfig);
- Map<String, AddSpecResponse> responseMap = this.flowCatalog.put(flowSpec,
triggerListener);
FlowStatusId flowStatusId = new FlowStatusId()
.setFlowName(flowSpec.getConfigAsProperties().getProperty(ConfigurationKeys.FLOW_NAME_KEY))
.setFlowGroup(flowSpec.getConfigAsProperties().getProperty(ConfigurationKeys.FLOW_GROUP_KEY));
@@ -61,6 +60,14 @@ public class FlowConfigV2ResourceLocalHandler extends
FlowConfigResourceLocalHan
} else {
flowStatusId.setFlowExecutionId(-1L);
}
+
+ // Return conflict and take no action if flowSpec has already been created
+ if (this.flowCatalog.exists(flowSpec.getUri())) {
+ log.warn("Flowspec with URI {} already exists, no action will be taken");
+ return new CreateKVResponse(new ComplexResourceKey<>(flowConfig.getId(),
flowStatusId), flowConfig, HttpStatus.S_409_CONFLICT);
+ }
+
+ Map<String, AddSpecResponse> responseMap = this.flowCatalog.put(flowSpec,
triggerListener);
HttpStatus httpStatus = HttpStatus.S_201_CREATED;
if (flowConfig.hasExplain() && flowConfig.isExplain()) {
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceHATest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceHATest.java
index 5ae0b37..2752add 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceHATest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceHATest.java
@@ -317,17 +317,23 @@ public class GobblinServiceHATest {
.setProperties(new StringMap(flowProperties));
// Try create on both nodes
+ RestLiResponseException exception1 = null;
try {
this.node1FlowConfigClient.createFlowConfig(flowConfig1);
} catch (RestLiResponseException e) {
- Assert.fail("Create Again should pass without complaining that the spec
already exists.");
+ exception1 = e;
}
+ Assert.assertNotNull(exception1);
+ Assert.assertEquals(exception1.getStatus(),
com.linkedin.restli.common.HttpStatus.S_409_CONFLICT.getCode());
+ RestLiResponseException exception2 = null;
try {
this.node2FlowConfigClient.createFlowConfig(flowConfig2);
} catch (RestLiResponseException e) {
- Assert.fail("Create Again should pass without complaining that the spec
already exists.");
+ exception2 = e;
}
+ Assert.assertNotNull(exception2);
+ Assert.assertEquals(exception2.getStatus(),
com.linkedin.restli.common.HttpStatus.S_409_CONFLICT.getCode());
logger.info("+++++++++++++++++++ testCreateAgain END");
}
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceManagerTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceManagerTest.java
index 1d1736c..43017ed 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceManagerTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceManagerTest.java
@@ -207,11 +207,14 @@ public class GobblinServiceManagerTest {
.setTemplateUris(TEST_TEMPLATE_URI).setSchedule(new
Schedule().setCronSchedule(TEST_SCHEDULE))
.setProperties(new StringMap(flowProperties));
+ RestLiResponseException exception = null;
try {
this.flowConfigClient.createFlowConfig(flowConfig);
} catch (RestLiResponseException e) {
- Assert.fail("Create Again should pass without complaining that the spec
already exists.");
+ exception = e;
}
+ Assert.assertNotNull(exception);
+ Assert.assertEquals(exception.getStatus(),
com.linkedin.restli.common.HttpStatus.S_409_CONFLICT.getCode());
}
@Test (dependsOnMethods = "testCreateAgain")
@@ -223,7 +226,7 @@ public class GobblinServiceManagerTest {
Assert.assertEquals(flowConfig.getId().getFlowName(), TEST_FLOW_NAME);
Assert.assertEquals(flowConfig.getSchedule().getCronSchedule(),
TEST_SCHEDULE);
Assert.assertEquals(flowConfig.getTemplateUris(), TEST_TEMPLATE_URI);
- Assert.assertFalse(flowConfig.getSchedule().isRunImmediately());
+ Assert.assertTrue(flowConfig.getSchedule().isRunImmediately());
// Add this assert back when getFlowSpec() is changed to return the raw
flow spec
//Assert.assertEquals(flowConfig.getProperties().size(), 1);
Assert.assertEquals(flowConfig.getProperties().get("param1"), "value1");