This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 09f5043 [GOBBLIN-1050] Verify requester when updating/deleting
FlowConfig
09f5043 is described below
commit 09f5043fbd3a90aa8f3b06a4b7d3bb2f94501db4
Author: Jack Moseley <[email protected]>
AuthorDate: Fri Feb 28 11:22:08 2020 -0800
[GOBBLIN-1050] Verify requester when updating/deleting FlowConfig
Closes #2890 from jack-moseley/check-requester
---
.../org/apache/gobblin/service/FlowConfigTest.java | 5 ++-
.../apache/gobblin/service/FlowConfigV2Test.java | 46 +++++++++++++++++++---
.../gobblin/service/FlowConfigsResource.java | 30 +++++++++++++-
.../gobblin/service/FlowConfigsV2Resource.java | 3 ++
.../service/modules/core/GobblinServiceHATest.java | 4 +-
.../modules/core/GobblinServiceManagerTest.java | 2 +-
6 files changed, 80 insertions(+), 10 deletions(-)
diff --git
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigTest.java
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigTest.java
index b20d944..e014c83 100644
---
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigTest.java
+++
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigTest.java
@@ -309,8 +309,11 @@ public class FlowConfigTest {
try {
_client.updateFlowConfig(flowConfig);
} catch (RestLiResponseException e) {
- Assert.fail("Bad update should pass without complaining that the spec
does not exists.");
+ Assert.assertEquals(e.getStatus(), HttpStatus.S_404_NOT_FOUND.getCode());
+ return;
}
+
+ Assert.fail("Update should have raised a 404 error");
}
@AfterClass(alwaysRun = true)
diff --git
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigV2Test.java
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigV2Test.java
index 83a200e..eea986e 100644
---
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigV2Test.java
+++
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigV2Test.java
@@ -18,6 +18,7 @@
package org.apache.gobblin.service;
import java.io.File;
+import java.util.List;
import java.util.Map;
import org.apache.commons.io.FileUtils;
@@ -45,6 +46,8 @@ import com.linkedin.restli.server.util.PatchApplier;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
+import lombok.Setter;
+
import org.apache.gobblin.config.ConfigBuilder;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.restli.EmbeddedRestliServer;
@@ -57,11 +60,13 @@ public class FlowConfigV2Test {
private FlowConfigV2Client _client;
private EmbeddedRestliServer _server;
private File _testDirectory;
+ private TestRequesterService _requesterService;
private static final String TEST_SPEC_STORE_DIR = "/tmp/flowConfigV2Test/";
private static final String TEST_GROUP_NAME = "testGroup1";
private static final String TEST_FLOW_NAME = "testFlow1";
private static final String TEST_FLOW_NAME_2 = "testFlow2";
+ private static final String TEST_FLOW_NAME_3 = "testFlow3";
private static final String TEST_SCHEDULE = "0 1/0 * ? * *";
private static final String TEST_TEMPLATE_URI =
"FS:///templates/test.template";
@@ -82,6 +87,8 @@ public class FlowConfigV2Test {
flowCatalog.startAsync();
flowCatalog.awaitRunning();
+ _requesterService = new TestRequesterService(ConfigFactory.empty());
+
Injector injector = Guice.createInjector(new Module() {
@Override
public void configure(Binder binder) {
@@ -89,8 +96,7 @@ public class FlowConfigV2Test {
// indicate that we are in unit testing since the resource is being
blocked until flow catalog changes have
// been made
binder.bindConstant().annotatedWith(Names.named(FlowConfigsV2Resource.INJECT_READY_TO_USE)).to(Boolean.TRUE);
-
binder.bind(RequesterService.class).annotatedWith(Names.named(FlowConfigsV2Resource.INJECT_REQUESTER_SERVICE)).toInstance(new
NoopRequesterService(
- ConfigFactory.empty()));
+
binder.bind(RequesterService.class).annotatedWith(Names.named(FlowConfigsV2Resource.INJECT_REQUESTER_SERVICE)).toInstance(_requesterService);
}
});
@@ -131,19 +137,19 @@ public class FlowConfigV2Test {
@Test
public void testPartialUpdate() throws Exception {
- FlowId flowId = new
FlowId().setFlowGroup(TEST_GROUP_NAME).setFlowName(TEST_FLOW_NAME);
+ FlowId flowId = new
FlowId().setFlowGroup(TEST_GROUP_NAME).setFlowName(TEST_FLOW_NAME_3);
Map<String, String> flowProperties = Maps.newHashMap();
flowProperties.put("param1", "value1");
flowProperties.put("param2", "value2");
flowProperties.put("param3", "value3");
- FlowConfig flowConfig = new FlowConfig().setId(new
FlowId().setFlowGroup(TEST_GROUP_NAME).setFlowName(TEST_FLOW_NAME))
+ FlowConfig flowConfig = new FlowConfig().setId(new
FlowId().setFlowGroup(TEST_GROUP_NAME).setFlowName(TEST_FLOW_NAME_3))
.setTemplateUris(TEST_TEMPLATE_URI).setSchedule(new
Schedule().setCronSchedule(TEST_SCHEDULE).setRunImmediately(false))
.setProperties(new StringMap(flowProperties));
// Set some initial config
- _client.updateFlowConfig(flowConfig);
+ _client.createFlowConfig(flowConfig);
// Change param2 to value4, delete param3
String patchJson = "{\"schedule\":{\"$set\":{\"runImmediately\":true}},"
@@ -176,6 +182,22 @@ public class FlowConfigV2Test {
_client.partialUpdateFlowConfig(flowId, flowConfigPatch);
}
+ @Test (expectedExceptions = RestLiResponseException.class)
+ public void testDisallowedRequester() throws Exception {
+ ServiceRequester testRequester = new ServiceRequester("testName",
"testType", "testFrom");
+ _requesterService.setRequester(testRequester);
+
+ Map<String, String> flowProperties = Maps.newHashMap();
+ flowProperties.put("param1", "value1");
+
+ FlowConfig flowConfig = new FlowConfig().setId(new
FlowId().setFlowGroup(TEST_GROUP_NAME).setFlowName(TEST_FLOW_NAME))
+ .setTemplateUris(TEST_TEMPLATE_URI).setProperties(new
StringMap(flowProperties));
+ _client.createFlowConfig(flowConfig);
+
+ testRequester.setName("testName2");
+ _client.deleteFlowConfig(new
FlowId().setFlowGroup(TEST_GROUP_NAME).setFlowName(TEST_FLOW_NAME));
+ }
+
@AfterClass(alwaysRun = true)
public void tearDown() throws Exception {
if (_client != null) {
@@ -188,4 +210,18 @@ public class FlowConfigV2Test {
_testDirectory.delete();
cleanUpDir(TEST_SPEC_STORE_DIR);
}
+
+ public class TestRequesterService extends RequesterService {
+ @Setter
+ private ServiceRequester requester;
+
+ public TestRequesterService(Config config) {
+ super(config);
+ }
+
+ @Override
+ public List<ServiceRequester> findRequesters(BaseResource resource) {
+ return requester == null ? Lists.newArrayList() :
Lists.newArrayList(requester);
+ }
+ }
}
diff --git
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsResource.java
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsResource.java
index d1a5208..a89eb02 100644
---
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsResource.java
+++
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsResource.java
@@ -33,7 +33,6 @@ import javax.inject.Named;
import com.linkedin.restli.common.ComplexResourceKey;
import com.linkedin.restli.common.EmptyRecord;
import com.linkedin.restli.common.HttpStatus;
-import com.linkedin.restli.common.PatchRequest;
import com.linkedin.restli.server.CreateResponse;
import com.linkedin.restli.server.UpdateResponse;
import com.linkedin.restli.server.annotations.RestLiCollection;
@@ -111,6 +110,7 @@ public class FlowConfigsResource extends
ComplexKeyResourceTemplate<FlowId, Empt
*/
@Override
public UpdateResponse update(ComplexResourceKey<FlowId, EmptyRecord> key,
FlowConfig flowConfig) {
+ checkRequester(get(key), this.requesterService.findRequesters(this));
String flowGroup = key.getKey().getFlowGroup();
String flowName = key.getKey().getFlowName();
FlowId flowId = new FlowId().setFlowGroup(flowGroup).setFlowName(flowName);
@@ -124,12 +124,40 @@ public class FlowConfigsResource extends
ComplexKeyResourceTemplate<FlowId, Empt
*/
@Override
public UpdateResponse delete(ComplexResourceKey<FlowId, EmptyRecord> key) {
+ checkRequester(get(key), this.requesterService.findRequesters(this));
String flowGroup = key.getKey().getFlowGroup();
String flowName = key.getKey().getFlowName();
FlowId flowId = new FlowId().setFlowGroup(flowGroup).setFlowName(flowName);
return this.flowConfigsResourceHandler.deleteFlowConfig(flowId,
getHeaders());
}
+ /**
+ * Check that all {@link ServiceRequester}s in this request are contained
within the original service requester list
+ * when the flow was submitted. If they are not, throw a {@link
FlowConfigLoggedException} with {@link HttpStatus#S_401_UNAUTHORIZED}.
+ * If there is a failure when deserializing the original requester list,
throw a {@link FlowConfigLoggedException} with
+ * {@link HttpStatus#S_400_BAD_REQUEST}.
+ *
+ * @param originalFlowConfig original flow config to find original requester
+ * @param requesterList list of requesters for this request
+ */
+ public static void checkRequester(FlowConfig originalFlowConfig,
List<ServiceRequester> requesterList) {
+ if (requesterList == null) {
+ return;
+ }
+
+ try {
+ String serializedOriginalRequesterList =
originalFlowConfig.getProperties().get(RequesterService.REQUESTER_LIST);
+ if (serializedOriginalRequesterList != null) {
+ List<ServiceRequester> originalRequesterList =
RequesterService.deserialize(serializedOriginalRequesterList);
+ if (!originalRequesterList.isEmpty() && (requesterList.isEmpty() ||
!originalRequesterList.containsAll(requesterList))) {
+ throw new FlowConfigLoggedException(HttpStatus.S_401_UNAUTHORIZED,
"Requester not in original requester list");
+ }
+ }
+ } catch (IOException e) {
+ throw new FlowConfigLoggedException(HttpStatus.S_400_BAD_REQUEST,
"Failed to get original requester list", e);
+ }
+ }
+
private Properties getHeaders() {
Properties headerProperties = new Properties();
for (Map.Entry<String, String> entry :
getContext().getRequestHeaders().entrySet()) {
diff --git
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsV2Resource.java
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsV2Resource.java
index 14cbaa7..8202994 100644
---
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsV2Resource.java
+++
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsV2Resource.java
@@ -115,6 +115,7 @@ public class FlowConfigsV2Resource extends
ComplexKeyResourceTemplate<FlowId, Fl
*/
@Override
public UpdateResponse update(ComplexResourceKey<FlowId, FlowStatusId> key,
FlowConfig flowConfig) {
+ FlowConfigsResource.checkRequester(get(key),
this.requesterService.findRequesters(this));
String flowGroup = key.getKey().getFlowGroup();
String flowName = key.getKey().getFlowName();
FlowId flowId = new FlowId().setFlowGroup(flowGroup).setFlowName(flowName);
@@ -129,6 +130,7 @@ public class FlowConfigsV2Resource extends
ComplexKeyResourceTemplate<FlowId, Fl
*/
@Override
public UpdateResponse update(ComplexResourceKey<FlowId, FlowStatusId> key,
PatchRequest<FlowConfig> flowConfigPatch) {
+ FlowConfigsResource.checkRequester(get(key),
this.requesterService.findRequesters(this));
String flowGroup = key.getKey().getFlowGroup();
String flowName = key.getKey().getFlowName();
FlowId flowId = new FlowId().setFlowGroup(flowGroup).setFlowName(flowName);
@@ -142,6 +144,7 @@ public class FlowConfigsV2Resource extends
ComplexKeyResourceTemplate<FlowId, Fl
*/
@Override
public UpdateResponse delete(ComplexResourceKey<FlowId, FlowStatusId> key) {
+ FlowConfigsResource.checkRequester(get(key),
this.requesterService.findRequesters(this));
String flowGroup = key.getKey().getFlowGroup();
String flowName = key.getKey().getFlowName();
FlowId flowId = new FlowId().setFlowGroup(flowGroup).setFlowName(flowName);
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceHATest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceHATest.java
index 2752add..48bac9f 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceHATest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceHATest.java
@@ -483,13 +483,13 @@ public class GobblinServiceHATest {
try {
this.node1FlowConfigClient.updateFlowConfig(flowConfig);
} catch (RestLiResponseException e) {
- Assert.fail("Bad update should pass without complaining that the spec
does not exists.");
+ Assert.assertEquals(e.getStatus(), HttpStatus.NOT_FOUND_404);
}
try {
this.node2FlowConfigClient.updateFlowConfig(flowConfig);
} catch (RestLiResponseException e) {
- Assert.fail("Bad update should pass without complaining that the spec
does not exists.");
+ Assert.assertEquals(e.getStatus(), HttpStatus.NOT_FOUND_404);
}
logger.info("+++++++++++++++++++ testBadUpdate END");
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceManagerTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceManagerTest.java
index 43017ed..7aecfb1 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceManagerTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceManagerTest.java
@@ -352,7 +352,7 @@ public class GobblinServiceManagerTest {
try {
this.flowConfigClient.updateFlowConfig(flowConfig);
} catch (RestLiResponseException e) {
- Assert.fail("Bad update should pass without complaining that the spec
does not exists.");
+ Assert.assertEquals(e.getStatus(), HttpStatus.NOT_FOUND_404);
}
cleanUpDir(FLOW_SPEC_STORE_DIR);
}