Repository: storm Updated Branches: refs/heads/master 82f7450cc -> 1b8f67c14
Add nimbus admins groups Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/e7d9881c Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/e7d9881c Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/e7d9881c Branch: refs/heads/master Commit: e7d9881c1876494b179e06c7c3ee64c606590343 Parents: 6207d32 Author: Kishor Patil <[email protected]> Authored: Thu Oct 26 17:59:35 2017 -0400 Committer: Kishor Patil <[email protected]> Committed: Fri Oct 27 11:58:50 2017 -0400 ---------------------------------------------------------------------- .../storm/hdfs/blobstore/BlobStoreTest.java | 36 +++++++++++ .../src/jvm/org/apache/storm/Config.java | 14 ++++ .../storm/blobstore/BlobStoreAclHandler.java | 28 ++++++++ .../storm/security/auth/FixedGroupsMapping.java | 68 ++++++++++++++++++++ .../auth/authorizer/SimpleACLAuthorizer.java | 10 ++- .../apache/storm/security/auth/auth_test.clj | 24 +++++++ .../java/org/apache/storm/DaemonConfig.java | 8 --- .../org/apache/storm/daemon/nimbus/Nimbus.java | 2 + .../logviewer/utils/ResourceAuthorizer.java | 1 + 9 files changed, 182 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/e7d9881c/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/blobstore/BlobStoreTest.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/blobstore/BlobStoreTest.java b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/blobstore/BlobStoreTest.java index 6c9cd55..fcb7221 100644 --- a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/blobstore/BlobStoreTest.java +++ b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/blobstore/BlobStoreTest.java @@ -27,6 +27,7 @@ import org.apache.storm.generated.KeyNotFoundException; import org.apache.storm.generated.SettableBlobMeta; import org.apache.storm.generated.AccessControlType; +import org.apache.storm.security.auth.FixedGroupsMapping; import org.apache.storm.security.auth.NimbusPrincipal; import org.apache.storm.security.auth.SingleUserPrincipal; import org.apache.commons.io.FileUtils; @@ -92,6 +93,19 @@ public class BlobStoreTest { // Method which initializes nimbus admin public static void initializeConfigs() { conf.put(Config.NIMBUS_ADMINS,"admin"); + conf.put(Config.NIMBUS_ADMINS_GROUPS,"adminsGroup"); + + // Construct a groups mapping for the FixedGroupsMapping class + Map<String, Set<String>> groupsMapping = new HashMap<String, Set<String>>(); + Set<String> groupSet = new HashSet<String>(); + groupSet.add("adminsGroup"); + groupsMapping.put("adminsGroupsUser", groupSet); + + // Now create a params map to put it in to our conf + Map<String, Object> paramMap = new HashMap<String, Object>(); + paramMap.put(FixedGroupsMapping.STORM_FIXED_GROUP_MAPPING, groupsMapping); + conf.put(Config.STORM_GROUP_MAPPING_SERVICE_PROVIDER_PLUGIN, "org.apache.storm.security.auth.FixedGroupsMapping"); + conf.put(Config.STORM_GROUP_MAPPING_SERVICE_PARAMS, paramMap); conf.put(Config.NIMBUS_SUPERVISOR_USERS,"supervisor"); } @@ -238,6 +252,18 @@ public class BlobStoreTest { assertEquals("Blobstore replication not matching", store.getBlobReplication("test", supervisor), 5); store.deleteBlob("test", supervisor); + Subject adminsGroupsUser = getSubject("adminsGroupsUser"); + metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT); + metadata.set_replication_factor(4); + out = store.createBlob("test", metadata, adminsGroupsUser); + out.write(1); + out.close(); + assertStoreHasExactly(store, "test"); + assertEquals("Blobstore replication not matching", store.getBlobReplication("test", adminsGroupsUser), 4); + store.updateBlobReplication("test", 5, adminsGroupsUser); + assertEquals("Blobstore replication not matching", store.getBlobReplication("test", adminsGroupsUser), 5); + store.deleteBlob("test", adminsGroupsUser); + //Test for a user having read or write or admin access to read replication for a blob String createSubject = "createSubject"; String writeSubject = "writeSubject"; @@ -284,6 +310,16 @@ public class BlobStoreTest { out.close(); store.deleteBlob("test", admin); + //Test for Nimbus Groups Admin + Subject adminsGroupsUser = getSubject("adminsGroupsUser"); + assertStoreHasExactly(store); + metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT); + out = store.createBlob("test", metadata, adminsGroupsUser); + assertStoreHasExactly(store, "test"); + out.write(1); + out.close(); + store.deleteBlob("test", adminsGroupsUser); + //Test for Supervisor Admin Subject supervisor = getSubject("supervisor"); assertStoreHasExactly(store); http://git-wip-us.apache.org/repos/asf/storm/blob/e7d9881c/storm-client/src/jvm/org/apache/storm/Config.java ---------------------------------------------------------------------- diff --git a/storm-client/src/jvm/org/apache/storm/Config.java b/storm-client/src/jvm/org/apache/storm/Config.java index 6be0c21..8dd47cb 100644 --- a/storm-client/src/jvm/org/apache/storm/Config.java +++ b/storm-client/src/jvm/org/apache/storm/Config.java @@ -1046,6 +1046,14 @@ public class Config extends HashMap<String, Object> { public static final String DRPC_INVOCATIONS_THREADS = "drpc.invocations.threads"; /** + * Initialization parameters for the group mapping service plugin. + * Provides a way for a @link{STORM_GROUP_MAPPING_SERVICE_PROVIDER_PLUGIN} + * implementation to access optional settings. + */ + @isType(type=Map.class) + public static final String STORM_GROUP_MAPPING_SERVICE_PARAMS = "storm.group.mapping.service.params"; + + /** * The default transport plug-in for Thrift client/server communication */ @isString @@ -1417,6 +1425,12 @@ public class Config extends HashMap<String, Object> { public static final String NIMBUS_ADMINS = "nimbus.admins"; /** + * A list of groups that are cluster admins and can run any command. + */ + @isStringList + public static final String NIMBUS_ADMINS_GROUPS = "nimbus.admins.groups"; + + /** * For secure mode we would want to turn on this config * By default this is turned off assuming the default is insecure */ http://git-wip-us.apache.org/repos/asf/storm/blob/e7d9881c/storm-client/src/jvm/org/apache/storm/blobstore/BlobStoreAclHandler.java ---------------------------------------------------------------------- diff --git a/storm-client/src/jvm/org/apache/storm/blobstore/BlobStoreAclHandler.java b/storm-client/src/jvm/org/apache/storm/blobstore/BlobStoreAclHandler.java index 296d64f..f65541e 100644 --- a/storm-client/src/jvm/org/apache/storm/blobstore/BlobStoreAclHandler.java +++ b/storm-client/src/jvm/org/apache/storm/blobstore/BlobStoreAclHandler.java @@ -23,6 +23,7 @@ import org.apache.storm.generated.AccessControlType; import org.apache.storm.generated.AuthorizationException; import org.apache.storm.generated.SettableBlobMeta; import org.apache.storm.security.auth.AuthUtils; +import org.apache.storm.security.auth.IGroupMappingServiceProvider; import org.apache.storm.security.auth.IPrincipalToLocal; import org.apache.storm.security.auth.NimbusPrincipal; import org.apache.commons.lang.StringUtils; @@ -30,6 +31,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.security.auth.Subject; +import java.io.IOException; import java.security.Principal; import java.util.ArrayList; import java.util.Arrays; @@ -45,6 +47,7 @@ import java.util.Set; public class BlobStoreAclHandler { public static final Logger LOG = LoggerFactory.getLogger(BlobStoreAclHandler.class); private final IPrincipalToLocal _ptol; + private final IGroupMappingServiceProvider _groupMappingProvider; public static final int READ = 0x01; public static final int WRITE = 0x02; @@ -54,18 +57,28 @@ public class BlobStoreAclHandler { public static final List<AccessControl> DEFAULT = new ArrayList<AccessControl>(); private Set<String> _supervisors; private Set<String> _admins; + private Set<String> _adminsGroups; private boolean doAclValidation; public BlobStoreAclHandler(Map<String, Object> conf) { _ptol = AuthUtils.GetPrincipalToLocalPlugin(conf); + if (conf.get(Config.STORM_GROUP_MAPPING_SERVICE_PROVIDER_PLUGIN) != null) { + _groupMappingProvider = AuthUtils.GetGroupMappingServiceProviderPlugin(conf); + } else { + _groupMappingProvider = null; + } _supervisors = new HashSet<String>(); _admins = new HashSet<String>(); + _adminsGroups = new HashSet<>(); if (conf.containsKey(Config.NIMBUS_SUPERVISOR_USERS)) { _supervisors.addAll((List<String>)conf.get(Config.NIMBUS_SUPERVISOR_USERS)); } if (conf.containsKey(Config.NIMBUS_ADMINS)) { _admins.addAll((List<String>)conf.get(Config.NIMBUS_ADMINS)); } + if (conf.containsKey(Config.NIMBUS_ADMINS_GROUPS)) { + _adminsGroups.addAll((List<String>)conf.get(Config.NIMBUS_ADMINS_GROUPS)); + } if (conf.containsKey(Config.STORM_BLOBSTORE_ACL_VALIDATION_ENABLED)) { doAclValidation = (boolean)conf.get(Config.STORM_BLOBSTORE_ACL_VALIDATION_ENABLED); } @@ -187,6 +200,21 @@ public class BlobStoreAclHandler { if (_admins.contains(u)) { return true; } + if (_adminsGroups.size() > 0 && _groupMappingProvider != null) { + Set<String> userGroups = null; + try { + userGroups = _groupMappingProvider.getGroups(u); + } catch (IOException e) { + LOG.warn("Error while trying to fetch user groups", e); + } + if (userGroups != null) { + for (String tgroup : userGroups) { + if (_adminsGroups.contains(tgroup)) { + return true; + } + } + } + } } return false; } http://git-wip-us.apache.org/repos/asf/storm/blob/e7d9881c/storm-client/src/jvm/org/apache/storm/security/auth/FixedGroupsMapping.java ---------------------------------------------------------------------- diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/FixedGroupsMapping.java b/storm-client/src/jvm/org/apache/storm/security/auth/FixedGroupsMapping.java new file mode 100644 index 0000000..4956b39 --- /dev/null +++ b/storm-client/src/jvm/org/apache/storm/security/auth/FixedGroupsMapping.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.security.auth; + +import java.io.IOException; +import java.util.Set; +import java.util.HashSet; +import java.util.HashMap; +import java.util.Map; + +import org.apache.storm.Config; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class FixedGroupsMapping implements IGroupMappingServiceProvider { + + public static Logger LOG = LoggerFactory.getLogger(FixedGroupsMapping.class); + public static final String STORM_FIXED_GROUP_MAPPING = "storm.fixed.group.mapping"; + public Map<String, Set<String>> cachedGroups = new HashMap<String, Set<String>>(); + + /** + * Invoked once immediately after construction + * @param storm_conf Storm configuration + */ + @Override + public void prepare(Map storm_conf) { + Map<?, ?> params = (Map<?, ?>) storm_conf.get(Config.STORM_GROUP_MAPPING_SERVICE_PARAMS); + Map<String, Set<String>> mapping = (Map<String, Set<String>>) params.get(STORM_FIXED_GROUP_MAPPING); + if (mapping != null) { + cachedGroups.putAll(mapping); + } else { + LOG.warn("There is no initial group mapping"); + } + } + + /** + * Returns list of groups for a user + * + * @param user get groups for this user + * @return list of groups for a given user + */ + @Override + public Set<String> getGroups(String user) throws IOException { + if (cachedGroups.containsKey(user)) { + return cachedGroups.get(user); + } + + // I don't have anything + return new HashSet<String>(); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/e7d9881c/storm-client/src/jvm/org/apache/storm/security/auth/authorizer/SimpleACLAuthorizer.java ---------------------------------------------------------------------- diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/authorizer/SimpleACLAuthorizer.java b/storm-client/src/jvm/org/apache/storm/security/auth/authorizer/SimpleACLAuthorizer.java index b8d02d1..a39dd6d 100644 --- a/storm-client/src/jvm/org/apache/storm/security/auth/authorizer/SimpleACLAuthorizer.java +++ b/storm-client/src/jvm/org/apache/storm/security/auth/authorizer/SimpleACLAuthorizer.java @@ -80,6 +80,7 @@ public class SimpleACLAuthorizer implements IAuthorizer { } protected Set<String> _admins; + protected Set<String> _adminsGroups; protected Set<String> _supervisors; protected Set<String> _nimbusUsers; protected Set<String> _nimbusGroups; @@ -92,6 +93,7 @@ public class SimpleACLAuthorizer implements IAuthorizer { @Override public void prepare(Map<String, Object> conf) { _admins = new HashSet<>(); + _adminsGroups = new HashSet<>(); _supervisors = new HashSet<>(); _nimbusUsers = new HashSet<>(); _nimbusGroups = new HashSet<>(); @@ -99,9 +101,15 @@ public class SimpleACLAuthorizer implements IAuthorizer { if (conf.containsKey(Config.NIMBUS_ADMINS)) { _admins.addAll((Collection<String>)conf.get(Config.NIMBUS_ADMINS)); } + + if (conf.containsKey(Config.NIMBUS_ADMINS_GROUPS)) { + _adminsGroups.addAll((Collection<String>)conf.get(Config.NIMBUS_ADMINS_GROUPS)); + } + if (conf.containsKey(Config.NIMBUS_SUPERVISOR_USERS)) { _supervisors.addAll((Collection<String>)conf.get(Config.NIMBUS_SUPERVISOR_USERS)); } + if (conf.containsKey(Config.NIMBUS_USERS)) { _nimbusUsers.addAll((Collection<String>)conf.get(Config.NIMBUS_USERS)); } @@ -135,7 +143,7 @@ public class SimpleACLAuthorizer implements IAuthorizer { } } - if (_admins.contains(principal) || _admins.contains(user)) { + if (_admins.contains(principal) || _admins.contains(user) || checkUserGroupAllowed(userGroups, _adminsGroups)) { return true; } http://git-wip-us.apache.org/repos/asf/storm/blob/e7d9881c/storm-core/test/clj/org/apache/storm/security/auth/auth_test.clj ---------------------------------------------------------------------- diff --git a/storm-core/test/clj/org/apache/storm/security/auth/auth_test.clj b/storm-core/test/clj/org/apache/storm/security/auth/auth_test.clj index f7aac52..2b10ffd 100644 --- a/storm-core/test/clj/org/apache/storm/security/auth/auth_test.clj +++ b/storm-core/test/clj/org/apache/storm/security/auth/auth_test.clj @@ -34,6 +34,7 @@ (:import [org.apache.storm.generated AuthorizationException]) (:import [org.apache.storm.daemon.nimbus Nimbus$StandaloneINimbus]) (:import [org.apache.storm.utils NimbusClient Time]) + (:import [org.apache.storm.security.auth FixedGroupsMapping FixedGroupsMapping]) (:import [org.apache.storm.security.auth.authorizer SimpleWhitelistAuthorizer SimpleACLAuthorizer]) (:import [org.apache.storm.security.auth AuthUtils ThriftServer ThriftClient ShellBasedGroupsMapping ReqContext SimpleTransportPlugin KerberosPrincipalToLocal ThriftConnectionType]) @@ -289,6 +290,29 @@ (is (= true (.permit authorizer (ReqContext. admin-user) "fileUpload" nil))) (is (= true (.permit authorizer (ReqContext. supervisor-user) "fileDownload" nil))))) +(deftest simple-acl-nimbus-groups-auth-test + (let [cluster-conf (merge (clojurify-structure (ConfigUtils/readStormConfig)) + {NIMBUS-ADMINS-GROUPS ["admin-group"] + NIMBUS-USERS ["user-a"] + NIMBUS-SUPERVISOR-USERS ["supervisor"] + STORM-GROUP-MAPPING-SERVICE-PROVIDER-PLUGIN "org.apache.storm.security.auth.FixedGroupsMapping" + STORM-GROUP-MAPPING-SERVICE-PARAMS {FixedGroupsMapping/STORM_FIXED_GROUP_MAPPING + {"admin" #{"admin-group"} + "not-admin" #{"not-admin-group"}}}}) + authorizer (SimpleACLAuthorizer. ) + admin-user (mk-subject "admin") + not-admin-user (mk-subject "not-admin") + supervisor-user (mk-subject "supervisor") + user-a (mk-subject "user-a") + user-b (mk-subject "user-b")] + (.prepare authorizer cluster-conf) + (is (= true (.permit authorizer (ReqContext. user-a) "submitTopology" {}))) + (is (= false (.permit authorizer (ReqContext. user-b) "submitTopology" {}))) + (is (= true (.permit authorizer (ReqContext. admin-user) "fileUpload" nil))) + (is (= false (.permit authorizer (ReqContext. not-admin-user) "fileUpload" nil))) + (is (= false (.permit authorizer (ReqContext. user-b) "fileUpload" nil))) + (is (= true (.permit authorizer (ReqContext. supervisor-user) "fileDownload" nil))))) + (deftest shell-based-groups-mapping-test (let [cluster-conf (clojurify-structure (ConfigUtils/readStormConfig)) groups (ShellBasedGroupsMapping. ) http://git-wip-us.apache.org/repos/asf/storm/blob/e7d9881c/storm-server/src/main/java/org/apache/storm/DaemonConfig.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/DaemonConfig.java b/storm-server/src/main/java/org/apache/storm/DaemonConfig.java index edc81f9..79b44ef 100644 --- a/storm-server/src/main/java/org/apache/storm/DaemonConfig.java +++ b/storm-server/src/main/java/org/apache/storm/DaemonConfig.java @@ -159,14 +159,6 @@ public class DaemonConfig implements Validated { public static final String SCHEDULER_DISPLAY_RESOURCE = "scheduler.display.resource"; /** - * Initialization parameters for the group mapping service plugin. - * Provides a way for a @link{STORM_GROUP_MAPPING_SERVICE_PROVIDER_PLUGIN} - * implementation to access optional settings. - */ - @isType(type = Map.class) - public static final String STORM_GROUP_MAPPING_SERVICE_PARAMS = "storm.group.mapping.service.params"; - - /** * The directory where storm's health scripts go. */ @isString http://git-wip-us.apache.org/repos/asf/storm/blob/e7d9881c/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java index 3b914ba..f51d169 100644 --- a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java +++ b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java @@ -3923,6 +3923,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon { public TopologyHistoryInfo getTopologyHistory(String user) throws AuthorizationException, TException { try { List<String> adminUsers = (List<String>) conf.getOrDefault(Config.NIMBUS_ADMINS, Collections.emptyList()); + List<String> adminGroups = (List<String>) conf.getOrDefault(Config.NIMBUS_ADMINS_GROUPS, Collections.emptyList()); IStormClusterState state = stormClusterState; List<String> assignedIds = state.assignments(null); Set<String> ret = new HashSet<>(); @@ -3934,6 +3935,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon { List<String> topoLogUsers = ServerConfigUtils.getTopoLogsUsers(topoConf); if (user == null || isAdmin || isUserPartOf(user, groups) || + isUserPartOf(user, adminGroups) || topoLogUsers.contains(user)) { ret.add(topoId); } http://git-wip-us.apache.org/repos/asf/storm/blob/e7d9881c/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/ResourceAuthorizer.java ---------------------------------------------------------------------- diff --git a/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/ResourceAuthorizer.java b/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/ResourceAuthorizer.java index 3d6a7d9..e24b988 100644 --- a/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/ResourceAuthorizer.java +++ b/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/ResourceAuthorizer.java @@ -88,6 +88,7 @@ public class ResourceAuthorizer { List<String> logsGroups = new ArrayList<>(); logsGroups.addAll(ObjectReader.getStrings(stormConf.get(DaemonConfig.LOGS_GROUPS))); + logsGroups.addAll(ObjectReader.getStrings(stormConf.get(Config.NIMBUS_ADMINS_GROUPS))); logsGroups.addAll(whitelist.getGroupWhitelist()); String userName = principalToLocal.toLocal(user);
