http://git-wip-us.apache.org/repos/asf/storm/blob/725003cc/storm-core/src/jvm/org/apache/storm/daemon/nimbus/TopologyActions.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/daemon/nimbus/TopologyActions.java 
b/storm-core/src/jvm/org/apache/storm/daemon/nimbus/TopologyActions.java
new file mode 100644
index 0000000..6a4a67a
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/daemon/nimbus/TopologyActions.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.nimbus;
+
+/**
+ * Actions that can be done to a topology in nimbus
+ */
+public enum TopologyActions {
+    STARTUP,
+    INACTIVATE,
+    ACTIVATE,
+    REBALANCE,
+    KILL,
+    DO_REBALANCE,
+    REMOVE
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/725003cc/storm-core/src/jvm/org/apache/storm/daemon/nimbus/TopologyResources.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/daemon/nimbus/TopologyResources.java 
b/storm-core/src/jvm/org/apache/storm/daemon/nimbus/TopologyResources.java
new file mode 100644
index 0000000..ab9b212
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/daemon/nimbus/TopologyResources.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.nimbus;
+
+public final class TopologyResources {
+    private final Double requestedMemOnHeap;
+    private final Double requestedMemOffHeap;
+    private final Double requestedCpu;
+    private final Double assignedMemOnHeap;
+    private final Double assignedMemOffHeap;
+    private final Double assignedCpu;
+    
+    public TopologyResources(Double requestedMemOnHeap, Double 
requestedMemOffHeap,
+            Double requestedCpu, Double assignedMemOnHeap, Double 
assignedMemOffHeap,
+            Double assignedCpu) {
+                this.requestedMemOnHeap = requestedMemOnHeap;
+                this.requestedMemOffHeap = requestedMemOffHeap;
+                this.requestedCpu = requestedCpu;
+                this.assignedMemOnHeap = assignedMemOnHeap;
+                this.assignedMemOffHeap = assignedMemOffHeap;
+                this.assignedCpu = assignedCpu;
+        
+    }
+
+    public Double getRequestedMemOnHeap() {
+        return requestedMemOnHeap;
+    }
+
+    public Double getRequestedMemOffHeap() {
+        return requestedMemOffHeap;
+    }
+
+    public Double getRequestedCpu() {
+        return requestedCpu;
+    }
+
+    public Double getAssignedMemOnHeap() {
+        return assignedMemOnHeap;
+    }
+
+    public Double getAssignedMemOffHeap() {
+        return assignedMemOffHeap;
+    }
+
+    public Double getAssignedCpu() {
+        return assignedCpu;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/725003cc/storm-core/src/jvm/org/apache/storm/daemon/nimbus/TopologyStateTransition.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/daemon/nimbus/TopologyStateTransition.java
 
b/storm-core/src/jvm/org/apache/storm/daemon/nimbus/TopologyStateTransition.java
new file mode 100644
index 0000000..39151e4
--- /dev/null
+++ 
b/storm-core/src/jvm/org/apache/storm/daemon/nimbus/TopologyStateTransition.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.nimbus;
+
+import org.apache.storm.generated.StormBase;
+
+/**
+ * A transition from one state to another
+ */
+interface TopologyStateTransition {
+    StormBase transition(Object argument, Nimbus nimbus, String topoId, 
StormBase base) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/725003cc/storm-core/src/jvm/org/apache/storm/daemon/supervisor/BasicContainer.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/BasicContainer.java 
b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/BasicContainer.java
index d02cfa1..b2af336 100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/BasicContainer.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/BasicContainer.java
@@ -17,6 +17,8 @@
  */
 package org.apache.storm.daemon.supervisor;
 
+import static org.apache.storm.utils.Utils.OR;
+
 import java.io.File;
 import java.io.FilenameFilter;
 import java.io.IOException;
@@ -610,16 +612,6 @@ public class BasicContainer extends Container {
         return workerProfilerChildopts;
     }
     
-    /**
-     * a or b the first one that is not null
-     * @param a something
-     * @param b something else
-     * @return a or b the first one that is not null
-     */
-    private <V> V OR(V a, V b) {
-        return a == null ? b : a;
-    }
-    
     protected String javaCmd(String cmd) {
         String ret = null;
         String javaHome = System.getenv().get("JAVA_HOME");
@@ -663,7 +655,7 @@ public class BasicContainer extends Container {
         commandList.addAll(commonParams);
         
commandList.addAll(substituteChildopts(_conf.get(Config.WORKER_CHILDOPTS), 
memOnheap));
         
commandList.addAll(substituteChildopts(_topoConf.get(Config.TOPOLOGY_WORKER_CHILDOPTS),
 memOnheap));
-        commandList.addAll(substituteChildopts(OR(
+        commandList.addAll(substituteChildopts(Utils.OR(
                 _topoConf.get(Config.TOPOLOGY_WORKER_GC_CHILDOPTS),
                 _conf.get(Config.WORKER_GC_CHILDOPTS)), memOnheap));
         commandList.addAll(getWorkerProfilerChildOpts(memOnheap));

http://git-wip-us.apache.org/repos/asf/storm/blob/725003cc/storm-core/src/jvm/org/apache/storm/daemon/worker/Worker.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/worker/Worker.java 
b/storm-core/src/jvm/org/apache/storm/daemon/worker/Worker.java
index 9d31c87..12709ac 100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/worker/Worker.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/worker/Worker.java
@@ -142,8 +142,12 @@ public class Worker implements Shutdownable, DaemonCommon {
         IStormClusterState stormClusterState =
             ClusterUtils.mkStormClusterState(stateStorage, acls, new 
ClusterStateContext());
         Credentials initialCredentials = 
stormClusterState.credentials(topologyId, null);
+        Map<String, String> initCreds = new HashMap<>();
+        if (initialCredentials != null) {
+            initCreds.putAll(initialCredentials.get_creds());
+        }
         autoCreds = AuthUtils.GetAutoCredentials(topologyConf);
-        subject = AuthUtils.populateSubject(null, autoCreds, 
initialCredentials.get_creds());
+        subject = AuthUtils.populateSubject(null, autoCreds, initCreds);
 
         Subject.doAs(subject, new PrivilegedExceptionAction<Object>() {
             @Override public Object run() throws Exception {
@@ -187,11 +191,11 @@ public class Worker implements Shutdownable, DaemonCommon 
{
                 for (List<Long> e : workerState.getExecutors()) {
                     if (ConfigUtils.isLocalMode(topologyConf)) {
                         newExecutors.add(
-                            LocalExecutor.mkExecutor(workerState, e, 
initialCredentials.get_creds())
+                            LocalExecutor.mkExecutor(workerState, e, initCreds)
                                 .execute());
                     } else {
                         newExecutors.add(
-                            Executor.mkExecutor(workerState, e, 
initialCredentials.get_creds())
+                            Executor.mkExecutor(workerState, e, initCreds)
                                 .execute());
                     }
                 }

http://git-wip-us.apache.org/repos/asf/storm/blob/725003cc/storm-core/src/jvm/org/apache/storm/nimbus/NimbusInfo.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/nimbus/NimbusInfo.java 
b/storm-core/src/jvm/org/apache/storm/nimbus/NimbusInfo.java
index c933c31..1530239 100644
--- a/storm-core/src/jvm/org/apache/storm/nimbus/NimbusInfo.java
+++ b/storm-core/src/jvm/org/apache/storm/nimbus/NimbusInfo.java
@@ -18,6 +18,7 @@
 package org.apache.storm.nimbus;
 
 import org.apache.storm.Config;
+import org.apache.storm.utils.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -27,6 +28,7 @@ import java.net.UnknownHostException;
 import java.util.Map;
 
 public class NimbusInfo implements Serializable {
+    private static final long serialVersionUID = 2161446155116099333L;
     private static final Logger LOG = 
LoggerFactory.getLogger(NimbusInfo.class);
     private static final String DELIM = ":";
 
@@ -49,17 +51,17 @@ public class NimbusInfo implements Serializable {
         }
     }
 
-    public static NimbusInfo fromConf(Map conf) {
+    public static NimbusInfo fromConf(Map<String, Object> conf) {
         try {
             String host = InetAddress.getLocalHost().getCanonicalHostName();
             if (conf.containsKey(Config.STORM_LOCAL_HOSTNAME)) {
-                host = conf.get(Config.STORM_LOCAL_HOSTNAME).toString();
+                host = (String) conf.get(Config.STORM_LOCAL_HOSTNAME);
                 LOG.info("Overriding nimbus host to storm.local.hostname -> 
{}", host);
             } else {
                 LOG.info("Nimbus figures out its name to {}", host);
             }
 
-            int port = 
Integer.parseInt(conf.get(Config.NIMBUS_THRIFT_PORT).toString());
+            int port = Utils.getInt(conf.get(Config.NIMBUS_THRIFT_PORT), 6627);
             return new NimbusInfo(host, port, false);
 
         } catch (UnknownHostException e) {

http://git-wip-us.apache.org/repos/asf/storm/blob/725003cc/storm-core/src/jvm/org/apache/storm/scheduler/SchedulerAssignmentImpl.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/scheduler/SchedulerAssignmentImpl.java 
b/storm-core/src/jvm/org/apache/storm/scheduler/SchedulerAssignmentImpl.java
index dffeb47..1b7d7f8 100644
--- a/storm-core/src/jvm/org/apache/storm/scheduler/SchedulerAssignmentImpl.java
+++ b/storm-core/src/jvm/org/apache/storm/scheduler/SchedulerAssignmentImpl.java
@@ -46,6 +46,22 @@ public class SchedulerAssignmentImpl implements 
SchedulerAssignment {
     }
 
     @Override
+    public String toString() {
+        return this.getClass().getSimpleName() + " topo: " + topologyId + " 
execToSlots: " + executorToSlot;
+    }
+    
+    @Override
+    public boolean equals(Object other) {
+        if (other == this) return true;
+        if (other instanceof SchedulerAssignmentImpl) {
+            SchedulerAssignmentImpl sother = (SchedulerAssignmentImpl) other;
+            return topologyId.equals(sother.topologyId) &&
+                    executorToSlot.equals(sother.executorToSlot);
+        }
+        return false;
+    }
+    
+    @Override
     public Set<WorkerSlot> getSlots() {
         return new HashSet<>(executorToSlot.values());
     }    

http://git-wip-us.apache.org/repos/asf/storm/blob/725003cc/storm-core/src/jvm/org/apache/storm/scheduler/SupervisorDetails.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/scheduler/SupervisorDetails.java 
b/storm-core/src/jvm/org/apache/storm/scheduler/SupervisorDetails.java
index 1f4ebd7..0ed6f8d 100644
--- a/storm-core/src/jvm/org/apache/storm/scheduler/SupervisorDetails.java
+++ b/storm-core/src/jvm/org/apache/storm/scheduler/SupervisorDetails.java
@@ -49,7 +49,7 @@ public class SupervisorDetails {
     private Map<String, Double> _total_resources;
 
     public SupervisorDetails(String id, String host, Object meta, Object 
schedulerMeta,
-                             Collection<Number> allPorts, Map<String, Double> 
total_resources){
+                             Collection<? extends Number> allPorts, 
Map<String, Double> total_resources){
 
         this.id = id;
         this.host = host;
@@ -72,23 +72,29 @@ public class SupervisorDetails {
         this(id, null, meta, null, null, total_resources);
     }
 
-    public SupervisorDetails(String id, Object meta, Collection<Number> 
allPorts){
+    public SupervisorDetails(String id, Object meta, Collection<? extends 
Number> allPorts){
         this(id, null, meta, null, allPorts, null);
     }
 
-    public SupervisorDetails(String id, String host, Object schedulerMeta, 
Collection<Number> allPorts) {
+    public SupervisorDetails(String id, String host, Object schedulerMeta, 
Collection<? extends Number> allPorts) {
         this(id, host, null, schedulerMeta, allPorts, null);
     }
 
     public SupervisorDetails(String id, String host, Object schedulerMeta,
-                             Collection<Number> allPorts, Map<String, Double> 
total_resources) {
+                             Collection<? extends Number> allPorts, 
Map<String, Double> total_resources) {
         this(id, host, null, schedulerMeta, allPorts, total_resources);
     }
+    
+    @Override
+    public String toString() {
+        return getClass().getSimpleName() + " ID: " + id + " HOST: " + host + 
" META: " + meta +
+                " SCHED_META: " + schedulerMeta + " PORTS: " + allPorts;
+    }
 
-    private void setAllPorts(Collection<Number> allPorts) {
+    private void setAllPorts(Collection<? extends Number> allPorts) {
         this.allPorts = new HashSet<>();
-        if(allPorts!=null) {
-            for(Number n: allPorts) {
+        if (allPorts!=null) {
+            for (Number n: allPorts) {
                 this.allPorts.add(n.intValue());
             }
         }

http://git-wip-us.apache.org/repos/asf/storm/blob/725003cc/storm-core/src/jvm/org/apache/storm/security/auth/AuthUtils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/security/auth/AuthUtils.java 
b/storm-core/src/jvm/org/apache/storm/security/auth/AuthUtils.java
index 3c6e961..353f679 100644
--- a/storm-core/src/jvm/org/apache/storm/security/auth/AuthUtils.java
+++ b/storm-core/src/jvm/org/apache/storm/security/auth/AuthUtils.java
@@ -153,12 +153,20 @@ public class AuthUtils {
      * @param storm_conf storm configuration
      * @return the plugin
      */
-    public static IPrincipalToLocal GetPrincipalToLocalPlugin(Map storm_conf) {
-        IPrincipalToLocal ptol;
+    public static IPrincipalToLocal GetPrincipalToLocalPlugin(Map<String, 
Object> storm_conf) {
+        IPrincipalToLocal ptol = null;
         try {
             String ptol_klassName = (String) 
storm_conf.get(Config.STORM_PRINCIPAL_TO_LOCAL_PLUGIN);
-            ptol = Utils.newInstance(ptol_klassName);
-            ptol.prepare(storm_conf);
+            if (ptol_klassName == null) {
+                LOG.warn("No principal to local given {}", 
Config.STORM_PRINCIPAL_TO_LOCAL_PLUGIN);
+            } else {
+                ptol = Utils.newInstance(ptol_klassName);
+                //TODO this can only ever be null if someone is doing 
something odd with mocking
+                // We should really fix the mocking and remove this
+                if (ptol != null) {
+                    ptol.prepare(storm_conf);
+                }
+            }
         } catch (Exception e) {
             throw new RuntimeException(e);
         }
@@ -167,15 +175,21 @@ public class AuthUtils {
 
     /**
      * Construct a group mapping service provider plugin
-     * @param storm_conf storm configuration
+     * @param conf daemon configuration
      * @return the plugin
      */
-    public static IGroupMappingServiceProvider 
GetGroupMappingServiceProviderPlugin(Map storm_conf) {
-        IGroupMappingServiceProvider gmsp;
+    public static IGroupMappingServiceProvider 
GetGroupMappingServiceProviderPlugin(Map conf) {
+        IGroupMappingServiceProvider gmsp = null;
         try {
-            String gmsp_klassName = (String) 
storm_conf.get(Config.STORM_GROUP_MAPPING_SERVICE_PROVIDER_PLUGIN);
-            gmsp = Utils.newInstance(gmsp_klassName);
-            gmsp.prepare(storm_conf);
+            String gmsp_klassName = (String) 
conf.get(Config.STORM_GROUP_MAPPING_SERVICE_PROVIDER_PLUGIN);
+            if (gmsp_klassName == null) {
+                LOG.warn("No group mapper given {}", 
Config.STORM_GROUP_MAPPING_SERVICE_PROVIDER_PLUGIN);
+            } else {
+                gmsp = Utils.newInstance(gmsp_klassName);
+                if (gmsp != null) {
+                    gmsp.prepare(conf);
+                }
+            }
         } catch (Exception e) {
             throw new RuntimeException(e);
         }

http://git-wip-us.apache.org/repos/asf/storm/blob/725003cc/storm-core/src/jvm/org/apache/storm/security/auth/ShellBasedGroupsMapping.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/security/auth/ShellBasedGroupsMapping.java
 
b/storm-core/src/jvm/org/apache/storm/security/auth/ShellBasedGroupsMapping.java
index 5d7f702..8ea42d5 100644
--- 
a/storm-core/src/jvm/org/apache/storm/security/auth/ShellBasedGroupsMapping.java
+++ 
b/storm-core/src/jvm/org/apache/storm/security/auth/ShellBasedGroupsMapping.java
@@ -56,12 +56,17 @@ public class ShellBasedGroupsMapping implements
      */
     @Override
     public Set<String> getGroups(String user) throws IOException {
-        if(cachedGroups.containsKey(user)) {
-            return cachedGroups.get(user);
+        synchronized(this) {
+            if (cachedGroups.containsKey(user)) {
+                return cachedGroups.get(user);
+            }
         }
         Set<String> groups = getUnixGroups(user);
-        if(!groups.isEmpty())
-            cachedGroups.put(user,groups);
+        if(!groups.isEmpty()) {
+            synchronized (this) {
+                cachedGroups.put(user,groups);
+            }
+        }
         return groups;
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/725003cc/storm-core/src/jvm/org/apache/storm/stats/StatsUtil.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/stats/StatsUtil.java 
b/storm-core/src/jvm/org/apache/storm/stats/StatsUtil.java
index 0524dc8..bd6f32d 100644
--- a/storm-core/src/jvm/org/apache/storm/stats/StatsUtil.java
+++ b/storm-core/src/jvm/org/apache/storm/stats/StatsUtil.java
@@ -36,6 +36,7 @@ import org.apache.storm.generated.ExecutorSpecificStats;
 import org.apache.storm.generated.ExecutorStats;
 import org.apache.storm.generated.ExecutorSummary;
 import org.apache.storm.generated.GlobalStreamId;
+import org.apache.storm.generated.NodeInfo;
 import org.apache.storm.generated.SpecificAggregateStats;
 import org.apache.storm.generated.SpoutAggregateStats;
 import org.apache.storm.generated.SpoutStats;
@@ -51,11 +52,13 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Set;
 
 @SuppressWarnings("unchecked")
@@ -1353,7 +1356,7 @@ public class StatsUtil {
         }
         return new ArrayList<WorkerSummary>(workerSummaryMap.values());
     }
-
+    
     /**
      * Aggregate statistics per worker for a topology. Optionally filtering on 
specific supervisors
      * 
@@ -1505,19 +1508,18 @@ public class StatsUtil {
      * @return a list of host+port
      */
     public static List<Map<String, Object>> extractNodeInfosFromHbForComp(
-            Map exec2hostPort, Map task2component, boolean includeSys, String 
compId) {
+            Map<List<? extends Number>, List<Object>> exec2hostPort, 
Map<Integer, String> task2component, boolean includeSys, String compId) {
         List<Map<String, Object>> ret = new ArrayList<>();
 
         Set<List> hostPorts = new HashSet<>();
-        for (Object o : exec2hostPort.entrySet()) {
-            Map.Entry entry = (Map.Entry) o;
-            List key = (List) entry.getKey();
-            List value = (List) entry.getValue();
+        for (Entry<List<? extends Number>, List<Object>> entry : 
exec2hostPort.entrySet()) {
+            List<? extends Number> key = entry.getKey();
+            List<Object> value = entry.getValue();
 
-            Integer start = ((Number) key.get(0)).intValue();
+            Integer start = key.get(0).intValue();
             String host = (String) value.get(0);
             Integer port = (Integer) value.get(1);
-            String comp = (String) task2component.get(start);
+            String comp = task2component.get(start);
             if ((compId == null || compId.equals(comp)) && (includeSys || 
!Utils.isSystemId(comp))) {
                 hostPorts.add(Lists.newArrayList(host, port));
             }
@@ -1548,10 +1550,10 @@ public class StatsUtil {
      * @param timeout       timeout
      * @return a HashMap of updated executor heart beats
      */
-    public static Map<List<Integer>, Object> 
updateHeartbeatCache(Map<List<Integer>, Map<String, Object>> cache,
+    public static Map<List<Integer>, Map<String, Object>> 
updateHeartbeatCache(Map<List<Integer>, Map<String, Object>> cache,
                                                                   
Map<List<Integer>, Map<String, Object>> executorBeats,
                                                                   
Set<List<Integer>> executors, Integer timeout) {
-        Map<List<Integer>, Object> ret = new HashMap<>();
+        Map<List<Integer>, Map<String, Object>> ret = new HashMap<>();
         if (cache == null && executorBeats == null) {
             return ret;
         }

http://git-wip-us.apache.org/repos/asf/storm/blob/725003cc/storm-core/src/jvm/org/apache/storm/utils/BufferInputStream.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/utils/BufferInputStream.java 
b/storm-core/src/jvm/org/apache/storm/utils/BufferInputStream.java
index 7420b85..40dd3b7 100644
--- a/storm-core/src/jvm/org/apache/storm/utils/BufferInputStream.java
+++ b/storm-core/src/jvm/org/apache/storm/utils/BufferInputStream.java
@@ -22,7 +22,7 @@ import java.io.InputStream;
 import java.util.Arrays;
 
 
-public class BufferInputStream {
+public class BufferInputStream implements AutoCloseable {
     byte[] buffer;
     InputStream stream;
 
@@ -47,6 +47,7 @@ public class BufferInputStream {
         }
     }
 
+    @Override
     public void close() throws IOException {
         stream.close();
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/725003cc/storm-core/src/jvm/org/apache/storm/utils/ConfigUtils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/utils/ConfigUtils.java 
b/storm-core/src/jvm/org/apache/storm/utils/ConfigUtils.java
index e2be8a7..1c5dca4 100644
--- a/storm-core/src/jvm/org/apache/storm/utils/ConfigUtils.java
+++ b/storm-core/src/jvm/org/apache/storm/utils/ConfigUtils.java
@@ -168,8 +168,8 @@ public class ConfigUtils {
         return conf;
     }
 
-    public static Map readYamlConfig(String name, boolean mustExist) {
-        Map conf = Utils.findAndReadConfigFile(name, mustExist);
+    public static Map<String, Object> readYamlConfig(String name, boolean 
mustExist) {
+        Map<String, Object> conf = Utils.findAndReadConfigFile(name, 
mustExist);
         ConfigValidation.validateFields(conf);
         return conf;
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/725003cc/storm-core/src/jvm/org/apache/storm/utils/Utils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/utils/Utils.java 
b/storm-core/src/jvm/org/apache/storm/utils/Utils.java
index 8577c22..9d3b1d4 100644
--- a/storm-core/src/jvm/org/apache/storm/utils/Utils.java
+++ b/storm-core/src/jvm/org/apache/storm/utils/Utils.java
@@ -1665,16 +1665,18 @@ public class Utils {
 
     /**
      * Creates a new map with a string value in the map replaced with an
-     * equivalently-lengthed string of '#'.
+     * equivalently-lengthed string of '#'.  (If the object is not a string
+     * to string will be called on it and replaced) 
      * @param m The map that a value will be redacted from
      * @param key The key pointing to the value to be redacted
      * @return a new map with the value redacted. The original map will not be 
modified.
      */
-    public static Map<Object, String> redactValue(Map<Object, String> m, 
Object key) {
-        if(m.containsKey(key)) {
-            HashMap<Object, String> newMap = new HashMap<>(m);
-            String value = newMap.get(key);
-            String redacted = new String(new 
char[value.length()]).replace("\0", "#");
+    public static Map<String, Object> redactValue(Map<String, Object> m, 
String key) {
+        if (m.containsKey(key)) {
+            HashMap<String, Object> newMap = new HashMap<>(m);
+            Object value = newMap.get(key);
+            String v = value.toString();
+            String redacted = new String(new char[v.length()]).replace("\0", 
"#");
             newMap.put(key, redacted);
             return newMap;
         }
@@ -1962,7 +1964,7 @@ public class Utils {
         return dir + FILE_PATH_SEPARATOR + "launch_container.sh";
     }
 
-    public static Object nullToZero (Object v) {
+    public static double nullToZero (Double v) {
         return (v != null ? v : 0);
     }
 
@@ -2071,6 +2073,16 @@ public class Utils {
     }
 
     /**
+     * a or b the first one that is not null
+     * @param a something
+     * @param b something else
+     * @return a or b the first one that is not null
+     */
+    public static <V> V OR(V a, V b) {
+        return a == null ? b : a;
+    }
+
+    /**
      * Writes a posix shell script file to be executed in its own process.
      * @param dir the directory under which the script is to be written
      * @param command the command the script is to execute

http://git-wip-us.apache.org/repos/asf/storm/blob/725003cc/storm-core/test/clj/org/apache/storm/cluster_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/cluster_test.clj 
b/storm-core/test/clj/org/apache/storm/cluster_test.clj
index 13198fa..2be4a7a 100644
--- a/storm-core/test/clj/org/apache/storm/cluster_test.clj
+++ b/storm-core/test/clj/org/apache/storm/cluster_test.clj
@@ -17,7 +17,7 @@
   (:import [java.util Arrays]
            [org.apache.storm.nimbus NimbusInfo])
   (:import [org.apache.storm.daemon.common Assignment StormBase 
SupervisorInfo])
-  (:import [org.apache.storm.generated NimbusSummary])
+  (:import [org.apache.storm.generated NimbusSummary TopologyStatus])
   (:import [org.apache.zookeeper ZooDefs ZooDefs$Ids Watcher$Event$EventType])
   (:import [org.mockito Mockito])
   (:import [org.mockito.exceptions.base MockitoAssertionError])
@@ -187,8 +187,8 @@
           nimbusInfo2 (NimbusInfo. "nimbus2" 6667 false)
           nimbusSummary1 (NimbusSummary. "nimbus1" 6667 (Time/currentTimeSecs) 
false "v1")
           nimbusSummary2 (NimbusSummary. "nimbus2" 6667 (Time/currentTimeSecs) 
false "v2")
-          base1 (StormBase. "/tmp/storm1" 1 {:type :active} 2 {} "" nil nil {})
-          base2 (StormBase. "/tmp/storm2" 2 {:type :active} 2 {} "" nil nil 
{})]
+          base1 (StormBase. "/tmp/storm1" 1 {:type TopologyStatus/ACTIVE} 2 {} 
"" nil nil {})
+          base2 (StormBase. "/tmp/storm2" 2 {:type TopologyStatus/ACTIVE} 2 {} 
"" nil nil {})]
       (is (= [] (.assignments state nil)))
       (.setAssignment state "storm1" (thriftify-assignment assignment1))
       (is (= assignment1 (clojurify-assignment (.assignmentInfo state "storm1" 
nil))))
@@ -336,4 +336,4 @@
           cluster-utils (Mockito/mock ClusterUtils)]
       (with-open [mocked-cluster (MockedCluster. cluster-utils)]
         (. (Mockito/when (.mkStateStorageImpl cluster-utils (Mockito/any) 
(Mockito/any) (Mockito/eq nil) (Mockito/any))) (thenReturn 
distributed-state-storage))
-        (ClusterUtils/mkStormClusterState {} nil (ClusterStateContext.))))))
\ No newline at end of file
+        (ClusterUtils/mkStormClusterState {} nil (ClusterStateContext.))))))

http://git-wip-us.apache.org/repos/asf/storm/blob/725003cc/storm-core/test/clj/org/apache/storm/nimbus_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/nimbus_test.clj 
b/storm-core/test/clj/org/apache/storm/nimbus_test.clj
index a43ad8e..cc760a4 100644
--- a/storm-core/test/clj/org/apache/storm/nimbus_test.clj
+++ b/storm-core/test/clj/org/apache/storm/nimbus_test.clj
@@ -16,14 +16,15 @@
 (ns org.apache.storm.nimbus-test
   (:use [clojure test])
   (:require [org.apache.storm [util :as util]])
-  (:require [org.apache.storm.daemon [nimbus :as nimbus]])
-  (:require [org.apache.storm [converter :as converter]])
   (:import [org.apache.storm.testing TestWordCounter TestWordSpout 
TestGlobalCount
             TestAggregatesCounter TestPlannerSpout TestPlannerBolt]
+           [org.apache.storm.blobstore BlobStore]
            [org.apache.storm.nimbus InMemoryTopologyActionNotifier]
-           [org.apache.storm.generated GlobalStreamId]
+           [org.apache.storm.daemon.nimbus Nimbus Nimbus$StandaloneINimbus]
+           [org.apache.storm.generated GlobalStreamId TopologyStatus 
SupervisorInfo StormTopology StormBase]
            [org.apache.storm Thrift MockAutoCred]
-           [org.apache.storm.stats BoltExecutorStats StatsUtil])
+           [org.apache.storm.stats BoltExecutorStats StatsUtil]
+           [org.apache.storm.security.auth IGroupMappingServiceProvider 
IAuthorizer])
   (:import [org.apache.storm.testing.staticmocking MockedZookeeper])
   (:import [org.apache.storm.scheduler INimbus])
   (:import [org.mockito Mockito Matchers])
@@ -34,8 +35,9 @@
             TopologyInitialStatus TopologyStatus AlreadyAliveException 
KillOptions RebalanceOptions
             InvalidTopologyException AuthorizationException
             LogConfig LogLevel LogLevelAction Assignment NodeInfo])
-  (:import [java.util HashMap])
+  (:import [java.util HashMap HashSet Optional])
   (:import [java.io File])
+  (:import [javax.security.auth Subject])
   (:import [org.apache.storm.utils Time Utils Utils$UptimeComputer ConfigUtils 
IPredicate StormCommonInstaller]
            [org.apache.storm.utils.staticmocking ConfigUtilsInstaller 
UtilsInstaller])
   (:import [org.apache.storm.zookeeper Zookeeper])
@@ -44,7 +46,7 @@
   (:import [org.apache.storm.daemon StormCommon])
   (:import [org.apache.storm.cluster IStormClusterState StormClusterStateImpl 
ClusterStateContext ClusterUtils])
   (:use [org.apache.storm testing util config log converter])
-    (:require [conjure.core] [org.apache.storm.daemon.nimbus :as nimbus])
+  (:require [conjure.core])
 
   (:use [conjure core]))
 
@@ -277,7 +279,7 @@
       )))
 
 (defn isolation-nimbus []
-  (let [standalone (nimbus/standalone-nimbus)]
+  (let [standalone (Nimbus$StandaloneINimbus.)]
     (reify INimbus
       (prepare [this conf local-dir]
         (.prepare standalone conf local-dir)
@@ -499,14 +501,15 @@
     )))
 
 (deftest test-topo-history
-  (with-simulated-time-local-cluster [cluster :supervisors 2 
:ports-per-supervisor 5
-                                      :daemon-conf {SUPERVISOR-ENABLE false
-                                                    NIMBUS-ADMINS 
["admin-user"]
-                                                    NIMBUS-TASK-TIMEOUT-SECS 30
-                                                    NIMBUS-MONITOR-FREQ-SECS 10
-                                                    TOPOLOGY-ACKER-EXECUTORS 
0}]
-
-    (stubbing [nimbus/user-groups ["alice-group"]]
+  (let [group-mapper (Mockito/mock IGroupMappingServiceProvider)]
+    (with-simulated-time-local-cluster [cluster :supervisors 2 
:ports-per-supervisor 5
+                                        :group-mapper group-mapper
+                                        :daemon-conf {SUPERVISOR-ENABLE false
+                                                      NIMBUS-ADMINS 
["admin-user"]
+                                                      NIMBUS-TASK-TIMEOUT-SECS 
30
+                                                      NIMBUS-MONITOR-FREQ-SECS 
10
+                                                      TOPOLOGY-ACKER-EXECUTORS 
0}]
+      (.thenReturn (Mockito/when (.getGroups group-mapper 
(Mockito/anyObject))) #{"alice-group"})
       (letlocals
         (bind conf (:daemon-conf cluster))
         (bind topology (Thrift/buildTopology
@@ -521,7 +524,7 @@
         (is (not-nil? (clojurify-assignment (.assignmentInfo state storm-id 
nil))))
         (.killTopology (:nimbus cluster) "test")
         ;; check that storm is deactivated but alive
-        (is (= :killed (-> (clojurify-storm-base (.stormBase state storm-id 
nil)) :status :type)))
+        (is (= TopologyStatus/KILLED (-> (clojurify-storm-base (.stormBase 
state storm-id nil)) :status :type)))
         (is (not-nil? (clojurify-assignment (.assignmentInfo state storm-id 
nil))))
         (advance-cluster-time cluster 35)
         ;; kill topology read on group
@@ -532,7 +535,7 @@
         (is (not-nil? (clojurify-assignment (.assignmentInfo state 
storm-id-killgroup nil))))
         (.killTopology (:nimbus cluster) "killgrouptest")
         ;; check that storm is deactivated but alive
-        (is (= :killed (-> (clojurify-storm-base (.stormBase state 
storm-id-killgroup nil)) :status :type)))
+        (is (= TopologyStatus/KILLED (-> (clojurify-storm-base (.stormBase 
state storm-id-killgroup nil)) :status :type)))
         (is (not-nil? (clojurify-assignment (.assignmentInfo state 
storm-id-killgroup nil))))
         (advance-cluster-time cluster 35)
         ;; kill topology can't read
@@ -543,7 +546,7 @@
         (is (not-nil? (clojurify-assignment (.assignmentInfo state 
storm-id-killnoread nil))))
         (.killTopology (:nimbus cluster) "killnoreadtest")
         ;; check that storm is deactivated but alive
-        (is (= :killed (-> (clojurify-storm-base (.stormBase state 
storm-id-killnoread nil)) :status :type)))
+        (is (= TopologyStatus/KILLED (-> (clojurify-storm-base (.stormBase 
state storm-id-killnoread nil)) :status :type)))
         (is (not-nil? (clojurify-assignment (.assignmentInfo state 
storm-id-killnoread nil))))
         (advance-cluster-time cluster 35)
 
@@ -617,7 +620,7 @@
       (is (not-nil? (clojurify-assignment (.assignmentInfo state storm-id 
nil))))
       (.killTopology (:nimbus cluster) "test")
       ;; check that storm is deactivated but alive
-      (is (= :killed (-> (clojurify-storm-base (.stormBase state storm-id 
nil)) :status :type)))
+      (is (= TopologyStatus/KILLED (-> (clojurify-storm-base (.stormBase state 
storm-id nil)) :status :type)))
       (is (not-nil? (clojurify-assignment (.assignmentInfo state storm-id 
nil))))
       (advance-cluster-time cluster 18)
       ;; check that storm is deactivated but alive
@@ -961,7 +964,7 @@
       (is (thrown? InvalidTopologyException
                    (.rebalance (:nimbus cluster) "test"
                      (doto (RebalanceOptions.)
-                       (.set_num_executors {"1" 0})
+                       (.set_num_executors {"1" (int 0)})
                        ))))
       )))
 
@@ -993,7 +996,7 @@
 
       (.rebalance (:nimbus cluster) "test"
                   (doto (RebalanceOptions.)
-                    (.set_num_workers 6)
+                    (.set_num_workers (int 6))
                     ))
       (advance-cluster-time cluster 29)
       (checker [2 2 2])
@@ -1002,7 +1005,7 @@
 
       (.rebalance (:nimbus cluster) "test"
                   (doto (RebalanceOptions.)
-                    (.set_num_executors {"1" 1})
+                    (.set_num_executors {"1" (int 1)})
                     ))
       (advance-cluster-time cluster 29)
       (checker [1 1 1 1 1 1])
@@ -1011,7 +1014,7 @@
 
       (.rebalance (:nimbus cluster) "test"
                   (doto (RebalanceOptions.)
-                    (.set_num_executors {"1" 8})
+                    (.set_num_executors {"1" (int 8)})
                     (.set_num_workers 4)
                     ))
       (advance-cluster-time cluster 32)
@@ -1137,24 +1140,6 @@
                                        {TOPOLOGY-WORKERS 8}
                                        topology))))))
 
-(defnk mock-leader-elector [:is-leader true :leader-name "test-host" 
:leader-port 9999]
-  (let [leader-address (NimbusInfo. leader-name leader-port true)]
-    (reify ILeaderElector
-      (prepare [this conf] true)
-      (isLeader [this] is-leader)
-      (addToLeaderLockQueue [this] true)
-      (getLeader [this] leader-address)
-      (getAllNimbuses [this] `(leader-address))
-      (close [this] true))))
-
-;(deftest test-no-overlapping-slots
-;  ;; test that same node+port never appears across 2 assignments
-;  )
-
-;(deftest test-stateless
-;  ;; test that nimbus can die and restart without any problems
-;  )
-
 (deftest test-clean-inbox
   "Tests that the inbox correctly cleans jar files."
   (with-simulated-time
@@ -1176,15 +1161,15 @@
         (doseq [fs [["a.jar" 20] ["b.jar" 20] ["c.jar" 0]]]
           (apply mk-file fs))
         (assert-files-in-dir ["a.jar" "b.jar" "c.jar"])
-        (nimbus/clean-inbox dir-location 10)
+        (Nimbus/cleanInbox dir-location 10)
         (assert-files-in-dir ["c.jar"])
         ;; Cleanit again, c.jar should stay
         (advance-time-secs! 5)
-        (nimbus/clean-inbox dir-location 10)
+        (Nimbus/cleanInbox dir-location 10)
         (assert-files-in-dir ["c.jar"])
         ;; Advance time, clean again, c.jar should be deleted.
         (advance-time-secs! 5)
-        (nimbus/clean-inbox dir-location 10)
+        (Nimbus/cleanInbox dir-location 10)
         (assert-files-in-dir [])
         ))))
 
@@ -1209,7 +1194,8 @@
                         STORM-ZOOKEEPER-PORT zk-port
                         STORM-LOCAL-DIR nimbus-dir}))
           (bind cluster-state (ClusterUtils/mkStormClusterState conf nil 
(ClusterStateContext.)))
-          (bind nimbus (nimbus/service-handler conf 
(nimbus/standalone-nimbus)))
+          (bind nimbus (mk-nimbus conf (Nimbus$StandaloneINimbus.) nil nil nil 
nil))
+          (.launchServer nimbus)
           (bind topology (Thrift/buildTopology
                            {"1" (Thrift/prepareSpoutDetails
                                   (TestPlannerSpout. true) (Integer. 3))}
@@ -1220,7 +1206,8 @@
 
             (letlocals
               (bind non-leader-cluster-state (ClusterUtils/mkStormClusterState 
conf nil (ClusterStateContext.)))
-              (bind non-leader-nimbus (nimbus/service-handler conf 
(nimbus/standalone-nimbus)))
+              (bind non-leader-nimbus (mk-nimbus conf 
(Nimbus$StandaloneINimbus.) nil nil nil nil))
+              (.launchServer non-leader-nimbus)
 
               ;first we verify that the master nimbus can perform all actions, 
even with another nimbus present.
               (submit-local-topology nimbus "t1" {} topology)
@@ -1274,136 +1261,122 @@
 )
 
 (deftest test-nimbus-iface-methods-check-authorization
-  (with-local-cluster [cluster
-                       :daemon-conf {NIMBUS-AUTHORIZER
-                          
"org.apache.storm.security.auth.authorizer.DenyAuthorizer"}]
-    (let [
-          nimbus (:nimbus cluster)
-          topology (Thrift/buildTopology {} {})
-         ]
-      ; Fake good authorization as part of setup.
-      (mocking [nimbus/check-authorization!]
-          (submit-local-topology-with-opts nimbus "test" {} topology
-              (SubmitOptions. TopologyInitialStatus/INACTIVE))
-      )
-      (stubbing [nimbus/storm-active? true]
+  (let [cluster-state (Mockito/mock IStormClusterState)
+        blob-store (Mockito/mock BlobStore)]
+    (with-mocked-nimbus [cluster :cluster-state cluster-state :blob-store 
blob-store
+                         :daemon-conf {NIMBUS-AUTHORIZER 
"org.apache.storm.security.auth.authorizer.DenyAuthorizer"}]
+      (let [nimbus (:nimbus cluster)
+            topology-name "test"
+            topology-id "test-id"]
+        (.thenReturn (Mockito/when (.getTopoId cluster-state topology-name)) 
(Optional/of topology-id))
         (is (thrown? AuthorizationException
-          (.rebalance nimbus "test" (RebalanceOptions.))
-          ))
-      )
-      (is (thrown? AuthorizationException
-        (.activate nimbus "test")
-        ))
-      (is (thrown? AuthorizationException
-        (.deactivate nimbus "test")
-        ))
-    )
-  )
-)
+          (.rebalance nimbus topology-name (RebalanceOptions.))))
+        (is (thrown? AuthorizationException
+          (.activate nimbus topology-name)))
+        (is (thrown? AuthorizationException
+          (.deactivate nimbus topology-name)))))))
 
 (deftest test-nimbus-check-authorization-params
-  (with-local-cluster [cluster
+  (let [cluster-state (Mockito/mock IStormClusterState)
+        blob-store (Mockito/mock BlobStore)
+        mk-nimbus (fn
+                    [conf inimbus blob-store leader-elector group-mapper 
cluster-state]
+                    (Mockito/spy (mk-nimbus conf inimbus blob-store 
leader-elector group-mapper cluster-state)))] 
+  (with-mocked-nimbus [cluster :cluster-state cluster-state :blob-store 
blob-store :mk-nimbus mk-nimbus
                        :daemon-conf {NIMBUS-AUTHORIZER 
"org.apache.storm.security.auth.authorizer.NoopAuthorizer"}]
     (let [nimbus (:nimbus cluster)
           topology-name "test-nimbus-check-autho-params"
-          topology (Thrift/buildTopology {} {})]
-
-      (submit-local-topology-with-opts nimbus topology-name {} topology
-          (SubmitOptions. TopologyInitialStatus/INACTIVE))
-
-      (let [expected-name topology-name
-            expected-conf {TOPOLOGY-NAME expected-name
-                           "foo" "bar"}]
-
-        (testing "getTopologyConf calls check-authorization! with the correct 
parameters."
-          (let [expected-operation "getTopologyConf"
-                expected-conf-json (JSONValue/toJSONString expected-conf)]
-            (stubbing [nimbus/check-authorization! nil
-                       nimbus/try-read-storm-conf expected-conf]
-              (try
-                (is (= expected-conf
-                       (->> (.getTopologyConf nimbus "fake-id")
-                            JSONValue/parse 
-                            clojurify-structure)))
-                (catch NotAliveException e)
-                (finally
-                  (verify-first-call-args-for-indices
-                    nimbus/check-authorization!
-                      [1 2 3] expected-name expected-conf 
expected-operation))))))
-
-        (testing "getTopology calls check-authorization! with the correct 
parameters."
-          (let [expected-operation "getTopology"
-                common-spy (->>
-                             (proxy [StormCommon] []
-                                    (systemTopologyImpl [conf topology] nil))
-                           Mockito/spy)]
-            (with-open [- (StormCommonInstaller. common-spy)]
-              (stubbing [nimbus/check-authorization! nil
-                         nimbus/try-read-storm-conf expected-conf
-                         nimbus/try-read-storm-topology nil]
-                (try
-                  (.getTopology nimbus "fake-id")
-                  (catch NotAliveException e)
-                  (finally
-                    (verify-first-call-args-for-indices
-                      nimbus/check-authorization!
-                        [1 2 3] expected-name expected-conf expected-operation)
-                    (. (Mockito/verify common-spy)
-                      (systemTopologyImpl (Matchers/eq expected-conf)
-                                          (Matchers/any)))))))))
-
-        (testing "getUserTopology calls check-authorization with the correct 
parameters."
-          (let [expected-operation "getUserTopology"]
-            (stubbing [nimbus/check-authorization! nil
-                       nimbus/try-read-storm-conf expected-conf
-                       nimbus/try-read-storm-topology nil]
-              (try
-                (.getUserTopology nimbus "fake-id")
-                (catch NotAliveException e)
-                (finally
-                  (verify-first-call-args-for-indices
-                    nimbus/check-authorization!
-                      [1 2 3] expected-name expected-conf expected-operation)
-                  (verify-first-call-args-for-indices
-                    nimbus/try-read-storm-topology [0] "fake-id"))))))
-
-        (testing "getSupervisorPageInfo only calls check-authorization as 
getTopology"
-          (let [expected-operation "getTopology"
-                assignment (doto (Assignment.)
-                             (.set_executor_node_port {[1 1] (NodeInfo. 
"super1" #{1}),
-                                                       [2 2] (NodeInfo. 
"super2" #{2})}))
-                clojurified-assignment (clojurify-assignment assignment)
-                topo-assignment {expected-name assignment}
-                check-auth-state (atom [])
-                mock-check-authorization (fn [nimbus storm-name storm-conf 
operation] 
-                                           (swap! check-auth-state conj 
{:nimbus nimbus
-                                                                         
:storm-name storm-name
-                                                                         
:storm-conf storm-conf
-                                                                         
:operation operation}))]
-            (stubbing [nimbus/check-authorization! mock-check-authorization
-                       nimbus/try-read-storm-conf expected-conf
-                       nimbus/try-read-storm-topology nil
-                       nimbus/get-clojurified-task-info {}
-                       nimbus/all-supervisor-info {"super1" {:hostname 
"host1", :meta [1234], :uptime-secs 123}
-                                                   "super2" {:hostname 
"host2", :meta [1234], :uptime-secs 123}}
-                       clojurify-assignment clojurified-assignment
-                       nimbus/topology-assignments topo-assignment
-                       nimbus/get-launch-time-secs 0]
-              ;; not called yet
-              (verify-call-times-for nimbus/check-authorization! 0)
-              (.getSupervisorPageInfo nimbus "super1" nil true)
- 
-              ;; afterwards, it should get called twice
-              (verify-call-times-for nimbus/check-authorization! 2)
-              (let [first-call (nth @check-auth-state 0)
-                    second-call (nth @check-auth-state 1)]
-                 (is (= expected-name (:storm-name first-call)))
-                 (is (= expected-conf (:storm-conf first-call)))
-                 (is (= "getTopology" (:operation first-call)))
+          topology-id "fake-id"
+          topology (Thrift/buildTopology {} {})
+          expected-name topology-name
+          expected-conf {TOPOLOGY-NAME expected-name
+                         "foo" "bar"}] 
+      (.thenReturn (Mockito/when (.getTopoId cluster-state topology-name)) 
(Optional/of topology-id))
+      (.thenReturn (Mockito/when (.readTopologyConf blob-store (Mockito/any 
String) (Mockito/anyObject))) expected-conf)
+      (.thenReturn (Mockito/when (.readTopology blob-store (Mockito/any 
String) (Mockito/anyObject))) nil)
+      (testing "getTopologyConf calls check-authorization! with the correct 
parameters."
+      (let [expected-operation "getTopologyConf"
+            expected-conf-json (JSONValue/toJSONString expected-conf)]
+          (try
+            (is (= expected-conf
+                   (->> (.getTopologyConf nimbus topology-id)
+                        JSONValue/parse 
+                        clojurify-structure)))
+            (catch NotAliveException e)
+            (finally
+              (.checkAuthorization (Mockito/verify nimbus) topology-name 
expected-conf expected-operation)))))
+
+      (testing "getTopology calls check-authorization! with the correct 
parameters."
+        (let [expected-operation "getTopology"
+              common-spy (->>
+                           (proxy [StormCommon] []
+                                  (systemTopologyImpl [conf topology] nil))
+                         Mockito/spy)]
+          (with-open [- (StormCommonInstaller. common-spy)]
+            (try
+              (.getTopology nimbus topology-id)
+              (catch NotAliveException e)
+              (finally
+                (.checkAuthorization (Mockito/verify nimbus) topology-name 
expected-conf expected-operation)
+                (. (Mockito/verify common-spy)
+                  (systemTopologyImpl (Matchers/eq expected-conf)
+                                      (Matchers/any))))))))
+
+      (testing "getUserTopology calls check-authorization with the correct 
parameters."
+        (let [expected-operation "getUserTopology"]
+          (try
+            (.getUserTopology nimbus topology-id)
+            (catch NotAliveException e)
+            (finally
+              (.checkAuthorization (Mockito/verify nimbus) topology-name 
expected-conf expected-operation)
+              ;;One for this time and one for getTopology call
+              (.readTopology (Mockito/verify blob-store (Mockito/times 2)) 
(Mockito/eq topology-id) (Mockito/anyObject))))))))))
+
+(deftest test-check-authorization-getSupervisorPageInfo
+  (let [cluster-state (Mockito/mock IStormClusterState)
+        blob-store (Mockito/mock BlobStore)
+        mk-nimbus (fn
+                    [conf inimbus blob-store leader-elector group-mapper 
cluster-state]
+                    (Mockito/spy (mk-nimbus conf inimbus blob-store 
leader-elector group-mapper cluster-state)))]
+  (with-mocked-nimbus [cluster :cluster-state cluster-state :blob-store 
blob-store :mk-nimbus mk-nimbus
+                       :daemon-conf {NIMBUS-AUTHORIZER 
"org.apache.storm.security.auth.authorizer.NoopAuthorizer"}]
+    (let [nimbus (:nimbus cluster)
+          expected-name "test-nimbus-check-autho-params"
+          expected-conf {TOPOLOGY-NAME expected-name
+                         TOPOLOGY-WORKERS 1
+                         TOPOLOGY-MESSAGE-TIMEOUT-SECS 30
+                         "foo" "bar"}
+          expected-operation "getTopology"
+          assignment (doto (Assignment.)
+                       (.set_executor_node_port {[1 1] (NodeInfo. "super1" 
#{1}),
+                                                 [2 2] (NodeInfo. "super2" 
#{2})}))
+          topology (doto (StormTopology. )
+                     (.set_spouts {})
+                     (.set_bolts {})
+                     (.set_state_spouts {}))
+          clojurified-assignment (clojurify-assignment assignment)
+          topo-assignment {expected-name assignment}
+          check-auth-state (atom [])
+          mock-check-authorization (fn [nimbus storm-name storm-conf 
operation] 
+                                     (swap! check-auth-state conj {:nimbus 
nimbus
+                                                                   :storm-name 
storm-name
+                                                                   :storm-conf 
storm-conf
+                                                                   :operation 
operation}))
+          all-supervisors (doto (HashMap.)
+                            (.put "super1" (doto (SupervisorInfo.) 
(.set_hostname "host1") (.set_meta [(long 1234)])
+                                             (.set_uptime_secs (long 123)) 
(.set_meta [1 2 3]) (.set_used_ports []) (.set_resources_map {})))
+                            (.put "super2" (doto (SupervisorInfo.) 
(.set_hostname "host2") (.set_meta [(long 1234)])
+                                             (.set_uptime_secs (long 123)) 
(.set_meta [1 2 3]) (.set_used_ports []) (.set_resources_map {}))))]
+      (.thenReturn (Mockito/when (.allSupervisorInfo cluster-state)) 
all-supervisors)
+      (.thenReturn (Mockito/when (.readTopologyConf blob-store (Mockito/any 
String) (Mockito/any Subject))) expected-conf)
+      (.thenReturn (Mockito/when (.readTopology blob-store (Mockito/any 
String) (Mockito/any Subject))) topology)
+      (.thenReturn (Mockito/when (.topologyAssignments cluster-state)) 
topo-assignment)
+      (stubbing [clojurify-assignment clojurified-assignment]
+        (.getSupervisorPageInfo nimbus "super1" nil true)
  
-                 (is (= expected-name (:storm-name second-call)))
-                 (is (= expected-conf (:storm-conf second-call)))
-                 (is (= "getSupervisorPageInfo" (:operation 
second-call)))))))))))
+        ;; afterwards, it should get called twice
+        (.checkAuthorization (Mockito/verify (:nimbus cluster)) expected-name 
expected-conf "getSupervisorPageInfo")
+        (.checkAuthorization (Mockito/verify (:nimbus cluster)) expected-name 
expected-conf "getTopology"))))))
 
 (deftest test-nimbus-iface-getTopology-methods-throw-correctly
   (with-local-cluster [cluster]
@@ -1444,52 +1417,56 @@
 )
 
 (deftest test-nimbus-iface-getClusterInfo-filters-topos-without-bases
-  (with-local-cluster [cluster]
-    (let [
-          nimbus (:nimbus cluster)
+  (let [cluster-state (Mockito/mock IStormClusterState)
+        blob-store (Mockito/mock BlobStore)]
+  (with-mocked-nimbus [cluster :cluster-state cluster-state :blob-store 
blob-store]
+    (let [nimbus (:nimbus cluster)
           bogus-secs 42
-          bogus-type "bogusType"
+          bogus-type TopologyStatus/ACTIVE
           bogus-bases {
                  "1" nil
-                 "2" {:launch-time-secs bogus-secs
+                 "2" (thriftify-storm-base {:launch-time-secs bogus-secs
                         :storm-name "id2-name"
-                        :status {:type bogus-type}}
+                        :status {:type bogus-type}})
                  "3" nil
-                 "4" {:launch-time-secs bogus-secs
+                 "4" (thriftify-storm-base {:launch-time-secs bogus-secs
                         :storm-name "id4-name"
-                        :status {:type bogus-type}}
+                        :status {:type bogus-type}})
                 }
+          topo-name "test-topo"
+          topo-conf {TOPOLOGY-NAME topo-name
+                     TOPOLOGY-WORKERS 1
+                     TOPOLOGY-MESSAGE-TIMEOUT-SECS 30}
+          storm-base (StormBase. )
+          topology (doto (StormTopology. )
+                     (.set_spouts {})
+                     (.set_bolts {})
+                     (.set_state_spouts {}))
         ]
-      (stubbing [nimbus/get-resources-for-topology nil
-                 nimbus/nimbus-topology-bases bogus-bases
-                 nimbus/get-blob-replication-count 1]
-        (let [topos (.get_topologies (.getClusterInfo nimbus))]
-          ; The number of topologies in the summary is correct.
-          (is (= (count
-            (filter (fn [b] (second b)) bogus-bases)) (count topos)))
-          ; Each topology present has a valid name.
-          (is (empty?
-            (filter (fn [t] (or (nil? t) (nil? (.get_name t)))) topos)))
-          ; The topologies are those with valid bases.
-          (is (empty?
-            (filter (fn [t]
-              (or
-                (nil? t)
-                (not (number? (read-string (.get_id t))))
-                (odd? (read-string (.get_id t)))
-              )) topos)))
-        )
+      (.thenReturn (Mockito/when (.stormBase cluster-state (Mockito/any 
String) (Mockito/anyObject))) storm-base)
+      (.thenReturn (Mockito/when (.topologyBases cluster-state)) bogus-bases)
+      (.thenReturn (Mockito/when (.readTopologyConf blob-store (Mockito/any 
String) (Mockito/any Subject))) topo-conf)
+      (.thenReturn (Mockito/when (.readTopology blob-store (Mockito/any 
String) (Mockito/any Subject))) topology)
+ 
+      (let [topos (.get_topologies (.getClusterInfo nimbus))]
+        ; The number of topologies in the summary is correct.
+        (is (= (count
+          (filter (fn [b] (second b)) bogus-bases)) (count topos)))
+        ; Each topology present has a valid name.
+        (is (empty?
+          (filter (fn [t] (or (nil? t) (nil? (.get_name t)))) topos)))
+        ; The topologies are those with valid bases.
+        (is (empty?
+          (filter (fn [t]
+            (or
+              (nil? t)
+              (not (number? (read-string (.get_id t))))
+              (odd? (read-string (.get_id t)))
+            )) topos)))
       )
     )
   )
-)
-
-(deftest test-defserverfn-numbus-iface-instance
-  (test-nimbus-iface-submitTopologyWithOpts-checks-authorization)
-  (test-nimbus-iface-methods-check-authorization)
-  (test-nimbus-iface-getTopology-methods-throw-correctly)
-  (test-nimbus-iface-getClusterInfo-filters-topos-without-bases)
-)
+))
 
 (deftest test-nimbus-data-acls
   (testing "nimbus-data uses correct ACLs"
@@ -1500,8 +1477,8 @@
                      STORM-ZOOKEEPER-AUTH-PAYLOAD digest
                      STORM-PRINCIPAL-TO-LOCAL-PLUGIN 
"org.apache.storm.security.auth.DefaultPrincipalToLocal"
                      NIMBUS-THRIFT-PORT 6666})
-          expected-acls nimbus/NIMBUS-ZK-ACLS
-          fake-inimbus (reify INimbus (getForcedScheduler [this] nil))
+          expected-acls Nimbus/ZK_ACLS
+          fake-inimbus (reify INimbus (getForcedScheduler [this] nil) (prepare 
[this conf dir] nil))
           fake-cu (proxy [ConfigUtils] []
                     (nimbusTopoHistoryStateImpl [conf] nil))
           fake-utils (proxy [Utils] []
@@ -1517,13 +1494,9 @@
                   zk-le (MockedZookeeper. (proxy [Zookeeper] []
                           (zkLeaderElectorImpl [conf blob-store] nil)))
                   mocked-cluster (MockedCluster. cluster-utils)]
-        (stubbing [nimbus/file-cache-map nil
-                   nimbus/mk-blob-cache-map nil
-                   nimbus/mk-bloblist-cache-map nil
-                   nimbus/mk-scheduler nil]
-          (nimbus/nimbus-data auth-conf fake-inimbus)
+          (Nimbus. auth-conf fake-inimbus)
           (.mkStormClusterStateImpl (Mockito/verify cluster-utils 
(Mockito/times 1)) (Mockito/any) (Mockito/eq expected-acls) (Mockito/any))
-          )))))
+          ))))
 
 (deftest test-file-bogus-download
   (with-local-cluster [cluster :daemon-conf {SUPERVISOR-ENABLE false 
TOPOLOGY-ACKER-EXECUTORS 0 TOPOLOGY-EVENTLOGGER-EXECUTORS 0}]
@@ -1534,12 +1507,14 @@
       )))
 
 (deftest test-validate-topo-config-on-submit
-  (with-local-cluster [cluster]
-    (let [nimbus (:nimbus cluster)
-          topology (Thrift/buildTopology {} {})
-          bad-config {"topology.isolate.machines" "2"}]
-      ; Fake good authorization as part of setup.
-      (mocking [nimbus/check-authorization!]
+  (let [cluster-state (Mockito/mock IStormClusterState)
+        blob-store (Mockito/mock BlobStore)]
+    (.thenReturn (Mockito/when (.getTopoId cluster-state "test")) 
(Optional/empty))
+    (with-mocked-nimbus [cluster :cluster-state cluster-state :blob-store 
blob-store
+                         :daemon-conf {NIMBUS-AUTHORIZER 
"org.apache.storm.security.auth.authorizer.NoopAuthorizer"}]
+      (let [nimbus (:nimbus cluster)
+            topology (Thrift/buildTopology {} {})
+            bad-config {"topology.isolate.machines" "2"}]
         (is (thrown-cause? InvalidTopologyException
           (submit-local-topology-with-opts nimbus "test" bad-config topology
                                            (SubmitOptions.))))))))
@@ -1555,7 +1530,8 @@
                       STORM-ZOOKEEPER-PORT zk-port
                       STORM-LOCAL-DIR nimbus-dir}))
         (bind cluster-state (ClusterUtils/mkStormClusterState conf nil 
(ClusterStateContext.)))
-        (bind nimbus (nimbus/service-handler conf (nimbus/standalone-nimbus)))
+        (bind nimbus (mk-nimbus conf (Nimbus$StandaloneINimbus.) nil nil nil 
nil))
+        (.launchServer nimbus)
         (Time/sleepSecs 1)
         (bind topology (Thrift/buildTopology
                          {"1" (Thrift/prepareSpoutDetails
@@ -1570,7 +1546,8 @@
         ; in startup of nimbus it reads cluster state and take proper actions
         ; in this case nimbus registers topology transition event to scheduler 
again
         ; before applying STORM-856 nimbus was killed with NPE
-        (bind nimbus (nimbus/service-handler conf (nimbus/standalone-nimbus)))
+        (bind nimbus (mk-nimbus conf (Nimbus$StandaloneINimbus.) nil nil nil 
nil))
+        (.launchServer nimbus)
         (.shutdown nimbus)
         (.disconnect cluster-state)
         ))))
@@ -1588,7 +1565,8 @@
                         STORM-LOCAL-DIR nimbus-dir
                         NIMBUS-TOPOLOGY-ACTION-NOTIFIER-PLUGIN (.getName 
InMemoryTopologyActionNotifier)}))
           (bind cluster-state (ClusterUtils/mkStormClusterState conf nil 
(ClusterStateContext.)))
-          (bind nimbus (nimbus/service-handler conf 
(nimbus/standalone-nimbus)))
+          (bind nimbus (mk-nimbus conf (Nimbus$StandaloneINimbus.) nil nil nil 
nil))
+          (.launchServer nimbus)
           (bind notifier (InMemoryTopologyActionNotifier.))
           (Time/sleepSecs 1)
           (bind topology (Thrift/buildTopology
@@ -1638,55 +1616,73 @@
 ;; if the user sends an empty log config, nimbus will say that all 
 ;; log configs it contains are LogLevelAction/UNCHANGED
 (deftest empty-save-config-results-in-all-unchanged-actions
-  (with-local-cluster [cluster]
-    (let [nimbus (:nimbus cluster)
-          previous-config (LogConfig.)
-          level (LogLevel.)
-          mock-config (LogConfig.)]
-      ;; send something with content to nimbus beforehand
-      (.set_target_log_level level "ERROR")
-      (.set_action level LogLevelAction/UPDATE)
-      (.put_to_named_logger_level previous-config "test" level)
-      (stubbing [nimbus/check-storm-active! nil
-                 nimbus/try-read-storm-conf {}]
-        (.setLogConfig nimbus "foo" previous-config)
+  (let [cluster-state (Mockito/mock IStormClusterState)
+        blob-store (Mockito/mock BlobStore)]
+    (with-mocked-nimbus [cluster :cluster-state cluster-state :blob-store 
blob-store
+                         :daemon-conf {NIMBUS-AUTHORIZER 
"org.apache.storm.security.auth.authorizer.NoopAuthorizer"}]
+      (let [nimbus (:nimbus cluster)
+            previous-config (LogConfig.)
+            mock-config (LogConfig.)
+            expected-config (LogConfig.)]
+        ;; send something with content to nimbus beforehand
+        (.put_to_named_logger_level previous-config "test" 
+          (doto (LogLevel.)
+            (.set_target_log_level "ERROR")
+            (.set_action LogLevelAction/UPDATE)))
+
+        (.put_to_named_logger_level expected-config "test" 
+          (doto (LogLevel.)
+            (.set_target_log_level "ERROR")
+            (.set_action LogLevelAction/UNCHANGED)))
+
+        (.thenReturn (Mockito/when (.readTopologyConf blob-store (Mockito/any 
String) (Mockito/anyObject))) {})
+        (.thenReturn (Mockito/when (.topologyLogConfig cluster-state 
(Mockito/any String) (Mockito/anyObject))) previous-config)
+ 
         (.setLogConfig nimbus "foo" mock-config)
-        (let [saved-config (.getLogConfig nimbus "foo")
-              levels (.get_named_logger_level saved-config)]
-           (is (= (.get_action (.get levels "test")) 
LogLevelAction/UNCHANGED)))))))
+        (.setTopologyLogConfig (Mockito/verify cluster-state) (Mockito/any 
String) (Mockito/eq expected-config))))))
 
 (deftest log-level-update-merges-and-flags-existent-log-level
-  (with-local-cluster [cluster]
-    (stubbing [nimbus/check-storm-active! nil
-               nimbus/try-read-storm-conf {}]
+  (let [cluster-state (Mockito/mock IStormClusterState)
+        blob-store (Mockito/mock BlobStore)]
+    (with-mocked-nimbus [cluster :cluster-state cluster-state :blob-store 
blob-store
+                         :daemon-conf {NIMBUS-AUTHORIZER 
"org.apache.storm.security.auth.authorizer.NoopAuthorizer"}]
       (let [nimbus (:nimbus cluster)
             previous-config (LogConfig.)
-            level (LogLevel.)
-            other-level (LogLevel.)
-            mock-config (LogConfig.)]
+            mock-config (LogConfig.)
+            expected-config (LogConfig.)]
         ;; send something with content to nimbus beforehand
-        (.set_target_log_level level "ERROR")
-        (.set_action level LogLevelAction/UPDATE)
-        (.put_to_named_logger_level previous-config "test" level)
+        (.put_to_named_logger_level previous-config "test" 
+          (doto (LogLevel.)
+            (.set_target_log_level "ERROR")
+            (.set_action LogLevelAction/UPDATE)))
+
+        (.put_to_named_logger_level previous-config "other-test" 
+          (doto (LogLevel.)
+            (.set_target_log_level "DEBUG")
+            (.set_action LogLevelAction/UPDATE)))
 
-        (.set_target_log_level other-level "DEBUG")
-        (.set_action other-level LogLevelAction/UPDATE)
-        (.put_to_named_logger_level previous-config "other-test" other-level)
-        (.setLogConfig nimbus "foo" previous-config)
 
         ;; only change "test"
-        (.set_target_log_level level "INFO")
-        (.set_action level LogLevelAction/UPDATE)
-        (.put_to_named_logger_level mock-config "test" level)
+        (.put_to_named_logger_level mock-config "test" 
+          (doto (LogLevel.)
+            (.set_target_log_level "INFO")
+            (.set_action LogLevelAction/UPDATE)))
+
+        (.put_to_named_logger_level expected-config "test" 
+          (doto (LogLevel.)
+            (.set_target_log_level "INFO")
+            (.set_action LogLevelAction/UPDATE)))
+
+        (.put_to_named_logger_level expected-config "other-test" 
+          (doto (LogLevel.)
+            (.set_target_log_level "DEBUG")
+            (.set_action LogLevelAction/UNCHANGED)))
+
+        (.thenReturn (Mockito/when (.readTopologyConf blob-store (Mockito/any 
String) (Mockito/anyObject))) {})
+        (.thenReturn (Mockito/when (.topologyLogConfig cluster-state 
(Mockito/any String) (Mockito/anyObject))) previous-config)
+ 
         (.setLogConfig nimbus "foo" mock-config)
-
-        (let [saved-config (.getLogConfig nimbus "foo")
-              levels (.get_named_logger_level saved-config)]
-           (is (= (.get_action (.get levels "test")) LogLevelAction/UPDATE))
-           (is (= (.get_target_log_level (.get levels "test")) "INFO"))
-
-           (is (= (.get_action (.get levels "other-test")) 
LogLevelAction/UNCHANGED))
-           (is (= (.get_target_log_level (.get levels "other-test")) 
"DEBUG")))))))
+        (.setTopologyLogConfig (Mockito/verify cluster-state) (Mockito/any 
String) (Mockito/eq expected-config))))))
 
 (defn teardown-heartbeats [id])
 (defn teardown-topo-errors [id])
@@ -1708,120 +1704,106 @@
       (backpressureTopologies [this] bp-topos))))
 
 (deftest cleanup-storm-ids-returns-inactive-topos
-         (let [mock-state (mock-cluster-state (list "topo1") (list "topo1" 
"topo2" "topo3"))]
-              (stubbing [nimbus/is-leader true
-                         nimbus/code-ids {}]
-                        (is (= (nimbus/cleanup-storm-ids mock-state nil) 
#{"topo2" "topo3"})))))
+         (let [mock-state (mock-cluster-state (list "topo1") (list "topo1" 
"topo2" "topo3"))
+               store (Mockito/mock BlobStore)]
+              (.thenReturn (Mockito/when (.storedTopoIds store)) #{})
+              (is (= (Nimbus/topoIdsToClean mock-state store) #{"topo2" 
"topo3"}))))
 
 (deftest cleanup-storm-ids-performs-union-of-storm-ids-with-active-znodes
   (let [active-topos (list "hb1" "e2" "bp3")
         hb-topos (list "hb1" "hb2" "hb3")
         error-topos (list "e1" "e2" "e3")
         bp-topos (list "bp1" "bp2" "bp3")
-        mock-state (mock-cluster-state active-topos hb-topos error-topos 
bp-topos)]
-    (stubbing [nimbus/is-leader true
-               nimbus/code-ids {}] 
-    (is (= (nimbus/cleanup-storm-ids mock-state nil) 
-           #{"hb2" "hb3" "e1" "e3" "bp1" "bp2"})))))
+        mock-state (mock-cluster-state active-topos hb-topos error-topos 
bp-topos)
+        store (Mockito/mock BlobStore)]
+    (.thenReturn (Mockito/when (.storedTopoIds store)) #{})
+    (is (= (Nimbus/topoIdsToClean mock-state store)
+           #{"hb2" "hb3" "e1" "e3" "bp1" "bp2"}))))
 
 (deftest cleanup-storm-ids-returns-empty-set-when-all-topos-are-active
   (let [active-topos (list "hb1" "hb2" "hb3" "e1" "e2" "e3" "bp1" "bp2" "bp3")
         hb-topos (list "hb1" "hb2" "hb3")
         error-topos (list "e1" "e2" "e3")
         bp-topos (list "bp1" "bp2" "bp3")
-        mock-state (mock-cluster-state active-topos hb-topos error-topos 
bp-topos)]
-    (stubbing [nimbus/is-leader true
-               nimbus/code-ids {}] 
-    (is (= (nimbus/cleanup-storm-ids mock-state nil) 
-           #{})))))
+        mock-state (mock-cluster-state active-topos hb-topos error-topos 
bp-topos)
+        store (Mockito/mock BlobStore)]
+    (.thenReturn (Mockito/when (.storedTopoIds store)) #{})
+    (is (= (Nimbus/topoIdsToClean mock-state store) 
+           #{}))))
 
 (deftest do-cleanup-removes-inactive-znodes
   (let [inactive-topos (list "topo2" "topo3")
-        hb-cache (atom (into {}(map vector inactive-topos '(nil nil))))
+        hb-cache (into {}(map vector inactive-topos '(nil nil)))
         mock-state (mock-cluster-state)
-        mock-blob-store {}
-        conf {}
-        nimbus {:conf conf
-                :submit-lock mock-blob-store 
-                :blob-store {}
-                :storm-cluster-state mock-state
-                :heartbeats-cache hb-cache}]
-
-    (stubbing [nimbus/is-leader true
-               nimbus/blob-rm-topology-keys nil
-               nimbus/cleanup-storm-ids inactive-topos]
-      (mocking
-        [teardown-heartbeats 
-         teardown-topo-errors 
-         teardown-backpressure-dirs
-         nimbus/force-delete-topo-dist-dir
-         nimbus/blob-rm-topology-keys
-         nimbus/blob-rm-dependency-jars-in-topology]
-
-        (nimbus/do-cleanup nimbus)
-
-        ;; removed heartbeats znode
-        (verify-nth-call-args-for 1 teardown-heartbeats "topo2")
-        (verify-nth-call-args-for 2 teardown-heartbeats "topo3")
-
-        ;; removed topo errors znode
-        (verify-nth-call-args-for 1 teardown-topo-errors "topo2")
-        (verify-nth-call-args-for 2 teardown-topo-errors "topo3")
-
-        ;; removed backpressure znodes
-        (verify-nth-call-args-for 1 teardown-backpressure-dirs "topo2")
-        (verify-nth-call-args-for 2 teardown-backpressure-dirs "topo3")
-
-        ;; removed topo directories
-        (verify-nth-call-args-for 1 nimbus/force-delete-topo-dist-dir conf 
"topo2")
-        (verify-nth-call-args-for 2 nimbus/force-delete-topo-dist-dir conf 
"topo3")
-
-        ;; removed blob store topo keys
-        (verify-nth-call-args-for 1 nimbus/blob-rm-topology-keys "topo2" 
mock-blob-store mock-state)
-        (verify-nth-call-args-for 2 nimbus/blob-rm-topology-keys "topo3" 
mock-blob-store mock-state)
-
-        ;; removed topology dependencies
-        (verify-nth-call-args-for 1 nimbus/blob-rm-dependency-jars-in-topology 
"topo2" mock-blob-store mock-state)
-        (verify-nth-call-args-for 2 nimbus/blob-rm-dependency-jars-in-topology 
"topo3" mock-blob-store mock-state)
-
-        ;; remove topos from heartbeat cache
-        (is (= (count @hb-cache) 0))))))
+        mock-blob-store (Mockito/mock BlobStore)
+        conf {}]
+    (with-open [_ (MockedZookeeper. (proxy [Zookeeper] []
+                    (zkLeaderElectorImpl [conf blob-store] 
(mock-leader-elector))))]
+      (let [nimbus (Mockito/spy (Nimbus. conf nil mock-state nil 
mock-blob-store nil nil))]
+        (.set (.getHeartbeatsCache nimbus) hb-cache)
+        (.thenReturn (Mockito/when (.storedTopoIds mock-blob-store)) (HashSet. 
inactive-topos))
+        (mocking
+          [teardown-heartbeats 
+           teardown-topo-errors 
+           teardown-backpressure-dirs]
+
+          (.doCleanup nimbus)
+
+          ;; removed heartbeats znode
+          (verify-nth-call-args-for 1 teardown-heartbeats "topo2")
+          (verify-nth-call-args-for 2 teardown-heartbeats "topo3")
+
+          ;; removed topo errors znode
+          (verify-nth-call-args-for 1 teardown-topo-errors "topo2")
+          (verify-nth-call-args-for 2 teardown-topo-errors "topo3")
+
+          ;; removed backpressure znodes
+          (verify-nth-call-args-for 1 teardown-backpressure-dirs "topo2")
+          (verify-nth-call-args-for 2 teardown-backpressure-dirs "topo3")
+
+          ;; removed topo directories
+          (.forceDeleteTopoDistDir (Mockito/verify nimbus) "topo2")
+          (.forceDeleteTopoDistDir (Mockito/verify nimbus) "topo3")
+
+          ;; removed blob store topo keys
+          (.rmTopologyKeys (Mockito/verify nimbus) "topo2")
+          (.rmTopologyKeys (Mockito/verify nimbus) "topo3")
+
+          ;; removed topology dependencies
+          (.rmDependencyJarsInTopology (Mockito/verify nimbus) "topo2")
+          (.rmDependencyJarsInTopology (Mockito/verify nimbus) "topo3")
+
+          ;; remove topos from heartbeat cache
+          (is (= (count (.get (.getHeartbeatsCache nimbus))) 0)))))))
 
 (deftest do-cleanup-does-not-teardown-active-topos
   (let [inactive-topos ()
-        hb-cache (atom {"topo1" nil "topo2" nil})
+        hb-cache {"topo1" nil "topo2" nil}
         mock-state (mock-cluster-state)
-        mock-blob-store {}
-        conf {}
-        nimbus {:conf conf
-                :submit-lock mock-blob-store 
-                :blob-store {}
-                :storm-cluster-state mock-state
-                :heartbeats-cache hb-cache}]
-
-    (stubbing [nimbus/is-leader true
-               nimbus/blob-rm-topology-keys nil
-               nimbus/cleanup-storm-ids inactive-topos]
-      (mocking
-        [teardown-heartbeats 
-         teardown-topo-errors 
-         teardown-backpressure-dirs
-         nimbus/force-delete-topo-dist-dir
-         nimbus/blob-rm-topology-keys] 
-
-        (nimbus/do-cleanup nimbus)
-
-        (verify-call-times-for teardown-heartbeats 0)
-        (verify-call-times-for teardown-topo-errors 0)
-        (verify-call-times-for teardown-backpressure-dirs 0)
-        (verify-call-times-for nimbus/force-delete-topo-dist-dir 0)
-        (verify-call-times-for nimbus/blob-rm-topology-keys 0)
-
-        ;; hb-cache goes down to 1 because only one topo was inactive
-        (is (= (count @hb-cache) 2))
-        (is (contains? @hb-cache "topo1"))
-        (is (contains? @hb-cache "topo2"))))))
-
+        mock-blob-store (Mockito/mock BlobStore)
+        conf {}]
+    (with-open [_ (MockedZookeeper. (proxy [Zookeeper] []
+                    (zkLeaderElectorImpl [conf blob-store] 
(mock-leader-elector))))]
+      (let [nimbus (Mockito/spy (Nimbus. conf nil mock-state nil 
mock-blob-store nil nil))]
+        (.set (.getHeartbeatsCache nimbus) hb-cache)
+        (.thenReturn (Mockito/when (.storedTopoIds mock-blob-store)) (set 
inactive-topos))
+        (mocking
+          [teardown-heartbeats 
+           teardown-topo-errors 
+           teardown-backpressure-dirs]
+
+          (.doCleanup nimbus)
+
+          (verify-call-times-for teardown-heartbeats 0)
+          (verify-call-times-for teardown-topo-errors 0)
+          (verify-call-times-for teardown-backpressure-dirs 0)
+          (.forceDeleteTopoDistDir (Mockito/verify nimbus (Mockito/times 0)) 
(Mockito/anyObject))
+          (.rmTopologyKeys (Mockito/verify nimbus (Mockito/times 0)) 
(Mockito/anyObject))
+
+          ;; hb-cache goes down to 1 because only one topo was inactive
+          (is (= (count (.get (.getHeartbeatsCache nimbus))) 2))
+          (is (contains? (.get (.getHeartbeatsCache nimbus)) "topo1"))
+          (is (contains? (.get (.getHeartbeatsCache nimbus)) "topo2")))))))
 
 (deftest user-topologies-for-supervisor
   (let [assignment (doto (Assignment.)
@@ -1830,18 +1812,18 @@
         assignment2 (doto (Assignment.)
                      (.set_executor_node_port {[1 1] (NodeInfo. "super2" #{2}),
                                                [2 2] (NodeInfo. "super2" 
#{2})}))
-        assignments {"topo1" assignment, "topo2" assignment2}]
-    (stubbing [nimbus/is-authorized? true]
-      (let [topos1 (nimbus/user-and-supervisor-topos nil nil nil assignments 
"super1")
-            topos2 (nimbus/user-and-supervisor-topos nil nil nil assignments 
"super2")]
-        (is (= (list "topo1") (:supervisor-topologies topos1)))
-        (is (= #{"topo1"} (:user-topologies topos1))) 
-        (is (= (list "topo1" "topo2") (:supervisor-topologies topos2)))
-        (is (= #{"topo1" "topo2"} (:user-topologies topos2)))))))
-
-(defn- mock-check-auth 
-  [nimbus conf blob-store op topo-name]
-  (= topo-name "authorized"))
+        assignments {"topo1" assignment, "topo2" assignment2}
+        mock-state (mock-cluster-state)
+        mock-blob-store (Mockito/mock BlobStore)
+        nimbus (Nimbus. {} nil mock-state nil mock-blob-store 
(mock-leader-elector) nil)]
+    (let [supervisor1-topologies (clojurify-structure 
(Nimbus/topologiesOnSupervisor assignments "super1"))
+          user1-topologies (clojurify-structure (.filterAuthorized nimbus 
"getTopology" supervisor1-topologies))
+          supervisor2-topologies (clojurify-structure 
(Nimbus/topologiesOnSupervisor assignments "super2"))
+          user2-topologies (clojurify-structure (.filterAuthorized nimbus 
"getTopology" supervisor2-topologies))]
+      (is (= (list "topo1") supervisor1-topologies))
+      (is (= #{"topo1"} user1-topologies)) 
+      (is (= (list "topo1" "topo2") supervisor2-topologies))
+      (is (= #{"topo1" "topo2"} user2-topologies)))))
 
 (deftest user-topologies-for-supervisor-with-unauthorized-user
   (let [assignment (doto (Assignment.)
@@ -1850,8 +1832,15 @@
         assignment2 (doto (Assignment.)
                      (.set_executor_node_port {[1 1] (NodeInfo. "super1" #{2}),
                                                [2 2] (NodeInfo. "super2" 
#{2})}))
-        assignments {"topo1" assignment, "authorized" assignment2}] 
-    (stubbing [nimbus/is-authorized? mock-check-auth]
-      (let [topos (nimbus/user-and-supervisor-topos nil nil nil assignments 
"super1")]
-        (is (= (list "topo1" "authorized") (:supervisor-topologies topos)))
-        (is (= #{"authorized"} (:user-topologies topos)))))))
+        assignments {"topo1" assignment, "authorized" assignment2}
+        mock-state (mock-cluster-state)
+        mock-blob-store (Mockito/mock BlobStore)
+        nimbus (Nimbus. {} nil mock-state nil mock-blob-store 
(mock-leader-elector) nil)]
+    (.thenReturn (Mockito/when (.readTopologyConf mock-blob-store (Mockito/eq 
"authorized") (Mockito/anyObject))) {TOPOLOGY-NAME "authorized"})
+    (.thenReturn (Mockito/when (.readTopologyConf mock-blob-store (Mockito/eq 
"topo1") (Mockito/anyObject))) {TOPOLOGY-NAME "topo1"})
+    (.setAuthorizationHandler nimbus (reify IAuthorizer (permit [this context 
operation topo-conf] (= "authorized" (get topo-conf TOPOLOGY-NAME)))))
+    (let [supervisor-topologies (clojurify-structure 
(Nimbus/topologiesOnSupervisor assignments "super1"))
+          user-topologies (clojurify-structure (.filterAuthorized nimbus 
"getTopology" supervisor-topologies))]
+ 
+      (is (= (list "topo1" "authorized") supervisor-topologies))
+      (is (= #{"authorized"} user-topologies)))))

Reply via email to