Github user danny0405 commented on a diff in the pull request:
https://github.com/apache/storm/pull/2531#discussion_r164031325
--- Diff:
storm-server/src/test/java/org/apache/storm/security/auth/AuthTest.java ---
@@ -0,0 +1,637 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.security.auth;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.security.Principal;
+import java.security.PrivilegedActionException;
+import java.security.PrivilegedExceptionAction;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import javax.security.auth.Subject;
+import org.apache.storm.Config;
+import org.apache.storm.Testing;
+import org.apache.storm.cluster.ClusterStateContext;
+import org.apache.storm.cluster.ClusterUtils;
+import org.apache.storm.cluster.DaemonType;
+import org.apache.storm.cluster.IStormClusterState;
+import org.apache.storm.cluster.StormClusterStateImpl;
+import org.apache.storm.generated.Nimbus;
+import org.apache.storm.generated.WorkerToken;
+import org.apache.storm.generated.WorkerTokenServiceType;
+import org.apache.storm.security.auth.authorizer.ImpersonationAuthorizer;
+import org.apache.storm.security.auth.authorizer.SimpleACLAuthorizer;
+import org.apache.storm.security.auth.authorizer.SimpleWhitelistAuthorizer;
+import org.apache.storm.security.auth.digest.DigestSaslTransportPlugin;
+import org.apache.storm.security.auth.workertoken.WorkerTokenManager;
+import org.apache.storm.testing.InProcessZookeeper;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.NimbusClient;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Utils;
+import org.apache.thrift.transport.TTransportException;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+public class AuthTest {
+ private static final Logger LOG =
LoggerFactory.getLogger(AuthTest.class);
+ private static final File BASE = new File("./src/test/resources/");
+
+ private static final String DIGEST_JAAS_CONF = new
File(BASE,"jaas_digest.conf").getAbsolutePath();
+ private static final String BAD_PASSWORD_CONF = new File(BASE,
"jaas_digest_bad_password.conf").getAbsolutePath();
+ private static final String WRONG_USER_CONF = new
File(BASE,"jaas_digest_unknown_user.conf").getAbsolutePath();
+ private static final String MISSING_CLIENT = new File(BASE,
"jaas_digest_missing_client.conf").getAbsolutePath();
+
+ //3 seconds in milliseconds
+ public static final int NIMBUS_TIMEOUT = 3_000;
+
+ public interface MyBiConsumer<T, U> {
+ void accept(T t, U u) throws Exception;
+ }
+
+
+ public static Principal mkPrincipal(final String name) {
+ return new Principal() {
+ @Override
+ public String getName() {
+ return name;
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ return other instanceof Principal
+ && name.equals(((Principal) other).getName());
+ }
+
+ @Override
+ public String toString() {
+ return name;
+ }
+
+ @Override
+ public int hashCode() {
+ return name.hashCode();
+ }
+ };
+ }
+
+ public static Subject mkSubject(String name) {
+ return new Subject(true, Collections.singleton(mkPrincipal(name)),
+ Collections.emptySet(), Collections.emptySet());
+ }
+
+ public static void withServer(Class<? extends ITransportPlugin>
transportPluginClass,
+ Nimbus.Iface impl,
+ MyBiConsumer<ThriftServer, Map<String,
Object>> body) throws Exception {
+ withServer(null, transportPluginClass, impl, null, null, body);
+ }
+
+ public static void withServer(String loginCfg,
+ Class<? extends ITransportPlugin>
transportPluginClass,
+ Nimbus.Iface impl,
+ MyBiConsumer<ThriftServer, Map<String,
Object>> body) throws Exception {
+ withServer(loginCfg, transportPluginClass, impl, null, null, body);
+ }
+
+ public static void withServer(String loginCfg,
+ Class<? extends ITransportPlugin>
transportPluginClass,
+ Nimbus.Iface impl,
+ InProcessZookeeper zk,
+ Map<String, Object> extraConfs,
+ MyBiConsumer<ThriftServer, Map<String,
Object>> body) throws Exception {
+ Map<String, Object> conf = ConfigUtils.readStormConfig();
+ conf.put(Config.NIMBUS_THRIFT_PORT, 0);
+ conf.put(Config.STORM_THRIFT_TRANSPORT_PLUGIN,
transportPluginClass.getName());
+
+ if (loginCfg != null) {
+ conf.put("java.security.auth.login.config", loginCfg);
+ }
+
+ if (zk != null) {
+ conf.put(Config.STORM_ZOOKEEPER_SERVERS,
Arrays.asList("localhost"));
+ conf.put(Config.STORM_ZOOKEEPER_PORT, zk.getPort());
+ }
+
+ if (extraConfs != null) {
+ conf.putAll(extraConfs);
+ }
+
+ Nimbus.Iface handler = impl != null ? impl :
mock(Nimbus.Iface.class);
+ final ThriftServer server = new ThriftServer(conf,
+ new Nimbus.Processor<>(handler),
+ ThriftConnectionType.NIMBUS);
+
+ LOG.info("Created Server... {}", server);
+ new Thread(() -> {
+ LOG.info("Starting Serving...");
+ server.serve();
+ }).start();
+ Testing.whileTimeout(
+ () -> !server.isServing(),
+ () -> {
+ try {
+ Time.sleep(100);
+ } catch (InterruptedException e) {
+ //Ignored
+ }
+ });
+ try {
+ LOG.info("Starting to run {}", body);
+ body.accept(server, conf);
+ LOG.info("{} finished with no exceptions", body);
+ } finally {
+ LOG.info("Stopping server {}", server);
+ server.stop();
+ }
+ }
+
+ @Test
+ public void kerbToLocalTest() {
+ KerberosPrincipalToLocal kptol = new KerberosPrincipalToLocal();
+ kptol.prepare(Collections.emptyMap());
+ assertEquals("me", kptol.toLocal(mkPrincipal("me@realm")));
+ assertEquals("simple", kptol.toLocal(mkPrincipal("simple")));
+ assertEquals("someone",
kptol.toLocal(mkPrincipal("someone/host@realm")));
+ }
+
+ @Test
+ public void simpleAuthTest() throws Exception {
+ Nimbus.Iface impl = mock(Nimbus.Iface.class);
+ withServer(SimpleTransportPlugin.class,
+ impl,
+ (ThriftServer server, Map<String, Object> conf) -> {
+ try (NimbusClient client = new NimbusClient(conf,
"localhost", server.getPort(), NIMBUS_TIMEOUT)) {
+
client.getClient().activate("security_auth_test_topology");
+ }
+
+ //Verify digest is rejected...
+ Map<String, Object> badConf = new HashMap<>(conf);
+ badConf.put(Config.STORM_THRIFT_TRANSPORT_PLUGIN,
DigestSaslTransportPlugin.class.getName());
+ badConf.put("java.security.auth.login.config",
DIGEST_JAAS_CONF);
+ badConf.put(Config.STORM_NIMBUS_RETRY_TIMES, 0);
+ try (NimbusClient client = new NimbusClient(badConf,
"localhost", server.getPort(), NIMBUS_TIMEOUT)) {
+
client.getClient().activate("bad_security_auth_test_topology");
+ fail("An exception should have been thrown trying to
connect.");
+ } catch (Exception te) {
+ LOG.info("Got Exception...", te);
+
assert(Utils.exceptionCauseIsInstanceOf(TTransportException.class, te));
+ }
+ });
+ verify(impl).activate("security_auth_test_topology");
+ verify(impl, never()).activate("bad_security_auth_test_topology");
+ }
+
+ public static void verifyIncorrectJaasConf(ThriftServer server,
Map<String, Object> conf, String jaas,
+ Class<? extends Exception>
expectedException) {
+ Map<String, Object> badConf = new HashMap<>(conf);
+ badConf.put("java.security.auth.login.config", jaas);
+ try (NimbusClient client = new NimbusClient(badConf, "localhost",
server.getPort(), NIMBUS_TIMEOUT)) {
+ client.getClient().activate("bad_auth_test_topology");
+ fail("An exception should have been thrown trying to
connect.");
+ } catch (Exception e) {
+ LOG.info("Got Exception...", e);
+ assert(Utils.exceptionCauseIsInstanceOf(expectedException, e));
+ }
+ }
+
+ @Test
+ public void digestAuthTest() throws Exception {
+ Nimbus.Iface impl = mock(Nimbus.Iface.class);
+ final AtomicReference<ReqContext> user = new AtomicReference<>();
+ doAnswer((invocation) -> {
+ user.set(new ReqContext(ReqContext.context()));
+ return null;
+ }).when(impl).activate(anyString());
+
+ withServer(DIGEST_JAAS_CONF,
+ DigestSaslTransportPlugin.class,
+ impl,
+ (ThriftServer server, Map<String, Object> conf) -> {
+ try (NimbusClient client = new NimbusClient(conf,
"localhost", server.getPort(), NIMBUS_TIMEOUT)) {
+
client.getClient().activate("security_auth_test_topology");
+ }
+
+ conf.put(Config.STORM_NIMBUS_RETRY_TIMES, 0);
+
+ //Verify simple is rejected...
+ Map<String, Object> badTransport = new HashMap<>(conf);
+ badTransport.put(Config.STORM_THRIFT_TRANSPORT_PLUGIN,
SimpleTransportPlugin.class.getName());
+ try (NimbusClient client = new NimbusClient(badTransport,
"localhost", server.getPort(), NIMBUS_TIMEOUT)) {
+
client.getClient().activate("bad_security_auth_test_topology");
+ fail("An exception should have been thrown trying to
connect.");
+ } catch (Exception te) {
+ LOG.info("Got Exception...", te);
+
assert(Utils.exceptionCauseIsInstanceOf(TTransportException.class, te));
+ }
+ //The user here from the jaas conf is bob. No
impersonation is done, so verify that
+ ReqContext found = user.get();
+ assertNotNull(found);
+ assertEquals("bob", found.principal().getName());
+ assertFalse(found.isImpersonating());
+ user.set(null);
+
+ verifyIncorrectJaasConf(server, conf, BAD_PASSWORD_CONF,
TTransportException.class);
+ verifyIncorrectJaasConf(server, conf, WRONG_USER_CONF,
TTransportException.class);
+ verifyIncorrectJaasConf(server, conf,
"./nonexistent.conf", RuntimeException.class);
+ verifyIncorrectJaasConf(server, conf, MISSING_CLIENT,
IOException.class);
+ });
+ verify(impl).activate("security_auth_test_topology");
+ verify(impl, never()).activate("bad_auth_test_topology");
+ }
+
+ public static Subject createSubjectWith(WorkerToken wt) {
+ //This is a bit ugly, but it shows how this would happen in a
worker so we will use the same APIs
+ Map<String, String> creds = new HashMap<>();
+ AuthUtils.setWorkerToken(creds, wt);
+ Subject subject = new Subject();
+ AuthUtils.updateSubject(subject, Collections.emptyList(), creds);
+ return subject;
+ }
+
+ public static void tryConnectAs( Map<String, Object> conf,
ThriftServer server, Subject subject, String topoId)
+ throws PrivilegedActionException {
+ Subject.doAs(subject, (PrivilegedExceptionAction<Void>) () -> {
+ try (NimbusClient client = new NimbusClient(conf, "localhost",
server.getPort(), NIMBUS_TIMEOUT)) {
+ client.getClient().activate(topoId); //Yes this should be
a topo name, but it makes this simpler...
+ }
+ return null;
+ });
+ }
+
+ public static Subject testConnectWithTokenFor(WorkerTokenManager
wtMan, Map<String, Object> conf, ThriftServer server,
+ String user, String topoId)
throws PrivilegedActionException {
+ WorkerToken wt =
wtMan.createOrUpdateTokenFor(WorkerTokenServiceType.NIMBUS, user, topoId);
+ Subject subject = createSubjectWith(wt);
+ tryConnectAs(conf, server, subject, topoId);
+ return subject;
+ }
+
+ public static void verifyUserIs(AtomicReference<ReqContext> user,
String userName) {
+ //The user from the token is bob, so verify that the name was set
correctly...
+ ReqContext found = user.get();
+ assertNotNull(found);
+ assertEquals(userName, found.principal().getName());
+ assertFalse(found.isImpersonating());
+ user.set(null);
+ }
+
+ @Test
+ public void workerTokenDigestAuthTest() throws Exception {
+ LOG.info("\n\n\t\tworkerTokenDigestAuthTest - START\n\n");
+ Nimbus.Iface impl = mock(Nimbus.Iface.class);
+ final AtomicReference<ReqContext> user = new AtomicReference<>();
+ doAnswer((invocation) -> {
+ user.set(new ReqContext(ReqContext.context()));
+ return null;
+ }).when(impl).activate(anyString());
+
+ Map<String, Object> extraConfs = new HashMap<>();
+ //Let worker tokens work on insecure ZK...
+ extraConfs.put("TESTING.ONLY.ENABLE.INSECURE.WORKER.TOKENS", true);
+
+ try (InProcessZookeeper zk = new InProcessZookeeper()) {
+ withServer(MISSING_CLIENT,
+ DigestSaslTransportPlugin.class,
+ impl,
+ zk,
+ extraConfs,
+ (ThriftServer server, Map<String, Object> conf) -> {
+ try (Time.SimulatedTime sim = new
Time.SimulatedTime()) {
+ conf.put(Config.STORM_NIMBUS_RETRY_TIMES, 0);
+ //We cannot connect if there is no client section
in the jaas conf...
+ try (NimbusClient client = new NimbusClient(conf,
"localhost", server.getPort(), NIMBUS_TIMEOUT)) {
+
client.getClient().activate("bad_auth_test_topology");
+ fail("We should not be able to connect without
a token...");
+ } catch (Exception e) {
+ assert
(Utils.exceptionCauseIsInstanceOf(IOException.class, e));
+ }
+
+ //Now lets create a token and verify that we can
connect...
+ IStormClusterState state =
ClusterUtils.mkStormClusterState(conf, new
ClusterStateContext(DaemonType.NIMBUS, conf));
+ WorkerTokenManager wtMan = new
WorkerTokenManager(conf, state);
+ Subject bob = testConnectWithTokenFor(wtMan, conf,
server, "bob", "topo-bob");
+ verifyUserIs(user, "bob");
+
+ Time.advanceTimeSecs(TimeUnit.HOURS.toSeconds(12));
+
+ //Alice has no digest jaas section at all...
+ Subject alice = testConnectWithTokenFor(wtMan,
conf, server, "alice", "topo-alice");
+ verifyUserIs(user, "alice");
+
+ Time.advanceTimeSecs(TimeUnit.HOURS.toSeconds(13));
+ //Verify that bob's token has expired
+
+ try {
+ tryConnectAs(conf, server, bob,
"bad_auth_test_topology");
+ fail("We should not be able to connect with
bad auth");
+ } catch (Exception e) {
+ assert
(Utils.exceptionCauseIsInstanceOf(TTransportException.class, e));
+ }
+ tryConnectAs(conf, server, alice, "topo-alice");
+ verifyUserIs(user, "alice");
+
+ //Now see if we can create a new token for bob and
try again.
+ bob = testConnectWithTokenFor(wtMan, conf, server,
"bob", "topo-bob");
+ verifyUserIs(user, "bob");
+
+ tryConnectAs(conf, server, alice, "topo-alice");
+ verifyUserIs(user, "alice");
+ }
+ });
+ }
+ verify(impl, times(2)).activate("topo-bob");
+ verify(impl, times(3)).activate("topo-alice");
+ verify(impl, never()).activate("bad_auth_test_topology");
+ LOG.info("\n\n\t\tworkerTokenDigestAuthTest - END\n\n");
+ }
+
+ @Test
+ public void negativeWhitelistAuthroizationTest() {
+ SimpleWhitelistAuthorizer auth = new SimpleWhitelistAuthorizer();
+ Map<String, Object> conf = ConfigUtils.readStormConfig();
+ auth.prepare(conf);
+ ReqContext context = new ReqContext(mkSubject("user"));
+ assertFalse(auth.permit(context, "activate", conf));
+ }
+
+ @Test
+ public void positiveWhitelistAuthroizationTest() {
+ SimpleWhitelistAuthorizer auth = new SimpleWhitelistAuthorizer();
+ Map<String, Object> conf = ConfigUtils.readStormConfig();
+ conf.put(SimpleWhitelistAuthorizer.WHITELIST_USERS_CONF,
Arrays.asList("user"));
+ auth.prepare(conf);
+ ReqContext context = new ReqContext(mkSubject("user"));
+ assertTrue(auth.permit(context, "activate", conf));
+ }
+
+ @Test
+ public void simpleAclUserAuthTest() {
+ Map<String, Object> clusterConf = ConfigUtils.readStormConfig();
+ clusterConf.put(Config.NIMBUS_ADMINS, Arrays.asList("admin"));
+ clusterConf.put(Config.NIMBUS_SUPERVISOR_USERS,
Arrays.asList("supervisor"));
+ ReqContext admin = new ReqContext(mkSubject("admin"));
+ ReqContext supervisor = new ReqContext(mkSubject("supervisor"));
+ ReqContext userA = new ReqContext(mkSubject("user-a"));
+ ReqContext userB = new ReqContext(mkSubject("user-b"));
+
+ final Map<String, Object> empty = Collections.emptyMap();
+ final Map<String, Object> aAllowed = new HashMap<>();
+ aAllowed.put(Config.TOPOLOGY_USERS, Arrays.asList("user-a"));
+
+ SimpleACLAuthorizer authorizer = new SimpleACLAuthorizer();
+ authorizer.prepare(clusterConf);
+
+ assertTrue(authorizer.permit(userA, "submitTopology", empty));
+ assertTrue(authorizer.permit(userB, "submitTopology", empty));
+ assertTrue(authorizer.permit(admin, "submitTopology", empty));
+ assertFalse(authorizer.permit(supervisor, "submitTopology",
empty));
+
+ assertTrue(authorizer.permit(userA, "fileUpload", null));
+ assertTrue(authorizer.permit(userB, "fileUpload", null));
+ assertTrue(authorizer.permit(admin, "fileUpload", null));
+ assertFalse(authorizer.permit(supervisor, "fileUpload", null));
+
+ assertTrue(authorizer.permit(userA, "getNimbusConf", null));
+ assertTrue(authorizer.permit(userB, "getNimbusConf", null));
+ assertTrue(authorizer.permit(admin, "getNimbusConf", null));
+ assertFalse(authorizer.permit(supervisor, "getNimbusConf", null));
+
+ assertTrue(authorizer.permit(userA, "getClusterInfo", null));
+ assertTrue(authorizer.permit(userB, "getClusterInfo", null));
+ assertTrue(authorizer.permit(admin, "getClusterInfo", null));
+ assertFalse(authorizer.permit(supervisor, "getClusterInfo", null));
+
+ assertFalse(authorizer.permit(userA, "fileDownload", null));
+ assertFalse(authorizer.permit(userB, "fileDownload", null));
+ assertTrue(authorizer.permit(admin, "fileDownload", null));
+ assertTrue(authorizer.permit(supervisor, "fileDownload", null));
+
+ assertTrue(authorizer.permit(userA, "killTopology", aAllowed));
+ assertFalse(authorizer.permit(userB, "killTopology", aAllowed));
+ assertTrue(authorizer.permit(admin, "killTopology", aAllowed));
+ assertFalse(authorizer.permit(supervisor, "killTopology",
aAllowed));
+
+ assertTrue(authorizer.permit(userA, "uploadNewCredentials",
aAllowed));
+ assertFalse(authorizer.permit(userB, "uploadNewCredentials",
aAllowed));
+ assertTrue(authorizer.permit(admin, "uploadNewCredentials",
aAllowed));
+ assertFalse(authorizer.permit(supervisor, "uploadNewCredentials",
aAllowed));
+
+ assertTrue(authorizer.permit(userA, "rebalance", aAllowed));
+ assertFalse(authorizer.permit(userB, "rebalance", aAllowed));
+ assertTrue(authorizer.permit(admin, "rebalance", aAllowed));
+ assertFalse(authorizer.permit(supervisor, "rebalance", aAllowed));
+
+ assertTrue(authorizer.permit(userA, "activate", aAllowed));
+ assertFalse(authorizer.permit(userB, "activate", aAllowed));
+ assertTrue(authorizer.permit(admin, "activate", aAllowed));
+ assertFalse(authorizer.permit(supervisor, "activate", aAllowed));
+
+ assertTrue(authorizer.permit(userA, "deactivate", aAllowed));
+ assertFalse(authorizer.permit(userB, "deactivate", aAllowed));
+ assertTrue(authorizer.permit(admin, "deactivate", aAllowed));
+ assertFalse(authorizer.permit(supervisor, "deactivate", aAllowed));
+
+ assertTrue(authorizer.permit(userA, "getTopologyConf", aAllowed));
+ assertFalse(authorizer.permit(userB, "getTopologyConf", aAllowed));
+ assertTrue(authorizer.permit(admin, "getTopologyConf", aAllowed));
+ assertFalse(authorizer.permit(supervisor, "getTopologyConf",
aAllowed));
+
+ assertTrue(authorizer.permit(userA, "getTopology", aAllowed));
+ assertFalse(authorizer.permit(userB, "getTopology", aAllowed));
+ assertTrue(authorizer.permit(admin, "getTopology", aAllowed));
+ assertFalse(authorizer.permit(supervisor, "getTopology",
aAllowed));
+
+ assertTrue(authorizer.permit(userA, "getUserTopology", aAllowed));
+ assertFalse(authorizer.permit(userB, "getUserTopology", aAllowed));
+ assertTrue(authorizer.permit(admin, "getUserTopology", aAllowed));
+ assertFalse(authorizer.permit(supervisor, "getUserTopology",
aAllowed));
+
+ assertTrue(authorizer.permit(userA, "getTopologyInfo", aAllowed));
+ assertFalse(authorizer.permit(userB, "getTopologyInfo", aAllowed));
+ assertTrue(authorizer.permit(admin, "getTopologyInfo", aAllowed));
+ assertFalse(authorizer.permit(supervisor, "getTopologyInfo",
aAllowed));
+ }
+
+ @Test
+ public void simpleAclNimbusUsersAuthTest() {
+ Map<String, Object> clusterConf = ConfigUtils.readStormConfig();
+ clusterConf.put(Config.NIMBUS_ADMINS, Arrays.asList("admin"));
+ clusterConf.put(Config.NIMBUS_SUPERVISOR_USERS,
Arrays.asList("supervisor"));
+ clusterConf.put(Config.NIMBUS_USERS, Arrays.asList("user-a"));
+ ReqContext admin = new ReqContext(mkSubject("admin"));
+ ReqContext supervisor = new ReqContext(mkSubject("supervisor"));
+ ReqContext userA = new ReqContext(mkSubject("user-a"));
+ ReqContext userB = new ReqContext(mkSubject("user-b"));
+
+ final Map<String, Object> empty = Collections.emptyMap();
+
+ SimpleACLAuthorizer authorizer = new SimpleACLAuthorizer();
+ authorizer.prepare(clusterConf);
+
+ assertTrue(authorizer.permit(userA, "submitTopology", empty));
+ assertFalse(authorizer.permit(userB, "submitTopology", empty));
+ assertTrue(authorizer.permit(admin, "fileUpload", null));
+ assertTrue(authorizer.permit(supervisor, "fileDownload", null));
+ }
+
+ @Test
+ public void simpleAclNimbusGroupsAuthTest() {
+ Map<String, Object> clusterConf = ConfigUtils.readStormConfig();
+ clusterConf.put(Config.NIMBUS_ADMINS_GROUPS,
Arrays.asList("admin-group"));
+ clusterConf.put(Config.NIMBUS_SUPERVISOR_USERS,
Arrays.asList("supervisor"));
+ clusterConf.put(Config.NIMBUS_USERS, Arrays.asList("user-a"));
+
clusterConf.put(Config.STORM_GROUP_MAPPING_SERVICE_PROVIDER_PLUGIN,
FixedGroupsMapping.class.getName());
+ Map<String, Object> groups = new HashMap<>();
+ groups.put("admin", Collections.singleton("admin-group"));
+ groups.put("not-admin", Collections.singleton("not-admin-group"));
+ Map<String, Object> groupsParams = new HashMap<>();
+ groupsParams.put(FixedGroupsMapping.STORM_FIXED_GROUP_MAPPING,
groups);
+ clusterConf.put(Config.STORM_GROUP_MAPPING_SERVICE_PARAMS,
groupsParams);
+
+ ReqContext admin = new ReqContext(mkSubject("admin"));
+ ReqContext notAdmin = new ReqContext(mkSubject("not-admin"));
+ ReqContext supervisor = new ReqContext(mkSubject("supervisor"));
+ ReqContext userA = new ReqContext(mkSubject("user-a"));
+ ReqContext userB = new ReqContext(mkSubject("user-b"));
+
+ final Map<String, Object> empty = Collections.emptyMap();
+
+ SimpleACLAuthorizer authorizer = new SimpleACLAuthorizer();
+ authorizer.prepare(clusterConf);
+
+ assertTrue(authorizer.permit(userA, "submitTopology", empty));
+ assertFalse(authorizer.permit(userB, "submitTopology", empty));
+
+ assertTrue(authorizer.permit(admin, "fileUpload", null));
+ assertFalse(authorizer.permit(notAdmin, "fileUpload", null));
+ assertFalse(authorizer.permit(userB, "fileUpload", null));
+
+ assertTrue(authorizer.permit(supervisor, "fileDownload", null));
+ }
+
+ @Test
+ public void simpleAclSameUserAuthTest() {
+ Map<String, Object> clusterConf = ConfigUtils.readStormConfig();
+ clusterConf.put(Config.NIMBUS_ADMINS, Arrays.asList("admin"));
+ clusterConf.put(Config.NIMBUS_SUPERVISOR_USERS,
Arrays.asList("admin"));
+ ReqContext admin = new ReqContext(mkSubject("admin"));
+
+ final Map<String, Object> empty = Collections.emptyMap();
+ final Map<String, Object> aAllowed = new HashMap<>();
+ aAllowed.put(Config.TOPOLOGY_USERS, Arrays.asList("user-a"));
+
+ SimpleACLAuthorizer authorizer = new SimpleACLAuthorizer();
+ authorizer.prepare(clusterConf);
+
+ assertTrue(authorizer.permit(admin, "submitTopology", empty));
+ assertTrue(authorizer.permit(admin, "fileUpload", null));
+ assertTrue(authorizer.permit(admin, "getNimbusConf", null));
+ assertTrue(authorizer.permit(admin, "getClusterInfo", null));
+ assertTrue(authorizer.permit(admin, "fileDownload", null));
+ assertTrue(authorizer.permit(admin, "killTopology", aAllowed));
+ assertTrue(authorizer.permit(admin, "uploadNewCredentials",
aAllowed));
+ assertTrue(authorizer.permit(admin, "rebalance", aAllowed));
+ assertTrue(authorizer.permit(admin, "activate", aAllowed));
+ assertTrue(authorizer.permit(admin, "getTopologyConf", aAllowed));
+ assertTrue(authorizer.permit(admin, "getTopology", aAllowed));
+ assertTrue(authorizer.permit(admin, "getUserTopology", aAllowed));
+ assertTrue(authorizer.permit(admin, "getTopologyInfo", aAllowed));
+ }
+
+ @Test
+ public void shellBaseGroupsMappingTest() throws Exception {
+ Map<String, Object> clusterConf = ConfigUtils.readStormConfig();
+ ShellBasedGroupsMapping groups = new ShellBasedGroupsMapping();
+ groups.prepare(clusterConf);
+
+ String userName = System.getProperty("user.name");
+
+ assertTrue(groups.getGroups(userName).size() >= 0);
+ assertEquals(0, groups.getGroups("userDoesNotExist").size());
+ assertEquals(0, groups.getGroups(null).size());
+ }
+
+ @Test(expected = RuntimeException.class)
+ public void getTransportPluginThrowsRunimeTest() {
+ Map<String, Object> conf = ConfigUtils.readStormConfig();
+ conf.put(Config.STORM_THRIFT_TRANSPORT_PLUGIN, "null.invalid");
+ AuthUtils.getTransportPlugin(ThriftConnectionType.NIMBUS, conf,
null);
+ }
+
+ public static ReqContext mkImpersonatingReqContext(String
impersonatingUser, String userBeingIUmpersonated, InetAddress remoteAddress) {
+ ReqContext ret = new ReqContext(mkSubject(userBeingIUmpersonated));
+ ret.setRemoteAddress(remoteAddress);
--- End diff --
userBeingImpersonated ?
---