This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 09f5043  [GOBBLIN-1050] Verify requester when updating/deleting 
FlowConfig
09f5043 is described below

commit 09f5043fbd3a90aa8f3b06a4b7d3bb2f94501db4
Author: Jack Moseley <[email protected]>
AuthorDate: Fri Feb 28 11:22:08 2020 -0800

    [GOBBLIN-1050] Verify requester when updating/deleting FlowConfig
    
    Closes #2890 from jack-moseley/check-requester
---
 .../org/apache/gobblin/service/FlowConfigTest.java |  5 ++-
 .../apache/gobblin/service/FlowConfigV2Test.java   | 46 +++++++++++++++++++---
 .../gobblin/service/FlowConfigsResource.java       | 30 +++++++++++++-
 .../gobblin/service/FlowConfigsV2Resource.java     |  3 ++
 .../service/modules/core/GobblinServiceHATest.java |  4 +-
 .../modules/core/GobblinServiceManagerTest.java    |  2 +-
 6 files changed, 80 insertions(+), 10 deletions(-)

diff --git 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigTest.java
 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigTest.java
index b20d944..e014c83 100644
--- 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigTest.java
+++ 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigTest.java
@@ -309,8 +309,11 @@ public class FlowConfigTest {
     try {
       _client.updateFlowConfig(flowConfig);
     } catch (RestLiResponseException e) {
-      Assert.fail("Bad update should pass without complaining that the spec 
does not exists.");
+      Assert.assertEquals(e.getStatus(), HttpStatus.S_404_NOT_FOUND.getCode());
+      return;
     }
+
+    Assert.fail("Update should have raised a 404 error");
   }
 
   @AfterClass(alwaysRun = true)
diff --git 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigV2Test.java
 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigV2Test.java
index 83a200e..eea986e 100644
--- 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigV2Test.java
+++ 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigV2Test.java
@@ -18,6 +18,7 @@
 package org.apache.gobblin.service;
 
 import java.io.File;
+import java.util.List;
 import java.util.Map;
 
 import org.apache.commons.io.FileUtils;
@@ -45,6 +46,8 @@ import com.linkedin.restli.server.util.PatchApplier;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 
+import lombok.Setter;
+
 import org.apache.gobblin.config.ConfigBuilder;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.restli.EmbeddedRestliServer;
@@ -57,11 +60,13 @@ public class FlowConfigV2Test {
   private FlowConfigV2Client _client;
   private EmbeddedRestliServer _server;
   private File _testDirectory;
+  private TestRequesterService _requesterService;
 
   private static final String TEST_SPEC_STORE_DIR = "/tmp/flowConfigV2Test/";
   private static final String TEST_GROUP_NAME = "testGroup1";
   private static final String TEST_FLOW_NAME = "testFlow1";
   private static final String TEST_FLOW_NAME_2 = "testFlow2";
+  private static final String TEST_FLOW_NAME_3 = "testFlow3";
   private static final String TEST_SCHEDULE = "0 1/0 * ? * *";
   private static final String TEST_TEMPLATE_URI = 
"FS:///templates/test.template";
 
@@ -82,6 +87,8 @@ public class FlowConfigV2Test {
     flowCatalog.startAsync();
     flowCatalog.awaitRunning();
 
+    _requesterService = new TestRequesterService(ConfigFactory.empty());
+
     Injector injector = Guice.createInjector(new Module() {
       @Override
       public void configure(Binder binder) {
@@ -89,8 +96,7 @@ public class FlowConfigV2Test {
         // indicate that we are in unit testing since the resource is being 
blocked until flow catalog changes have
         // been made
         
binder.bindConstant().annotatedWith(Names.named(FlowConfigsV2Resource.INJECT_READY_TO_USE)).to(Boolean.TRUE);
-        
binder.bind(RequesterService.class).annotatedWith(Names.named(FlowConfigsV2Resource.INJECT_REQUESTER_SERVICE)).toInstance(new
 NoopRequesterService(
-            ConfigFactory.empty()));
+        
binder.bind(RequesterService.class).annotatedWith(Names.named(FlowConfigsV2Resource.INJECT_REQUESTER_SERVICE)).toInstance(_requesterService);
       }
     });
 
@@ -131,19 +137,19 @@ public class FlowConfigV2Test {
 
   @Test
   public void testPartialUpdate() throws Exception {
-    FlowId flowId = new 
FlowId().setFlowGroup(TEST_GROUP_NAME).setFlowName(TEST_FLOW_NAME);
+    FlowId flowId = new 
FlowId().setFlowGroup(TEST_GROUP_NAME).setFlowName(TEST_FLOW_NAME_3);
 
     Map<String, String> flowProperties = Maps.newHashMap();
     flowProperties.put("param1", "value1");
     flowProperties.put("param2", "value2");
     flowProperties.put("param3", "value3");
 
-    FlowConfig flowConfig = new FlowConfig().setId(new 
FlowId().setFlowGroup(TEST_GROUP_NAME).setFlowName(TEST_FLOW_NAME))
+    FlowConfig flowConfig = new FlowConfig().setId(new 
FlowId().setFlowGroup(TEST_GROUP_NAME).setFlowName(TEST_FLOW_NAME_3))
         .setTemplateUris(TEST_TEMPLATE_URI).setSchedule(new 
Schedule().setCronSchedule(TEST_SCHEDULE).setRunImmediately(false))
         .setProperties(new StringMap(flowProperties));
 
     // Set some initial config
-    _client.updateFlowConfig(flowConfig);
+    _client.createFlowConfig(flowConfig);
 
     // Change param2 to value4, delete param3
     String patchJson = "{\"schedule\":{\"$set\":{\"runImmediately\":true}},"
@@ -176,6 +182,22 @@ public class FlowConfigV2Test {
     _client.partialUpdateFlowConfig(flowId, flowConfigPatch);
   }
 
+  @Test (expectedExceptions = RestLiResponseException.class)
+  public void testDisallowedRequester() throws Exception {
+    ServiceRequester testRequester = new ServiceRequester("testName", 
"testType", "testFrom");
+    _requesterService.setRequester(testRequester);
+
+    Map<String, String> flowProperties = Maps.newHashMap();
+    flowProperties.put("param1", "value1");
+
+    FlowConfig flowConfig = new FlowConfig().setId(new 
FlowId().setFlowGroup(TEST_GROUP_NAME).setFlowName(TEST_FLOW_NAME))
+        .setTemplateUris(TEST_TEMPLATE_URI).setProperties(new 
StringMap(flowProperties));
+    _client.createFlowConfig(flowConfig);
+
+    testRequester.setName("testName2");
+    _client.deleteFlowConfig(new 
FlowId().setFlowGroup(TEST_GROUP_NAME).setFlowName(TEST_FLOW_NAME));
+  }
+
   @AfterClass(alwaysRun = true)
   public void tearDown() throws Exception {
     if (_client != null) {
@@ -188,4 +210,18 @@ public class FlowConfigV2Test {
     _testDirectory.delete();
     cleanUpDir(TEST_SPEC_STORE_DIR);
   }
+
+  public class TestRequesterService extends RequesterService {
+    @Setter
+    private ServiceRequester requester;
+
+    public TestRequesterService(Config config) {
+      super(config);
+    }
+
+    @Override
+    public List<ServiceRequester> findRequesters(BaseResource resource) {
+      return requester == null ? Lists.newArrayList() : 
Lists.newArrayList(requester);
+    }
+  }
 }
diff --git 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsResource.java
 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsResource.java
index d1a5208..a89eb02 100644
--- 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsResource.java
+++ 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsResource.java
@@ -33,7 +33,6 @@ import javax.inject.Named;
 import com.linkedin.restli.common.ComplexResourceKey;
 import com.linkedin.restli.common.EmptyRecord;
 import com.linkedin.restli.common.HttpStatus;
-import com.linkedin.restli.common.PatchRequest;
 import com.linkedin.restli.server.CreateResponse;
 import com.linkedin.restli.server.UpdateResponse;
 import com.linkedin.restli.server.annotations.RestLiCollection;
@@ -111,6 +110,7 @@ public class FlowConfigsResource extends 
ComplexKeyResourceTemplate<FlowId, Empt
    */
   @Override
   public UpdateResponse update(ComplexResourceKey<FlowId, EmptyRecord> key, 
FlowConfig flowConfig) {
+    checkRequester(get(key), this.requesterService.findRequesters(this));
     String flowGroup = key.getKey().getFlowGroup();
     String flowName = key.getKey().getFlowName();
     FlowId flowId = new FlowId().setFlowGroup(flowGroup).setFlowName(flowName);
@@ -124,12 +124,40 @@ public class FlowConfigsResource extends 
ComplexKeyResourceTemplate<FlowId, Empt
    */
   @Override
   public UpdateResponse delete(ComplexResourceKey<FlowId, EmptyRecord> key) {
+    checkRequester(get(key), this.requesterService.findRequesters(this));
     String flowGroup = key.getKey().getFlowGroup();
     String flowName = key.getKey().getFlowName();
     FlowId flowId = new FlowId().setFlowGroup(flowGroup).setFlowName(flowName);
     return this.flowConfigsResourceHandler.deleteFlowConfig(flowId, 
getHeaders());
   }
 
+  /**
+   * Check that all {@link ServiceRequester}s in this request are contained 
within the original service requester list
+   * when the flow was submitted. If they are not, throw a {@link 
FlowConfigLoggedException} with {@link HttpStatus#S_401_UNAUTHORIZED}.
+   * If there is a failure when deserializing the original requester list, 
throw a {@link FlowConfigLoggedException} with
+   * {@link HttpStatus#S_400_BAD_REQUEST}.
+   *
+   * @param originalFlowConfig original flow config to find original requester
+   * @param requesterList list of requesters for this request
+   */
+  public static void checkRequester(FlowConfig originalFlowConfig, 
List<ServiceRequester> requesterList) {
+    if (requesterList == null) {
+      return;
+    }
+
+    try {
+      String serializedOriginalRequesterList = 
originalFlowConfig.getProperties().get(RequesterService.REQUESTER_LIST);
+      if (serializedOriginalRequesterList != null) {
+        List<ServiceRequester> originalRequesterList = 
RequesterService.deserialize(serializedOriginalRequesterList);
+        if (!originalRequesterList.isEmpty() && (requesterList.isEmpty() || 
!originalRequesterList.containsAll(requesterList))) {
+          throw new FlowConfigLoggedException(HttpStatus.S_401_UNAUTHORIZED, 
"Requester not in original requester list");
+        }
+      }
+    } catch (IOException e) {
+      throw new FlowConfigLoggedException(HttpStatus.S_400_BAD_REQUEST, 
"Failed to get original requester list", e);
+    }
+  }
+
   private Properties getHeaders() {
     Properties headerProperties = new Properties();
     for (Map.Entry<String, String> entry : 
getContext().getRequestHeaders().entrySet()) {
diff --git 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsV2Resource.java
 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsV2Resource.java
index 14cbaa7..8202994 100644
--- 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsV2Resource.java
+++ 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsV2Resource.java
@@ -115,6 +115,7 @@ public class FlowConfigsV2Resource extends 
ComplexKeyResourceTemplate<FlowId, Fl
    */
   @Override
   public UpdateResponse update(ComplexResourceKey<FlowId, FlowStatusId> key, 
FlowConfig flowConfig) {
+    FlowConfigsResource.checkRequester(get(key), 
this.requesterService.findRequesters(this));
     String flowGroup = key.getKey().getFlowGroup();
     String flowName = key.getKey().getFlowName();
     FlowId flowId = new FlowId().setFlowGroup(flowGroup).setFlowName(flowName);
@@ -129,6 +130,7 @@ public class FlowConfigsV2Resource extends 
ComplexKeyResourceTemplate<FlowId, Fl
    */
   @Override
   public UpdateResponse update(ComplexResourceKey<FlowId, FlowStatusId> key, 
PatchRequest<FlowConfig> flowConfigPatch) {
+    FlowConfigsResource.checkRequester(get(key), 
this.requesterService.findRequesters(this));
     String flowGroup = key.getKey().getFlowGroup();
     String flowName = key.getKey().getFlowName();
     FlowId flowId = new FlowId().setFlowGroup(flowGroup).setFlowName(flowName);
@@ -142,6 +144,7 @@ public class FlowConfigsV2Resource extends 
ComplexKeyResourceTemplate<FlowId, Fl
    */
   @Override
   public UpdateResponse delete(ComplexResourceKey<FlowId, FlowStatusId> key) {
+    FlowConfigsResource.checkRequester(get(key), 
this.requesterService.findRequesters(this));
     String flowGroup = key.getKey().getFlowGroup();
     String flowName = key.getKey().getFlowName();
     FlowId flowId = new FlowId().setFlowGroup(flowGroup).setFlowName(flowName);
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceHATest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceHATest.java
index 2752add..48bac9f 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceHATest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceHATest.java
@@ -483,13 +483,13 @@ public class GobblinServiceHATest {
     try {
       this.node1FlowConfigClient.updateFlowConfig(flowConfig);
     } catch (RestLiResponseException e) {
-      Assert.fail("Bad update should pass without complaining that the spec 
does not exists.");
+      Assert.assertEquals(e.getStatus(), HttpStatus.NOT_FOUND_404);
     }
 
     try {
       this.node2FlowConfigClient.updateFlowConfig(flowConfig);
     } catch (RestLiResponseException e) {
-      Assert.fail("Bad update should pass without complaining that the spec 
does not exists.");
+      Assert.assertEquals(e.getStatus(), HttpStatus.NOT_FOUND_404);
     }
 
     logger.info("+++++++++++++++++++ testBadUpdate END");
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceManagerTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceManagerTest.java
index 43017ed..7aecfb1 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceManagerTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceManagerTest.java
@@ -352,7 +352,7 @@ public class GobblinServiceManagerTest {
     try {
       this.flowConfigClient.updateFlowConfig(flowConfig);
     } catch (RestLiResponseException e) {
-      Assert.fail("Bad update should pass without complaining that the spec 
does not exists.");
+      Assert.assertEquals(e.getStatus(), HttpStatus.NOT_FOUND_404);
     }
     cleanUpDir(FLOW_SPEC_STORE_DIR);
   }

Reply via email to