Github user danny0405 commented on a diff in the pull request: https://github.com/apache/storm/pull/2531#discussion_r164031325 --- Diff: storm-server/src/test/java/org/apache/storm/security/auth/AuthTest.java --- @@ -0,0 +1,637 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.security.auth; + +import java.io.File; +import java.io.IOException; +import java.net.InetAddress; +import java.security.Principal; +import java.security.PrivilegedActionException; +import java.security.PrivilegedExceptionAction; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import javax.security.auth.Subject; +import org.apache.storm.Config; +import org.apache.storm.Testing; +import org.apache.storm.cluster.ClusterStateContext; +import org.apache.storm.cluster.ClusterUtils; +import org.apache.storm.cluster.DaemonType; +import org.apache.storm.cluster.IStormClusterState; +import org.apache.storm.cluster.StormClusterStateImpl; +import org.apache.storm.generated.Nimbus; +import org.apache.storm.generated.WorkerToken; +import org.apache.storm.generated.WorkerTokenServiceType; +import org.apache.storm.security.auth.authorizer.ImpersonationAuthorizer; +import org.apache.storm.security.auth.authorizer.SimpleACLAuthorizer; +import org.apache.storm.security.auth.authorizer.SimpleWhitelistAuthorizer; +import org.apache.storm.security.auth.digest.DigestSaslTransportPlugin; +import org.apache.storm.security.auth.workertoken.WorkerTokenManager; +import org.apache.storm.testing.InProcessZookeeper; +import org.apache.storm.utils.ConfigUtils; +import org.apache.storm.utils.NimbusClient; +import org.apache.storm.utils.Time; +import org.apache.storm.utils.Utils; +import org.apache.thrift.transport.TTransportException; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.junit.Assert.*; +import static org.mockito.Mockito.*; + +public class AuthTest { + private static final Logger LOG = LoggerFactory.getLogger(AuthTest.class); + private static final File BASE = new File("./src/test/resources/"); + + private static final String DIGEST_JAAS_CONF = new File(BASE,"jaas_digest.conf").getAbsolutePath(); + private static final String BAD_PASSWORD_CONF = new File(BASE, "jaas_digest_bad_password.conf").getAbsolutePath(); + private static final String WRONG_USER_CONF = new File(BASE,"jaas_digest_unknown_user.conf").getAbsolutePath(); + private static final String MISSING_CLIENT = new File(BASE, "jaas_digest_missing_client.conf").getAbsolutePath(); + + //3 seconds in milliseconds + public static final int NIMBUS_TIMEOUT = 3_000; + + public interface MyBiConsumer<T, U> { + void accept(T t, U u) throws Exception; + } + + + public static Principal mkPrincipal(final String name) { + return new Principal() { + @Override + public String getName() { + return name; + } + + @Override + public boolean equals(Object other) { + return other instanceof Principal + && name.equals(((Principal) other).getName()); + } + + @Override + public String toString() { + return name; + } + + @Override + public int hashCode() { + return name.hashCode(); + } + }; + } + + public static Subject mkSubject(String name) { + return new Subject(true, Collections.singleton(mkPrincipal(name)), + Collections.emptySet(), Collections.emptySet()); + } + + public static void withServer(Class<? extends ITransportPlugin> transportPluginClass, + Nimbus.Iface impl, + MyBiConsumer<ThriftServer, Map<String, Object>> body) throws Exception { + withServer(null, transportPluginClass, impl, null, null, body); + } + + public static void withServer(String loginCfg, + Class<? extends ITransportPlugin> transportPluginClass, + Nimbus.Iface impl, + MyBiConsumer<ThriftServer, Map<String, Object>> body) throws Exception { + withServer(loginCfg, transportPluginClass, impl, null, null, body); + } + + public static void withServer(String loginCfg, + Class<? extends ITransportPlugin> transportPluginClass, + Nimbus.Iface impl, + InProcessZookeeper zk, + Map<String, Object> extraConfs, + MyBiConsumer<ThriftServer, Map<String, Object>> body) throws Exception { + Map<String, Object> conf = ConfigUtils.readStormConfig(); + conf.put(Config.NIMBUS_THRIFT_PORT, 0); + conf.put(Config.STORM_THRIFT_TRANSPORT_PLUGIN, transportPluginClass.getName()); + + if (loginCfg != null) { + conf.put("java.security.auth.login.config", loginCfg); + } + + if (zk != null) { + conf.put(Config.STORM_ZOOKEEPER_SERVERS, Arrays.asList("localhost")); + conf.put(Config.STORM_ZOOKEEPER_PORT, zk.getPort()); + } + + if (extraConfs != null) { + conf.putAll(extraConfs); + } + + Nimbus.Iface handler = impl != null ? impl : mock(Nimbus.Iface.class); + final ThriftServer server = new ThriftServer(conf, + new Nimbus.Processor<>(handler), + ThriftConnectionType.NIMBUS); + + LOG.info("Created Server... {}", server); + new Thread(() -> { + LOG.info("Starting Serving..."); + server.serve(); + }).start(); + Testing.whileTimeout( + () -> !server.isServing(), + () -> { + try { + Time.sleep(100); + } catch (InterruptedException e) { + //Ignored + } + }); + try { + LOG.info("Starting to run {}", body); + body.accept(server, conf); + LOG.info("{} finished with no exceptions", body); + } finally { + LOG.info("Stopping server {}", server); + server.stop(); + } + } + + @Test + public void kerbToLocalTest() { + KerberosPrincipalToLocal kptol = new KerberosPrincipalToLocal(); + kptol.prepare(Collections.emptyMap()); + assertEquals("me", kptol.toLocal(mkPrincipal("me@realm"))); + assertEquals("simple", kptol.toLocal(mkPrincipal("simple"))); + assertEquals("someone", kptol.toLocal(mkPrincipal("someone/host@realm"))); + } + + @Test + public void simpleAuthTest() throws Exception { + Nimbus.Iface impl = mock(Nimbus.Iface.class); + withServer(SimpleTransportPlugin.class, + impl, + (ThriftServer server, Map<String, Object> conf) -> { + try (NimbusClient client = new NimbusClient(conf, "localhost", server.getPort(), NIMBUS_TIMEOUT)) { + client.getClient().activate("security_auth_test_topology"); + } + + //Verify digest is rejected... + Map<String, Object> badConf = new HashMap<>(conf); + badConf.put(Config.STORM_THRIFT_TRANSPORT_PLUGIN, DigestSaslTransportPlugin.class.getName()); + badConf.put("java.security.auth.login.config", DIGEST_JAAS_CONF); + badConf.put(Config.STORM_NIMBUS_RETRY_TIMES, 0); + try (NimbusClient client = new NimbusClient(badConf, "localhost", server.getPort(), NIMBUS_TIMEOUT)) { + client.getClient().activate("bad_security_auth_test_topology"); + fail("An exception should have been thrown trying to connect."); + } catch (Exception te) { + LOG.info("Got Exception...", te); + assert(Utils.exceptionCauseIsInstanceOf(TTransportException.class, te)); + } + }); + verify(impl).activate("security_auth_test_topology"); + verify(impl, never()).activate("bad_security_auth_test_topology"); + } + + public static void verifyIncorrectJaasConf(ThriftServer server, Map<String, Object> conf, String jaas, + Class<? extends Exception> expectedException) { + Map<String, Object> badConf = new HashMap<>(conf); + badConf.put("java.security.auth.login.config", jaas); + try (NimbusClient client = new NimbusClient(badConf, "localhost", server.getPort(), NIMBUS_TIMEOUT)) { + client.getClient().activate("bad_auth_test_topology"); + fail("An exception should have been thrown trying to connect."); + } catch (Exception e) { + LOG.info("Got Exception...", e); + assert(Utils.exceptionCauseIsInstanceOf(expectedException, e)); + } + } + + @Test + public void digestAuthTest() throws Exception { + Nimbus.Iface impl = mock(Nimbus.Iface.class); + final AtomicReference<ReqContext> user = new AtomicReference<>(); + doAnswer((invocation) -> { + user.set(new ReqContext(ReqContext.context())); + return null; + }).when(impl).activate(anyString()); + + withServer(DIGEST_JAAS_CONF, + DigestSaslTransportPlugin.class, + impl, + (ThriftServer server, Map<String, Object> conf) -> { + try (NimbusClient client = new NimbusClient(conf, "localhost", server.getPort(), NIMBUS_TIMEOUT)) { + client.getClient().activate("security_auth_test_topology"); + } + + conf.put(Config.STORM_NIMBUS_RETRY_TIMES, 0); + + //Verify simple is rejected... + Map<String, Object> badTransport = new HashMap<>(conf); + badTransport.put(Config.STORM_THRIFT_TRANSPORT_PLUGIN, SimpleTransportPlugin.class.getName()); + try (NimbusClient client = new NimbusClient(badTransport, "localhost", server.getPort(), NIMBUS_TIMEOUT)) { + client.getClient().activate("bad_security_auth_test_topology"); + fail("An exception should have been thrown trying to connect."); + } catch (Exception te) { + LOG.info("Got Exception...", te); + assert(Utils.exceptionCauseIsInstanceOf(TTransportException.class, te)); + } + //The user here from the jaas conf is bob. No impersonation is done, so verify that + ReqContext found = user.get(); + assertNotNull(found); + assertEquals("bob", found.principal().getName()); + assertFalse(found.isImpersonating()); + user.set(null); + + verifyIncorrectJaasConf(server, conf, BAD_PASSWORD_CONF, TTransportException.class); + verifyIncorrectJaasConf(server, conf, WRONG_USER_CONF, TTransportException.class); + verifyIncorrectJaasConf(server, conf, "./nonexistent.conf", RuntimeException.class); + verifyIncorrectJaasConf(server, conf, MISSING_CLIENT, IOException.class); + }); + verify(impl).activate("security_auth_test_topology"); + verify(impl, never()).activate("bad_auth_test_topology"); + } + + public static Subject createSubjectWith(WorkerToken wt) { + //This is a bit ugly, but it shows how this would happen in a worker so we will use the same APIs + Map<String, String> creds = new HashMap<>(); + AuthUtils.setWorkerToken(creds, wt); + Subject subject = new Subject(); + AuthUtils.updateSubject(subject, Collections.emptyList(), creds); + return subject; + } + + public static void tryConnectAs( Map<String, Object> conf, ThriftServer server, Subject subject, String topoId) + throws PrivilegedActionException { + Subject.doAs(subject, (PrivilegedExceptionAction<Void>) () -> { + try (NimbusClient client = new NimbusClient(conf, "localhost", server.getPort(), NIMBUS_TIMEOUT)) { + client.getClient().activate(topoId); //Yes this should be a topo name, but it makes this simpler... + } + return null; + }); + } + + public static Subject testConnectWithTokenFor(WorkerTokenManager wtMan, Map<String, Object> conf, ThriftServer server, + String user, String topoId) throws PrivilegedActionException { + WorkerToken wt = wtMan.createOrUpdateTokenFor(WorkerTokenServiceType.NIMBUS, user, topoId); + Subject subject = createSubjectWith(wt); + tryConnectAs(conf, server, subject, topoId); + return subject; + } + + public static void verifyUserIs(AtomicReference<ReqContext> user, String userName) { + //The user from the token is bob, so verify that the name was set correctly... + ReqContext found = user.get(); + assertNotNull(found); + assertEquals(userName, found.principal().getName()); + assertFalse(found.isImpersonating()); + user.set(null); + } + + @Test + public void workerTokenDigestAuthTest() throws Exception { + LOG.info("\n\n\t\tworkerTokenDigestAuthTest - START\n\n"); + Nimbus.Iface impl = mock(Nimbus.Iface.class); + final AtomicReference<ReqContext> user = new AtomicReference<>(); + doAnswer((invocation) -> { + user.set(new ReqContext(ReqContext.context())); + return null; + }).when(impl).activate(anyString()); + + Map<String, Object> extraConfs = new HashMap<>(); + //Let worker tokens work on insecure ZK... + extraConfs.put("TESTING.ONLY.ENABLE.INSECURE.WORKER.TOKENS", true); + + try (InProcessZookeeper zk = new InProcessZookeeper()) { + withServer(MISSING_CLIENT, + DigestSaslTransportPlugin.class, + impl, + zk, + extraConfs, + (ThriftServer server, Map<String, Object> conf) -> { + try (Time.SimulatedTime sim = new Time.SimulatedTime()) { + conf.put(Config.STORM_NIMBUS_RETRY_TIMES, 0); + //We cannot connect if there is no client section in the jaas conf... + try (NimbusClient client = new NimbusClient(conf, "localhost", server.getPort(), NIMBUS_TIMEOUT)) { + client.getClient().activate("bad_auth_test_topology"); + fail("We should not be able to connect without a token..."); + } catch (Exception e) { + assert (Utils.exceptionCauseIsInstanceOf(IOException.class, e)); + } + + //Now lets create a token and verify that we can connect... + IStormClusterState state = ClusterUtils.mkStormClusterState(conf, new ClusterStateContext(DaemonType.NIMBUS, conf)); + WorkerTokenManager wtMan = new WorkerTokenManager(conf, state); + Subject bob = testConnectWithTokenFor(wtMan, conf, server, "bob", "topo-bob"); + verifyUserIs(user, "bob"); + + Time.advanceTimeSecs(TimeUnit.HOURS.toSeconds(12)); + + //Alice has no digest jaas section at all... + Subject alice = testConnectWithTokenFor(wtMan, conf, server, "alice", "topo-alice"); + verifyUserIs(user, "alice"); + + Time.advanceTimeSecs(TimeUnit.HOURS.toSeconds(13)); + //Verify that bob's token has expired + + try { + tryConnectAs(conf, server, bob, "bad_auth_test_topology"); + fail("We should not be able to connect with bad auth"); + } catch (Exception e) { + assert (Utils.exceptionCauseIsInstanceOf(TTransportException.class, e)); + } + tryConnectAs(conf, server, alice, "topo-alice"); + verifyUserIs(user, "alice"); + + //Now see if we can create a new token for bob and try again. + bob = testConnectWithTokenFor(wtMan, conf, server, "bob", "topo-bob"); + verifyUserIs(user, "bob"); + + tryConnectAs(conf, server, alice, "topo-alice"); + verifyUserIs(user, "alice"); + } + }); + } + verify(impl, times(2)).activate("topo-bob"); + verify(impl, times(3)).activate("topo-alice"); + verify(impl, never()).activate("bad_auth_test_topology"); + LOG.info("\n\n\t\tworkerTokenDigestAuthTest - END\n\n"); + } + + @Test + public void negativeWhitelistAuthroizationTest() { + SimpleWhitelistAuthorizer auth = new SimpleWhitelistAuthorizer(); + Map<String, Object> conf = ConfigUtils.readStormConfig(); + auth.prepare(conf); + ReqContext context = new ReqContext(mkSubject("user")); + assertFalse(auth.permit(context, "activate", conf)); + } + + @Test + public void positiveWhitelistAuthroizationTest() { + SimpleWhitelistAuthorizer auth = new SimpleWhitelistAuthorizer(); + Map<String, Object> conf = ConfigUtils.readStormConfig(); + conf.put(SimpleWhitelistAuthorizer.WHITELIST_USERS_CONF, Arrays.asList("user")); + auth.prepare(conf); + ReqContext context = new ReqContext(mkSubject("user")); + assertTrue(auth.permit(context, "activate", conf)); + } + + @Test + public void simpleAclUserAuthTest() { + Map<String, Object> clusterConf = ConfigUtils.readStormConfig(); + clusterConf.put(Config.NIMBUS_ADMINS, Arrays.asList("admin")); + clusterConf.put(Config.NIMBUS_SUPERVISOR_USERS, Arrays.asList("supervisor")); + ReqContext admin = new ReqContext(mkSubject("admin")); + ReqContext supervisor = new ReqContext(mkSubject("supervisor")); + ReqContext userA = new ReqContext(mkSubject("user-a")); + ReqContext userB = new ReqContext(mkSubject("user-b")); + + final Map<String, Object> empty = Collections.emptyMap(); + final Map<String, Object> aAllowed = new HashMap<>(); + aAllowed.put(Config.TOPOLOGY_USERS, Arrays.asList("user-a")); + + SimpleACLAuthorizer authorizer = new SimpleACLAuthorizer(); + authorizer.prepare(clusterConf); + + assertTrue(authorizer.permit(userA, "submitTopology", empty)); + assertTrue(authorizer.permit(userB, "submitTopology", empty)); + assertTrue(authorizer.permit(admin, "submitTopology", empty)); + assertFalse(authorizer.permit(supervisor, "submitTopology", empty)); + + assertTrue(authorizer.permit(userA, "fileUpload", null)); + assertTrue(authorizer.permit(userB, "fileUpload", null)); + assertTrue(authorizer.permit(admin, "fileUpload", null)); + assertFalse(authorizer.permit(supervisor, "fileUpload", null)); + + assertTrue(authorizer.permit(userA, "getNimbusConf", null)); + assertTrue(authorizer.permit(userB, "getNimbusConf", null)); + assertTrue(authorizer.permit(admin, "getNimbusConf", null)); + assertFalse(authorizer.permit(supervisor, "getNimbusConf", null)); + + assertTrue(authorizer.permit(userA, "getClusterInfo", null)); + assertTrue(authorizer.permit(userB, "getClusterInfo", null)); + assertTrue(authorizer.permit(admin, "getClusterInfo", null)); + assertFalse(authorizer.permit(supervisor, "getClusterInfo", null)); + + assertFalse(authorizer.permit(userA, "fileDownload", null)); + assertFalse(authorizer.permit(userB, "fileDownload", null)); + assertTrue(authorizer.permit(admin, "fileDownload", null)); + assertTrue(authorizer.permit(supervisor, "fileDownload", null)); + + assertTrue(authorizer.permit(userA, "killTopology", aAllowed)); + assertFalse(authorizer.permit(userB, "killTopology", aAllowed)); + assertTrue(authorizer.permit(admin, "killTopology", aAllowed)); + assertFalse(authorizer.permit(supervisor, "killTopology", aAllowed)); + + assertTrue(authorizer.permit(userA, "uploadNewCredentials", aAllowed)); + assertFalse(authorizer.permit(userB, "uploadNewCredentials", aAllowed)); + assertTrue(authorizer.permit(admin, "uploadNewCredentials", aAllowed)); + assertFalse(authorizer.permit(supervisor, "uploadNewCredentials", aAllowed)); + + assertTrue(authorizer.permit(userA, "rebalance", aAllowed)); + assertFalse(authorizer.permit(userB, "rebalance", aAllowed)); + assertTrue(authorizer.permit(admin, "rebalance", aAllowed)); + assertFalse(authorizer.permit(supervisor, "rebalance", aAllowed)); + + assertTrue(authorizer.permit(userA, "activate", aAllowed)); + assertFalse(authorizer.permit(userB, "activate", aAllowed)); + assertTrue(authorizer.permit(admin, "activate", aAllowed)); + assertFalse(authorizer.permit(supervisor, "activate", aAllowed)); + + assertTrue(authorizer.permit(userA, "deactivate", aAllowed)); + assertFalse(authorizer.permit(userB, "deactivate", aAllowed)); + assertTrue(authorizer.permit(admin, "deactivate", aAllowed)); + assertFalse(authorizer.permit(supervisor, "deactivate", aAllowed)); + + assertTrue(authorizer.permit(userA, "getTopologyConf", aAllowed)); + assertFalse(authorizer.permit(userB, "getTopologyConf", aAllowed)); + assertTrue(authorizer.permit(admin, "getTopologyConf", aAllowed)); + assertFalse(authorizer.permit(supervisor, "getTopologyConf", aAllowed)); + + assertTrue(authorizer.permit(userA, "getTopology", aAllowed)); + assertFalse(authorizer.permit(userB, "getTopology", aAllowed)); + assertTrue(authorizer.permit(admin, "getTopology", aAllowed)); + assertFalse(authorizer.permit(supervisor, "getTopology", aAllowed)); + + assertTrue(authorizer.permit(userA, "getUserTopology", aAllowed)); + assertFalse(authorizer.permit(userB, "getUserTopology", aAllowed)); + assertTrue(authorizer.permit(admin, "getUserTopology", aAllowed)); + assertFalse(authorizer.permit(supervisor, "getUserTopology", aAllowed)); + + assertTrue(authorizer.permit(userA, "getTopologyInfo", aAllowed)); + assertFalse(authorizer.permit(userB, "getTopologyInfo", aAllowed)); + assertTrue(authorizer.permit(admin, "getTopologyInfo", aAllowed)); + assertFalse(authorizer.permit(supervisor, "getTopologyInfo", aAllowed)); + } + + @Test + public void simpleAclNimbusUsersAuthTest() { + Map<String, Object> clusterConf = ConfigUtils.readStormConfig(); + clusterConf.put(Config.NIMBUS_ADMINS, Arrays.asList("admin")); + clusterConf.put(Config.NIMBUS_SUPERVISOR_USERS, Arrays.asList("supervisor")); + clusterConf.put(Config.NIMBUS_USERS, Arrays.asList("user-a")); + ReqContext admin = new ReqContext(mkSubject("admin")); + ReqContext supervisor = new ReqContext(mkSubject("supervisor")); + ReqContext userA = new ReqContext(mkSubject("user-a")); + ReqContext userB = new ReqContext(mkSubject("user-b")); + + final Map<String, Object> empty = Collections.emptyMap(); + + SimpleACLAuthorizer authorizer = new SimpleACLAuthorizer(); + authorizer.prepare(clusterConf); + + assertTrue(authorizer.permit(userA, "submitTopology", empty)); + assertFalse(authorizer.permit(userB, "submitTopology", empty)); + assertTrue(authorizer.permit(admin, "fileUpload", null)); + assertTrue(authorizer.permit(supervisor, "fileDownload", null)); + } + + @Test + public void simpleAclNimbusGroupsAuthTest() { + Map<String, Object> clusterConf = ConfigUtils.readStormConfig(); + clusterConf.put(Config.NIMBUS_ADMINS_GROUPS, Arrays.asList("admin-group")); + clusterConf.put(Config.NIMBUS_SUPERVISOR_USERS, Arrays.asList("supervisor")); + clusterConf.put(Config.NIMBUS_USERS, Arrays.asList("user-a")); + clusterConf.put(Config.STORM_GROUP_MAPPING_SERVICE_PROVIDER_PLUGIN, FixedGroupsMapping.class.getName()); + Map<String, Object> groups = new HashMap<>(); + groups.put("admin", Collections.singleton("admin-group")); + groups.put("not-admin", Collections.singleton("not-admin-group")); + Map<String, Object> groupsParams = new HashMap<>(); + groupsParams.put(FixedGroupsMapping.STORM_FIXED_GROUP_MAPPING, groups); + clusterConf.put(Config.STORM_GROUP_MAPPING_SERVICE_PARAMS, groupsParams); + + ReqContext admin = new ReqContext(mkSubject("admin")); + ReqContext notAdmin = new ReqContext(mkSubject("not-admin")); + ReqContext supervisor = new ReqContext(mkSubject("supervisor")); + ReqContext userA = new ReqContext(mkSubject("user-a")); + ReqContext userB = new ReqContext(mkSubject("user-b")); + + final Map<String, Object> empty = Collections.emptyMap(); + + SimpleACLAuthorizer authorizer = new SimpleACLAuthorizer(); + authorizer.prepare(clusterConf); + + assertTrue(authorizer.permit(userA, "submitTopology", empty)); + assertFalse(authorizer.permit(userB, "submitTopology", empty)); + + assertTrue(authorizer.permit(admin, "fileUpload", null)); + assertFalse(authorizer.permit(notAdmin, "fileUpload", null)); + assertFalse(authorizer.permit(userB, "fileUpload", null)); + + assertTrue(authorizer.permit(supervisor, "fileDownload", null)); + } + + @Test + public void simpleAclSameUserAuthTest() { + Map<String, Object> clusterConf = ConfigUtils.readStormConfig(); + clusterConf.put(Config.NIMBUS_ADMINS, Arrays.asList("admin")); + clusterConf.put(Config.NIMBUS_SUPERVISOR_USERS, Arrays.asList("admin")); + ReqContext admin = new ReqContext(mkSubject("admin")); + + final Map<String, Object> empty = Collections.emptyMap(); + final Map<String, Object> aAllowed = new HashMap<>(); + aAllowed.put(Config.TOPOLOGY_USERS, Arrays.asList("user-a")); + + SimpleACLAuthorizer authorizer = new SimpleACLAuthorizer(); + authorizer.prepare(clusterConf); + + assertTrue(authorizer.permit(admin, "submitTopology", empty)); + assertTrue(authorizer.permit(admin, "fileUpload", null)); + assertTrue(authorizer.permit(admin, "getNimbusConf", null)); + assertTrue(authorizer.permit(admin, "getClusterInfo", null)); + assertTrue(authorizer.permit(admin, "fileDownload", null)); + assertTrue(authorizer.permit(admin, "killTopology", aAllowed)); + assertTrue(authorizer.permit(admin, "uploadNewCredentials", aAllowed)); + assertTrue(authorizer.permit(admin, "rebalance", aAllowed)); + assertTrue(authorizer.permit(admin, "activate", aAllowed)); + assertTrue(authorizer.permit(admin, "getTopologyConf", aAllowed)); + assertTrue(authorizer.permit(admin, "getTopology", aAllowed)); + assertTrue(authorizer.permit(admin, "getUserTopology", aAllowed)); + assertTrue(authorizer.permit(admin, "getTopologyInfo", aAllowed)); + } + + @Test + public void shellBaseGroupsMappingTest() throws Exception { + Map<String, Object> clusterConf = ConfigUtils.readStormConfig(); + ShellBasedGroupsMapping groups = new ShellBasedGroupsMapping(); + groups.prepare(clusterConf); + + String userName = System.getProperty("user.name"); + + assertTrue(groups.getGroups(userName).size() >= 0); + assertEquals(0, groups.getGroups("userDoesNotExist").size()); + assertEquals(0, groups.getGroups(null).size()); + } + + @Test(expected = RuntimeException.class) + public void getTransportPluginThrowsRunimeTest() { + Map<String, Object> conf = ConfigUtils.readStormConfig(); + conf.put(Config.STORM_THRIFT_TRANSPORT_PLUGIN, "null.invalid"); + AuthUtils.getTransportPlugin(ThriftConnectionType.NIMBUS, conf, null); + } + + public static ReqContext mkImpersonatingReqContext(String impersonatingUser, String userBeingIUmpersonated, InetAddress remoteAddress) { + ReqContext ret = new ReqContext(mkSubject(userBeingIUmpersonated)); + ret.setRemoteAddress(remoteAddress); --- End diff -- userBeingImpersonated ?
---