This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 7352cad  [GOBBLIN-1304] Adds group ownership service
7352cad is described below

commit 7352cad8ae3a1d9be10d3b6fb78383ccbada9b19
Author: William Lo <[email protected]>
AuthorDate: Sat Nov 7 17:17:19 2020 -0800

    [GOBBLIN-1304] Adds group ownership service
    
    Closes #3142 from Will-Lo/add-group-ownership-
    flows
---
 .../gobblin/configuration/ConfigurationKeys.java   |   1 +
 .../apache/gobblin/service/ServiceConfigKeys.java  |   3 +
 .../org/apache/gobblin/service/FlowConfig.pdl      |   5 +
 ...pache.gobblin.service.flowconfigs.snapshot.json |   5 +
 ...che.gobblin.service.flowconfigsV2.snapshot.json |   5 +
 .../apache/gobblin/service/FlowConfigV2Test.java   | 127 +++++++++++++++++++--
 .../src/test/resources/TestGroups.json             |   3 +
 .../service/FlowConfigResourceLocalHandler.java    |   4 +
 .../gobblin/service/FlowConfigsV2Resource.java     |  52 ++++++++-
 .../gobblin/service/GroupOwnershipService.java     |  43 +++++++
 .../LocalGroupOwnershipPathAlterationListener.java |  85 ++++++++++++++
 .../service/LocalGroupOwnershipService.java        |  74 ++++++++++++
 .../gobblin/service/NoopGroupOwnershipService.java |  33 ++++++
 .../org/apache/gobblin/runtime/api/FlowSpec.java   |   4 +
 .../gobblin/runtime/spec_store/MysqlSpecStore.java |   6 +-
 .../runtime/spec_store/MysqlSpecStoreTest.java     |  52 ++++++---
 .../modules/core/GobblinServiceManager.java        |  17 +++
 .../gobblin/service/GobblinServiceManagerTest.java |   8 ++
 gobblin-service/src/test/resources/TestGroups.json |   3 +
 19 files changed, 497 insertions(+), 33 deletions(-)

diff --git 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index a6237cf..004be99 100644
--- 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++ 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -140,6 +140,7 @@ public class ConfigurationKeys {
   public static final String FLOW_ALLOW_CONCURRENT_EXECUTION = 
"flow.allowConcurrentExecution";
   public static final String FLOW_EXPLAIN_KEY = "flow.explain";
   public static final String FLOW_UNSCHEDULE_KEY = "flow.unschedule";
+  public static final String FLOW_OWNING_GROUP_KEY = "flow.owningGroup";
 
   /**
    * Common topology configuration properties.
diff --git 
a/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java 
b/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
index 6aefffb..4a4c97b 100644
--- 
a/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
+++ 
b/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
@@ -129,4 +129,7 @@ public class ServiceConfigKeys {
 
   public static final String FORCE_LEADER = GOBBLIN_SERVICE_PREFIX + 
"forceLeader";
   public static final boolean DEFAULT_FORCE_LEADER = false;
+  // Group Membership authentication service
+  public static final String GROUP_OWNERSHIP_SERVICE_CLASS = 
GOBBLIN_SERVICE_PREFIX + "groupOwnershipService.class";
+  public static final String DEFAULT_GROUP_OWNERSHIP_SERVICE = 
"org.apache.gobblin.service.NoopGroupOwnershipService";
 }
diff --git 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/pegasus/org/apache/gobblin/service/FlowConfig.pdl
 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/pegasus/org/apache/gobblin/service/FlowConfig.pdl
index 8459b32..9cabd26 100644
--- 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/pegasus/org/apache/gobblin/service/FlowConfig.pdl
+++ 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/pegasus/org/apache/gobblin/service/FlowConfig.pdl
@@ -27,6 +27,11 @@ record FlowConfig {
   explain: boolean = false
 
   /**
+   * Optional string name of group that the requester belongs to for group 
ownership of flows.
+   */
+  owningGroup: optional string
+
+  /**
    * Properties for the flow. These properties are passed to the compiled 
Gobblin jobs.
    */
   properties: map[string, string]
diff --git 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowconfigs.snapshot.json
 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowconfigs.snapshot.json
index 86c9ee4..9845e8d 100644
--- 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowconfigs.snapshot.json
+++ 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowconfigs.snapshot.json
@@ -77,6 +77,11 @@
       "doc" : "Return the compiled flow as a string. If enabled, the flow is 
not added.",
       "default" : false
     }, {
+      "name" : "owningGroup",
+      "type" : "string",
+      "doc" : "Optional string name of group that the requester belongs to for 
group ownership of flows.",
+      "optional" : true
+    }, {
       "name" : "properties",
       "type" : {
         "type" : "map",
diff --git 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowconfigsV2.snapshot.json
 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowconfigsV2.snapshot.json
index 13da7a5..2442bd9 100644
--- 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowconfigsV2.snapshot.json
+++ 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowconfigsV2.snapshot.json
@@ -68,6 +68,11 @@
       "doc" : "Return the compiled flow as a string. If enabled, the flow is 
not added.",
       "default" : false
     }, {
+      "name" : "owningGroup",
+      "type" : "string",
+      "doc" : "Optional string name of group that the requester belongs to for 
group ownership of flows.",
+      "optional" : true
+    }, {
       "name" : "properties",
       "type" : {
         "type" : "map",
diff --git 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigV2Test.java
 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigV2Test.java
index 50fb947..f393b8f 100644
--- 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigV2Test.java
+++ 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigV2Test.java
@@ -18,11 +18,13 @@
 package org.apache.gobblin.service;
 
 import java.io.File;
+import java.nio.file.Path;
 import java.util.List;
 import java.util.Map;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.io.IOUtils;
+import org.mortbay.jetty.HttpStatus;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
@@ -62,12 +64,18 @@ public class FlowConfigV2Test {
   private EmbeddedRestliServer _server;
   private File _testDirectory;
   private TestRequesterService _requesterService;
+  private GroupOwnershipService groupOwnershipService;
+  private File groupConfigFile;
 
   private static final String TEST_SPEC_STORE_DIR = "/tmp/flowConfigV2Test/";
   private static final String TEST_GROUP_NAME = "testGroup1";
   private static final String TEST_FLOW_NAME = "testFlow1";
   private static final String TEST_FLOW_NAME_2 = "testFlow2";
   private static final String TEST_FLOW_NAME_3 = "testFlow3";
+  private static final String TEST_FLOW_NAME_4 = "testFlow4";
+  private static final String TEST_FLOW_NAME_5 = "testFlow5";
+  private static final String TEST_FLOW_NAME_6 = "testFlow6";
+  private static final String TEST_FLOW_NAME_7 = "testFlow7";
   private static final String TEST_SCHEDULE = "0 1/0 * ? * *";
   private static final String TEST_TEMPLATE_URI = 
"FS:///templates/test.template";
 
@@ -90,6 +98,15 @@ public class FlowConfigV2Test {
 
     _requesterService = new TestRequesterService(ConfigFactory.empty());
 
+    this.groupConfigFile = new File(_testDirectory + "/TestGroups.json");
+    String groups ="{\"testGroup\": \"testName,testName2\"}";
+    Files.write(groups.getBytes(), this.groupConfigFile);
+    Config groupServiceConfig = ConfigBuilder.create()
+        .addPrimitive(LocalGroupOwnershipService.GROUP_MEMBER_LIST, 
this.groupConfigFile.getAbsolutePath())
+        .build();
+
+    groupOwnershipService = new LocalGroupOwnershipService(groupServiceConfig);
+
     Injector injector = Guice.createInjector(new Module() {
       @Override
       public void configure(Binder binder) {
@@ -98,6 +115,7 @@ public class FlowConfigV2Test {
         // been made
         
binder.bindConstant().annotatedWith(Names.named(FlowConfigsV2Resource.INJECT_READY_TO_USE)).to(Boolean.TRUE);
         
binder.bind(RequesterService.class).annotatedWith(Names.named(FlowConfigsV2Resource.INJECT_REQUESTER_SERVICE)).toInstance(_requesterService);
+        
binder.bind(GroupOwnershipService.class).annotatedWith(Names.named(FlowConfigsV2Resource.INJECT_GROUP_OWNERSHIP_SERVICE)).toInstance(groupOwnershipService);
       }
     });
 
@@ -185,20 +203,102 @@ public class FlowConfigV2Test {
     _client.partialUpdateFlowConfig(flowId, flowConfigPatch);
   }
 
-  @Test (expectedExceptions = RestLiResponseException.class)
+  @Test
   public void testDisallowedRequester() throws Exception {
-    ServiceRequester testRequester = new ServiceRequester("testName", 
"testType", "testFrom");
-    _requesterService.setRequester(testRequester);
+    try {
+      ServiceRequester testRequester = new ServiceRequester("testName", 
"testType", "testFrom");
+      _requesterService.setRequester(testRequester);
+
+      Map<String, String> flowProperties = Maps.newHashMap();
+      flowProperties.put("param1", "value1");
+
+      FlowConfig flowConfig = new FlowConfig().setId(new 
FlowId().setFlowGroup(TEST_GROUP_NAME).setFlowName(TEST_FLOW_NAME_4))
+          .setTemplateUris(TEST_TEMPLATE_URI)
+          .setProperties(new StringMap(flowProperties));
+      _client.createFlowConfig(flowConfig);
+
+      testRequester.setName("testName2");
+      _client.deleteFlowConfig(new 
FlowId().setFlowGroup(TEST_GROUP_NAME).setFlowName(TEST_FLOW_NAME_4));
+    } catch (RestLiResponseException e) {
+      Assert.assertEquals(e.getStatus(), HttpStatus.ORDINAL_401_Unauthorized);
+    }
+  }
 
+  @Test
+  public void testGroupRequesterAllowed() throws Exception {
+    ServiceRequester testRequester = new ServiceRequester("testName", 
"USER_PRINCIPAL", "testFrom");
+    _requesterService.setRequester(testRequester);
     Map<String, String> flowProperties = Maps.newHashMap();
-    flowProperties.put("param1", "value1");
 
-    FlowConfig flowConfig = new FlowConfig().setId(new 
FlowId().setFlowGroup(TEST_GROUP_NAME).setFlowName(TEST_FLOW_NAME))
-        .setTemplateUris(TEST_TEMPLATE_URI).setProperties(new 
StringMap(flowProperties));
-    _client.createFlowConfig(flowConfig);
+    FlowConfig flowConfig = new FlowConfig().setId(new 
FlowId().setFlowGroup(TEST_GROUP_NAME).setFlowName(TEST_FLOW_NAME_5))
+        .setTemplateUris(TEST_TEMPLATE_URI)
+        .setProperties(new StringMap(flowProperties))
+        .setOwningGroup("testGroup");
+
+     _client.createFlowConfig(flowConfig);
 
     testRequester.setName("testName2");
-    _client.deleteFlowConfig(new 
FlowId().setFlowGroup(TEST_GROUP_NAME).setFlowName(TEST_FLOW_NAME));
+    _client.deleteFlowConfig(new 
FlowId().setFlowGroup(TEST_GROUP_NAME).setFlowName(TEST_FLOW_NAME_5));
+  }
+
+  @Test
+  public void testGroupRequesterRejected() throws Exception {
+    try {
+      ServiceRequester testRequester = new ServiceRequester("testName", 
"USER_PRINCIPAL", "testFrom");
+      _requesterService.setRequester(testRequester);
+      Map<String, String> flowProperties = Maps.newHashMap();
+
+      FlowConfig flowConfig = new FlowConfig().setId(new 
FlowId().setFlowGroup(TEST_GROUP_NAME).setFlowName(TEST_FLOW_NAME_6))
+          .setTemplateUris(TEST_TEMPLATE_URI)
+          .setProperties(new StringMap(flowProperties))
+          .setOwningGroup("testGroup");
+
+      _client.createFlowConfig(flowConfig);
+
+      testRequester.setName("testName3");
+      _client.deleteFlowConfig(new 
FlowId().setFlowGroup(TEST_GROUP_NAME).setFlowName(TEST_FLOW_NAME_6));
+    } catch (RestLiResponseException e) {
+      Assert.assertEquals(e.getStatus(), HttpStatus.ORDINAL_401_Unauthorized);
+    }
+  }
+
+  @Test
+  public void testLocalGroupOwnershipUpdates() throws Exception {
+    try {
+      ServiceRequester testRequester = new ServiceRequester("testName", 
"USER_PRINCIPAL", "testFrom");
+      _requesterService.setRequester(testRequester);
+      Map<String, String> flowProperties = Maps.newHashMap();
+
+      FlowConfig flowConfig = new FlowConfig().setId(new 
FlowId().setFlowGroup(TEST_GROUP_NAME).setFlowName(TEST_FLOW_NAME_7))
+          .setTemplateUris(TEST_TEMPLATE_URI)
+          .setProperties(new StringMap(flowProperties))
+          .setOwningGroup("testGroup2");
+
+      _client.createFlowConfig(flowConfig);
+
+    } catch (RestLiResponseException e) {
+      Assert.assertEquals(e.getStatus(), HttpStatus.ORDINAL_401_Unauthorized);
+    }
+
+    String filePath = this.groupConfigFile.getAbsolutePath();
+    this.groupConfigFile.delete();
+    this.groupConfigFile = new File(filePath);
+    String groups ="{\"testGroup2\": \"testName,testName3\"}";
+    Files.write(groups.getBytes(), this.groupConfigFile);
+
+    ServiceRequester testRequester = new ServiceRequester("testName", 
"USER_PRINCIPAL", "testFrom");
+    _requesterService.setRequester(testRequester);
+    Map<String, String> flowProperties = Maps.newHashMap();
+
+    FlowConfig flowConfig = new FlowConfig().setId(new 
FlowId().setFlowGroup(TEST_GROUP_NAME).setFlowName(TEST_FLOW_NAME_7))
+        .setTemplateUris(TEST_TEMPLATE_URI)
+        .setProperties(new StringMap(flowProperties))
+        .setOwningGroup("testGroup2");
+
+    // this should no longer fail as the localGroupOwnership service should 
have updated as the file changed
+    _client.createFlowConfig(flowConfig);
+    testRequester.setName("testName3");
+    _client.deleteFlowConfig(new 
FlowId().setFlowGroup(TEST_GROUP_NAME).setFlowName(TEST_FLOW_NAME_7));
   }
 
   @AfterClass(alwaysRun = true)
@@ -226,5 +326,16 @@ public class FlowConfigV2Test {
     public List<ServiceRequester> findRequesters(BaseResource resource) {
       return requester == null ? Lists.newArrayList() : 
Lists.newArrayList(requester);
     }
+
+    @Override
+    public boolean isRequesterAllowed(
+        List<ServiceRequester> originalRequesterList, List<ServiceRequester> 
currentRequesterList) {
+      for (ServiceRequester s: currentRequesterList) {
+        if (originalRequesterList.contains(s)) {
+          return true;
+        }
+      }
+      return false;
+    }
   }
 }
diff --git 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/resources/TestGroups.json
 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/resources/TestGroups.json
new file mode 100644
index 0000000..38e9ecc
--- /dev/null
+++ 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/resources/TestGroups.json
@@ -0,0 +1,3 @@
+{
+  "testGroup": "testName,testName2"
+}
\ No newline at end of file
diff --git 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java
 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java
index 2404eff..f4ef176 100644
--- 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java
+++ 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java
@@ -248,6 +248,10 @@ public class FlowConfigResourceLocalHandler implements 
FlowConfigsResourceHandle
       configBuilder.addPrimitive(ConfigurationKeys.FLOW_EXPLAIN_KEY, 
flowConfig.isExplain());
     }
 
+    if (flowConfig.hasOwningGroup()) {
+      configBuilder.addPrimitive(ConfigurationKeys.FLOW_OWNING_GROUP_KEY, 
flowConfig.getOwningGroup());
+    }
+
     Config config = configBuilder.build();
 
     Config configWithFallback;
diff --git 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsV2Resource.java
 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsV2Resource.java
index 3bbfca0..49f7faa 100644
--- 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsV2Resource.java
+++ 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsV2Resource.java
@@ -56,7 +56,7 @@ public class FlowConfigsV2Resource extends 
ComplexKeyResourceTemplate<FlowId, Fl
   public static final String FLOW_CONFIG_GENERATOR_INJECT_NAME = 
"flowConfigsV2ResourceHandler";
   public static final String INJECT_REQUESTER_SERVICE = "v2RequesterService";
   public static final String INJECT_READY_TO_USE = "v2ReadyToUse";
-
+  public static final String INJECT_GROUP_OWNERSHIP_SERVICE = 
"v2GroupOwnershipService";
   private static final Set<String> ALLOWED_METADATA = 
ImmutableSet.of("delete.state.store");
 
 
@@ -77,6 +77,10 @@ public class FlowConfigsV2Resource extends 
ComplexKeyResourceTemplate<FlowId, Fl
   @Named(INJECT_READY_TO_USE)
   private Boolean readyToUse;
 
+  @Inject
+  @Named(INJECT_GROUP_OWNERSHIP_SERVICE)
+  private GroupOwnershipService groupOwnershipService;
+
   public FlowConfigsV2Resource() {
   }
 
@@ -128,11 +132,14 @@ public class FlowConfigsV2Resource extends 
ComplexKeyResourceTemplate<FlowId, Fl
   @ReturnEntity
   @Override
   public CreateKVResponse create(FlowConfig flowConfig) {
-    List<ServiceRequester> requestorList = 
this.requesterService.findRequesters(this);
+    List<ServiceRequester> requesterList = 
this.requesterService.findRequesters(this);
     try {
-      String serialized = RequesterService.serialize(requestorList);
+      String serialized = RequesterService.serialize(requesterList);
       flowConfig.getProperties().put(RequesterService.REQUESTER_LIST, 
serialized);
       LOG.info("Rest requester list is " + serialized);
+      if (flowConfig.hasOwningGroup() && 
!this.groupOwnershipService.isMemberOfGroup(requesterList, 
flowConfig.getOwningGroup())) {
+        throw new FlowConfigLoggedException(HttpStatus.S_401_UNAUTHORIZED, 
"Requester not part of owning group specified");
+      }
     } catch (IOException e) {
       throw new FlowConfigLoggedException(HttpStatus.S_401_UNAUTHORIZED, 
"cannot get who is the requester", e);
     }
@@ -148,7 +155,7 @@ public class FlowConfigsV2Resource extends 
ComplexKeyResourceTemplate<FlowId, Fl
    */
   @Override
   public UpdateResponse update(ComplexResourceKey<FlowId, FlowStatusId> key, 
FlowConfig flowConfig) {
-    FlowConfigsResource.checkRequester(this.requesterService, get(key), 
this.requesterService.findRequesters(this));
+    checkRequester(this.requesterService, get(key), 
this.requesterService.findRequesters(this));
     String flowGroup = key.getKey().getFlowGroup();
     String flowName = key.getKey().getFlowName();
     FlowId flowId = new FlowId().setFlowGroup(flowGroup).setFlowName(flowName);
@@ -163,7 +170,7 @@ public class FlowConfigsV2Resource extends 
ComplexKeyResourceTemplate<FlowId, Fl
    */
   @Override
   public UpdateResponse update(ComplexResourceKey<FlowId, FlowStatusId> key, 
PatchRequest<FlowConfig> flowConfigPatch) {
-    FlowConfigsResource.checkRequester(this.requesterService, get(key), 
this.requesterService.findRequesters(this));
+    checkRequester(this.requesterService, get(key), 
this.requesterService.findRequesters(this));
     String flowGroup = key.getKey().getFlowGroup();
     String flowName = key.getKey().getFlowName();
     FlowId flowId = new FlowId().setFlowGroup(flowGroup).setFlowName(flowName);
@@ -177,7 +184,7 @@ public class FlowConfigsV2Resource extends 
ComplexKeyResourceTemplate<FlowId, Fl
    */
   @Override
   public UpdateResponse delete(ComplexResourceKey<FlowId, FlowStatusId> key) {
-    FlowConfigsResource.checkRequester(this.requesterService, get(key), 
this.requesterService.findRequesters(this));
+    checkRequester(this.requesterService, get(key), 
this.requesterService.findRequesters(this));
     String flowGroup = key.getKey().getFlowGroup();
     String flowName = key.getKey().getFlowName();
     FlowId flowId = new FlowId().setFlowGroup(flowGroup).setFlowName(flowName);
@@ -200,4 +207,37 @@ public class FlowConfigsV2Resource extends 
ComplexKeyResourceTemplate<FlowId, Fl
     }
     return headerProperties;
   }
+
+  /**
+   * Check that all {@link ServiceRequester}s in this request are contained 
within the original service requester list
+   * or is part of the original requester's owning group when the flow was 
submitted. If they are not, throw a {@link FlowConfigLoggedException} with 
{@link HttpStatus#S_401_UNAUTHORIZED}.
+   * If there is a failure when deserializing the original requester list, 
throw a {@link FlowConfigLoggedException} with
+   * {@link HttpStatus#S_400_BAD_REQUEST}.
+   * @param requesterService the {@link RequesterService} used to verify the 
requester
+   * @param originalFlowConfig original flow config to find original requester
+   * @param requesterList list of requesters for this request
+   */
+  public void checkRequester(
+      RequesterService requesterService, FlowConfig originalFlowConfig, 
List<ServiceRequester> requesterList) {
+    if (requesterList == null) {
+      return;
+    }
+
+    try {
+      String serializedOriginalRequesterList = 
originalFlowConfig.getProperties().get(RequesterService.REQUESTER_LIST);
+      if (serializedOriginalRequesterList != null) {
+        List<ServiceRequester> originalRequesterList = 
RequesterService.deserialize(serializedOriginalRequesterList);
+        if (!requesterService.isRequesterAllowed(originalRequesterList, 
requesterList)) {
+          // if the requester is not whitelisted or the original requester, 
reject the requester if it is not part of the owning group
+          // of the original requester
+          if (!(originalFlowConfig.hasOwningGroup() && 
this.groupOwnershipService.isMemberOfGroup(
+              requesterList, originalFlowConfig.getOwningGroup()))) {
+            throw new FlowConfigLoggedException(HttpStatus.S_401_UNAUTHORIZED, 
"Requester not allowed to make this request");
+          }
+        }
+      }
+    } catch (IOException e) {
+      throw new FlowConfigLoggedException(HttpStatus.S_400_BAD_REQUEST, 
"Failed to get original requester list", e);
+    }
+  }
 }
diff --git 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/GroupOwnershipService.java
 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/GroupOwnershipService.java
new file mode 100644
index 0000000..a84290d
--- /dev/null
+++ 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/GroupOwnershipService.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Service for handling group ownership of flows
+ */
+public abstract class GroupOwnershipService {
+
+   /**
+    * @return true if any of the serviceRequesters belong in the group
+    */
+   public abstract boolean isMemberOfGroup(List<ServiceRequester> 
serviceRequesters, String group);
+
+   /**
+    * Extracts ServiceRequester names
+    * @param requesterList
+    * @return a list of service requester names
+    */
+   protected static List<String> extractRequesterNames(List<ServiceRequester> 
requesterList) {
+      return requesterList.stream()
+          .map(requester -> requester.getName())
+          .collect(Collectors.toList());
+   }
+}
diff --git 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/LocalGroupOwnershipPathAlterationListener.java
 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/LocalGroupOwnershipPathAlterationListener.java
new file mode 100644
index 0000000..e7e54e1
--- /dev/null
+++ 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/LocalGroupOwnershipPathAlterationListener.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service;
+
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import org.apache.commons.io.IOUtils;
+import org.apache.gobblin.util.filesystem.PathAlterationListenerAdaptor;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class LocalGroupOwnershipPathAlterationListener extends 
PathAlterationListenerAdaptor {
+  private static final Logger LOG = 
LoggerFactory.getLogger(LocalGroupOwnershipPathAlterationListener.class);
+  private JsonObject groupOwnerships;
+  FileSystem fs;
+  Path groupOwnershipFilePath;
+
+  LocalGroupOwnershipPathAlterationListener(Path filePath) {
+    this.groupOwnershipFilePath = filePath;
+    try {
+      this.fs = FileSystem.get(new Configuration());
+      updateGroupOwnerships(filePath);
+    } catch (IOException e) {
+      throw new RuntimeException("Could not get local filesystem", e);
+    }
+  }
+
+  public JsonObject getGroupOwnerships() {
+    return groupOwnerships;
+  }
+
+  void updateGroupOwnerships(Path path) {
+    // only update if the group ownership file is changed
+    if (path.toUri().getPath().equals(this.groupOwnershipFilePath.toString())) 
{
+      LOG.info("Detected change in group ownership file, updating groups");
+      try (FSDataInputStream in = this.fs.open(path)) {
+        String jsonString = IOUtils.toString(in, Charset.defaultCharset());
+        JsonParser parser = new JsonParser();
+        this.groupOwnerships = parser.parse(jsonString).getAsJsonObject();
+      } catch (IOException e) {
+        throw new RuntimeException("Could not open group ownership file at " + 
path.toString(), e);
+      }
+    }
+  }
+
+  @Override
+  public void onFileCreate(Path path) {
+    updateGroupOwnerships(path);
+  }
+
+  @Override
+  public void onFileChange(Path path) {
+    updateGroupOwnerships(path);
+  }
+
+  @Override
+  public void onFileDelete(Path path) {
+    // ignore if another file in same directory is deleted
+    if (path.toUri().getPath().equals(this.groupOwnershipFilePath.toString())) 
{
+      this.groupOwnerships = new JsonObject();
+    }
+  }
+}
diff --git 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/LocalGroupOwnershipService.java
 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/LocalGroupOwnershipService.java
new file mode 100644
index 0000000..abe3bf3
--- /dev/null
+++ 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/LocalGroupOwnershipService.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.service;
+
+import com.google.common.base.Splitter;
+import com.google.gson.JsonObject;
+import com.typesafe.config.Config;
+import java.io.IOException;
+import java.util.List;
+import org.apache.gobblin.annotation.Alias;
+import org.apache.gobblin.util.filesystem.PathAlterationObserver;
+import org.apache.hadoop.fs.Path;
+
+
+/**
+ * Reads and updates from a JSON where keys denote group names
+ * and values denote a list of group members
+ */
+@Alias("local")
+public class LocalGroupOwnershipService extends GroupOwnershipService {
+  public static final String GROUP_MEMBER_LIST = 
"groupOwnershipService.groupMembers.path";
+  LocalGroupOwnershipPathAlterationListener listener;
+  PathAlterationObserver observer;
+
+  public LocalGroupOwnershipService(Config config) {
+    Path groupOwnershipFilePath = new 
Path(config.getString(GROUP_MEMBER_LIST));
+    try {
+      observer = new 
PathAlterationObserver(groupOwnershipFilePath.getParent());
+      this.listener = new 
LocalGroupOwnershipPathAlterationListener(groupOwnershipFilePath);
+      observer.addListener(this.listener);
+    } catch (IOException e) {
+      throw new RuntimeException("Could not get initialize 
PathAlterationObserver at %" + groupOwnershipFilePath.toString(), e);
+    }
+  }
+
+
+  @Override
+  public boolean isMemberOfGroup(List<ServiceRequester> serviceRequesters, 
String group) {
+    // ensure that the group ownership file is up to date
+    try {
+      this.observer.checkAndNotify();
+    } catch (IOException e) {
+      throw new RuntimeException("Group Ownership observer could not check for 
file changes", e);
+    }
+
+    JsonObject groupOwnerships = this.listener.getGroupOwnerships();
+    if (groupOwnerships.has(group)) {
+      List<String> groupMembers = 
Splitter.on(',').trimResults().omitEmptyStrings().splitToList(
+          groupOwnerships.get(group).getAsString());
+      for (ServiceRequester requester: serviceRequesters) {
+        if (groupMembers.contains(requester.getName())) {
+          return true;
+        }
+      }
+    }
+    return false;
+  }
+
+
+}
diff --git 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/NoopGroupOwnershipService.java
 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/NoopGroupOwnershipService.java
new file mode 100644
index 0000000..639c6a1
--- /dev/null
+++ 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/NoopGroupOwnershipService.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.service;
+
+import com.typesafe.config.Config;
+import java.util.List;
+import org.apache.gobblin.annotation.Alias;
+
+
+@Alias("noop")
+public class NoopGroupOwnershipService extends GroupOwnershipService{
+
+   public NoopGroupOwnershipService(Config config) {
+   }
+
+   public boolean isMemberOfGroup(List<ServiceRequester> serviceRequesters, 
String group) {
+      return true;
+   }
+}
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpec.java 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpec.java
index a7df2e0..a3d2fff 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpec.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpec.java
@@ -451,6 +451,10 @@ public class FlowSpec implements Configurable, Spec {
         flowConfig.setSchedule(schedule);
       }
 
+      if (flowProps.containsKey(ConfigurationKeys.FLOW_OWNING_GROUP_KEY)) {
+        
flowConfig.setOwningGroup(flowProps.getProperty(ConfigurationKeys.FLOW_OWNING_GROUP_KEY));
+      }
+
       // remove keys that were injected as part of flowSpec creation
       flowProps.remove(ConfigurationKeys.JOB_SCHEDULE_KEY);
       flowProps.remove(ConfigurationKeys.JOB_TEMPLATE_PATH);
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStore.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStore.java
index a3b5848..ab644af 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStore.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStore.java
@@ -83,8 +83,8 @@ public class MysqlSpecStore extends InstrumentedSpecStore {
           + "spec LONGBLOB, " + NEW_COLUMN + " JSON, PRIMARY KEY (spec_uri))";
   private static final String EXISTS_STATEMENT = "SELECT EXISTS(SELECT * FROM 
%s WHERE spec_uri = ?)";
   protected static final String INSERT_STATEMENT = "INSERT INTO %s (spec_uri, 
flow_group, flow_name, template_uri, "
-      + "user_to_proxy, source_identifier, destination_identifier, schedule, 
tag, isRunImmediately, spec, " + NEW_COLUMN + ") "
-      + "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON DUPLICATE KEY UPDATE 
spec = VALUES(spec), " + NEW_COLUMN + " = VALUES(" + NEW_COLUMN + ")";
+      + "user_to_proxy, source_identifier, destination_identifier, schedule, 
tag, isRunImmediately, owning_group, spec, " + NEW_COLUMN + ") "
+      + "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON DUPLICATE KEY 
UPDATE spec = VALUES(spec), " + NEW_COLUMN + " = VALUES(" + NEW_COLUMN + ")";
   private static final String DELETE_STATEMENT = "DELETE FROM %s WHERE 
spec_uri = ?";
   private static final String GET_STATEMENT = "SELECT spec_uri, spec, " + 
NEW_COLUMN + " FROM %s WHERE ";
   private static final String GET_ALL_STATEMENT = "SELECT spec_uri, spec, " + 
NEW_COLUMN + " FROM %s";
@@ -409,6 +409,7 @@ public class MysqlSpecStore extends InstrumentedSpecStore {
     String sourceIdentifier = flowConfig.getString(FLOW_SOURCE_IDENTIFIER_KEY);
     String destinationIdentifier = 
flowConfig.getString(FLOW_DESTINATION_IDENTIFIER_KEY);
     String schedule = ConfigUtils.getString(flowConfig, 
ConfigurationKeys.JOB_SCHEDULE_KEY, null);
+    String owningGroup = ConfigUtils.getString(flowConfig, 
ConfigurationKeys.FLOW_OWNING_GROUP_KEY, null);
     boolean isRunImmediately = ConfigUtils.getBoolean(flowConfig, 
ConfigurationKeys.FLOW_RUN_IMMEDIATELY, false);
 
     int i = 0;
@@ -422,6 +423,7 @@ public class MysqlSpecStore extends InstrumentedSpecStore {
     statement.setString(++i, schedule);
     statement.setString(++i, tagValue);
     statement.setBoolean(++i, isRunImmediately);
+    statement.setString(++i, owningGroup);
     statement.setBlob(++i, new 
ByteArrayInputStream(this.specSerDe.serialize(flowSpec)));
     statement.setString(++i, new String(this.specSerDe.serialize(flowSpec), 
Charsets.UTF_8));
   }
diff --git 
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStoreTest.java
 
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStoreTest.java
index b6d720b..63b9141 100644
--- 
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStoreTest.java
+++ 
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStoreTest.java
@@ -62,7 +62,8 @@ public class MysqlSpecStoreTest {
   private final URI uri1 = FlowSpec.Utils.createFlowSpecUri(new 
FlowId().setFlowName("fg1").setFlowGroup("fn1"));
   private final URI uri2 = FlowSpec.Utils.createFlowSpecUri(new 
FlowId().setFlowName("fg2").setFlowGroup("fn2"));
   private final URI uri3 = FlowSpec.Utils.createFlowSpecUri(new 
FlowId().setFlowName("fg3").setFlowGroup("fn3"));
-  private FlowSpec flowSpec1, flowSpec2, flowSpec3;
+  private final URI uri4 = FlowSpec.Utils.createFlowSpecUri(new 
FlowId().setFlowName("fg4").setFlowGroup("fn4"));
+  private FlowSpec flowSpec1, flowSpec2, flowSpec3, flowSpec4;
 
   public MysqlSpecStoreTest()
       throws URISyntaxException {
@@ -113,6 +114,17 @@ public class MysqlSpecStoreTest {
         .withDescription("Test flow spec 3")
         .withVersion("Test version 3")
         .build();
+
+    flowSpec4 = FlowSpec.builder(this.uri4)
+        .withConfig(ConfigBuilder.create().addPrimitive("key4", "value4")
+            .addPrimitive(FLOW_SOURCE_IDENTIFIER_KEY, "source")
+            .addPrimitive(FLOW_DESTINATION_IDENTIFIER_KEY, "destination")
+            .addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "fg4")
+            .addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "fn4")
+            .addPrimitive(ConfigurationKeys.FLOW_OWNING_GROUP_KEY, 
"owningGroup4").build())
+        .withDescription("Test flow spec 4")
+        .withVersion("Test version 4")
+        .build();
   }
 
   @Test(expectedExceptions = IOException.class)
@@ -126,9 +138,10 @@ public class MysqlSpecStoreTest {
   public void testAddSpec() throws Exception {
     this.specStore.addSpec(this.flowSpec1);
     this.specStore.addSpec(this.flowSpec2);
-
+    this.specStore.addSpec(this.flowSpec4);
     Assert.assertTrue(this.specStore.exists(this.uri1));
     Assert.assertTrue(this.specStore.exists(this.uri2));
+    Assert.assertTrue(this.specStore.exists(this.uri4));
     Assert.assertFalse(this.specStore.exists(URI.create("dummy")));
   }
 
@@ -138,7 +151,7 @@ public class MysqlSpecStoreTest {
     Assert.assertEquals(result, this.flowSpec1);
 
     Collection<Spec> specs = this.specStore.getSpecs();
-    Assert.assertEquals(specs.size(), 2);
+    Assert.assertEquals(specs.size(), 3);
     Assert.assertTrue(specs.contains(this.flowSpec1));
     Assert.assertTrue(specs.contains(this.flowSpec2));
 
@@ -180,24 +193,17 @@ public class MysqlSpecStoreTest {
     specs = this.specStore.getSpecs(flowSpecSearchObject);
     Assert.assertEquals(specs.size(), 1);
     Assert.assertTrue(specs.contains(this.flowSpec1));
+
+    flowSpecSearchObject = 
FlowSpecSearchObject.builder().owningGroup("owningGroup4").build();
+    specs = this.specStore.getSpecs(flowSpecSearchObject);
+    Assert.assertEquals(specs.size(), 1);
+    Assert.assertTrue(specs.contains(this.flowSpec4));
   }
 
   @Test  (dependsOnMethods = "testGetSpec")
   public void testGetSpecWithTag() throws Exception {
 
     //Creating and inserting flowspecs with tags
-    URI uri4 = URI.create("flowspec4");
-    FlowSpec flowSpec4 = FlowSpec.builder(uri4)
-        .withConfig(ConfigBuilder.create()
-            .addPrimitive(FLOW_SOURCE_IDENTIFIER_KEY, "source")
-            .addPrimitive(FLOW_DESTINATION_IDENTIFIER_KEY, "destination")
-            .addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "fg4")
-            .addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "fn4")
-            .addPrimitive("key4", "value4").build())
-        .withDescription("Test flow spec 4")
-        .withVersion("Test version 4")
-        .build();
-
     URI uri5 = URI.create("flowspec5");
     FlowSpec flowSpec5 = FlowSpec.builder(uri5)
         .withConfig(ConfigBuilder.create()
@@ -210,11 +216,23 @@ public class MysqlSpecStoreTest {
         .withVersion("Test version 5")
         .build();
 
-    this.specStore.addSpec(flowSpec4, "dr");
+    URI uri6 = URI.create("flowspec6");
+    FlowSpec flowSpec6 = FlowSpec.builder(uri6)
+        .withConfig(ConfigBuilder.create()
+            .addPrimitive(FLOW_SOURCE_IDENTIFIER_KEY, "source")
+            .addPrimitive(FLOW_DESTINATION_IDENTIFIER_KEY, "destination")
+            .addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "fg6")
+            .addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "fn6")
+            .addPrimitive("key6", "value6").build())
+        .withDescription("Test flow spec 6")
+        .withVersion("Test version 6")
+        .build();
+
     this.specStore.addSpec(flowSpec5, "dr");
+    this.specStore.addSpec(flowSpec6, "dr");
 
-    Assert.assertTrue(this.specStore.exists(uri4));
     Assert.assertTrue(this.specStore.exists(uri5));
+    Assert.assertTrue(this.specStore.exists(uri6));
     List<URI> result = new ArrayList<>();
     this.specStore.getSpecURIsWithTag("dr").forEachRemaining(result::add);
     Assert.assertEquals(result.size(), 2);
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
index 0643b6a..a9518d1 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
@@ -31,6 +31,8 @@ import org.apache.commons.cli.HelpFormatter;
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.ParseException;
 import org.apache.commons.lang3.reflect.ConstructorUtils;
+import org.apache.gobblin.service.GroupOwnershipService;
+import org.apache.gobblin.service.NoopGroupOwnershipService;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -155,6 +157,8 @@ public class GobblinServiceManager implements 
ApplicationLauncher, StandardMetri
   protected GobblinServiceFlowExecutionResourceHandler 
flowExecutionResourceHandler;
   @Getter
   protected FlowStatusGenerator flowStatusGenerator;
+  @Getter
+  protected GroupOwnershipService groupOwnershipService;
 
   protected boolean flowCatalogLocalCommit;
   @Getter
@@ -304,6 +308,16 @@ public class GobblinServiceManager implements 
ApplicationLauncher, StandardMetri
     this.isRestLIServerEnabled = ConfigUtils.getBoolean(config,
         ServiceConfigKeys.GOBBLIN_SERVICE_RESTLI_SERVER_ENABLED_KEY, true);
 
+    ClassAliasResolver<GroupOwnershipService> groupOwnershipAliasResolver = 
new ClassAliasResolver<>(GroupOwnershipService.class);
+    String groupOwnershipServiceClass = 
ServiceConfigKeys.DEFAULT_GROUP_OWNERSHIP_SERVICE;
+    LOGGER.info("I am here " + groupOwnershipServiceClass);
+    if (config.hasPath(ServiceConfigKeys.GROUP_OWNERSHIP_SERVICE_CLASS)) {
+      groupOwnershipServiceClass = 
config.getString(ServiceConfigKeys.GROUP_OWNERSHIP_SERVICE_CLASS);
+      LOGGER.info("Initializing with group ownership service " + 
groupOwnershipServiceClass);
+    }
+     this.groupOwnershipService = 
GobblinConstructorUtils.invokeConstructor(GroupOwnershipService.class,
+          groupOwnershipAliasResolver.resolve(groupOwnershipServiceClass), 
config);
+
     if (isRestLIServerEnabled) {
       Injector injector = Guice.createInjector(new Module() {
         @Override
@@ -329,6 +343,9 @@ public class GobblinServiceManager implements 
ApplicationLauncher, StandardMetri
           binder.bind(RequesterService.class)
               
.annotatedWith(Names.named(FlowConfigsV2Resource.INJECT_REQUESTER_SERVICE))
               .toInstance(new NoopRequesterService(config));
+          binder.bind(GroupOwnershipService.class)
+              
.annotatedWith(Names.named(FlowConfigsV2Resource.INJECT_GROUP_OWNERSHIP_SERVICE))
+              .toInstance(GobblinServiceManager.this.groupOwnershipService);
         }
       });
       this.restliServer = EmbeddedRestliServer.builder()
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java
index d3cb898..f753e8c 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java
@@ -81,6 +81,7 @@ public class GobblinServiceManagerTest {
   private static final String GIT_REMOTE_REPO_DIR = "/tmp/serviceCore/remote";
   private static final String GIT_LOCAL_REPO_DIR = "/tmp/serviceCore/local";
   private static final String JOB_STATUS_STATE_STORE_DIR = 
"/tmp/serviceCore/fsJobStatusRetriever";
+  private static final String GROUP_OWNERSHIP_CONFIG_DIR = 
Files.createTempDir().getAbsolutePath();
 
   private static final String TEST_GROUP_NAME = "testGroup";
   private static final String TEST_FLOW_NAME = "testFlow";
@@ -202,6 +203,13 @@ public class GobblinServiceManagerTest {
     } catch (Exception e) {
       logger.warn("Could not completely cleanup Spec Store Parent Dir");
     }
+
+    try {
+      cleanUpDir(GROUP_OWNERSHIP_CONFIG_DIR);
+    } catch (Exception e) {
+      logger.warn("Could not completely cleanup Group Ownership Parent Dir");
+    }
+
     try {
       this.testingServer.close();
     } catch(Exception e) {
diff --git a/gobblin-service/src/test/resources/TestGroups.json 
b/gobblin-service/src/test/resources/TestGroups.json
new file mode 100644
index 0000000..38e9ecc
--- /dev/null
+++ b/gobblin-service/src/test/resources/TestGroups.json
@@ -0,0 +1,3 @@
+{
+  "testGroup": "testName,testName2"
+}
\ No newline at end of file

Reply via email to