This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 7352cad [GOBBLIN-1304] Adds group ownership service
7352cad is described below
commit 7352cad8ae3a1d9be10d3b6fb78383ccbada9b19
Author: William Lo <[email protected]>
AuthorDate: Sat Nov 7 17:17:19 2020 -0800
[GOBBLIN-1304] Adds group ownership service
Closes #3142 from Will-Lo/add-group-ownership-
flows
---
.../gobblin/configuration/ConfigurationKeys.java | 1 +
.../apache/gobblin/service/ServiceConfigKeys.java | 3 +
.../org/apache/gobblin/service/FlowConfig.pdl | 5 +
...pache.gobblin.service.flowconfigs.snapshot.json | 5 +
...che.gobblin.service.flowconfigsV2.snapshot.json | 5 +
.../apache/gobblin/service/FlowConfigV2Test.java | 127 +++++++++++++++++++--
.../src/test/resources/TestGroups.json | 3 +
.../service/FlowConfigResourceLocalHandler.java | 4 +
.../gobblin/service/FlowConfigsV2Resource.java | 52 ++++++++-
.../gobblin/service/GroupOwnershipService.java | 43 +++++++
.../LocalGroupOwnershipPathAlterationListener.java | 85 ++++++++++++++
.../service/LocalGroupOwnershipService.java | 74 ++++++++++++
.../gobblin/service/NoopGroupOwnershipService.java | 33 ++++++
.../org/apache/gobblin/runtime/api/FlowSpec.java | 4 +
.../gobblin/runtime/spec_store/MysqlSpecStore.java | 6 +-
.../runtime/spec_store/MysqlSpecStoreTest.java | 52 ++++++---
.../modules/core/GobblinServiceManager.java | 17 +++
.../gobblin/service/GobblinServiceManagerTest.java | 8 ++
gobblin-service/src/test/resources/TestGroups.json | 3 +
19 files changed, 497 insertions(+), 33 deletions(-)
diff --git
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index a6237cf..004be99 100644
---
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -140,6 +140,7 @@ public class ConfigurationKeys {
public static final String FLOW_ALLOW_CONCURRENT_EXECUTION =
"flow.allowConcurrentExecution";
public static final String FLOW_EXPLAIN_KEY = "flow.explain";
public static final String FLOW_UNSCHEDULE_KEY = "flow.unschedule";
+ public static final String FLOW_OWNING_GROUP_KEY = "flow.owningGroup";
/**
* Common topology configuration properties.
diff --git
a/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
b/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
index 6aefffb..4a4c97b 100644
---
a/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
+++
b/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
@@ -129,4 +129,7 @@ public class ServiceConfigKeys {
public static final String FORCE_LEADER = GOBBLIN_SERVICE_PREFIX +
"forceLeader";
public static final boolean DEFAULT_FORCE_LEADER = false;
+ // Group Membership authentication service
+ public static final String GROUP_OWNERSHIP_SERVICE_CLASS =
GOBBLIN_SERVICE_PREFIX + "groupOwnershipService.class";
+ public static final String DEFAULT_GROUP_OWNERSHIP_SERVICE =
"org.apache.gobblin.service.NoopGroupOwnershipService";
}
diff --git
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/pegasus/org/apache/gobblin/service/FlowConfig.pdl
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/pegasus/org/apache/gobblin/service/FlowConfig.pdl
index 8459b32..9cabd26 100644
---
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/pegasus/org/apache/gobblin/service/FlowConfig.pdl
+++
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/pegasus/org/apache/gobblin/service/FlowConfig.pdl
@@ -27,6 +27,11 @@ record FlowConfig {
explain: boolean = false
/**
+ * Optional string name of group that the requester belongs to for group
ownership of flows.
+ */
+ owningGroup: optional string
+
+ /**
* Properties for the flow. These properties are passed to the compiled
Gobblin jobs.
*/
properties: map[string, string]
diff --git
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowconfigs.snapshot.json
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowconfigs.snapshot.json
index 86c9ee4..9845e8d 100644
---
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowconfigs.snapshot.json
+++
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowconfigs.snapshot.json
@@ -77,6 +77,11 @@
"doc" : "Return the compiled flow as a string. If enabled, the flow is
not added.",
"default" : false
}, {
+ "name" : "owningGroup",
+ "type" : "string",
+ "doc" : "Optional string name of group that the requester belongs to for
group ownership of flows.",
+ "optional" : true
+ }, {
"name" : "properties",
"type" : {
"type" : "map",
diff --git
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowconfigsV2.snapshot.json
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowconfigsV2.snapshot.json
index 13da7a5..2442bd9 100644
---
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowconfigsV2.snapshot.json
+++
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowconfigsV2.snapshot.json
@@ -68,6 +68,11 @@
"doc" : "Return the compiled flow as a string. If enabled, the flow is
not added.",
"default" : false
}, {
+ "name" : "owningGroup",
+ "type" : "string",
+ "doc" : "Optional string name of group that the requester belongs to for
group ownership of flows.",
+ "optional" : true
+ }, {
"name" : "properties",
"type" : {
"type" : "map",
diff --git
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigV2Test.java
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigV2Test.java
index 50fb947..f393b8f 100644
---
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigV2Test.java
+++
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigV2Test.java
@@ -18,11 +18,13 @@
package org.apache.gobblin.service;
import java.io.File;
+import java.nio.file.Path;
import java.util.List;
import java.util.Map;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
+import org.mortbay.jetty.HttpStatus;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
@@ -62,12 +64,18 @@ public class FlowConfigV2Test {
private EmbeddedRestliServer _server;
private File _testDirectory;
private TestRequesterService _requesterService;
+ private GroupOwnershipService groupOwnershipService;
+ private File groupConfigFile;
private static final String TEST_SPEC_STORE_DIR = "/tmp/flowConfigV2Test/";
private static final String TEST_GROUP_NAME = "testGroup1";
private static final String TEST_FLOW_NAME = "testFlow1";
private static final String TEST_FLOW_NAME_2 = "testFlow2";
private static final String TEST_FLOW_NAME_3 = "testFlow3";
+ private static final String TEST_FLOW_NAME_4 = "testFlow4";
+ private static final String TEST_FLOW_NAME_5 = "testFlow5";
+ private static final String TEST_FLOW_NAME_6 = "testFlow6";
+ private static final String TEST_FLOW_NAME_7 = "testFlow7";
private static final String TEST_SCHEDULE = "0 1/0 * ? * *";
private static final String TEST_TEMPLATE_URI =
"FS:///templates/test.template";
@@ -90,6 +98,15 @@ public class FlowConfigV2Test {
_requesterService = new TestRequesterService(ConfigFactory.empty());
+ this.groupConfigFile = new File(_testDirectory + "/TestGroups.json");
+ String groups ="{\"testGroup\": \"testName,testName2\"}";
+ Files.write(groups.getBytes(), this.groupConfigFile);
+ Config groupServiceConfig = ConfigBuilder.create()
+ .addPrimitive(LocalGroupOwnershipService.GROUP_MEMBER_LIST,
this.groupConfigFile.getAbsolutePath())
+ .build();
+
+ groupOwnershipService = new LocalGroupOwnershipService(groupServiceConfig);
+
Injector injector = Guice.createInjector(new Module() {
@Override
public void configure(Binder binder) {
@@ -98,6 +115,7 @@ public class FlowConfigV2Test {
// been made
binder.bindConstant().annotatedWith(Names.named(FlowConfigsV2Resource.INJECT_READY_TO_USE)).to(Boolean.TRUE);
binder.bind(RequesterService.class).annotatedWith(Names.named(FlowConfigsV2Resource.INJECT_REQUESTER_SERVICE)).toInstance(_requesterService);
+
binder.bind(GroupOwnershipService.class).annotatedWith(Names.named(FlowConfigsV2Resource.INJECT_GROUP_OWNERSHIP_SERVICE)).toInstance(groupOwnershipService);
}
});
@@ -185,20 +203,102 @@ public class FlowConfigV2Test {
_client.partialUpdateFlowConfig(flowId, flowConfigPatch);
}
- @Test (expectedExceptions = RestLiResponseException.class)
+ @Test
public void testDisallowedRequester() throws Exception {
- ServiceRequester testRequester = new ServiceRequester("testName",
"testType", "testFrom");
- _requesterService.setRequester(testRequester);
+ try {
+ ServiceRequester testRequester = new ServiceRequester("testName",
"testType", "testFrom");
+ _requesterService.setRequester(testRequester);
+
+ Map<String, String> flowProperties = Maps.newHashMap();
+ flowProperties.put("param1", "value1");
+
+ FlowConfig flowConfig = new FlowConfig().setId(new
FlowId().setFlowGroup(TEST_GROUP_NAME).setFlowName(TEST_FLOW_NAME_4))
+ .setTemplateUris(TEST_TEMPLATE_URI)
+ .setProperties(new StringMap(flowProperties));
+ _client.createFlowConfig(flowConfig);
+
+ testRequester.setName("testName2");
+ _client.deleteFlowConfig(new
FlowId().setFlowGroup(TEST_GROUP_NAME).setFlowName(TEST_FLOW_NAME_4));
+ } catch (RestLiResponseException e) {
+ Assert.assertEquals(e.getStatus(), HttpStatus.ORDINAL_401_Unauthorized);
+ }
+ }
+ @Test
+ public void testGroupRequesterAllowed() throws Exception {
+ ServiceRequester testRequester = new ServiceRequester("testName",
"USER_PRINCIPAL", "testFrom");
+ _requesterService.setRequester(testRequester);
Map<String, String> flowProperties = Maps.newHashMap();
- flowProperties.put("param1", "value1");
- FlowConfig flowConfig = new FlowConfig().setId(new
FlowId().setFlowGroup(TEST_GROUP_NAME).setFlowName(TEST_FLOW_NAME))
- .setTemplateUris(TEST_TEMPLATE_URI).setProperties(new
StringMap(flowProperties));
- _client.createFlowConfig(flowConfig);
+ FlowConfig flowConfig = new FlowConfig().setId(new
FlowId().setFlowGroup(TEST_GROUP_NAME).setFlowName(TEST_FLOW_NAME_5))
+ .setTemplateUris(TEST_TEMPLATE_URI)
+ .setProperties(new StringMap(flowProperties))
+ .setOwningGroup("testGroup");
+
+ _client.createFlowConfig(flowConfig);
testRequester.setName("testName2");
- _client.deleteFlowConfig(new
FlowId().setFlowGroup(TEST_GROUP_NAME).setFlowName(TEST_FLOW_NAME));
+ _client.deleteFlowConfig(new
FlowId().setFlowGroup(TEST_GROUP_NAME).setFlowName(TEST_FLOW_NAME_5));
+ }
+
+ @Test
+ public void testGroupRequesterRejected() throws Exception {
+ try {
+ ServiceRequester testRequester = new ServiceRequester("testName",
"USER_PRINCIPAL", "testFrom");
+ _requesterService.setRequester(testRequester);
+ Map<String, String> flowProperties = Maps.newHashMap();
+
+ FlowConfig flowConfig = new FlowConfig().setId(new
FlowId().setFlowGroup(TEST_GROUP_NAME).setFlowName(TEST_FLOW_NAME_6))
+ .setTemplateUris(TEST_TEMPLATE_URI)
+ .setProperties(new StringMap(flowProperties))
+ .setOwningGroup("testGroup");
+
+ _client.createFlowConfig(flowConfig);
+
+ testRequester.setName("testName3");
+ _client.deleteFlowConfig(new
FlowId().setFlowGroup(TEST_GROUP_NAME).setFlowName(TEST_FLOW_NAME_6));
+ } catch (RestLiResponseException e) {
+ Assert.assertEquals(e.getStatus(), HttpStatus.ORDINAL_401_Unauthorized);
+ }
+ }
+
+ @Test
+ public void testLocalGroupOwnershipUpdates() throws Exception {
+ try {
+ ServiceRequester testRequester = new ServiceRequester("testName",
"USER_PRINCIPAL", "testFrom");
+ _requesterService.setRequester(testRequester);
+ Map<String, String> flowProperties = Maps.newHashMap();
+
+ FlowConfig flowConfig = new FlowConfig().setId(new
FlowId().setFlowGroup(TEST_GROUP_NAME).setFlowName(TEST_FLOW_NAME_7))
+ .setTemplateUris(TEST_TEMPLATE_URI)
+ .setProperties(new StringMap(flowProperties))
+ .setOwningGroup("testGroup2");
+
+ _client.createFlowConfig(flowConfig);
+
+ } catch (RestLiResponseException e) {
+ Assert.assertEquals(e.getStatus(), HttpStatus.ORDINAL_401_Unauthorized);
+ }
+
+ String filePath = this.groupConfigFile.getAbsolutePath();
+ this.groupConfigFile.delete();
+ this.groupConfigFile = new File(filePath);
+ String groups ="{\"testGroup2\": \"testName,testName3\"}";
+ Files.write(groups.getBytes(), this.groupConfigFile);
+
+ ServiceRequester testRequester = new ServiceRequester("testName",
"USER_PRINCIPAL", "testFrom");
+ _requesterService.setRequester(testRequester);
+ Map<String, String> flowProperties = Maps.newHashMap();
+
+ FlowConfig flowConfig = new FlowConfig().setId(new
FlowId().setFlowGroup(TEST_GROUP_NAME).setFlowName(TEST_FLOW_NAME_7))
+ .setTemplateUris(TEST_TEMPLATE_URI)
+ .setProperties(new StringMap(flowProperties))
+ .setOwningGroup("testGroup2");
+
+ // this should no longer fail as the localGroupOwnership service should
have updated as the file changed
+ _client.createFlowConfig(flowConfig);
+ testRequester.setName("testName3");
+ _client.deleteFlowConfig(new
FlowId().setFlowGroup(TEST_GROUP_NAME).setFlowName(TEST_FLOW_NAME_7));
}
@AfterClass(alwaysRun = true)
@@ -226,5 +326,16 @@ public class FlowConfigV2Test {
public List<ServiceRequester> findRequesters(BaseResource resource) {
return requester == null ? Lists.newArrayList() :
Lists.newArrayList(requester);
}
+
+ @Override
+ public boolean isRequesterAllowed(
+ List<ServiceRequester> originalRequesterList, List<ServiceRequester>
currentRequesterList) {
+ for (ServiceRequester s: currentRequesterList) {
+ if (originalRequesterList.contains(s)) {
+ return true;
+ }
+ }
+ return false;
+ }
}
}
diff --git
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/resources/TestGroups.json
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/resources/TestGroups.json
new file mode 100644
index 0000000..38e9ecc
--- /dev/null
+++
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/resources/TestGroups.json
@@ -0,0 +1,3 @@
+{
+ "testGroup": "testName,testName2"
+}
\ No newline at end of file
diff --git
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java
index 2404eff..f4ef176 100644
---
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java
+++
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java
@@ -248,6 +248,10 @@ public class FlowConfigResourceLocalHandler implements
FlowConfigsResourceHandle
configBuilder.addPrimitive(ConfigurationKeys.FLOW_EXPLAIN_KEY,
flowConfig.isExplain());
}
+ if (flowConfig.hasOwningGroup()) {
+ configBuilder.addPrimitive(ConfigurationKeys.FLOW_OWNING_GROUP_KEY,
flowConfig.getOwningGroup());
+ }
+
Config config = configBuilder.build();
Config configWithFallback;
diff --git
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsV2Resource.java
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsV2Resource.java
index 3bbfca0..49f7faa 100644
---
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsV2Resource.java
+++
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsV2Resource.java
@@ -56,7 +56,7 @@ public class FlowConfigsV2Resource extends
ComplexKeyResourceTemplate<FlowId, Fl
public static final String FLOW_CONFIG_GENERATOR_INJECT_NAME =
"flowConfigsV2ResourceHandler";
public static final String INJECT_REQUESTER_SERVICE = "v2RequesterService";
public static final String INJECT_READY_TO_USE = "v2ReadyToUse";
-
+ public static final String INJECT_GROUP_OWNERSHIP_SERVICE =
"v2GroupOwnershipService";
private static final Set<String> ALLOWED_METADATA =
ImmutableSet.of("delete.state.store");
@@ -77,6 +77,10 @@ public class FlowConfigsV2Resource extends
ComplexKeyResourceTemplate<FlowId, Fl
@Named(INJECT_READY_TO_USE)
private Boolean readyToUse;
+ @Inject
+ @Named(INJECT_GROUP_OWNERSHIP_SERVICE)
+ private GroupOwnershipService groupOwnershipService;
+
public FlowConfigsV2Resource() {
}
@@ -128,11 +132,14 @@ public class FlowConfigsV2Resource extends
ComplexKeyResourceTemplate<FlowId, Fl
@ReturnEntity
@Override
public CreateKVResponse create(FlowConfig flowConfig) {
- List<ServiceRequester> requestorList =
this.requesterService.findRequesters(this);
+ List<ServiceRequester> requesterList =
this.requesterService.findRequesters(this);
try {
- String serialized = RequesterService.serialize(requestorList);
+ String serialized = RequesterService.serialize(requesterList);
flowConfig.getProperties().put(RequesterService.REQUESTER_LIST,
serialized);
LOG.info("Rest requester list is " + serialized);
+ if (flowConfig.hasOwningGroup() &&
!this.groupOwnershipService.isMemberOfGroup(requesterList,
flowConfig.getOwningGroup())) {
+ throw new FlowConfigLoggedException(HttpStatus.S_401_UNAUTHORIZED,
"Requester not part of owning group specified");
+ }
} catch (IOException e) {
throw new FlowConfigLoggedException(HttpStatus.S_401_UNAUTHORIZED,
"cannot get who is the requester", e);
}
@@ -148,7 +155,7 @@ public class FlowConfigsV2Resource extends
ComplexKeyResourceTemplate<FlowId, Fl
*/
@Override
public UpdateResponse update(ComplexResourceKey<FlowId, FlowStatusId> key,
FlowConfig flowConfig) {
- FlowConfigsResource.checkRequester(this.requesterService, get(key),
this.requesterService.findRequesters(this));
+ checkRequester(this.requesterService, get(key),
this.requesterService.findRequesters(this));
String flowGroup = key.getKey().getFlowGroup();
String flowName = key.getKey().getFlowName();
FlowId flowId = new FlowId().setFlowGroup(flowGroup).setFlowName(flowName);
@@ -163,7 +170,7 @@ public class FlowConfigsV2Resource extends
ComplexKeyResourceTemplate<FlowId, Fl
*/
@Override
public UpdateResponse update(ComplexResourceKey<FlowId, FlowStatusId> key,
PatchRequest<FlowConfig> flowConfigPatch) {
- FlowConfigsResource.checkRequester(this.requesterService, get(key),
this.requesterService.findRequesters(this));
+ checkRequester(this.requesterService, get(key),
this.requesterService.findRequesters(this));
String flowGroup = key.getKey().getFlowGroup();
String flowName = key.getKey().getFlowName();
FlowId flowId = new FlowId().setFlowGroup(flowGroup).setFlowName(flowName);
@@ -177,7 +184,7 @@ public class FlowConfigsV2Resource extends
ComplexKeyResourceTemplate<FlowId, Fl
*/
@Override
public UpdateResponse delete(ComplexResourceKey<FlowId, FlowStatusId> key) {
- FlowConfigsResource.checkRequester(this.requesterService, get(key),
this.requesterService.findRequesters(this));
+ checkRequester(this.requesterService, get(key),
this.requesterService.findRequesters(this));
String flowGroup = key.getKey().getFlowGroup();
String flowName = key.getKey().getFlowName();
FlowId flowId = new FlowId().setFlowGroup(flowGroup).setFlowName(flowName);
@@ -200,4 +207,37 @@ public class FlowConfigsV2Resource extends
ComplexKeyResourceTemplate<FlowId, Fl
}
return headerProperties;
}
+
+ /**
+ * Check that all {@link ServiceRequester}s in this request are contained
within the original service requester list
+ * or is part of the original requester's owning group when the flow was
submitted. If they are not, throw a {@link FlowConfigLoggedException} with
{@link HttpStatus#S_401_UNAUTHORIZED}.
+ * If there is a failure when deserializing the original requester list,
throw a {@link FlowConfigLoggedException} with
+ * {@link HttpStatus#S_400_BAD_REQUEST}.
+ * @param requesterService the {@link RequesterService} used to verify the
requester
+ * @param originalFlowConfig original flow config to find original requester
+ * @param requesterList list of requesters for this request
+ */
+ public void checkRequester(
+ RequesterService requesterService, FlowConfig originalFlowConfig,
List<ServiceRequester> requesterList) {
+ if (requesterList == null) {
+ return;
+ }
+
+ try {
+ String serializedOriginalRequesterList =
originalFlowConfig.getProperties().get(RequesterService.REQUESTER_LIST);
+ if (serializedOriginalRequesterList != null) {
+ List<ServiceRequester> originalRequesterList =
RequesterService.deserialize(serializedOriginalRequesterList);
+ if (!requesterService.isRequesterAllowed(originalRequesterList,
requesterList)) {
+ // if the requester is not whitelisted or the original requester,
reject the requester if it is not part of the owning group
+ // of the original requester
+ if (!(originalFlowConfig.hasOwningGroup() &&
this.groupOwnershipService.isMemberOfGroup(
+ requesterList, originalFlowConfig.getOwningGroup()))) {
+ throw new FlowConfigLoggedException(HttpStatus.S_401_UNAUTHORIZED,
"Requester not allowed to make this request");
+ }
+ }
+ }
+ } catch (IOException e) {
+ throw new FlowConfigLoggedException(HttpStatus.S_400_BAD_REQUEST,
"Failed to get original requester list", e);
+ }
+ }
}
diff --git
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/GroupOwnershipService.java
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/GroupOwnershipService.java
new file mode 100644
index 0000000..a84290d
--- /dev/null
+++
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/GroupOwnershipService.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Service for handling group ownership of flows
+ */
+public abstract class GroupOwnershipService {
+
+ /**
+ * @return true if any of the serviceRequesters belong in the group
+ */
+ public abstract boolean isMemberOfGroup(List<ServiceRequester>
serviceRequesters, String group);
+
+ /**
+ * Extracts ServiceRequester names
+ * @param requesterList
+ * @return a list of service requester names
+ */
+ protected static List<String> extractRequesterNames(List<ServiceRequester>
requesterList) {
+ return requesterList.stream()
+ .map(requester -> requester.getName())
+ .collect(Collectors.toList());
+ }
+}
diff --git
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/LocalGroupOwnershipPathAlterationListener.java
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/LocalGroupOwnershipPathAlterationListener.java
new file mode 100644
index 0000000..e7e54e1
--- /dev/null
+++
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/LocalGroupOwnershipPathAlterationListener.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service;
+
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import org.apache.commons.io.IOUtils;
+import org.apache.gobblin.util.filesystem.PathAlterationListenerAdaptor;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class LocalGroupOwnershipPathAlterationListener extends
PathAlterationListenerAdaptor {
+ private static final Logger LOG =
LoggerFactory.getLogger(LocalGroupOwnershipPathAlterationListener.class);
+ private JsonObject groupOwnerships;
+ FileSystem fs;
+ Path groupOwnershipFilePath;
+
+ LocalGroupOwnershipPathAlterationListener(Path filePath) {
+ this.groupOwnershipFilePath = filePath;
+ try {
+ this.fs = FileSystem.get(new Configuration());
+ updateGroupOwnerships(filePath);
+ } catch (IOException e) {
+ throw new RuntimeException("Could not get local filesystem", e);
+ }
+ }
+
+ public JsonObject getGroupOwnerships() {
+ return groupOwnerships;
+ }
+
+ void updateGroupOwnerships(Path path) {
+ // only update if the group ownership file is changed
+ if (path.toUri().getPath().equals(this.groupOwnershipFilePath.toString()))
{
+ LOG.info("Detected change in group ownership file, updating groups");
+ try (FSDataInputStream in = this.fs.open(path)) {
+ String jsonString = IOUtils.toString(in, Charset.defaultCharset());
+ JsonParser parser = new JsonParser();
+ this.groupOwnerships = parser.parse(jsonString).getAsJsonObject();
+ } catch (IOException e) {
+ throw new RuntimeException("Could not open group ownership file at " +
path.toString(), e);
+ }
+ }
+ }
+
+ @Override
+ public void onFileCreate(Path path) {
+ updateGroupOwnerships(path);
+ }
+
+ @Override
+ public void onFileChange(Path path) {
+ updateGroupOwnerships(path);
+ }
+
+ @Override
+ public void onFileDelete(Path path) {
+ // ignore if another file in same directory is deleted
+ if (path.toUri().getPath().equals(this.groupOwnershipFilePath.toString()))
{
+ this.groupOwnerships = new JsonObject();
+ }
+ }
+}
diff --git
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/LocalGroupOwnershipService.java
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/LocalGroupOwnershipService.java
new file mode 100644
index 0000000..abe3bf3
--- /dev/null
+++
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/LocalGroupOwnershipService.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.service;
+
+import com.google.common.base.Splitter;
+import com.google.gson.JsonObject;
+import com.typesafe.config.Config;
+import java.io.IOException;
+import java.util.List;
+import org.apache.gobblin.annotation.Alias;
+import org.apache.gobblin.util.filesystem.PathAlterationObserver;
+import org.apache.hadoop.fs.Path;
+
+
+/**
+ * Reads and updates from a JSON where keys denote group names
+ * and values denote a list of group members
+ */
+@Alias("local")
+public class LocalGroupOwnershipService extends GroupOwnershipService {
+ public static final String GROUP_MEMBER_LIST =
"groupOwnershipService.groupMembers.path";
+ LocalGroupOwnershipPathAlterationListener listener;
+ PathAlterationObserver observer;
+
+ public LocalGroupOwnershipService(Config config) {
+ Path groupOwnershipFilePath = new
Path(config.getString(GROUP_MEMBER_LIST));
+ try {
+ observer = new
PathAlterationObserver(groupOwnershipFilePath.getParent());
+ this.listener = new
LocalGroupOwnershipPathAlterationListener(groupOwnershipFilePath);
+ observer.addListener(this.listener);
+ } catch (IOException e) {
+ throw new RuntimeException("Could not get initialize
PathAlterationObserver at %" + groupOwnershipFilePath.toString(), e);
+ }
+ }
+
+
+ @Override
+ public boolean isMemberOfGroup(List<ServiceRequester> serviceRequesters,
String group) {
+ // ensure that the group ownership file is up to date
+ try {
+ this.observer.checkAndNotify();
+ } catch (IOException e) {
+ throw new RuntimeException("Group Ownership observer could not check for
file changes", e);
+ }
+
+ JsonObject groupOwnerships = this.listener.getGroupOwnerships();
+ if (groupOwnerships.has(group)) {
+ List<String> groupMembers =
Splitter.on(',').trimResults().omitEmptyStrings().splitToList(
+ groupOwnerships.get(group).getAsString());
+ for (ServiceRequester requester: serviceRequesters) {
+ if (groupMembers.contains(requester.getName())) {
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+
+
+}
diff --git
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/NoopGroupOwnershipService.java
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/NoopGroupOwnershipService.java
new file mode 100644
index 0000000..639c6a1
--- /dev/null
+++
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/NoopGroupOwnershipService.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.service;
+
+import com.typesafe.config.Config;
+import java.util.List;
+import org.apache.gobblin.annotation.Alias;
+
+
+@Alias("noop")
+public class NoopGroupOwnershipService extends GroupOwnershipService{
+
+ public NoopGroupOwnershipService(Config config) {
+ }
+
+ public boolean isMemberOfGroup(List<ServiceRequester> serviceRequesters,
String group) {
+ return true;
+ }
+}
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpec.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpec.java
index a7df2e0..a3d2fff 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpec.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpec.java
@@ -451,6 +451,10 @@ public class FlowSpec implements Configurable, Spec {
flowConfig.setSchedule(schedule);
}
+ if (flowProps.containsKey(ConfigurationKeys.FLOW_OWNING_GROUP_KEY)) {
+
flowConfig.setOwningGroup(flowProps.getProperty(ConfigurationKeys.FLOW_OWNING_GROUP_KEY));
+ }
+
// remove keys that were injected as part of flowSpec creation
flowProps.remove(ConfigurationKeys.JOB_SCHEDULE_KEY);
flowProps.remove(ConfigurationKeys.JOB_TEMPLATE_PATH);
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStore.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStore.java
index a3b5848..ab644af 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStore.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStore.java
@@ -83,8 +83,8 @@ public class MysqlSpecStore extends InstrumentedSpecStore {
+ "spec LONGBLOB, " + NEW_COLUMN + " JSON, PRIMARY KEY (spec_uri))";
private static final String EXISTS_STATEMENT = "SELECT EXISTS(SELECT * FROM
%s WHERE spec_uri = ?)";
protected static final String INSERT_STATEMENT = "INSERT INTO %s (spec_uri,
flow_group, flow_name, template_uri, "
- + "user_to_proxy, source_identifier, destination_identifier, schedule,
tag, isRunImmediately, spec, " + NEW_COLUMN + ") "
- + "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON DUPLICATE KEY UPDATE
spec = VALUES(spec), " + NEW_COLUMN + " = VALUES(" + NEW_COLUMN + ")";
+ + "user_to_proxy, source_identifier, destination_identifier, schedule,
tag, isRunImmediately, owning_group, spec, " + NEW_COLUMN + ") "
+ + "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON DUPLICATE KEY
UPDATE spec = VALUES(spec), " + NEW_COLUMN + " = VALUES(" + NEW_COLUMN + ")";
private static final String DELETE_STATEMENT = "DELETE FROM %s WHERE
spec_uri = ?";
private static final String GET_STATEMENT = "SELECT spec_uri, spec, " +
NEW_COLUMN + " FROM %s WHERE ";
private static final String GET_ALL_STATEMENT = "SELECT spec_uri, spec, " +
NEW_COLUMN + " FROM %s";
@@ -409,6 +409,7 @@ public class MysqlSpecStore extends InstrumentedSpecStore {
String sourceIdentifier = flowConfig.getString(FLOW_SOURCE_IDENTIFIER_KEY);
String destinationIdentifier =
flowConfig.getString(FLOW_DESTINATION_IDENTIFIER_KEY);
String schedule = ConfigUtils.getString(flowConfig,
ConfigurationKeys.JOB_SCHEDULE_KEY, null);
+ String owningGroup = ConfigUtils.getString(flowConfig,
ConfigurationKeys.FLOW_OWNING_GROUP_KEY, null);
boolean isRunImmediately = ConfigUtils.getBoolean(flowConfig,
ConfigurationKeys.FLOW_RUN_IMMEDIATELY, false);
int i = 0;
@@ -422,6 +423,7 @@ public class MysqlSpecStore extends InstrumentedSpecStore {
statement.setString(++i, schedule);
statement.setString(++i, tagValue);
statement.setBoolean(++i, isRunImmediately);
+ statement.setString(++i, owningGroup);
statement.setBlob(++i, new
ByteArrayInputStream(this.specSerDe.serialize(flowSpec)));
statement.setString(++i, new String(this.specSerDe.serialize(flowSpec),
Charsets.UTF_8));
}
diff --git
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStoreTest.java
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStoreTest.java
index b6d720b..63b9141 100644
---
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStoreTest.java
+++
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStoreTest.java
@@ -62,7 +62,8 @@ public class MysqlSpecStoreTest {
private final URI uri1 = FlowSpec.Utils.createFlowSpecUri(new
FlowId().setFlowName("fg1").setFlowGroup("fn1"));
private final URI uri2 = FlowSpec.Utils.createFlowSpecUri(new
FlowId().setFlowName("fg2").setFlowGroup("fn2"));
private final URI uri3 = FlowSpec.Utils.createFlowSpecUri(new
FlowId().setFlowName("fg3").setFlowGroup("fn3"));
- private FlowSpec flowSpec1, flowSpec2, flowSpec3;
+ private final URI uri4 = FlowSpec.Utils.createFlowSpecUri(new
FlowId().setFlowName("fg4").setFlowGroup("fn4"));
+ private FlowSpec flowSpec1, flowSpec2, flowSpec3, flowSpec4;
public MysqlSpecStoreTest()
throws URISyntaxException {
@@ -113,6 +114,17 @@ public class MysqlSpecStoreTest {
.withDescription("Test flow spec 3")
.withVersion("Test version 3")
.build();
+
+ flowSpec4 = FlowSpec.builder(this.uri4)
+ .withConfig(ConfigBuilder.create().addPrimitive("key4", "value4")
+ .addPrimitive(FLOW_SOURCE_IDENTIFIER_KEY, "source")
+ .addPrimitive(FLOW_DESTINATION_IDENTIFIER_KEY, "destination")
+ .addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "fg4")
+ .addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "fn4")
+ .addPrimitive(ConfigurationKeys.FLOW_OWNING_GROUP_KEY,
"owningGroup4").build())
+ .withDescription("Test flow spec 4")
+ .withVersion("Test version 4")
+ .build();
}
@Test(expectedExceptions = IOException.class)
@@ -126,9 +138,10 @@ public class MysqlSpecStoreTest {
public void testAddSpec() throws Exception {
this.specStore.addSpec(this.flowSpec1);
this.specStore.addSpec(this.flowSpec2);
-
+ this.specStore.addSpec(this.flowSpec4);
Assert.assertTrue(this.specStore.exists(this.uri1));
Assert.assertTrue(this.specStore.exists(this.uri2));
+ Assert.assertTrue(this.specStore.exists(this.uri4));
Assert.assertFalse(this.specStore.exists(URI.create("dummy")));
}
@@ -138,7 +151,7 @@ public class MysqlSpecStoreTest {
Assert.assertEquals(result, this.flowSpec1);
Collection<Spec> specs = this.specStore.getSpecs();
- Assert.assertEquals(specs.size(), 2);
+ Assert.assertEquals(specs.size(), 3);
Assert.assertTrue(specs.contains(this.flowSpec1));
Assert.assertTrue(specs.contains(this.flowSpec2));
@@ -180,24 +193,17 @@ public class MysqlSpecStoreTest {
specs = this.specStore.getSpecs(flowSpecSearchObject);
Assert.assertEquals(specs.size(), 1);
Assert.assertTrue(specs.contains(this.flowSpec1));
+
+ flowSpecSearchObject =
FlowSpecSearchObject.builder().owningGroup("owningGroup4").build();
+ specs = this.specStore.getSpecs(flowSpecSearchObject);
+ Assert.assertEquals(specs.size(), 1);
+ Assert.assertTrue(specs.contains(this.flowSpec4));
}
@Test (dependsOnMethods = "testGetSpec")
public void testGetSpecWithTag() throws Exception {
//Creating and inserting flowspecs with tags
- URI uri4 = URI.create("flowspec4");
- FlowSpec flowSpec4 = FlowSpec.builder(uri4)
- .withConfig(ConfigBuilder.create()
- .addPrimitive(FLOW_SOURCE_IDENTIFIER_KEY, "source")
- .addPrimitive(FLOW_DESTINATION_IDENTIFIER_KEY, "destination")
- .addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "fg4")
- .addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "fn4")
- .addPrimitive("key4", "value4").build())
- .withDescription("Test flow spec 4")
- .withVersion("Test version 4")
- .build();
-
URI uri5 = URI.create("flowspec5");
FlowSpec flowSpec5 = FlowSpec.builder(uri5)
.withConfig(ConfigBuilder.create()
@@ -210,11 +216,23 @@ public class MysqlSpecStoreTest {
.withVersion("Test version 5")
.build();
- this.specStore.addSpec(flowSpec4, "dr");
+ URI uri6 = URI.create("flowspec6");
+ FlowSpec flowSpec6 = FlowSpec.builder(uri6)
+ .withConfig(ConfigBuilder.create()
+ .addPrimitive(FLOW_SOURCE_IDENTIFIER_KEY, "source")
+ .addPrimitive(FLOW_DESTINATION_IDENTIFIER_KEY, "destination")
+ .addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "fg6")
+ .addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "fn6")
+ .addPrimitive("key6", "value6").build())
+ .withDescription("Test flow spec 6")
+ .withVersion("Test version 6")
+ .build();
+
this.specStore.addSpec(flowSpec5, "dr");
+ this.specStore.addSpec(flowSpec6, "dr");
- Assert.assertTrue(this.specStore.exists(uri4));
Assert.assertTrue(this.specStore.exists(uri5));
+ Assert.assertTrue(this.specStore.exists(uri6));
List<URI> result = new ArrayList<>();
this.specStore.getSpecURIsWithTag("dr").forEachRemaining(result::add);
Assert.assertEquals(result.size(), 2);
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
index 0643b6a..a9518d1 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
@@ -31,6 +31,8 @@ import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.commons.lang3.reflect.ConstructorUtils;
+import org.apache.gobblin.service.GroupOwnershipService;
+import org.apache.gobblin.service.NoopGroupOwnershipService;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -155,6 +157,8 @@ public class GobblinServiceManager implements
ApplicationLauncher, StandardMetri
protected GobblinServiceFlowExecutionResourceHandler
flowExecutionResourceHandler;
@Getter
protected FlowStatusGenerator flowStatusGenerator;
+ @Getter
+ protected GroupOwnershipService groupOwnershipService;
protected boolean flowCatalogLocalCommit;
@Getter
@@ -304,6 +308,16 @@ public class GobblinServiceManager implements
ApplicationLauncher, StandardMetri
this.isRestLIServerEnabled = ConfigUtils.getBoolean(config,
ServiceConfigKeys.GOBBLIN_SERVICE_RESTLI_SERVER_ENABLED_KEY, true);
+ ClassAliasResolver<GroupOwnershipService> groupOwnershipAliasResolver =
new ClassAliasResolver<>(GroupOwnershipService.class);
+ String groupOwnershipServiceClass =
ServiceConfigKeys.DEFAULT_GROUP_OWNERSHIP_SERVICE;
+ LOGGER.info("I am here " + groupOwnershipServiceClass);
+ if (config.hasPath(ServiceConfigKeys.GROUP_OWNERSHIP_SERVICE_CLASS)) {
+ groupOwnershipServiceClass =
config.getString(ServiceConfigKeys.GROUP_OWNERSHIP_SERVICE_CLASS);
+ LOGGER.info("Initializing with group ownership service " +
groupOwnershipServiceClass);
+ }
+ this.groupOwnershipService =
GobblinConstructorUtils.invokeConstructor(GroupOwnershipService.class,
+ groupOwnershipAliasResolver.resolve(groupOwnershipServiceClass),
config);
+
if (isRestLIServerEnabled) {
Injector injector = Guice.createInjector(new Module() {
@Override
@@ -329,6 +343,9 @@ public class GobblinServiceManager implements
ApplicationLauncher, StandardMetri
binder.bind(RequesterService.class)
.annotatedWith(Names.named(FlowConfigsV2Resource.INJECT_REQUESTER_SERVICE))
.toInstance(new NoopRequesterService(config));
+ binder.bind(GroupOwnershipService.class)
+
.annotatedWith(Names.named(FlowConfigsV2Resource.INJECT_GROUP_OWNERSHIP_SERVICE))
+ .toInstance(GobblinServiceManager.this.groupOwnershipService);
}
});
this.restliServer = EmbeddedRestliServer.builder()
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java
index d3cb898..f753e8c 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java
@@ -81,6 +81,7 @@ public class GobblinServiceManagerTest {
private static final String GIT_REMOTE_REPO_DIR = "/tmp/serviceCore/remote";
private static final String GIT_LOCAL_REPO_DIR = "/tmp/serviceCore/local";
private static final String JOB_STATUS_STATE_STORE_DIR =
"/tmp/serviceCore/fsJobStatusRetriever";
+ private static final String GROUP_OWNERSHIP_CONFIG_DIR =
Files.createTempDir().getAbsolutePath();
private static final String TEST_GROUP_NAME = "testGroup";
private static final String TEST_FLOW_NAME = "testFlow";
@@ -202,6 +203,13 @@ public class GobblinServiceManagerTest {
} catch (Exception e) {
logger.warn("Could not completely cleanup Spec Store Parent Dir");
}
+
+ try {
+ cleanUpDir(GROUP_OWNERSHIP_CONFIG_DIR);
+ } catch (Exception e) {
+ logger.warn("Could not completely cleanup Group Ownership Parent Dir");
+ }
+
try {
this.testingServer.close();
} catch(Exception e) {
diff --git a/gobblin-service/src/test/resources/TestGroups.json
b/gobblin-service/src/test/resources/TestGroups.json
new file mode 100644
index 0000000..38e9ecc
--- /dev/null
+++ b/gobblin-service/src/test/resources/TestGroups.json
@@ -0,0 +1,3 @@
+{
+ "testGroup": "testName,testName2"
+}
\ No newline at end of file