This is an automated email from the ASF dual-hosted git repository.
kuyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 4b8df7a [GOBBLIN-1109] partial rollback of PR#2836
4b8df7a is described below
commit 4b8df7af59efb2ac5096bd3db6b0cdf5c3b83d1a
Author: Arjun <[email protected]>
AuthorDate: Mon Apr 6 12:52:55 2020 -0700
[GOBBLIN-1109] partial rollback of PR#2836
Dear Gobblin maintainers,
Please accept this PR. I understand that it will
not be reviewed until I have checked off all the
steps below! jack-moseley please review
### JIRA
- [x] My PR addresses the following [Gobblin JIRA]
(https://issues.apache.org/jira/browse/GOBBLIN/)
issues and references them in the PR title. For
example, "[GOBBLIN-XXX] My Gobblin PR"
-
https://issues.apache.org/jira/browse/GOBBLIN-1109
### Description
- [x] Here are some details about my PR, including
screenshots (if applicable):
partial rollback of PR#2836, because we want to
keep the behavior of flowConfigV1 API unchanged
### Tests
- [x] My PR adds the following unit tests __OR__
does not need testing for this extremely good
reason:
NA
### Commits
- [x] My commits all reference JIRA issues in
their subject lines, and I have squashed multiple
commits if they address the same issue. In
addition, my commits follow the guidelines from
"[How to write a good git commit
message](http://chris.beams.io/posts/git-
commit/)":
1. Subject is separated from body by a blank line
2. Subject is limited to 50 characters
3. Subject does not end with a period
4. Subject uses the imperative mood ("add", not
"adding")
5. Body wraps at 72 characters
6. Body explains "what" and "why", not "how"
Closes #2949 from arjun4084346/rollbackV1Changes
---
.../org/apache/gobblin/service/GobblinServiceManagerTest.java | 7 ++-----
.../test/java/org/apache/gobblin/service/FlowConfigTest.java | 10 +++-------
.../apache/gobblin/service/FlowConfigResourceLocalHandler.java | 7 ++++---
.../gobblin/service/modules/core/GobblinServiceHATest.java | 10 ++--------
4 files changed, 11 insertions(+), 23 deletions(-)
diff --git
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java
index 125c2bf..ded0d5c 100644
---
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java
+++
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java
@@ -207,14 +207,11 @@ public class GobblinServiceManagerTest {
.setTemplateUris(TEST_TEMPLATE_URI).setSchedule(new
Schedule().setCronSchedule(TEST_SCHEDULE))
.setProperties(new StringMap(flowProperties));
- RestLiResponseException exception = null;
try {
this.flowConfigClient.createFlowConfig(flowConfig);
} catch (RestLiResponseException e) {
- exception = e;
+ Assert.fail("Create Again should pass without complaining that the spec
already exists.");
}
- Assert.assertNotNull(exception);
- Assert.assertEquals(exception.getStatus(),
com.linkedin.restli.common.HttpStatus.S_409_CONFLICT.getCode());
}
@Test (dependsOnMethods = "testCreateAgain")
@@ -226,7 +223,7 @@ public class GobblinServiceManagerTest {
Assert.assertEquals(flowConfig.getId().getFlowName(), TEST_FLOW_NAME);
Assert.assertEquals(flowConfig.getSchedule().getCronSchedule(),
TEST_SCHEDULE);
Assert.assertEquals(flowConfig.getTemplateUris(), TEST_TEMPLATE_URI);
- Assert.assertTrue(flowConfig.getSchedule().isRunImmediately());
+ Assert.assertFalse(flowConfig.getSchedule().isRunImmediately());
// Add this assert back when getFlowSpec() is changed to return the raw
flow spec
//Assert.assertEquals(flowConfig.getProperties().size(), 1);
Assert.assertEquals(flowConfig.getProperties().get("param1"), "value1");
diff --git
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigTest.java
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigTest.java
index e014c83..f8d8ea8 100644
---
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigTest.java
+++
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigTest.java
@@ -174,15 +174,11 @@ public class FlowConfigTest {
.setTemplateUris(TEST_TEMPLATE_URI).setSchedule(new
Schedule().setCronSchedule(TEST_SCHEDULE))
.setProperties(new StringMap(flowProperties));
- RestLiResponseException exception = null;
try {
_client.createFlowConfig(flowConfig);
} catch (RestLiResponseException e) {
- exception = e;
+ Assert.fail("Create Again should pass without complaining that the spec
already exists.");
}
-
- Assert.assertNotNull(exception);
- Assert.assertEquals(exception.getStatus(),
HttpStatus.S_409_CONFLICT.getCode());
}
@Test (dependsOnMethods = "testCreateAgain")
@@ -194,8 +190,8 @@ public class FlowConfigTest {
Assert.assertEquals(flowConfig.getId().getFlowName(), TEST_FLOW_NAME);
Assert.assertEquals(flowConfig.getSchedule().getCronSchedule(),
TEST_SCHEDULE );
Assert.assertEquals(flowConfig.getTemplateUris(), TEST_TEMPLATE_URI);
- Assert.assertTrue(flowConfig.getSchedule().isRunImmediately());
- // Add this asssert back when getFlowSpec() is changed to return the raw
flow spec
+ Assert.assertFalse(flowConfig.getSchedule().isRunImmediately());
+ // Add this assert back when getFlowSpec() is changed to return the raw
flow spec
//Assert.assertEquals(flowConfig.getProperties().size(), 1);
Assert.assertEquals(flowConfig.getProperties().get("param1"), "value1");
}
diff --git
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java
index 8357037..a7e4d04 100644
---
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java
+++
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java
@@ -135,9 +135,10 @@ public class FlowConfigResourceLocalHandler implements
FlowConfigsResourceHandle
}
FlowSpec flowSpec = createFlowSpecForConfig(flowConfig);
- // Return conflict and take no action if flowSpec has already been created
- if (this.flowCatalog.exists(flowSpec.getUri())) {
- log.warn("Flowspec with URI {} already exists, no action will be taken",
flowSpec.getUri());
+ // Existence of a flow spec in the flow catalog implies that the flow is
currently running. // Return conflict and take no action if flowSpec
has already been created
+ // If the new flow spec has a schedule we should allow submission of the
new flow to accept the new schedule. if
(this.flowCatalog.exists(flowSpec.getUri())) {
+ // However, if the new flow spec does not have a schedule, we should allow
submission only if it is not running. log.warn("Flowspec with URI {}
already exists, no action will be taken");
+ if (!flowConfig.hasSchedule() &&
this.flowCatalog.exists(flowSpec.getUri())) {
return new CreateResponse(new ComplexResourceKey<>(flowConfig.getId(),
new EmptyRecord()), HttpStatus.S_409_CONFLICT);
} else {
this.flowCatalog.put(flowSpec, triggerListener);
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceHATest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceHATest.java
index 48bac9f..2132778 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceHATest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceHATest.java
@@ -317,23 +317,17 @@ public class GobblinServiceHATest {
.setProperties(new StringMap(flowProperties));
// Try create on both nodes
- RestLiResponseException exception1 = null;
try {
this.node1FlowConfigClient.createFlowConfig(flowConfig1);
} catch (RestLiResponseException e) {
- exception1 = e;
+ Assert.fail("Create Again should pass without complaining that the spec
already exists.");
}
- Assert.assertNotNull(exception1);
- Assert.assertEquals(exception1.getStatus(),
com.linkedin.restli.common.HttpStatus.S_409_CONFLICT.getCode());
- RestLiResponseException exception2 = null;
try {
this.node2FlowConfigClient.createFlowConfig(flowConfig2);
} catch (RestLiResponseException e) {
- exception2 = e;
+ Assert.fail("Create Again should pass without complaining that the spec
already exists.");
}
- Assert.assertNotNull(exception2);
- Assert.assertEquals(exception2.getStatus(),
com.linkedin.restli.common.HttpStatus.S_409_CONFLICT.getCode());
logger.info("+++++++++++++++++++ testCreateAgain END");
}