http://git-wip-us.apache.org/repos/asf/storm/blob/725003cc/storm-core/src/clj/org/apache/storm/testing.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/testing.clj 
b/storm-core/src/clj/org/apache/storm/testing.clj
index ca60c02..60f5a1b 100644
--- a/storm-core/src/clj/org/apache/storm/testing.clj
+++ b/storm-core/src/clj/org/apache/storm/testing.clj
@@ -16,7 +16,6 @@
 
 (ns org.apache.storm.testing
   (:require [org.apache.storm.daemon
-             [nimbus :as nimbus]
              [local-supervisor :as local-supervisor]
              [common :as common]])
   (:import [org.apache.commons.io FileUtils]
@@ -33,6 +32,7 @@
   (:import [org.apache.storm.utils Time Utils IPredicate RegisteredGlobalState 
ConfigUtils LocalState StormCommonInstaller])
   (:import [org.apache.storm.tuple Fields Tuple TupleImpl])
   (:import [org.apache.storm.task TopologyContext])
+  (:import [org.apache.storm.nimbus ILeaderElector NimbusInfo])
   (:import [org.apache.storm.generated GlobalStreamId Bolt KillOptions])
   (:import [org.apache.storm.testing FeederSpout FixedTupleSpout FixedTuple
             TupleCaptureBolt SpoutTracker BoltTracker NonRichBoltTracker
@@ -49,9 +49,10 @@
   (:import [org.apache.storm Config])
   (:import [org.apache.storm.generated StormTopology])
   (:import [org.apache.storm.task TopologyContext]
-           (org.apache.storm.messaging IContext)
+           [org.apache.storm.messaging IContext]
            [org.json.simple JSONValue]
-           (org.apache.storm.daemon StormCommon Acker DaemonCommon))
+           [org.apache.storm.daemon.nimbus Nimbus Nimbus$StandaloneINimbus]
+           [org.apache.storm.daemon StormCommon Acker DaemonCommon])
   (:import [org.apache.storm.cluster ZKStateStorage ClusterStateContext 
StormClusterStateImpl ClusterUtils])
   (:use [org.apache.storm util config log local-state-converter converter])
   (:use [org.apache.storm.internal thrift]))
@@ -167,11 +168,60 @@
     (let [val (atom (dec start-val))]
       (fn [] (swap! val inc)))))
 
+(defnk mock-leader-elector [:is-leader true :leader-name "test-host" 
:leader-port 9999]
+  (let [leader-address (NimbusInfo. leader-name leader-port true)]
+    (reify ILeaderElector
+      (prepare [this conf] true)
+      (isLeader [this] is-leader)
+      (addToLeaderLockQueue [this] true)
+      (getLeader [this] leader-address)
+      (getAllNimbuses [this] `(leader-address))
+      (close [this] true))))
+
+(defn mk-nimbus
+  [conf inimbus blob-store leader-elector group-mapper cluster-state]
+  (Nimbus. conf inimbus cluster-state nil blob-store leader-elector 
group-mapper))
+
+(defnk mk-mocked-nimbus 
+  [:daemon-conf {} :inimbus nil :blob-store nil :cluster-state nil 
+   :leader-elector nil :group-mapper nil :nimbus-daemon false :mk-nimbus 
mk-nimbus]
+  (let [zk-tmp (local-temp-path)
+        [zk-port zk-handle] (if-not cluster-state
+                              (Zookeeper/mkInprocessZookeeper zk-tmp nil))
+        leader-elector (or leader-elector (if zk-handle leader-elector 
(mock-leader-elector)))
+        nimbus-tmp (local-temp-path)
+        daemon-conf (merge (clojurify-structure (ConfigUtils/readStormConfig))
+                           {TOPOLOGY-SKIP-MISSING-KRYO-REGISTRATIONS true
+                            ZMQ-LINGER-MILLIS 0
+                            TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS false
+                            TOPOLOGY-TRIDENT-BATCH-EMIT-INTERVAL-MILLIS 50
+                            STORM-CLUSTER-MODE "local"
+                            BLOBSTORE-SUPERUSER (System/getProperty 
"user.name")
+                            BLOBSTORE-DIR nimbus-tmp}
+                           (if-not cluster-state
+                             {STORM-ZOOKEEPER-PORT zk-port
+                              STORM-ZOOKEEPER-SERVERS ["localhost"]})
+                           daemon-conf)
+        nimbus (mk-nimbus
+                (assoc daemon-conf STORM-LOCAL-DIR nimbus-tmp)
+                (if inimbus inimbus (Nimbus$StandaloneINimbus.))
+                blob-store
+                leader-elector
+                group-mapper
+                cluster-state)
+        _ (.launchServer nimbus)
+        nimbus-thrift-server (if nimbus-daemon (start-nimbus-daemon 
daemon-conf nimbus) nil)]
+    {:nimbus nimbus
+     :daemon-conf daemon-conf
+     :tmp-dirs (atom [nimbus-tmp zk-tmp])
+     :nimbus-thrift-server nimbus-thrift-server
+     :zookeeper (if (not-nil? zk-handle) zk-handle)}))
+
 ;; returns map containing cluster info
 ;; local dir is always overridden in maps
 ;; can customize the supervisors (except for ports) by passing in map for 
:supervisors parameter
 ;; if need to customize amt of ports more, can use add-supervisor calls 
afterwards
-(defnk mk-local-storm-cluster [:supervisors 2 :ports-per-supervisor 3 
:daemon-conf {} :inimbus nil :supervisor-slot-port-min 1024 :nimbus-daemon 
false]
+(defnk mk-local-storm-cluster [:supervisors 2 :ports-per-supervisor 3 
:daemon-conf {} :inimbus nil :group-mapper nil :supervisor-slot-port-min 1024 
:nimbus-daemon false]
   (let [zk-tmp (local-temp-path)
         [zk-port zk-handle] (if-not (contains? daemon-conf 
STORM-ZOOKEEPER-SERVERS)
                               (Zookeeper/mkInprocessZookeeper zk-tmp nil))
@@ -189,9 +239,14 @@
                               STORM-ZOOKEEPER-SERVERS ["localhost"]})
                            daemon-conf)
         port-counter (mk-counter supervisor-slot-port-min)
-        nimbus (nimbus/service-handler
-                (assoc daemon-conf STORM-LOCAL-DIR nimbus-tmp)
-                (if inimbus inimbus (nimbus/standalone-nimbus)))
+        nimbus (mk-nimbus
+                 (assoc daemon-conf STORM-LOCAL-DIR nimbus-tmp)
+                 (if inimbus inimbus (Nimbus$StandaloneINimbus.))
+                 nil
+                 nil
+                 group-mapper
+                 nil)
+        _ (.launchServer nimbus)
         context (mk-shared-context daemon-conf)
         nimbus-thrift-server (if nimbus-daemon (start-nimbus-daemon 
daemon-conf nimbus) nil)
         cluster-map {:nimbus nimbus
@@ -242,11 +297,12 @@
         (.stop (:nimbus-thrift-server cluster-map))
         (catch Exception e (log-message "failed to stop thrift")))
       ))
-  (.close (:state cluster-map))
-  (.disconnect (:storm-cluster-state cluster-map))
-  (doseq [s @(:supervisors cluster-map)]
-    (.shutdownAllWorkers s nil ReadClusterState/THREAD_DUMP_ON_ERROR)
-    (.close s))
+  (if (:state cluster-map) (.close (:state cluster-map)))
+  (if (:storm-cluster-state cluster-map) (.disconnect (:storm-cluster-state 
cluster-map)))
+  (if (:supervisors cluster-map)
+    (doseq [s @(:supervisors cluster-map)]
+      (.shutdownAllWorkers s nil ReadClusterState/THREAD_DUMP_ON_ERROR)
+      (.close s)))
   (ProcessSimulator/killAllProcesses)
   (if (not-nil? (:zookeeper cluster-map))
     (do
@@ -315,6 +371,21 @@
   ([cluster-map secs]
    (advance-cluster-time cluster-map secs 1)))
 
+(defmacro with-mocked-nimbus
+  [[nimbus-sym & args] & body]
+  `(let [~nimbus-sym (mk-mocked-nimbus ~@args)]
+     (try
+       ~@body
+       (catch Throwable t#
+         (log-error t# "Error in cluster")
+         (throw t#))
+       (finally
+         (let [keep-waiting?# (atom true)
+               f# (future (while @keep-waiting?# (simulate-wait ~nimbus-sym)))]
+           (kill-local-storm-cluster ~nimbus-sym)
+           (reset! keep-waiting?# false)
+            @f#)))))
+
 (defmacro with-local-cluster
   [[cluster-sym & args] & body]
   `(let [~cluster-sym (mk-local-storm-cluster ~@args)]
@@ -356,48 +427,6 @@
     (throw (IllegalArgumentException. "Topology conf is not 
json-serializable")))
   (.submitTopologyWithOpts nimbus storm-name nil (JSONValue/toJSONString conf) 
topology submit-opts))
 
-(defn mocked-convert-assignments-to-worker->resources [storm-cluster-state 
storm-name worker->resources]
-  (fn [existing-assignments]
-    (let [topology-id (StormCommon/getStormId storm-cluster-state storm-name)
-          existing-assignments (into {} (for [[tid assignment] 
existing-assignments]
-                                          {tid (:worker->resources 
assignment)}))
-          new-assignments (assoc existing-assignments topology-id 
worker->resources)]
-      new-assignments)))
-
-(defn mocked-compute-new-topology->executor->node+port [storm-cluster-state 
storm-name executor->node+port]
-  (fn [new-scheduler-assignments existing-assignments]
-    (let [topology-id (StormCommon/getStormId storm-cluster-state storm-name)
-          existing-assignments (into {} (for [[tid assignment] 
existing-assignments]
-                                          {tid (:executor->node+port 
assignment)}))
-          new-assignments (assoc existing-assignments topology-id 
executor->node+port)]
-      new-assignments)))
-
-(defn mocked-compute-new-scheduler-assignments []
-  (fn [nimbus existing-assignments topologies scratch-topology-id]
-    existing-assignments))
-
-(defn submit-mocked-assignment
-  [nimbus storm-cluster-state storm-name conf topology task->component 
executor->node+port worker->resources]
-  (let [fake-common (proxy [StormCommon] []
-                      (stormTaskInfoImpl [_] task->component))]
-    (with-open [- (StormCommonInstaller. fake-common)]
-      (with-var-roots [nimbus/compute-new-scheduler-assignments 
(mocked-compute-new-scheduler-assignments)
-                       nimbus/convert-assignments-to-worker->resources 
(mocked-convert-assignments-to-worker->resources
-                                                              
storm-cluster-state
-                                                              storm-name
-                                                              
worker->resources)
-                       nimbus/compute-new-topology->executor->node+port 
(mocked-compute-new-topology->executor->node+port
-                                                                          
storm-cluster-state
-                                                                          
storm-name
-                                                                          
executor->node+port)]
-        (submit-local-topology nimbus storm-name conf topology)))))
-
-(defn find-worker-id
-  [supervisor-conf port]
-  (let [supervisor-state (ConfigUtils/supervisorState supervisor-conf)
-        worker->port (.getApprovedWorkers ^LocalState supervisor-state)]
-    (first ((clojurify-structure (Utils/reverseMap worker->port)) port))))
-
 (defn find-worker-port
   [supervisor-conf worker-id]
   (let [supervisor-state (ConfigUtils/supervisorState supervisor-conf)

http://git-wip-us.apache.org/repos/asf/storm/blob/725003cc/storm-core/src/clj/org/apache/storm/testing4j.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/testing4j.clj 
b/storm-core/src/clj/org/apache/storm/testing4j.clj
index b2acffc..41f8258 100644
--- a/storm-core/src/clj/org/apache/storm/testing4j.clj
+++ b/storm-core/src/clj/org/apache/storm/testing4j.clj
@@ -18,7 +18,6 @@
   (:require [org.apache.storm [LocalCluster :as LocalCluster]])
   (:import [org.apache.storm Config ILocalCluster LocalCluster])
   (:import [org.apache.storm.generated StormTopology])
-  (:import [org.apache.storm.daemon nimbus])
   (:import [org.apache.storm.testing TestJob MockedSources TrackedTopology
             MkClusterParam CompleteTopologyParam MkTupleParam])
   (:import [org.apache.storm.utils Utils])

http://git-wip-us.apache.org/repos/asf/storm/blob/725003cc/storm-core/src/clj/org/apache/storm/ui/core.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/ui/core.clj 
b/storm-core/src/clj/org/apache/storm/ui/core.clj
index 52e4059..5465b92 100644
--- a/storm-core/src/clj/org/apache/storm/ui/core.clj
+++ b/storm-core/src/clj/org/apache/storm/ui/core.clj
@@ -888,9 +888,9 @@
      "stream" (.get_streamId s)
      "executeLatency" (StatsUtil/floatStr (.get_execute_latency_ms bas))
      "processLatency" (StatsUtil/floatStr (.get_process_latency_ms bas))
-     "executed" (Utils/nullToZero (.get_executed bas))
-     "acked" (Utils/nullToZero (.get_acked cas))
-     "failed" (Utils/nullToZero (.get_failed cas))}))
+     "executed" (if-let [e (.get_executed bas)] e 0)
+     "acked" (if-let [a (.get_acked cas)] a 0)
+     "failed" (if-let [f (.get_failed cas)] f 0)}))
 
 (defmulti unpack-comp-output-stat
   (fn [[_ ^ComponentAggregateStats s]] (.get_type s)))
@@ -899,8 +899,8 @@
   [[stream-id ^ComponentAggregateStats stats]]
   (let [^CommonAggregateStats cas (.get_common_stats stats)]
     {"stream" stream-id
-     "emitted" (Utils/nullToZero (.get_emitted cas))
-     "transferred" (Utils/nullToZero (.get_transferred cas))}))
+     "emitted" (if-let [e (.get_emitted cas)] e 0)
+     "transferred" (if-let [t (.get_transferred cas)] t 0)}))
 
 (defmethod unpack-comp-output-stat ComponentType/SPOUT
   [[stream-id ^ComponentAggregateStats stats]]
@@ -908,11 +908,11 @@
         ^SpecificAggregateStats spec-s (.get_specific_stats stats)
         ^SpoutAggregateStats spout-s (.get_spout spec-s)]
     {"stream" stream-id
-     "emitted" (Utils/nullToZero (.get_emitted cas))
-     "transferred" (Utils/nullToZero (.get_transferred cas))
+     "emitted" (if-let [e (.get_emitted cas)] e 0)
+     "transferred" (if-let [t (.get_transferred cas)] t 0)
      "completeLatency" (StatsUtil/floatStr (.get_complete_latency_ms spout-s))
-     "acked" (Utils/nullToZero (.get_acked cas))
-     "failed" (Utils/nullToZero (.get_failed cas))}))
+     "acked" (if-let [a (.get_acked cas)] a 0)
+     "failed" (if-let [f (.get_failed cas)] f 0)}))
 
 (defmulti unpack-comp-exec-stat
   (fn [_ _ ^ComponentAggregateStats cas] (.get_type (.get_stats 
^ExecutorAggregateStats cas))))
@@ -935,14 +935,14 @@
      "uptimeSeconds" uptime
      "host" host
      "port" port
-     "emitted" (Utils/nullToZero (.get_emitted cas))
-     "transferred" (Utils/nullToZero (.get_transferred cas))
+     "emitted" (if-let [e (.get_emitted cas)] e 0)
+     "transferred" (if-let [t (.get_transferred cas)] t 0)
      "capacity" (StatsUtil/floatStr (Utils/nullToZero (.get_capacity bas)))
      "executeLatency" (StatsUtil/floatStr (.get_execute_latency_ms bas))
-     "executed" (Utils/nullToZero (.get_executed bas))
+     "executed" (if-let [e (.get_executed bas)] e 0)
      "processLatency" (StatsUtil/floatStr (.get_process_latency_ms bas))
-     "acked" (Utils/nullToZero (.get_acked cas))
-     "failed" (Utils/nullToZero (.get_failed cas))
+     "acked" (if-let [a (.get_acked cas)] a 0)
+     "failed" (if-let [f (.get_failed cas)] f 0)
      "workerLogLink" (worker-log-link host port topology-id secure?)}))
 
 (defmethod unpack-comp-exec-stat ComponentType/SPOUT
@@ -963,11 +963,11 @@
      "uptimeSeconds" uptime
      "host" host
      "port" port
-     "emitted" (Utils/nullToZero (.get_emitted cas))
-     "transferred" (Utils/nullToZero (.get_transferred cas))
+     "emitted" (if-let [em (.get_emitted cas)] em 0)
+     "transferred" (if-let [t (.get_transferred cas)] t 0)
      "completeLatency" (StatsUtil/floatStr (.get_complete_latency_ms sas))
-     "acked" (Utils/nullToZero (.get_acked cas))
-     "failed" (Utils/nullToZero (.get_failed cas))
+     "acked" (if-let [ack (.get_acked cas)] ack 0)
+     "failed" (if-let [f (.get_failed cas)] f 0)
      "workerLogLink" (worker-log-link host port topology-id secure?)}))
 
 (defmulti unpack-component-page-info

http://git-wip-us.apache.org/repos/asf/storm/blob/725003cc/storm-core/src/clj/org/apache/storm/util.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/util.clj 
b/storm-core/src/clj/org/apache/storm/util.clj
index 016fe55..bd76022 100644
--- a/storm-core/src/clj/org/apache/storm/util.clj
+++ b/storm-core/src/clj/org/apache/storm/util.clj
@@ -25,7 +25,7 @@
             MutableObject])
   (:import [org.apache.storm.security.auth NimbusPrincipal])
   (:import [javax.security.auth Subject])
-  (:import [java.util UUID Random ArrayList List Collections])
+  (:import [java.util UUID Random ArrayList List Collections Set])
   (:import [java.util.zip ZipFile])
   (:import [java.util.concurrent.locks ReentrantReadWriteLock])
   (:import [java.util.concurrent Semaphore])
@@ -117,7 +117,9 @@
      ~@body
      false
      (catch Throwable t#
-       (Utils/exceptionCauseIsInstanceOf ~klass t#))))
+       (let [tc# (Utils/exceptionCauseIsInstanceOf ~klass t#)]
+         (if (not tc#) (log-error t# "Exception did not match " ~klass))
+         tc#))))
 
 (defmacro forcat
   [[args aseq] & body]
@@ -145,14 +147,16 @@
 
 (defn clojurify-structure
   [s]
-  (prewalk (fn [x]
+  (if s
+    (prewalk (fn [x]
              (cond (instance? Map x) (into {} x)
                    (instance? List x) (vec x)
+                   (instance? Set x) (into #{} x)
                    ;; (Boolean. false) does not evaluate to false in an if.
                    ;; This fixes that.
                    (instance? Boolean x) (boolean x)
                    true x))
-           s))
+           s)))
 ; move this func form convert.clj due to cyclic load dependency
 (defn clojurify-error [^ErrorInfo error]
   (if error

http://git-wip-us.apache.org/repos/asf/storm/blob/725003cc/storm-core/src/jvm/org/apache/storm/Config.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/Config.java 
b/storm-core/src/jvm/org/apache/storm/Config.java
index de96d40..4f05261 100644
--- a/storm-core/src/jvm/org/apache/storm/Config.java
+++ b/storm-core/src/jvm/org/apache/storm/Config.java
@@ -18,6 +18,7 @@
 package org.apache.storm;
 
 import org.apache.storm.container.ResourceIsolationInterface;
+import org.apache.storm.nimbus.ITopologyActionNotifierPlugin;
 import 
org.apache.storm.scheduler.resource.strategies.eviction.IEvictionStrategy;
 import 
org.apache.storm.scheduler.resource.strategies.priority.ISchedulingPriorityStrategy;
 import org.apache.storm.scheduler.resource.strategies.scheduling.IStrategy;
@@ -694,9 +695,9 @@ public class Config extends HashMap<String, Object> {
     /**
      * FQCN of a class that implements {@code I} @see 
org.apache.storm.nimbus.ITopologyActionNotifierPlugin for details.
      */
+    @isImplementationOfClass(implementsClass = 
ITopologyActionNotifierPlugin.class)
     public static final String NIMBUS_TOPOLOGY_ACTION_NOTIFIER_PLUGIN = 
"nimbus.topology.action.notifier.plugin.class";
-    public static final Object NIMBUS_TOPOLOGY_ACTION_NOTIFIER_PLUGIN_SCHEMA = 
String.class;
-
+    
     /**
      * Storm UI binds to this host/interface.
      */

http://git-wip-us.apache.org/repos/asf/storm/blob/725003cc/storm-core/src/jvm/org/apache/storm/blobstore/BlobStore.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/blobstore/BlobStore.java 
b/storm-core/src/jvm/org/apache/storm/blobstore/BlobStore.java
index 14879b4..1c10c40 100644
--- a/storm-core/src/jvm/org/apache/storm/blobstore/BlobStore.java
+++ b/storm-core/src/jvm/org/apache/storm/blobstore/BlobStore.java
@@ -31,6 +31,8 @@ import java.util.regex.Pattern;
 import javax.security.auth.Subject;
 
 import org.apache.storm.nimbus.NimbusInfo;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.Utils;
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -41,6 +43,7 @@ import org.apache.storm.generated.KeyNotFoundException;
 import org.apache.storm.generated.KeyAlreadyExistsException;
 import org.apache.storm.generated.ReadableBlobMeta;
 import org.apache.storm.generated.SettableBlobMeta;
+import org.apache.storm.generated.StormTopology;
 
 /**
  * Provides a way to store blobs that can be downloaded.
@@ -304,6 +307,41 @@ public abstract class BlobStore implements Shutdownable {
     }
 
     /**
+     * Helper method to read a stored topology
+     * @param topoId the id of the topology to read
+     * @param who who to read it as
+     * @return the deserialized topology.
+     * @throws IOException on any error while reading the blob.
+     * @throws AuthorizationException if who is not allowed to read the blob
+     * @throws KeyNotFoundException if the blob could not be found
+     */
+    public StormTopology readTopology(String topoId, Subject who) throws 
KeyNotFoundException, AuthorizationException, IOException {
+        return 
Utils.deserialize(readBlob(ConfigUtils.masterStormCodeKey(topoId), who), 
StormTopology.class);
+    }
+    
+    /**
+     * Helper method to read a stored topology config
+     * @param topoId the id of the topology whose conf we are reading
+     * @param who who we are reading this as
+     * @return the deserialized config
+     * @throws KeyNotFoundException if the blob could not be found
+     * @throws AuthorizationException if who is not allowed to read the blob
+     * @throws IOException on any error while reading the blob.
+     */
+    public Map<String, Object> readTopologyConf(String topoId, Subject who) 
throws KeyNotFoundException, AuthorizationException, IOException {
+        return 
Utils.fromCompressedJsonConf(readBlob(ConfigUtils.masterStormConfKey(topoId), 
who));
+    }
+    
+    private static final KeyFilter<String> TO_TOPO_ID = (key) -> 
ConfigUtils.getIdFromBlobKey(key);
+    /**
+     * @return a set of all of the topology ids with special data stored in the
+     * blob store.
+     */
+    public Set<String> storedTopoIds() {
+        return filterAndListKeys(TO_TOPO_ID);
+    }
+    
+    /**
      * Output stream implementation used for reading the
      * metadata and data information.
      */

http://git-wip-us.apache.org/repos/asf/storm/blob/725003cc/storm-core/src/jvm/org/apache/storm/cluster/IStormClusterState.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/cluster/IStormClusterState.java 
b/storm-core/src/jvm/org/apache/storm/cluster/IStormClusterState.java
index a6f07ed..443e471 100644
--- a/storm-core/src/jvm/org/apache/storm/cluster/IStormClusterState.java
+++ b/storm-core/src/jvm/org/apache/storm/cluster/IStormClusterState.java
@@ -17,12 +17,24 @@
  */
 package org.apache.storm.cluster;
 
-import org.apache.storm.generated.*;
-import org.apache.storm.nimbus.NimbusInfo;
-
 import java.security.NoSuchAlgorithmException;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
+
+import org.apache.storm.generated.Assignment;
+import org.apache.storm.generated.ClusterWorkerHeartbeat;
+import org.apache.storm.generated.Credentials;
+import org.apache.storm.generated.ErrorInfo;
+import org.apache.storm.generated.ExecutorInfo;
+import org.apache.storm.generated.LogConfig;
+import org.apache.storm.generated.NimbusSummary;
+import org.apache.storm.generated.NodeInfo;
+import org.apache.storm.generated.ProfileRequest;
+import org.apache.storm.generated.StormBase;
+import org.apache.storm.generated.SupervisorInfo;
+import org.apache.storm.nimbus.NimbusInfo;
 
 public interface IStormClusterState {
     public List<String> assignments(Runnable callback);
@@ -35,7 +47,7 @@ public interface IStormClusterState {
 
     public List<String> blobstoreInfo(String blobKey);
 
-    public List nimbuses();
+    public List<NimbusSummary> nimbuses();
 
     public void addNimbusHost(String nimbusId, NimbusSummary nimbusSummary);
 
@@ -117,10 +129,62 @@ public interface IStormClusterState {
 
     public ErrorInfo lastError(String stormId, String componentId);
 
-    public void setCredentials(String stormId, Credentials creds, Map 
topoConf) throws NoSuchAlgorithmException;
+    public void setCredentials(String stormId, Credentials creds, Map<String, 
Object> topoConf) throws NoSuchAlgorithmException;
 
     public Credentials credentials(String stormId, Runnable callback);
 
     public void disconnect();
-
-}
+    
+    /**
+     * @return All of the supervisors with the ID as the key
+     */
+    default Map<String, SupervisorInfo> allSupervisorInfo() {
+        return allSupervisorInfo(null);
+    }
+
+    /**
+     * @param callback be alerted if the list of supervisors change
+     * @return All of the supervisors with the ID as the key
+     */
+    default Map<String, SupervisorInfo> allSupervisorInfo(Runnable callback) {
+        Map<String, SupervisorInfo> ret = new HashMap<>();
+        for (String id: supervisors(callback)) {
+            ret.put(id, supervisorInfo(id));
+        }
+        return ret;
+    }
+    
+    /**
+     * Get a topology ID from the name of a topology
+     * @param topologyName the name of the topology to look for
+     * @return the id of the topology or null if it is not alive.
+     */
+    default Optional<String> getTopoId(final String topologyName) {
+        String ret = null;
+        for (String topoId: activeStorms()) {
+            String name = stormBase(topoId, null).get_name();
+            if (topologyName.equals(name)) {
+                ret = topoId;
+                break;
+            }
+        }
+        return Optional.ofNullable(ret);
+    }
+    
+    default Map<String, Assignment> topologyAssignments() {
+        Map<String, Assignment> ret = new HashMap<>();
+        for (String topoId: assignments(null)) {
+            ret.put(topoId, assignmentInfo(topoId, null));
+        }
+        return ret;
+    }
+    
+    default Map<String, StormBase> topologyBases() {
+        Map<String, StormBase> stormBases = new HashMap<>();
+        for (String topologyId : activeStorms()) {
+            StormBase base = stormBase(topologyId, null);
+            stormBases.put(topologyId, base);
+        }
+        return stormBases;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/725003cc/storm-core/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java 
b/storm-core/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
index 2fbb6c2..971b426 100644
--- a/storm-core/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
+++ b/storm-core/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
@@ -194,7 +194,7 @@ public class StormClusterStateImpl implements 
IStormClusterState {
     }
 
     @Override
-    public List nimbuses() {
+    public List<NimbusSummary> nimbuses() {
         List<NimbusSummary> nimbusSummaries = new ArrayList<>();
         List<String> nimbusIds = 
stateStorage.get_children(ClusterUtils.NIMBUSES_SUBTREE, false);
         for (String nimbusId : nimbusIds) {
@@ -698,7 +698,7 @@ public class StormClusterStateImpl implements 
IStormClusterState {
     }
 
     @Override
-    public void setCredentials(String stormId, Credentials creds, Map 
topoConf) throws NoSuchAlgorithmException {
+    public void setCredentials(String stormId, Credentials creds, Map<String, 
Object> topoConf) throws NoSuchAlgorithmException {
         List<ACL> aclList = ClusterUtils.mkTopoOnlyAcls(topoConf);
         String path = ClusterUtils.credentialsPath(stormId);
         stateStorage.set_data(path, Utils.serialize(creds), aclList);

http://git-wip-us.apache.org/repos/asf/storm/blob/725003cc/storm-core/src/jvm/org/apache/storm/daemon/StormCommon.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/StormCommon.java 
b/storm-core/src/jvm/org/apache/storm/daemon/StormCommon.java
index 70a91f3..874b607 100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/StormCommon.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/StormCommon.java
@@ -56,6 +56,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.TreeMap;
 
@@ -94,31 +95,9 @@ public class StormCommon {
     public static final String TOPOLOGY_METRICS_CONSUMER_EXPAND_MAP_TYPE = 
"expandMapType";
     public static final String TOPOLOGY_METRICS_CONSUMER_METRIC_NAME_SEPARATOR 
= "metricNameSeparator";
 
-    @SuppressWarnings("unchecked")
+    @Deprecated
     public static String getStormId(final IStormClusterState 
stormClusterState, final String topologyName) {
-        List<String> activeTopologys = stormClusterState.activeStorms();
-        IPredicate pred = new IPredicate<String>() {
-            @Override
-            public boolean test(String obj) {
-                String name = stormClusterState.stormBase(obj, 
null).get_name();
-                return name.equals(topologyName);
-            }
-        };
-        return Utils.findOne(pred, activeTopologys);
-    }
-
-    public static Map<String, StormBase> topologyBases(IStormClusterState 
stormClusterState) {
-        return _instance.topologyBasesImpl(stormClusterState);
-    }
-
-    protected Map<String, StormBase> topologyBasesImpl(IStormClusterState 
stormClusterState) {
-        List<String> activeTopologys = stormClusterState.activeStorms();
-        Map<String, StormBase> stormBases = new HashMap<>();
-        for (String topologyId : activeTopologys) {
-            StormBase base = stormClusterState.stormBase(topologyId, null);
-            stormBases.put(topologyId, base);
-        }
-        return stormBases;
+        return stormClusterState.getTopoId(topologyName).get();
     }
 
     public static void validateDistributedMode(Map conf) {
@@ -181,13 +160,13 @@ public class StormCommon {
     }
 
     @SuppressWarnings("unchecked")
-    public static Map componentConf(Object component) {
+    public static Map<String, Object> componentConf(Object component) {
         try {
-            Map<Object, Object> conf = new HashMap<>();
+            Map<String, Object> conf = new HashMap<>();
             ComponentCommon common = getComponentCommon(component);
             String jconf = common.get_json_conf();
             if (jconf != null) {
-                conf.putAll((Map<Object, Object>) 
JSONValue.parseWithException(jconf));
+                conf.putAll((Map<String, Object>) 
JSONValue.parseWithException(jconf));
             }
             return conf;
         } catch (Exception e) {

Reply via email to