Repository: storm
Updated Branches:
  refs/heads/master ec537fdf5 -> a715e9a5c


http://git-wip-us.apache.org/repos/asf/storm/blob/9566a882/storm-server/src/main/java/org/apache/storm/daemon/supervisor/SupervisorUtils.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/SupervisorUtils.java
 
b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/SupervisorUtils.java
index 81421b2..aba9459 100644
--- 
a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/SupervisorUtils.java
+++ 
b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/SupervisorUtils.java
@@ -161,11 +161,4 @@ public class SupervisorUtils {
     private  boolean  isWorkerHbTimedOutImpl(int now, LSWorkerHeartbeat whb, 
Map<String, Object> conf) {
         return (now - whb.get_time_secs()) > 
ObjectReader.getInt(conf.get(Config.SUPERVISOR_WORKER_TIMEOUT_SECS));
     }
-
-    static List<ACL> supervisorZkAcls() {
-        final List<ACL> acls = new ArrayList<>();
-        acls.add(ZooDefs.Ids.CREATOR_ALL_ACL.get(0));
-        acls.add(new ACL((ZooDefs.Perms.READ ^ ZooDefs.Perms.CREATE), 
ZooDefs.Ids.ANYONE_ID_UNSAFE));
-        return acls;
-    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/9566a882/storm-server/src/main/java/org/apache/storm/security/auth/workertoken/WorkerTokenManager.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/security/auth/workertoken/WorkerTokenManager.java
 
b/storm-server/src/main/java/org/apache/storm/security/auth/workertoken/WorkerTokenManager.java
new file mode 100644
index 0000000..dac0c91
--- /dev/null
+++ 
b/storm-server/src/main/java/org/apache/storm/security/auth/workertoken/WorkerTokenManager.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.security.auth.workertoken;
+
+import java.nio.ByteBuffer;
+import java.security.NoSuchAlgorithmException;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import javax.crypto.KeyGenerator;
+import javax.crypto.SecretKey;
+import org.apache.storm.DaemonConfig;
+import org.apache.storm.cluster.IStormClusterState;
+import org.apache.storm.generated.PrivateWorkerKey;
+import org.apache.storm.generated.WorkerToken;
+import org.apache.storm.generated.WorkerTokenInfo;
+import org.apache.storm.generated.WorkerTokenServiceType;
+import org.apache.storm.security.auth.AuthUtils;
+import org.apache.storm.utils.ObjectReader;
+import org.apache.storm.utils.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The WorkerTokenManager manages the life cycle of worker tokens in nimbus.
+ */
+public class WorkerTokenManager {
+    private static final Logger LOG = 
LoggerFactory.getLogger(WorkerTokenManager.class);
+
+    /**
+     * The length of the random keys to use in bits.
+     * This should be at least the length of 
WorkerTokenSigner.DEFAULT_HMAC_ALGORITHM.
+     */
+    private static final int KEY_LENGTH = 256;
+
+    /**
+     * Generate a new random secret key.
+     * @return the new key
+     */
+    protected SecretKey generateSecret() {
+        SecretKey key;
+        synchronized (keyGen) {
+            key = keyGen.generateKey();
+        }
+        return key;
+    }
+
+    /**
+     * Get the secret that should be used to sign a token.  This may either 
reuse a secret or generate a new one so any user should
+     * call this once and save the result.
+     * @return the key to use.
+     */
+    protected SecretKey getCurrentSecret() {
+        return generateSecret();
+    }
+
+    /**
+     * Key generator to use.
+     */
+    private final KeyGenerator keyGen;
+    private final IStormClusterState state;
+    private final long tokenLifetimeMillis;
+
+    /**
+     * Constructor.  This assumes that state can store the tokens securely, 
and that they should be enabled at all.
+     * Please use AuthUtils.areWorkerTokensEnabledServer to validate this 
first.
+     * @param daemonConf the config for nimbus.
+     * @param state the state used to store private keys.
+     */
+    public WorkerTokenManager(Map<String, Object> daemonConf, 
IStormClusterState state) {
+        this.state = state;
+        try {
+            keyGen = 
KeyGenerator.getInstance(WorkerTokenSigner.DEFAULT_HMAC_ALGORITHM);
+            keyGen.init(KEY_LENGTH);
+        } catch (NoSuchAlgorithmException nsa) {
+            throw new IllegalArgumentException("Can't find " + 
WorkerTokenSigner.DEFAULT_HMAC_ALGORITHM + " algorithm.");
+        }
+        this.tokenLifetimeMillis = TimeUnit.MILLISECONDS.convert(
+            
ObjectReader.getLong(daemonConf.get(DaemonConfig.STORM_WORKER_TOKEN_LIFE_TIME_HOURS),24L),
+            TimeUnit.HOURS);
+    }
+
+    /**
+     * Create or update an existing key.
+     * @param serviceType the type of service to create a token for
+     * @param user the user the token is for
+     * @param topologyId the topology the token is for
+     * @return a newly generated token that should be good to start using form 
now until it expires.
+     */
+    public WorkerToken createOrUpdateTokenFor(WorkerTokenServiceType 
serviceType, String user, String topologyId) {
+        long nextVersion = state.getNextPrivateWorkerKeyVersion(serviceType, 
topologyId);
+        SecretKey topoSecret = getCurrentSecret();
+        long expirationTimeMillis = Time.currentTimeMillis() + 
tokenLifetimeMillis;
+        WorkerTokenInfo info = new WorkerTokenInfo(user, topologyId, 
nextVersion, expirationTimeMillis);
+        byte[] serializedInfo = AuthUtils.serializeWorkerTokenInfo(info);
+        byte[] signature = WorkerTokenSigner.createPassword(serializedInfo, 
topoSecret);
+        WorkerToken ret = new WorkerToken(serviceType, 
ByteBuffer.wrap(serializedInfo), ByteBuffer.wrap(signature));
+        PrivateWorkerKey key = new 
PrivateWorkerKey(ByteBuffer.wrap(topoSecret.getEncoded()), user, 
expirationTimeMillis);
+        state.addPrivateWorkerKey(serviceType, topologyId, nextVersion, key);
+        LOG.info("Created new WorkerToken for user {} on service {}", user, 
serviceType);
+        return ret;
+    }
+
+    /**
+     * Get the maximum expiration token time that should be renewed.
+     * @return any token with an expiration less than the returned value 
should be renewed.
+     */
+    public long getMaxExpirationTimeForRenewal() {
+        return Time.currentTimeMillis() + (tokenLifetimeMillis / 2);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/9566a882/storm-server/src/test/java/org/apache/storm/security/auth/AuthTest.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/test/java/org/apache/storm/security/auth/AuthTest.java 
b/storm-server/src/test/java/org/apache/storm/security/auth/AuthTest.java
new file mode 100644
index 0000000..052f6cf
--- /dev/null
+++ b/storm-server/src/test/java/org/apache/storm/security/auth/AuthTest.java
@@ -0,0 +1,637 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.security.auth;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.security.Principal;
+import java.security.PrivilegedActionException;
+import java.security.PrivilegedExceptionAction;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import javax.security.auth.Subject;
+import org.apache.storm.Config;
+import org.apache.storm.Testing;
+import org.apache.storm.cluster.ClusterStateContext;
+import org.apache.storm.cluster.ClusterUtils;
+import org.apache.storm.cluster.DaemonType;
+import org.apache.storm.cluster.IStormClusterState;
+import org.apache.storm.cluster.StormClusterStateImpl;
+import org.apache.storm.generated.Nimbus;
+import org.apache.storm.generated.WorkerToken;
+import org.apache.storm.generated.WorkerTokenServiceType;
+import org.apache.storm.security.auth.authorizer.ImpersonationAuthorizer;
+import org.apache.storm.security.auth.authorizer.SimpleACLAuthorizer;
+import org.apache.storm.security.auth.authorizer.SimpleWhitelistAuthorizer;
+import org.apache.storm.security.auth.digest.DigestSaslTransportPlugin;
+import org.apache.storm.security.auth.workertoken.WorkerTokenManager;
+import org.apache.storm.testing.InProcessZookeeper;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.NimbusClient;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Utils;
+import org.apache.thrift.transport.TTransportException;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+public class AuthTest {
+    private static final Logger LOG = LoggerFactory.getLogger(AuthTest.class);
+    private static final File BASE = new File("./src/test/resources/");
+
+    private static final String DIGEST_JAAS_CONF = new 
File(BASE,"jaas_digest.conf").getAbsolutePath();
+    private static final String BAD_PASSWORD_CONF = new File(BASE, 
"jaas_digest_bad_password.conf").getAbsolutePath();
+    private static final String WRONG_USER_CONF = new 
File(BASE,"jaas_digest_unknown_user.conf").getAbsolutePath();
+    private static final String MISSING_CLIENT = new File(BASE, 
"jaas_digest_missing_client.conf").getAbsolutePath();
+
+    //3 seconds in milliseconds
+    public static final int NIMBUS_TIMEOUT = 3_000;
+
+    public interface MyBiConsumer<T, U>  {
+        void accept(T t, U u) throws Exception;
+    }
+
+
+    public static Principal mkPrincipal(final String name) {
+        return new Principal() {
+            @Override
+            public String getName() {
+                return name;
+            }
+
+            @Override
+            public boolean equals(Object other) {
+                return other instanceof Principal
+                    && name.equals(((Principal) other).getName());
+            }
+
+            @Override
+            public String toString() {
+                return name;
+            }
+
+            @Override
+            public int hashCode() {
+                return name.hashCode();
+            }
+        };
+    }
+
+    public static Subject mkSubject(String name) {
+        return new Subject(true, Collections.singleton(mkPrincipal(name)),
+            Collections.emptySet(), Collections.emptySet());
+    }
+
+    public static void withServer(Class<? extends ITransportPlugin> 
transportPluginClass,
+                                  Nimbus.Iface impl,
+                                  MyBiConsumer<ThriftServer, Map<String, 
Object>> body) throws Exception {
+        withServer(null, transportPluginClass, impl, null, null, body);
+    }
+
+    public static void withServer(String loginCfg,
+                                  Class<? extends ITransportPlugin> 
transportPluginClass,
+                                  Nimbus.Iface impl,
+                                  MyBiConsumer<ThriftServer, Map<String, 
Object>> body) throws Exception {
+        withServer(loginCfg, transportPluginClass, impl, null, null, body);
+    }
+
+    public static  void withServer(String loginCfg,
+                                   Class<? extends ITransportPlugin> 
transportPluginClass,
+                                   Nimbus.Iface impl,
+                                   InProcessZookeeper zk,
+                                   Map<String, Object> extraConfs,
+                                   MyBiConsumer<ThriftServer, Map<String, 
Object>> body) throws Exception {
+        Map<String, Object> conf = ConfigUtils.readStormConfig();
+        conf.put(Config.NIMBUS_THRIFT_PORT, 0);
+        conf.put(Config.STORM_THRIFT_TRANSPORT_PLUGIN, 
transportPluginClass.getName());
+
+        if (loginCfg != null) {
+            conf.put("java.security.auth.login.config", loginCfg);
+        }
+
+        if (zk != null) {
+            conf.put(Config.STORM_ZOOKEEPER_SERVERS, 
Arrays.asList("localhost"));
+            conf.put(Config.STORM_ZOOKEEPER_PORT, zk.getPort());
+        }
+
+        if (extraConfs != null) {
+            conf.putAll(extraConfs);
+        }
+
+        Nimbus.Iface handler = impl != null ? impl : mock(Nimbus.Iface.class);
+        final ThriftServer server = new ThriftServer(conf,
+            new Nimbus.Processor<>(handler),
+            ThriftConnectionType.NIMBUS);
+
+        LOG.info("Created Server... {}", server);
+        new Thread(() -> {
+            LOG.info("Starting Serving...");
+            server.serve();
+        }).start();
+        Testing.whileTimeout(
+            () -> !server.isServing(),
+            () -> {
+                try {
+                    Time.sleep(100);
+                } catch (InterruptedException e) {
+                    //Ignored
+                }
+            });
+        try {
+            LOG.info("Starting to run {}", body);
+            body.accept(server, conf);
+            LOG.info("{} finished with no exceptions", body);
+        } finally {
+            LOG.info("Stopping server {}", server);
+            server.stop();
+        }
+    }
+
+    @Test
+    public void kerbToLocalTest() {
+        KerberosPrincipalToLocal kptol = new KerberosPrincipalToLocal();
+        kptol.prepare(Collections.emptyMap());
+        assertEquals("me", kptol.toLocal(mkPrincipal("me@realm")));
+        assertEquals("simple", kptol.toLocal(mkPrincipal("simple")));
+        assertEquals("someone", 
kptol.toLocal(mkPrincipal("someone/host@realm")));
+    }
+
+    @Test
+    public void simpleAuthTest() throws Exception {
+        Nimbus.Iface impl = mock(Nimbus.Iface.class);
+        withServer(SimpleTransportPlugin.class,
+            impl,
+            (ThriftServer server, Map<String, Object> conf) -> {
+                try (NimbusClient client = new NimbusClient(conf, "localhost", 
server.getPort(), NIMBUS_TIMEOUT)) {
+                    client.getClient().activate("security_auth_test_topology");
+                }
+
+                //Verify digest is rejected...
+                Map<String, Object> badConf = new HashMap<>(conf);
+                badConf.put(Config.STORM_THRIFT_TRANSPORT_PLUGIN, 
DigestSaslTransportPlugin.class.getName());
+                badConf.put("java.security.auth.login.config", 
DIGEST_JAAS_CONF);
+                badConf.put(Config.STORM_NIMBUS_RETRY_TIMES, 0);
+                try (NimbusClient client = new NimbusClient(badConf, 
"localhost", server.getPort(), NIMBUS_TIMEOUT)) {
+                    
client.getClient().activate("bad_security_auth_test_topology");
+                    fail("An exception should have been thrown trying to 
connect.");
+                } catch (Exception te) {
+                    LOG.info("Got Exception...", te);
+                    
assert(Utils.exceptionCauseIsInstanceOf(TTransportException.class, te));
+                }
+            });
+        verify(impl).activate("security_auth_test_topology");
+        verify(impl, never()).activate("bad_security_auth_test_topology");
+    }
+
+    public static void verifyIncorrectJaasConf(ThriftServer server, 
Map<String, Object> conf, String jaas,
+                                               Class<? extends Exception> 
expectedException) {
+        Map<String, Object> badConf = new HashMap<>(conf);
+        badConf.put("java.security.auth.login.config", jaas);
+        try (NimbusClient client = new NimbusClient(badConf, "localhost", 
server.getPort(), NIMBUS_TIMEOUT)) {
+            client.getClient().activate("bad_auth_test_topology");
+            fail("An exception should have been thrown trying to connect.");
+        } catch (Exception e) {
+            LOG.info("Got Exception...", e);
+            assert(Utils.exceptionCauseIsInstanceOf(expectedException, e));
+        }
+    }
+
+    @Test
+    public void digestAuthTest() throws Exception {
+        Nimbus.Iface impl = mock(Nimbus.Iface.class);
+        final AtomicReference<ReqContext> user = new AtomicReference<>();
+        doAnswer((invocation) -> {
+            user.set(new ReqContext(ReqContext.context()));
+            return null;
+        }).when(impl).activate(anyString());
+
+        withServer(DIGEST_JAAS_CONF,
+            DigestSaslTransportPlugin.class,
+            impl,
+            (ThriftServer server, Map<String, Object> conf) -> {
+                try (NimbusClient client = new NimbusClient(conf, "localhost", 
server.getPort(), NIMBUS_TIMEOUT)) {
+                    client.getClient().activate("security_auth_test_topology");
+                }
+
+                conf.put(Config.STORM_NIMBUS_RETRY_TIMES, 0);
+
+                //Verify simple is rejected...
+                Map<String, Object> badTransport = new HashMap<>(conf);
+                badTransport.put(Config.STORM_THRIFT_TRANSPORT_PLUGIN, 
SimpleTransportPlugin.class.getName());
+                try (NimbusClient client = new NimbusClient(badTransport, 
"localhost", server.getPort(), NIMBUS_TIMEOUT)) {
+                    
client.getClient().activate("bad_security_auth_test_topology");
+                    fail("An exception should have been thrown trying to 
connect.");
+                } catch (Exception te) {
+                    LOG.info("Got Exception...", te);
+                    
assert(Utils.exceptionCauseIsInstanceOf(TTransportException.class, te));
+                }
+                //The user here from the jaas conf is bob.  No impersonation 
is done, so verify that
+                ReqContext found = user.get();
+                assertNotNull(found);
+                assertEquals("bob", found.principal().getName());
+                assertFalse(found.isImpersonating());
+                user.set(null);
+
+                verifyIncorrectJaasConf(server, conf, BAD_PASSWORD_CONF, 
TTransportException.class);
+                verifyIncorrectJaasConf(server, conf, WRONG_USER_CONF, 
TTransportException.class);
+                verifyIncorrectJaasConf(server, conf, "./nonexistent.conf", 
RuntimeException.class);
+                verifyIncorrectJaasConf(server, conf, MISSING_CLIENT, 
IOException.class);
+            });
+        verify(impl).activate("security_auth_test_topology");
+        verify(impl, never()).activate("bad_auth_test_topology");
+    }
+
+    public static Subject createSubjectWith(WorkerToken wt) {
+        //This is a bit ugly, but it shows how this would happen in a worker 
so we will use the same APIs
+        Map<String, String> creds = new HashMap<>();
+        AuthUtils.setWorkerToken(creds, wt);
+        Subject subject = new Subject();
+        AuthUtils.updateSubject(subject, Collections.emptyList(), creds);
+        return subject;
+    }
+
+    public static void tryConnectAs( Map<String, Object> conf, ThriftServer 
server, Subject subject, String topoId)
+        throws PrivilegedActionException {
+        Subject.doAs(subject, (PrivilegedExceptionAction<Void>) () -> {
+            try (NimbusClient client = new NimbusClient(conf, "localhost", 
server.getPort(), NIMBUS_TIMEOUT)) {
+                client.getClient().activate(topoId); //Yes this should be a 
topo name, but it makes this simpler...
+            }
+            return null;
+        });
+    }
+
+    public static Subject testConnectWithTokenFor(WorkerTokenManager wtMan, 
Map<String, Object> conf, ThriftServer server,
+                                               String user, String topoId) 
throws PrivilegedActionException {
+        WorkerToken wt = 
wtMan.createOrUpdateTokenFor(WorkerTokenServiceType.NIMBUS, user, topoId);
+        Subject subject = createSubjectWith(wt);
+        tryConnectAs(conf, server, subject, topoId);
+        return subject;
+    }
+
+    public static void verifyUserIs(AtomicReference<ReqContext> user, String 
userName) {
+        //The user from the token is bob, so verify that the name was set 
correctly...
+        ReqContext found = user.get();
+        assertNotNull(found);
+        assertEquals(userName, found.principal().getName());
+        assertFalse(found.isImpersonating());
+        user.set(null);
+    }
+
+    @Test
+    public void workerTokenDigestAuthTest() throws Exception {
+        LOG.info("\n\n\t\tworkerTokenDigestAuthTest - START\n\n");
+        Nimbus.Iface impl = mock(Nimbus.Iface.class);
+        final AtomicReference<ReqContext> user = new AtomicReference<>();
+        doAnswer((invocation) -> {
+            user.set(new ReqContext(ReqContext.context()));
+            return null;
+        }).when(impl).activate(anyString());
+
+        Map<String, Object> extraConfs = new HashMap<>();
+        //Let worker tokens work on insecure ZK...
+        extraConfs.put("TESTING.ONLY.ENABLE.INSECURE.WORKER.TOKENS", true);
+
+        try (InProcessZookeeper zk = new InProcessZookeeper()) {
+            withServer(MISSING_CLIENT,
+                DigestSaslTransportPlugin.class,
+                impl,
+                zk,
+                extraConfs,
+                (ThriftServer server, Map<String, Object> conf) -> {
+                    try (Time.SimulatedTime sim = new Time.SimulatedTime()) {
+                        conf.put(Config.STORM_NIMBUS_RETRY_TIMES, 0);
+                        //We cannot connect if there is no client section in 
the jaas conf...
+                        try (NimbusClient client = new NimbusClient(conf, 
"localhost", server.getPort(), NIMBUS_TIMEOUT)) {
+                            
client.getClient().activate("bad_auth_test_topology");
+                            fail("We should not be able to connect without a 
token...");
+                        } catch (Exception e) {
+                            assert 
(Utils.exceptionCauseIsInstanceOf(IOException.class, e));
+                        }
+
+                        //Now lets create a token and verify that we can 
connect...
+                        IStormClusterState state = 
ClusterUtils.mkStormClusterState(conf, new 
ClusterStateContext(DaemonType.NIMBUS, conf));
+                        WorkerTokenManager wtMan = new 
WorkerTokenManager(conf, state);
+                        Subject bob = testConnectWithTokenFor(wtMan, conf, 
server, "bob", "topo-bob");
+                        verifyUserIs(user, "bob");
+
+                        Time.advanceTimeSecs(TimeUnit.HOURS.toSeconds(12));
+
+                        //Alice has no digest jaas section at all...
+                        Subject alice = testConnectWithTokenFor(wtMan, conf, 
server, "alice", "topo-alice");
+                        verifyUserIs(user, "alice");
+
+                        Time.advanceTimeSecs(TimeUnit.HOURS.toSeconds(13));
+                        //Verify that bob's token has expired
+
+                        try {
+                            tryConnectAs(conf, server, bob, 
"bad_auth_test_topology");
+                            fail("We should not be able to connect with bad 
auth");
+                        } catch (Exception e) {
+                            assert 
(Utils.exceptionCauseIsInstanceOf(TTransportException.class, e));
+                        }
+                        tryConnectAs(conf, server, alice, "topo-alice");
+                        verifyUserIs(user, "alice");
+
+                        //Now see if we can create a new token for bob and try 
again.
+                        bob = testConnectWithTokenFor(wtMan, conf, server, 
"bob", "topo-bob");
+                        verifyUserIs(user, "bob");
+
+                        tryConnectAs(conf, server, alice, "topo-alice");
+                        verifyUserIs(user, "alice");
+                    }
+                });
+        }
+        verify(impl, times(2)).activate("topo-bob");
+        verify(impl, times(3)).activate("topo-alice");
+        verify(impl, never()).activate("bad_auth_test_topology");
+        LOG.info("\n\n\t\tworkerTokenDigestAuthTest - END\n\n");
+    }
+
+    @Test
+    public void negativeWhitelistAuthroizationTest() {
+        SimpleWhitelistAuthorizer auth = new SimpleWhitelistAuthorizer();
+        Map<String, Object> conf = ConfigUtils.readStormConfig();
+        auth.prepare(conf);
+        ReqContext context = new ReqContext(mkSubject("user"));
+        assertFalse(auth.permit(context, "activate", conf));
+    }
+
+    @Test
+    public void positiveWhitelistAuthroizationTest() {
+        SimpleWhitelistAuthorizer auth = new SimpleWhitelistAuthorizer();
+        Map<String, Object> conf = ConfigUtils.readStormConfig();
+        conf.put(SimpleWhitelistAuthorizer.WHITELIST_USERS_CONF, 
Arrays.asList("user"));
+        auth.prepare(conf);
+        ReqContext context = new ReqContext(mkSubject("user"));
+        assertTrue(auth.permit(context, "activate", conf));
+    }
+
+    @Test
+    public void simpleAclUserAuthTest() {
+        Map<String, Object> clusterConf = ConfigUtils.readStormConfig();
+        clusterConf.put(Config.NIMBUS_ADMINS, Arrays.asList("admin"));
+        clusterConf.put(Config.NIMBUS_SUPERVISOR_USERS, 
Arrays.asList("supervisor"));
+        ReqContext admin = new ReqContext(mkSubject("admin"));
+        ReqContext supervisor = new ReqContext(mkSubject("supervisor"));
+        ReqContext userA = new ReqContext(mkSubject("user-a"));
+        ReqContext userB = new ReqContext(mkSubject("user-b"));
+
+        final Map<String, Object> empty = Collections.emptyMap();
+        final Map<String, Object> aAllowed = new HashMap<>();
+        aAllowed.put(Config.TOPOLOGY_USERS, Arrays.asList("user-a"));
+
+        SimpleACLAuthorizer authorizer = new SimpleACLAuthorizer();
+        authorizer.prepare(clusterConf);
+
+        assertTrue(authorizer.permit(userA, "submitTopology", empty));
+        assertTrue(authorizer.permit(userB, "submitTopology", empty));
+        assertTrue(authorizer.permit(admin, "submitTopology", empty));
+        assertFalse(authorizer.permit(supervisor, "submitTopology", empty));
+
+        assertTrue(authorizer.permit(userA, "fileUpload", null));
+        assertTrue(authorizer.permit(userB, "fileUpload", null));
+        assertTrue(authorizer.permit(admin, "fileUpload", null));
+        assertFalse(authorizer.permit(supervisor, "fileUpload", null));
+
+        assertTrue(authorizer.permit(userA, "getNimbusConf", null));
+        assertTrue(authorizer.permit(userB, "getNimbusConf", null));
+        assertTrue(authorizer.permit(admin, "getNimbusConf", null));
+        assertFalse(authorizer.permit(supervisor, "getNimbusConf", null));
+
+        assertTrue(authorizer.permit(userA, "getClusterInfo", null));
+        assertTrue(authorizer.permit(userB, "getClusterInfo", null));
+        assertTrue(authorizer.permit(admin, "getClusterInfo", null));
+        assertFalse(authorizer.permit(supervisor, "getClusterInfo", null));
+
+        assertFalse(authorizer.permit(userA, "fileDownload", null));
+        assertFalse(authorizer.permit(userB, "fileDownload", null));
+        assertTrue(authorizer.permit(admin, "fileDownload", null));
+        assertTrue(authorizer.permit(supervisor, "fileDownload", null));
+
+        assertTrue(authorizer.permit(userA, "killTopology", aAllowed));
+        assertFalse(authorizer.permit(userB, "killTopology", aAllowed));
+        assertTrue(authorizer.permit(admin, "killTopology", aAllowed));
+        assertFalse(authorizer.permit(supervisor, "killTopology", aAllowed));
+
+        assertTrue(authorizer.permit(userA, "uploadNewCredentials", aAllowed));
+        assertFalse(authorizer.permit(userB, "uploadNewCredentials", 
aAllowed));
+        assertTrue(authorizer.permit(admin, "uploadNewCredentials", aAllowed));
+        assertFalse(authorizer.permit(supervisor, "uploadNewCredentials", 
aAllowed));
+
+        assertTrue(authorizer.permit(userA, "rebalance", aAllowed));
+        assertFalse(authorizer.permit(userB, "rebalance", aAllowed));
+        assertTrue(authorizer.permit(admin, "rebalance", aAllowed));
+        assertFalse(authorizer.permit(supervisor, "rebalance", aAllowed));
+
+        assertTrue(authorizer.permit(userA, "activate", aAllowed));
+        assertFalse(authorizer.permit(userB, "activate", aAllowed));
+        assertTrue(authorizer.permit(admin, "activate", aAllowed));
+        assertFalse(authorizer.permit(supervisor, "activate", aAllowed));
+
+        assertTrue(authorizer.permit(userA, "deactivate", aAllowed));
+        assertFalse(authorizer.permit(userB, "deactivate", aAllowed));
+        assertTrue(authorizer.permit(admin, "deactivate", aAllowed));
+        assertFalse(authorizer.permit(supervisor, "deactivate", aAllowed));
+
+        assertTrue(authorizer.permit(userA, "getTopologyConf", aAllowed));
+        assertFalse(authorizer.permit(userB, "getTopologyConf", aAllowed));
+        assertTrue(authorizer.permit(admin, "getTopologyConf", aAllowed));
+        assertFalse(authorizer.permit(supervisor, "getTopologyConf", 
aAllowed));
+
+        assertTrue(authorizer.permit(userA, "getTopology", aAllowed));
+        assertFalse(authorizer.permit(userB, "getTopology", aAllowed));
+        assertTrue(authorizer.permit(admin, "getTopology", aAllowed));
+        assertFalse(authorizer.permit(supervisor, "getTopology", aAllowed));
+
+        assertTrue(authorizer.permit(userA, "getUserTopology", aAllowed));
+        assertFalse(authorizer.permit(userB, "getUserTopology", aAllowed));
+        assertTrue(authorizer.permit(admin, "getUserTopology", aAllowed));
+        assertFalse(authorizer.permit(supervisor, "getUserTopology", 
aAllowed));
+
+        assertTrue(authorizer.permit(userA, "getTopologyInfo", aAllowed));
+        assertFalse(authorizer.permit(userB, "getTopologyInfo", aAllowed));
+        assertTrue(authorizer.permit(admin, "getTopologyInfo", aAllowed));
+        assertFalse(authorizer.permit(supervisor, "getTopologyInfo", 
aAllowed));
+    }
+
+    @Test
+    public void simpleAclNimbusUsersAuthTest() {
+        Map<String, Object> clusterConf = ConfigUtils.readStormConfig();
+        clusterConf.put(Config.NIMBUS_ADMINS, Arrays.asList("admin"));
+        clusterConf.put(Config.NIMBUS_SUPERVISOR_USERS, 
Arrays.asList("supervisor"));
+        clusterConf.put(Config.NIMBUS_USERS, Arrays.asList("user-a"));
+        ReqContext admin = new ReqContext(mkSubject("admin"));
+        ReqContext supervisor = new ReqContext(mkSubject("supervisor"));
+        ReqContext userA = new ReqContext(mkSubject("user-a"));
+        ReqContext userB = new ReqContext(mkSubject("user-b"));
+
+        final Map<String, Object> empty = Collections.emptyMap();
+
+        SimpleACLAuthorizer authorizer = new SimpleACLAuthorizer();
+        authorizer.prepare(clusterConf);
+
+        assertTrue(authorizer.permit(userA, "submitTopology", empty));
+        assertFalse(authorizer.permit(userB, "submitTopology", empty));
+        assertTrue(authorizer.permit(admin, "fileUpload", null));
+        assertTrue(authorizer.permit(supervisor, "fileDownload", null));
+    }
+
+    @Test
+    public void simpleAclNimbusGroupsAuthTest() {
+        Map<String, Object> clusterConf = ConfigUtils.readStormConfig();
+        clusterConf.put(Config.NIMBUS_ADMINS_GROUPS, 
Arrays.asList("admin-group"));
+        clusterConf.put(Config.NIMBUS_SUPERVISOR_USERS, 
Arrays.asList("supervisor"));
+        clusterConf.put(Config.NIMBUS_USERS, Arrays.asList("user-a"));
+        clusterConf.put(Config.STORM_GROUP_MAPPING_SERVICE_PROVIDER_PLUGIN, 
FixedGroupsMapping.class.getName());
+        Map<String, Object> groups = new HashMap<>();
+        groups.put("admin", Collections.singleton("admin-group"));
+        groups.put("not-admin", Collections.singleton("not-admin-group"));
+        Map<String, Object> groupsParams = new HashMap<>();
+        groupsParams.put(FixedGroupsMapping.STORM_FIXED_GROUP_MAPPING, groups);
+        clusterConf.put(Config.STORM_GROUP_MAPPING_SERVICE_PARAMS, 
groupsParams);
+
+        ReqContext admin = new ReqContext(mkSubject("admin"));
+        ReqContext notAdmin = new ReqContext(mkSubject("not-admin"));
+        ReqContext supervisor = new ReqContext(mkSubject("supervisor"));
+        ReqContext userA = new ReqContext(mkSubject("user-a"));
+        ReqContext userB = new ReqContext(mkSubject("user-b"));
+
+        final Map<String, Object> empty = Collections.emptyMap();
+
+        SimpleACLAuthorizer authorizer = new SimpleACLAuthorizer();
+        authorizer.prepare(clusterConf);
+
+        assertTrue(authorizer.permit(userA, "submitTopology", empty));
+        assertFalse(authorizer.permit(userB, "submitTopology", empty));
+
+        assertTrue(authorizer.permit(admin, "fileUpload", null));
+        assertFalse(authorizer.permit(notAdmin, "fileUpload", null));
+        assertFalse(authorizer.permit(userB, "fileUpload", null));
+
+        assertTrue(authorizer.permit(supervisor, "fileDownload", null));
+    }
+
+    @Test
+    public void simpleAclSameUserAuthTest() {
+        Map<String, Object> clusterConf = ConfigUtils.readStormConfig();
+        clusterConf.put(Config.NIMBUS_ADMINS, Arrays.asList("admin"));
+        clusterConf.put(Config.NIMBUS_SUPERVISOR_USERS, 
Arrays.asList("admin"));
+        ReqContext admin = new ReqContext(mkSubject("admin"));
+
+        final Map<String, Object> empty = Collections.emptyMap();
+        final Map<String, Object> aAllowed = new HashMap<>();
+        aAllowed.put(Config.TOPOLOGY_USERS, Arrays.asList("user-a"));
+
+        SimpleACLAuthorizer authorizer = new SimpleACLAuthorizer();
+        authorizer.prepare(clusterConf);
+
+        assertTrue(authorizer.permit(admin, "submitTopology", empty));
+        assertTrue(authorizer.permit(admin, "fileUpload", null));
+        assertTrue(authorizer.permit(admin, "getNimbusConf", null));
+        assertTrue(authorizer.permit(admin, "getClusterInfo", null));
+        assertTrue(authorizer.permit(admin, "fileDownload", null));
+        assertTrue(authorizer.permit(admin, "killTopology", aAllowed));
+        assertTrue(authorizer.permit(admin, "uploadNewCredentials", aAllowed));
+        assertTrue(authorizer.permit(admin, "rebalance", aAllowed));
+        assertTrue(authorizer.permit(admin, "activate", aAllowed));
+        assertTrue(authorizer.permit(admin, "getTopologyConf", aAllowed));
+        assertTrue(authorizer.permit(admin, "getTopology", aAllowed));
+        assertTrue(authorizer.permit(admin, "getUserTopology", aAllowed));
+        assertTrue(authorizer.permit(admin, "getTopologyInfo", aAllowed));
+    }
+
+    @Test
+    public void shellBaseGroupsMappingTest() throws Exception {
+        Map<String, Object> clusterConf = ConfigUtils.readStormConfig();
+        ShellBasedGroupsMapping groups = new ShellBasedGroupsMapping();
+        groups.prepare(clusterConf);
+
+        String userName = System.getProperty("user.name");
+
+        assertTrue(groups.getGroups(userName).size() >= 0);
+        assertEquals(0, groups.getGroups("userDoesNotExist").size());
+        assertEquals(0, groups.getGroups(null).size());
+    }
+
+    @Test(expected = RuntimeException.class)
+    public void getTransportPluginThrowsRunimeTest() {
+        Map<String, Object> conf = ConfigUtils.readStormConfig();
+        conf.put(Config.STORM_THRIFT_TRANSPORT_PLUGIN, "null.invalid");
+        AuthUtils.getTransportPlugin(ThriftConnectionType.NIMBUS, conf, null);
+    }
+
+    public static ReqContext mkImpersonatingReqContext(String 
impersonatingUser, String userBeingIUmpersonated, InetAddress remoteAddress) {
+        ReqContext ret = new ReqContext(mkSubject(userBeingIUmpersonated));
+        ret.setRemoteAddress(remoteAddress);
+        ret.setRealPrincipal(mkPrincipal(impersonatingUser));
+        return ret;
+    }
+
+    @Test
+    public void impersonationAuthorizerTest() throws Exception {
+        final String impersonatingUser = "admin";
+        final String userBeingImpersonated = System.getProperty("user.name");
+        Map<String, Object> clusterConf = ConfigUtils.readStormConfig();
+        ShellBasedGroupsMapping groupMapper = new ShellBasedGroupsMapping();
+        groupMapper.prepare(clusterConf);
+        Set<String> groups = groupMapper.getGroups(userBeingImpersonated);
+
+        InetAddress localHost = InetAddress.getLocalHost();
+
+        Map<String, Object> acl = new HashMap<>();
+        Map<String, Object> aclConf = new HashMap<>();
+        aclConf.put("hosts", Arrays.asList(localHost.getHostName()));
+        aclConf.put("groups", groups);
+        acl.put(impersonatingUser, aclConf);
+        clusterConf.put(Config.NIMBUS_IMPERSONATION_ACL, acl);
+
+        InetAddress unauthorizedHost = 
com.google.common.net.InetAddresses.forString("10.10.10.10");
+
+        ImpersonationAuthorizer authorizer = new ImpersonationAuthorizer();
+        authorizer.prepare(clusterConf);
+
+        //non impersonating request, should be permitted.
+        assertTrue(authorizer.permit(new ReqContext(mkSubject("anyuser")), 
"fileUplaod", null));
+
+        //user with no impersonation acl should be reject
+        
assertFalse(authorizer.permit(mkImpersonatingReqContext("user-with-no-acl", 
userBeingImpersonated, localHost),
+            "someOperation",null));
+
+        //request from hosts that are not authorized should be rejected
+        
assertFalse(authorizer.permit(mkImpersonatingReqContext(impersonatingUser, 
userBeingImpersonated, unauthorizedHost),
+            "someOperation", null));
+
+        //request to impersonate users from unauthroized groups should be 
rejected.
+        
assertFalse(authorizer.permit(mkImpersonatingReqContext(impersonatingUser, 
"unauthorized-user", localHost),
+            "someOperation", null));
+
+        //request from authorized hosts and group should be allowed.
+        
assertTrue(authorizer.permit(mkImpersonatingReqContext(impersonatingUser, 
userBeingImpersonated, localHost),
+            "someOperation", null));
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/9566a882/storm-server/src/test/java/org/apache/storm/security/auth/workertoken/WorkerTokenTest.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/test/java/org/apache/storm/security/auth/workertoken/WorkerTokenTest.java
 
b/storm-server/src/test/java/org/apache/storm/security/auth/workertoken/WorkerTokenTest.java
new file mode 100644
index 0000000..4adb4d3
--- /dev/null
+++ 
b/storm-server/src/test/java/org/apache/storm/security/auth/workertoken/WorkerTokenTest.java
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.security.auth.workertoken;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.storm.cluster.IStormClusterState;
+import org.apache.storm.generated.PrivateWorkerKey;
+import org.apache.storm.generated.WorkerToken;
+import org.apache.storm.generated.WorkerTokenInfo;
+import org.apache.storm.generated.WorkerTokenServiceType;
+import org.apache.storm.security.auth.AuthUtils;
+import org.apache.storm.utils.Time;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+public class WorkerTokenTest {
+    public static final long ONE_DAY_MILLIS = TimeUnit.HOURS.toMillis(24);
+
+    @Test
+    public void testBasicGenerateAndAuthorize() {
+        final AtomicReference<PrivateWorkerKey> privateKey = new 
AtomicReference<>();
+        final String topoId = "topo-1";
+        final String userName = "user";
+        final WorkerTokenServiceType type = WorkerTokenServiceType.NIMBUS;
+        final long versionNumber = 0L;
+        //Simulate time starts out at 0, so we are going to just leave it here.
+        try (Time.SimulatedTime sim = new Time.SimulatedTime()) {
+            IStormClusterState mockState = mock(IStormClusterState.class);
+            Map<String, Object> conf = new HashMap<>();
+            WorkerTokenManager wtm = new WorkerTokenManager(conf, mockState);
+
+            when(mockState.getNextPrivateWorkerKeyVersion(type, 
topoId)).thenReturn(versionNumber);
+            doAnswer((invocation) -> {
+                //Save the private worker key away so we can test it too.
+                privateKey.set(invocation.getArgument(3));
+                return null;
+            }).when(mockState).addPrivateWorkerKey(eq(type), eq(topoId), 
eq(versionNumber), any(PrivateWorkerKey.class));
+            //Answer when we ask for a private key...
+            when(mockState.getPrivateWorkerKey(type, topoId, 
versionNumber)).thenAnswer((invocation) -> privateKey.get());
+
+            WorkerToken wt = wtm.createOrUpdateTokenFor(type, userName, 
topoId);
+            verify(mockState).addPrivateWorkerKey(eq(type), eq(topoId), 
eq(versionNumber), any(PrivateWorkerKey.class));
+            assertTrue(wt.is_set_serviceType());
+            assertEquals(type, wt.get_serviceType());
+            assertTrue(wt.is_set_info());
+            assertTrue(wt.is_set_signature());
+
+            PrivateWorkerKey pwk = privateKey.get();
+            assertNotNull(pwk);
+            assertTrue(pwk.is_set_expirationTimeMillis());
+            assertEquals(ONE_DAY_MILLIS, pwk.get_expirationTimeMillis());
+
+            WorkerTokenInfo info = AuthUtils.getWorkerTokenInfo(wt);
+            assertTrue(info.is_set_topologyId());
+            assertTrue(info.is_set_userName());
+            assertTrue(info.is_set_expirationTimeMillis());
+            assertTrue(info.is_set_secretVersion());
+            assertEquals(topoId, info.get_topologyId());
+            assertEquals(userName, info.get_userName());
+            assertEquals(ONE_DAY_MILLIS, info.get_expirationTimeMillis());
+            assertEquals(versionNumber, info.get_secretVersion());
+
+            //Verify the signature...
+            WorkerTokenAuthorizer wta = new WorkerTokenAuthorizer(type, 
mockState);
+            byte[] signature = wta.getSignedPasswordFor(wt.get_info(), info);
+            assertArrayEquals(wt.get_signature(), signature);
+        }
+    }
+
+    @Test
+    public void testExpiration() {
+        final AtomicReference<PrivateWorkerKey> privateKey = new 
AtomicReference<>();
+        final String topoId = "topo-1";
+        final String userName = "user";
+        final WorkerTokenServiceType type = WorkerTokenServiceType.NIMBUS;
+        final long versionNumber = 5L;
+        //Simulate time starts out at 0, so we are going to just leave it here.
+        try (Time.SimulatedTime sim = new Time.SimulatedTime()) {
+            IStormClusterState mockState = mock(IStormClusterState.class);
+            Map<String, Object> conf = new HashMap<>();
+            WorkerTokenManager wtm = new WorkerTokenManager(conf, mockState);
+
+            when(mockState.getNextPrivateWorkerKeyVersion(type, 
topoId)).thenReturn(versionNumber);
+            doAnswer((invocation) -> {
+                //Save the private worker key away so we can test it too.
+                privateKey.set(invocation.getArgument(3));
+                return null;
+            }).when(mockState).addPrivateWorkerKey(eq(type), eq(topoId), 
eq(versionNumber), any(PrivateWorkerKey.class));
+            //Answer when we ask for a private key...
+            when(mockState.getPrivateWorkerKey(type, topoId, 
versionNumber)).thenAnswer((invocation) -> privateKey.get());
+
+            WorkerToken wt = wtm.createOrUpdateTokenFor(type, userName, 
topoId);
+            verify(mockState).addPrivateWorkerKey(eq(type), eq(topoId), 
eq(versionNumber), any(PrivateWorkerKey.class));
+            assertTrue(wt.is_set_serviceType());
+            assertEquals(type, wt.get_serviceType());
+            assertTrue(wt.is_set_info());
+            assertTrue(wt.is_set_signature());
+
+            PrivateWorkerKey pwk = privateKey.get();
+            assertNotNull(pwk);
+            assertTrue(pwk.is_set_expirationTimeMillis());
+            assertEquals(ONE_DAY_MILLIS, pwk.get_expirationTimeMillis());
+
+            WorkerTokenInfo info = AuthUtils.getWorkerTokenInfo(wt);
+            assertTrue(info.is_set_topologyId());
+            assertTrue(info.is_set_userName());
+            assertTrue(info.is_set_expirationTimeMillis());
+            assertTrue(info.is_set_secretVersion());
+            assertEquals(topoId, info.get_topologyId());
+            assertEquals(userName, info.get_userName());
+            assertEquals(ONE_DAY_MILLIS, info.get_expirationTimeMillis());
+            assertEquals(versionNumber, info.get_secretVersion());
+
+            //Expire the token
+            Time.advanceTime(ONE_DAY_MILLIS + 1);
+
+            //Verify the signature...
+            WorkerTokenAuthorizer wta = new WorkerTokenAuthorizer(type, 
mockState);
+            try {
+                wta.getSignedPasswordFor(wt.get_info(), info);
+                fail("Expected an expired token to not be signed!!!");
+            } catch (IllegalArgumentException ia) {
+                //What we want...
+            }
+        }
+    }
+
+    @Test
+    public void testRenewalTimeDefault() {
+        try (Time.SimulatedTime sim = new Time.SimulatedTime()) {
+            IStormClusterState mockState = mock(IStormClusterState.class);
+            Map<String, Object> conf = new HashMap<>();
+            WorkerTokenManager wtm = new WorkerTokenManager(conf, mockState);
+
+            assertEquals(ONE_DAY_MILLIS/2, 
wtm.getMaxExpirationTimeForRenewal());
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/9566a882/storm-server/src/test/resources/jaas_digest.conf
----------------------------------------------------------------------
diff --git a/storm-server/src/test/resources/jaas_digest.conf 
b/storm-server/src/test/resources/jaas_digest.conf
new file mode 100644
index 0000000..2a6d618
--- /dev/null
+++ b/storm-server/src/test/resources/jaas_digest.conf
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/* This sample file illustrates how Digest authentication should be configured
+*/
+StormServer {
+       org.apache.zookeeper.server.auth.DigestLoginModule required
+       user_super="adminsecret"
+       user_bob="bobsecret";
+};
+StormClient {
+       org.apache.zookeeper.server.auth.DigestLoginModule required
+       username="bob"
+       password="bobsecret";
+};
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/9566a882/storm-server/src/test/resources/jaas_digest_bad_password.conf
----------------------------------------------------------------------
diff --git a/storm-server/src/test/resources/jaas_digest_bad_password.conf 
b/storm-server/src/test/resources/jaas_digest_bad_password.conf
new file mode 100644
index 0000000..149db3f
--- /dev/null
+++ b/storm-server/src/test/resources/jaas_digest_bad_password.conf
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/* This sample file containes incorrect password of a user.
+   We use this file for negative test.
+*/
+StormServer {
+       org.apache.zookeeper.server.auth.DigestLoginModule required
+       user_super="adminsecret"
+       user_bob="bobsecret";
+};
+StormClient {
+       org.apache.zookeeper.server.auth.DigestLoginModule required
+       username="bob"
+       password="bad_password";
+};
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/9566a882/storm-server/src/test/resources/jaas_digest_missing_client.conf
----------------------------------------------------------------------
diff --git a/storm-server/src/test/resources/jaas_digest_missing_client.conf 
b/storm-server/src/test/resources/jaas_digest_missing_client.conf
new file mode 100644
index 0000000..f4f2b64
--- /dev/null
+++ b/storm-server/src/test/resources/jaas_digest_missing_client.conf
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+StormServer {
+       org.apache.zookeeper.server.auth.DigestLoginModule required
+       user_super="adminsecret"
+       user_bob="bobsecret";
+};

http://git-wip-us.apache.org/repos/asf/storm/blob/9566a882/storm-server/src/test/resources/jaas_digest_unknown_user.conf
----------------------------------------------------------------------
diff --git a/storm-server/src/test/resources/jaas_digest_unknown_user.conf 
b/storm-server/src/test/resources/jaas_digest_unknown_user.conf
new file mode 100644
index 0000000..e03a333
--- /dev/null
+++ b/storm-server/src/test/resources/jaas_digest_unknown_user.conf
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/* This sample file containes an unauthorized user.
+   We use this file for negative test.
+*/
+StormServer {
+       org.apache.zookeeper.server.auth.DigestLoginModule required
+       user_super="adminsecret"
+       user_bob="bobsecret";
+};
+StormClient {
+       org.apache.zookeeper.server.auth.DigestLoginModule required
+       username="unknown_user"
+       password="some_password";
+};
\ No newline at end of file

Reply via email to