Github user danny0405 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2531#discussion_r164031325
  
    --- Diff: 
storm-server/src/test/java/org/apache/storm/security/auth/AuthTest.java ---
    @@ -0,0 +1,637 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.storm.security.auth;
    +
    +import java.io.File;
    +import java.io.IOException;
    +import java.net.InetAddress;
    +import java.security.Principal;
    +import java.security.PrivilegedActionException;
    +import java.security.PrivilegedExceptionAction;
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicReference;
    +import javax.security.auth.Subject;
    +import org.apache.storm.Config;
    +import org.apache.storm.Testing;
    +import org.apache.storm.cluster.ClusterStateContext;
    +import org.apache.storm.cluster.ClusterUtils;
    +import org.apache.storm.cluster.DaemonType;
    +import org.apache.storm.cluster.IStormClusterState;
    +import org.apache.storm.cluster.StormClusterStateImpl;
    +import org.apache.storm.generated.Nimbus;
    +import org.apache.storm.generated.WorkerToken;
    +import org.apache.storm.generated.WorkerTokenServiceType;
    +import org.apache.storm.security.auth.authorizer.ImpersonationAuthorizer;
    +import org.apache.storm.security.auth.authorizer.SimpleACLAuthorizer;
    +import org.apache.storm.security.auth.authorizer.SimpleWhitelistAuthorizer;
    +import org.apache.storm.security.auth.digest.DigestSaslTransportPlugin;
    +import org.apache.storm.security.auth.workertoken.WorkerTokenManager;
    +import org.apache.storm.testing.InProcessZookeeper;
    +import org.apache.storm.utils.ConfigUtils;
    +import org.apache.storm.utils.NimbusClient;
    +import org.apache.storm.utils.Time;
    +import org.apache.storm.utils.Utils;
    +import org.apache.thrift.transport.TTransportException;
    +import org.junit.Test;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import static org.junit.Assert.*;
    +import static org.mockito.Mockito.*;
    +
    +public class AuthTest {
    +    private static final Logger LOG = 
LoggerFactory.getLogger(AuthTest.class);
    +    private static final File BASE = new File("./src/test/resources/");
    +
    +    private static final String DIGEST_JAAS_CONF = new 
File(BASE,"jaas_digest.conf").getAbsolutePath();
    +    private static final String BAD_PASSWORD_CONF = new File(BASE, 
"jaas_digest_bad_password.conf").getAbsolutePath();
    +    private static final String WRONG_USER_CONF = new 
File(BASE,"jaas_digest_unknown_user.conf").getAbsolutePath();
    +    private static final String MISSING_CLIENT = new File(BASE, 
"jaas_digest_missing_client.conf").getAbsolutePath();
    +
    +    //3 seconds in milliseconds
    +    public static final int NIMBUS_TIMEOUT = 3_000;
    +
    +    public interface MyBiConsumer<T, U>  {
    +        void accept(T t, U u) throws Exception;
    +    }
    +
    +
    +    public static Principal mkPrincipal(final String name) {
    +        return new Principal() {
    +            @Override
    +            public String getName() {
    +                return name;
    +            }
    +
    +            @Override
    +            public boolean equals(Object other) {
    +                return other instanceof Principal
    +                    && name.equals(((Principal) other).getName());
    +            }
    +
    +            @Override
    +            public String toString() {
    +                return name;
    +            }
    +
    +            @Override
    +            public int hashCode() {
    +                return name.hashCode();
    +            }
    +        };
    +    }
    +
    +    public static Subject mkSubject(String name) {
    +        return new Subject(true, Collections.singleton(mkPrincipal(name)),
    +            Collections.emptySet(), Collections.emptySet());
    +    }
    +
    +    public static void withServer(Class<? extends ITransportPlugin> 
transportPluginClass,
    +                                  Nimbus.Iface impl,
    +                                  MyBiConsumer<ThriftServer, Map<String, 
Object>> body) throws Exception {
    +        withServer(null, transportPluginClass, impl, null, null, body);
    +    }
    +
    +    public static void withServer(String loginCfg,
    +                                  Class<? extends ITransportPlugin> 
transportPluginClass,
    +                                  Nimbus.Iface impl,
    +                                  MyBiConsumer<ThriftServer, Map<String, 
Object>> body) throws Exception {
    +        withServer(loginCfg, transportPluginClass, impl, null, null, body);
    +    }
    +
    +    public static  void withServer(String loginCfg,
    +                                   Class<? extends ITransportPlugin> 
transportPluginClass,
    +                                   Nimbus.Iface impl,
    +                                   InProcessZookeeper zk,
    +                                   Map<String, Object> extraConfs,
    +                                   MyBiConsumer<ThriftServer, Map<String, 
Object>> body) throws Exception {
    +        Map<String, Object> conf = ConfigUtils.readStormConfig();
    +        conf.put(Config.NIMBUS_THRIFT_PORT, 0);
    +        conf.put(Config.STORM_THRIFT_TRANSPORT_PLUGIN, 
transportPluginClass.getName());
    +
    +        if (loginCfg != null) {
    +            conf.put("java.security.auth.login.config", loginCfg);
    +        }
    +
    +        if (zk != null) {
    +            conf.put(Config.STORM_ZOOKEEPER_SERVERS, 
Arrays.asList("localhost"));
    +            conf.put(Config.STORM_ZOOKEEPER_PORT, zk.getPort());
    +        }
    +
    +        if (extraConfs != null) {
    +            conf.putAll(extraConfs);
    +        }
    +
    +        Nimbus.Iface handler = impl != null ? impl : 
mock(Nimbus.Iface.class);
    +        final ThriftServer server = new ThriftServer(conf,
    +            new Nimbus.Processor<>(handler),
    +            ThriftConnectionType.NIMBUS);
    +
    +        LOG.info("Created Server... {}", server);
    +        new Thread(() -> {
    +            LOG.info("Starting Serving...");
    +            server.serve();
    +        }).start();
    +        Testing.whileTimeout(
    +            () -> !server.isServing(),
    +            () -> {
    +                try {
    +                    Time.sleep(100);
    +                } catch (InterruptedException e) {
    +                    //Ignored
    +                }
    +            });
    +        try {
    +            LOG.info("Starting to run {}", body);
    +            body.accept(server, conf);
    +            LOG.info("{} finished with no exceptions", body);
    +        } finally {
    +            LOG.info("Stopping server {}", server);
    +            server.stop();
    +        }
    +    }
    +
    +    @Test
    +    public void kerbToLocalTest() {
    +        KerberosPrincipalToLocal kptol = new KerberosPrincipalToLocal();
    +        kptol.prepare(Collections.emptyMap());
    +        assertEquals("me", kptol.toLocal(mkPrincipal("me@realm")));
    +        assertEquals("simple", kptol.toLocal(mkPrincipal("simple")));
    +        assertEquals("someone", 
kptol.toLocal(mkPrincipal("someone/host@realm")));
    +    }
    +
    +    @Test
    +    public void simpleAuthTest() throws Exception {
    +        Nimbus.Iface impl = mock(Nimbus.Iface.class);
    +        withServer(SimpleTransportPlugin.class,
    +            impl,
    +            (ThriftServer server, Map<String, Object> conf) -> {
    +                try (NimbusClient client = new NimbusClient(conf, 
"localhost", server.getPort(), NIMBUS_TIMEOUT)) {
    +                    
client.getClient().activate("security_auth_test_topology");
    +                }
    +
    +                //Verify digest is rejected...
    +                Map<String, Object> badConf = new HashMap<>(conf);
    +                badConf.put(Config.STORM_THRIFT_TRANSPORT_PLUGIN, 
DigestSaslTransportPlugin.class.getName());
    +                badConf.put("java.security.auth.login.config", 
DIGEST_JAAS_CONF);
    +                badConf.put(Config.STORM_NIMBUS_RETRY_TIMES, 0);
    +                try (NimbusClient client = new NimbusClient(badConf, 
"localhost", server.getPort(), NIMBUS_TIMEOUT)) {
    +                    
client.getClient().activate("bad_security_auth_test_topology");
    +                    fail("An exception should have been thrown trying to 
connect.");
    +                } catch (Exception te) {
    +                    LOG.info("Got Exception...", te);
    +                    
assert(Utils.exceptionCauseIsInstanceOf(TTransportException.class, te));
    +                }
    +            });
    +        verify(impl).activate("security_auth_test_topology");
    +        verify(impl, never()).activate("bad_security_auth_test_topology");
    +    }
    +
    +    public static void verifyIncorrectJaasConf(ThriftServer server, 
Map<String, Object> conf, String jaas,
    +                                               Class<? extends Exception> 
expectedException) {
    +        Map<String, Object> badConf = new HashMap<>(conf);
    +        badConf.put("java.security.auth.login.config", jaas);
    +        try (NimbusClient client = new NimbusClient(badConf, "localhost", 
server.getPort(), NIMBUS_TIMEOUT)) {
    +            client.getClient().activate("bad_auth_test_topology");
    +            fail("An exception should have been thrown trying to 
connect.");
    +        } catch (Exception e) {
    +            LOG.info("Got Exception...", e);
    +            assert(Utils.exceptionCauseIsInstanceOf(expectedException, e));
    +        }
    +    }
    +
    +    @Test
    +    public void digestAuthTest() throws Exception {
    +        Nimbus.Iface impl = mock(Nimbus.Iface.class);
    +        final AtomicReference<ReqContext> user = new AtomicReference<>();
    +        doAnswer((invocation) -> {
    +            user.set(new ReqContext(ReqContext.context()));
    +            return null;
    +        }).when(impl).activate(anyString());
    +
    +        withServer(DIGEST_JAAS_CONF,
    +            DigestSaslTransportPlugin.class,
    +            impl,
    +            (ThriftServer server, Map<String, Object> conf) -> {
    +                try (NimbusClient client = new NimbusClient(conf, 
"localhost", server.getPort(), NIMBUS_TIMEOUT)) {
    +                    
client.getClient().activate("security_auth_test_topology");
    +                }
    +
    +                conf.put(Config.STORM_NIMBUS_RETRY_TIMES, 0);
    +
    +                //Verify simple is rejected...
    +                Map<String, Object> badTransport = new HashMap<>(conf);
    +                badTransport.put(Config.STORM_THRIFT_TRANSPORT_PLUGIN, 
SimpleTransportPlugin.class.getName());
    +                try (NimbusClient client = new NimbusClient(badTransport, 
"localhost", server.getPort(), NIMBUS_TIMEOUT)) {
    +                    
client.getClient().activate("bad_security_auth_test_topology");
    +                    fail("An exception should have been thrown trying to 
connect.");
    +                } catch (Exception te) {
    +                    LOG.info("Got Exception...", te);
    +                    
assert(Utils.exceptionCauseIsInstanceOf(TTransportException.class, te));
    +                }
    +                //The user here from the jaas conf is bob.  No 
impersonation is done, so verify that
    +                ReqContext found = user.get();
    +                assertNotNull(found);
    +                assertEquals("bob", found.principal().getName());
    +                assertFalse(found.isImpersonating());
    +                user.set(null);
    +
    +                verifyIncorrectJaasConf(server, conf, BAD_PASSWORD_CONF, 
TTransportException.class);
    +                verifyIncorrectJaasConf(server, conf, WRONG_USER_CONF, 
TTransportException.class);
    +                verifyIncorrectJaasConf(server, conf, 
"./nonexistent.conf", RuntimeException.class);
    +                verifyIncorrectJaasConf(server, conf, MISSING_CLIENT, 
IOException.class);
    +            });
    +        verify(impl).activate("security_auth_test_topology");
    +        verify(impl, never()).activate("bad_auth_test_topology");
    +    }
    +
    +    public static Subject createSubjectWith(WorkerToken wt) {
    +        //This is a bit ugly, but it shows how this would happen in a 
worker so we will use the same APIs
    +        Map<String, String> creds = new HashMap<>();
    +        AuthUtils.setWorkerToken(creds, wt);
    +        Subject subject = new Subject();
    +        AuthUtils.updateSubject(subject, Collections.emptyList(), creds);
    +        return subject;
    +    }
    +
    +    public static void tryConnectAs( Map<String, Object> conf, 
ThriftServer server, Subject subject, String topoId)
    +        throws PrivilegedActionException {
    +        Subject.doAs(subject, (PrivilegedExceptionAction<Void>) () -> {
    +            try (NimbusClient client = new NimbusClient(conf, "localhost", 
server.getPort(), NIMBUS_TIMEOUT)) {
    +                client.getClient().activate(topoId); //Yes this should be 
a topo name, but it makes this simpler...
    +            }
    +            return null;
    +        });
    +    }
    +
    +    public static Subject testConnectWithTokenFor(WorkerTokenManager 
wtMan, Map<String, Object> conf, ThriftServer server,
    +                                               String user, String topoId) 
throws PrivilegedActionException {
    +        WorkerToken wt = 
wtMan.createOrUpdateTokenFor(WorkerTokenServiceType.NIMBUS, user, topoId);
    +        Subject subject = createSubjectWith(wt);
    +        tryConnectAs(conf, server, subject, topoId);
    +        return subject;
    +    }
    +
    +    public static void verifyUserIs(AtomicReference<ReqContext> user, 
String userName) {
    +        //The user from the token is bob, so verify that the name was set 
correctly...
    +        ReqContext found = user.get();
    +        assertNotNull(found);
    +        assertEquals(userName, found.principal().getName());
    +        assertFalse(found.isImpersonating());
    +        user.set(null);
    +    }
    +
    +    @Test
    +    public void workerTokenDigestAuthTest() throws Exception {
    +        LOG.info("\n\n\t\tworkerTokenDigestAuthTest - START\n\n");
    +        Nimbus.Iface impl = mock(Nimbus.Iface.class);
    +        final AtomicReference<ReqContext> user = new AtomicReference<>();
    +        doAnswer((invocation) -> {
    +            user.set(new ReqContext(ReqContext.context()));
    +            return null;
    +        }).when(impl).activate(anyString());
    +
    +        Map<String, Object> extraConfs = new HashMap<>();
    +        //Let worker tokens work on insecure ZK...
    +        extraConfs.put("TESTING.ONLY.ENABLE.INSECURE.WORKER.TOKENS", true);
    +
    +        try (InProcessZookeeper zk = new InProcessZookeeper()) {
    +            withServer(MISSING_CLIENT,
    +                DigestSaslTransportPlugin.class,
    +                impl,
    +                zk,
    +                extraConfs,
    +                (ThriftServer server, Map<String, Object> conf) -> {
    +                    try (Time.SimulatedTime sim = new 
Time.SimulatedTime()) {
    +                        conf.put(Config.STORM_NIMBUS_RETRY_TIMES, 0);
    +                        //We cannot connect if there is no client section 
in the jaas conf...
    +                        try (NimbusClient client = new NimbusClient(conf, 
"localhost", server.getPort(), NIMBUS_TIMEOUT)) {
    +                            
client.getClient().activate("bad_auth_test_topology");
    +                            fail("We should not be able to connect without 
a token...");
    +                        } catch (Exception e) {
    +                            assert 
(Utils.exceptionCauseIsInstanceOf(IOException.class, e));
    +                        }
    +
    +                        //Now lets create a token and verify that we can 
connect...
    +                        IStormClusterState state = 
ClusterUtils.mkStormClusterState(conf, new 
ClusterStateContext(DaemonType.NIMBUS, conf));
    +                        WorkerTokenManager wtMan = new 
WorkerTokenManager(conf, state);
    +                        Subject bob = testConnectWithTokenFor(wtMan, conf, 
server, "bob", "topo-bob");
    +                        verifyUserIs(user, "bob");
    +
    +                        Time.advanceTimeSecs(TimeUnit.HOURS.toSeconds(12));
    +
    +                        //Alice has no digest jaas section at all...
    +                        Subject alice = testConnectWithTokenFor(wtMan, 
conf, server, "alice", "topo-alice");
    +                        verifyUserIs(user, "alice");
    +
    +                        Time.advanceTimeSecs(TimeUnit.HOURS.toSeconds(13));
    +                        //Verify that bob's token has expired
    +
    +                        try {
    +                            tryConnectAs(conf, server, bob, 
"bad_auth_test_topology");
    +                            fail("We should not be able to connect with 
bad auth");
    +                        } catch (Exception e) {
    +                            assert 
(Utils.exceptionCauseIsInstanceOf(TTransportException.class, e));
    +                        }
    +                        tryConnectAs(conf, server, alice, "topo-alice");
    +                        verifyUserIs(user, "alice");
    +
    +                        //Now see if we can create a new token for bob and 
try again.
    +                        bob = testConnectWithTokenFor(wtMan, conf, server, 
"bob", "topo-bob");
    +                        verifyUserIs(user, "bob");
    +
    +                        tryConnectAs(conf, server, alice, "topo-alice");
    +                        verifyUserIs(user, "alice");
    +                    }
    +                });
    +        }
    +        verify(impl, times(2)).activate("topo-bob");
    +        verify(impl, times(3)).activate("topo-alice");
    +        verify(impl, never()).activate("bad_auth_test_topology");
    +        LOG.info("\n\n\t\tworkerTokenDigestAuthTest - END\n\n");
    +    }
    +
    +    @Test
    +    public void negativeWhitelistAuthroizationTest() {
    +        SimpleWhitelistAuthorizer auth = new SimpleWhitelistAuthorizer();
    +        Map<String, Object> conf = ConfigUtils.readStormConfig();
    +        auth.prepare(conf);
    +        ReqContext context = new ReqContext(mkSubject("user"));
    +        assertFalse(auth.permit(context, "activate", conf));
    +    }
    +
    +    @Test
    +    public void positiveWhitelistAuthroizationTest() {
    +        SimpleWhitelistAuthorizer auth = new SimpleWhitelistAuthorizer();
    +        Map<String, Object> conf = ConfigUtils.readStormConfig();
    +        conf.put(SimpleWhitelistAuthorizer.WHITELIST_USERS_CONF, 
Arrays.asList("user"));
    +        auth.prepare(conf);
    +        ReqContext context = new ReqContext(mkSubject("user"));
    +        assertTrue(auth.permit(context, "activate", conf));
    +    }
    +
    +    @Test
    +    public void simpleAclUserAuthTest() {
    +        Map<String, Object> clusterConf = ConfigUtils.readStormConfig();
    +        clusterConf.put(Config.NIMBUS_ADMINS, Arrays.asList("admin"));
    +        clusterConf.put(Config.NIMBUS_SUPERVISOR_USERS, 
Arrays.asList("supervisor"));
    +        ReqContext admin = new ReqContext(mkSubject("admin"));
    +        ReqContext supervisor = new ReqContext(mkSubject("supervisor"));
    +        ReqContext userA = new ReqContext(mkSubject("user-a"));
    +        ReqContext userB = new ReqContext(mkSubject("user-b"));
    +
    +        final Map<String, Object> empty = Collections.emptyMap();
    +        final Map<String, Object> aAllowed = new HashMap<>();
    +        aAllowed.put(Config.TOPOLOGY_USERS, Arrays.asList("user-a"));
    +
    +        SimpleACLAuthorizer authorizer = new SimpleACLAuthorizer();
    +        authorizer.prepare(clusterConf);
    +
    +        assertTrue(authorizer.permit(userA, "submitTopology", empty));
    +        assertTrue(authorizer.permit(userB, "submitTopology", empty));
    +        assertTrue(authorizer.permit(admin, "submitTopology", empty));
    +        assertFalse(authorizer.permit(supervisor, "submitTopology", 
empty));
    +
    +        assertTrue(authorizer.permit(userA, "fileUpload", null));
    +        assertTrue(authorizer.permit(userB, "fileUpload", null));
    +        assertTrue(authorizer.permit(admin, "fileUpload", null));
    +        assertFalse(authorizer.permit(supervisor, "fileUpload", null));
    +
    +        assertTrue(authorizer.permit(userA, "getNimbusConf", null));
    +        assertTrue(authorizer.permit(userB, "getNimbusConf", null));
    +        assertTrue(authorizer.permit(admin, "getNimbusConf", null));
    +        assertFalse(authorizer.permit(supervisor, "getNimbusConf", null));
    +
    +        assertTrue(authorizer.permit(userA, "getClusterInfo", null));
    +        assertTrue(authorizer.permit(userB, "getClusterInfo", null));
    +        assertTrue(authorizer.permit(admin, "getClusterInfo", null));
    +        assertFalse(authorizer.permit(supervisor, "getClusterInfo", null));
    +
    +        assertFalse(authorizer.permit(userA, "fileDownload", null));
    +        assertFalse(authorizer.permit(userB, "fileDownload", null));
    +        assertTrue(authorizer.permit(admin, "fileDownload", null));
    +        assertTrue(authorizer.permit(supervisor, "fileDownload", null));
    +
    +        assertTrue(authorizer.permit(userA, "killTopology", aAllowed));
    +        assertFalse(authorizer.permit(userB, "killTopology", aAllowed));
    +        assertTrue(authorizer.permit(admin, "killTopology", aAllowed));
    +        assertFalse(authorizer.permit(supervisor, "killTopology", 
aAllowed));
    +
    +        assertTrue(authorizer.permit(userA, "uploadNewCredentials", 
aAllowed));
    +        assertFalse(authorizer.permit(userB, "uploadNewCredentials", 
aAllowed));
    +        assertTrue(authorizer.permit(admin, "uploadNewCredentials", 
aAllowed));
    +        assertFalse(authorizer.permit(supervisor, "uploadNewCredentials", 
aAllowed));
    +
    +        assertTrue(authorizer.permit(userA, "rebalance", aAllowed));
    +        assertFalse(authorizer.permit(userB, "rebalance", aAllowed));
    +        assertTrue(authorizer.permit(admin, "rebalance", aAllowed));
    +        assertFalse(authorizer.permit(supervisor, "rebalance", aAllowed));
    +
    +        assertTrue(authorizer.permit(userA, "activate", aAllowed));
    +        assertFalse(authorizer.permit(userB, "activate", aAllowed));
    +        assertTrue(authorizer.permit(admin, "activate", aAllowed));
    +        assertFalse(authorizer.permit(supervisor, "activate", aAllowed));
    +
    +        assertTrue(authorizer.permit(userA, "deactivate", aAllowed));
    +        assertFalse(authorizer.permit(userB, "deactivate", aAllowed));
    +        assertTrue(authorizer.permit(admin, "deactivate", aAllowed));
    +        assertFalse(authorizer.permit(supervisor, "deactivate", aAllowed));
    +
    +        assertTrue(authorizer.permit(userA, "getTopologyConf", aAllowed));
    +        assertFalse(authorizer.permit(userB, "getTopologyConf", aAllowed));
    +        assertTrue(authorizer.permit(admin, "getTopologyConf", aAllowed));
    +        assertFalse(authorizer.permit(supervisor, "getTopologyConf", 
aAllowed));
    +
    +        assertTrue(authorizer.permit(userA, "getTopology", aAllowed));
    +        assertFalse(authorizer.permit(userB, "getTopology", aAllowed));
    +        assertTrue(authorizer.permit(admin, "getTopology", aAllowed));
    +        assertFalse(authorizer.permit(supervisor, "getTopology", 
aAllowed));
    +
    +        assertTrue(authorizer.permit(userA, "getUserTopology", aAllowed));
    +        assertFalse(authorizer.permit(userB, "getUserTopology", aAllowed));
    +        assertTrue(authorizer.permit(admin, "getUserTopology", aAllowed));
    +        assertFalse(authorizer.permit(supervisor, "getUserTopology", 
aAllowed));
    +
    +        assertTrue(authorizer.permit(userA, "getTopologyInfo", aAllowed));
    +        assertFalse(authorizer.permit(userB, "getTopologyInfo", aAllowed));
    +        assertTrue(authorizer.permit(admin, "getTopologyInfo", aAllowed));
    +        assertFalse(authorizer.permit(supervisor, "getTopologyInfo", 
aAllowed));
    +    }
    +
    +    @Test
    +    public void simpleAclNimbusUsersAuthTest() {
    +        Map<String, Object> clusterConf = ConfigUtils.readStormConfig();
    +        clusterConf.put(Config.NIMBUS_ADMINS, Arrays.asList("admin"));
    +        clusterConf.put(Config.NIMBUS_SUPERVISOR_USERS, 
Arrays.asList("supervisor"));
    +        clusterConf.put(Config.NIMBUS_USERS, Arrays.asList("user-a"));
    +        ReqContext admin = new ReqContext(mkSubject("admin"));
    +        ReqContext supervisor = new ReqContext(mkSubject("supervisor"));
    +        ReqContext userA = new ReqContext(mkSubject("user-a"));
    +        ReqContext userB = new ReqContext(mkSubject("user-b"));
    +
    +        final Map<String, Object> empty = Collections.emptyMap();
    +
    +        SimpleACLAuthorizer authorizer = new SimpleACLAuthorizer();
    +        authorizer.prepare(clusterConf);
    +
    +        assertTrue(authorizer.permit(userA, "submitTopology", empty));
    +        assertFalse(authorizer.permit(userB, "submitTopology", empty));
    +        assertTrue(authorizer.permit(admin, "fileUpload", null));
    +        assertTrue(authorizer.permit(supervisor, "fileDownload", null));
    +    }
    +
    +    @Test
    +    public void simpleAclNimbusGroupsAuthTest() {
    +        Map<String, Object> clusterConf = ConfigUtils.readStormConfig();
    +        clusterConf.put(Config.NIMBUS_ADMINS_GROUPS, 
Arrays.asList("admin-group"));
    +        clusterConf.put(Config.NIMBUS_SUPERVISOR_USERS, 
Arrays.asList("supervisor"));
    +        clusterConf.put(Config.NIMBUS_USERS, Arrays.asList("user-a"));
    +        
clusterConf.put(Config.STORM_GROUP_MAPPING_SERVICE_PROVIDER_PLUGIN, 
FixedGroupsMapping.class.getName());
    +        Map<String, Object> groups = new HashMap<>();
    +        groups.put("admin", Collections.singleton("admin-group"));
    +        groups.put("not-admin", Collections.singleton("not-admin-group"));
    +        Map<String, Object> groupsParams = new HashMap<>();
    +        groupsParams.put(FixedGroupsMapping.STORM_FIXED_GROUP_MAPPING, 
groups);
    +        clusterConf.put(Config.STORM_GROUP_MAPPING_SERVICE_PARAMS, 
groupsParams);
    +
    +        ReqContext admin = new ReqContext(mkSubject("admin"));
    +        ReqContext notAdmin = new ReqContext(mkSubject("not-admin"));
    +        ReqContext supervisor = new ReqContext(mkSubject("supervisor"));
    +        ReqContext userA = new ReqContext(mkSubject("user-a"));
    +        ReqContext userB = new ReqContext(mkSubject("user-b"));
    +
    +        final Map<String, Object> empty = Collections.emptyMap();
    +
    +        SimpleACLAuthorizer authorizer = new SimpleACLAuthorizer();
    +        authorizer.prepare(clusterConf);
    +
    +        assertTrue(authorizer.permit(userA, "submitTopology", empty));
    +        assertFalse(authorizer.permit(userB, "submitTopology", empty));
    +
    +        assertTrue(authorizer.permit(admin, "fileUpload", null));
    +        assertFalse(authorizer.permit(notAdmin, "fileUpload", null));
    +        assertFalse(authorizer.permit(userB, "fileUpload", null));
    +
    +        assertTrue(authorizer.permit(supervisor, "fileDownload", null));
    +    }
    +
    +    @Test
    +    public void simpleAclSameUserAuthTest() {
    +        Map<String, Object> clusterConf = ConfigUtils.readStormConfig();
    +        clusterConf.put(Config.NIMBUS_ADMINS, Arrays.asList("admin"));
    +        clusterConf.put(Config.NIMBUS_SUPERVISOR_USERS, 
Arrays.asList("admin"));
    +        ReqContext admin = new ReqContext(mkSubject("admin"));
    +
    +        final Map<String, Object> empty = Collections.emptyMap();
    +        final Map<String, Object> aAllowed = new HashMap<>();
    +        aAllowed.put(Config.TOPOLOGY_USERS, Arrays.asList("user-a"));
    +
    +        SimpleACLAuthorizer authorizer = new SimpleACLAuthorizer();
    +        authorizer.prepare(clusterConf);
    +
    +        assertTrue(authorizer.permit(admin, "submitTopology", empty));
    +        assertTrue(authorizer.permit(admin, "fileUpload", null));
    +        assertTrue(authorizer.permit(admin, "getNimbusConf", null));
    +        assertTrue(authorizer.permit(admin, "getClusterInfo", null));
    +        assertTrue(authorizer.permit(admin, "fileDownload", null));
    +        assertTrue(authorizer.permit(admin, "killTopology", aAllowed));
    +        assertTrue(authorizer.permit(admin, "uploadNewCredentials", 
aAllowed));
    +        assertTrue(authorizer.permit(admin, "rebalance", aAllowed));
    +        assertTrue(authorizer.permit(admin, "activate", aAllowed));
    +        assertTrue(authorizer.permit(admin, "getTopologyConf", aAllowed));
    +        assertTrue(authorizer.permit(admin, "getTopology", aAllowed));
    +        assertTrue(authorizer.permit(admin, "getUserTopology", aAllowed));
    +        assertTrue(authorizer.permit(admin, "getTopologyInfo", aAllowed));
    +    }
    +
    +    @Test
    +    public void shellBaseGroupsMappingTest() throws Exception {
    +        Map<String, Object> clusterConf = ConfigUtils.readStormConfig();
    +        ShellBasedGroupsMapping groups = new ShellBasedGroupsMapping();
    +        groups.prepare(clusterConf);
    +
    +        String userName = System.getProperty("user.name");
    +
    +        assertTrue(groups.getGroups(userName).size() >= 0);
    +        assertEquals(0, groups.getGroups("userDoesNotExist").size());
    +        assertEquals(0, groups.getGroups(null).size());
    +    }
    +
    +    @Test(expected = RuntimeException.class)
    +    public void getTransportPluginThrowsRunimeTest() {
    +        Map<String, Object> conf = ConfigUtils.readStormConfig();
    +        conf.put(Config.STORM_THRIFT_TRANSPORT_PLUGIN, "null.invalid");
    +        AuthUtils.getTransportPlugin(ThriftConnectionType.NIMBUS, conf, 
null);
    +    }
    +
    +    public static ReqContext mkImpersonatingReqContext(String 
impersonatingUser, String userBeingIUmpersonated, InetAddress remoteAddress) {
    +        ReqContext ret = new ReqContext(mkSubject(userBeingIUmpersonated));
    +        ret.setRemoteAddress(remoteAddress);
    --- End diff --
    
    userBeingImpersonated ?


---

Reply via email to