http://git-wip-us.apache.org/repos/asf/storm/blob/725003cc/storm-core/src/clj/org/apache/storm/testing.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/testing.clj b/storm-core/src/clj/org/apache/storm/testing.clj index ca60c02..60f5a1b 100644 --- a/storm-core/src/clj/org/apache/storm/testing.clj +++ b/storm-core/src/clj/org/apache/storm/testing.clj @@ -16,7 +16,6 @@ (ns org.apache.storm.testing (:require [org.apache.storm.daemon - [nimbus :as nimbus] [local-supervisor :as local-supervisor] [common :as common]]) (:import [org.apache.commons.io FileUtils] @@ -33,6 +32,7 @@ (:import [org.apache.storm.utils Time Utils IPredicate RegisteredGlobalState ConfigUtils LocalState StormCommonInstaller]) (:import [org.apache.storm.tuple Fields Tuple TupleImpl]) (:import [org.apache.storm.task TopologyContext]) + (:import [org.apache.storm.nimbus ILeaderElector NimbusInfo]) (:import [org.apache.storm.generated GlobalStreamId Bolt KillOptions]) (:import [org.apache.storm.testing FeederSpout FixedTupleSpout FixedTuple TupleCaptureBolt SpoutTracker BoltTracker NonRichBoltTracker @@ -49,9 +49,10 @@ (:import [org.apache.storm Config]) (:import [org.apache.storm.generated StormTopology]) (:import [org.apache.storm.task TopologyContext] - (org.apache.storm.messaging IContext) + [org.apache.storm.messaging IContext] [org.json.simple JSONValue] - (org.apache.storm.daemon StormCommon Acker DaemonCommon)) + [org.apache.storm.daemon.nimbus Nimbus Nimbus$StandaloneINimbus] + [org.apache.storm.daemon StormCommon Acker DaemonCommon]) (:import [org.apache.storm.cluster ZKStateStorage ClusterStateContext StormClusterStateImpl ClusterUtils]) (:use [org.apache.storm util config log local-state-converter converter]) (:use [org.apache.storm.internal thrift])) @@ -167,11 +168,60 @@ (let [val (atom (dec start-val))] (fn [] (swap! val inc))))) +(defnk mock-leader-elector [:is-leader true :leader-name "test-host" :leader-port 9999] + (let [leader-address (NimbusInfo. leader-name leader-port true)] + (reify ILeaderElector + (prepare [this conf] true) + (isLeader [this] is-leader) + (addToLeaderLockQueue [this] true) + (getLeader [this] leader-address) + (getAllNimbuses [this] `(leader-address)) + (close [this] true)))) + +(defn mk-nimbus + [conf inimbus blob-store leader-elector group-mapper cluster-state] + (Nimbus. conf inimbus cluster-state nil blob-store leader-elector group-mapper)) + +(defnk mk-mocked-nimbus + [:daemon-conf {} :inimbus nil :blob-store nil :cluster-state nil + :leader-elector nil :group-mapper nil :nimbus-daemon false :mk-nimbus mk-nimbus] + (let [zk-tmp (local-temp-path) + [zk-port zk-handle] (if-not cluster-state + (Zookeeper/mkInprocessZookeeper zk-tmp nil)) + leader-elector (or leader-elector (if zk-handle leader-elector (mock-leader-elector))) + nimbus-tmp (local-temp-path) + daemon-conf (merge (clojurify-structure (ConfigUtils/readStormConfig)) + {TOPOLOGY-SKIP-MISSING-KRYO-REGISTRATIONS true + ZMQ-LINGER-MILLIS 0 + TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS false + TOPOLOGY-TRIDENT-BATCH-EMIT-INTERVAL-MILLIS 50 + STORM-CLUSTER-MODE "local" + BLOBSTORE-SUPERUSER (System/getProperty "user.name") + BLOBSTORE-DIR nimbus-tmp} + (if-not cluster-state + {STORM-ZOOKEEPER-PORT zk-port + STORM-ZOOKEEPER-SERVERS ["localhost"]}) + daemon-conf) + nimbus (mk-nimbus + (assoc daemon-conf STORM-LOCAL-DIR nimbus-tmp) + (if inimbus inimbus (Nimbus$StandaloneINimbus.)) + blob-store + leader-elector + group-mapper + cluster-state) + _ (.launchServer nimbus) + nimbus-thrift-server (if nimbus-daemon (start-nimbus-daemon daemon-conf nimbus) nil)] + {:nimbus nimbus + :daemon-conf daemon-conf + :tmp-dirs (atom [nimbus-tmp zk-tmp]) + :nimbus-thrift-server nimbus-thrift-server + :zookeeper (if (not-nil? zk-handle) zk-handle)})) + ;; returns map containing cluster info ;; local dir is always overridden in maps ;; can customize the supervisors (except for ports) by passing in map for :supervisors parameter ;; if need to customize amt of ports more, can use add-supervisor calls afterwards -(defnk mk-local-storm-cluster [:supervisors 2 :ports-per-supervisor 3 :daemon-conf {} :inimbus nil :supervisor-slot-port-min 1024 :nimbus-daemon false] +(defnk mk-local-storm-cluster [:supervisors 2 :ports-per-supervisor 3 :daemon-conf {} :inimbus nil :group-mapper nil :supervisor-slot-port-min 1024 :nimbus-daemon false] (let [zk-tmp (local-temp-path) [zk-port zk-handle] (if-not (contains? daemon-conf STORM-ZOOKEEPER-SERVERS) (Zookeeper/mkInprocessZookeeper zk-tmp nil)) @@ -189,9 +239,14 @@ STORM-ZOOKEEPER-SERVERS ["localhost"]}) daemon-conf) port-counter (mk-counter supervisor-slot-port-min) - nimbus (nimbus/service-handler - (assoc daemon-conf STORM-LOCAL-DIR nimbus-tmp) - (if inimbus inimbus (nimbus/standalone-nimbus))) + nimbus (mk-nimbus + (assoc daemon-conf STORM-LOCAL-DIR nimbus-tmp) + (if inimbus inimbus (Nimbus$StandaloneINimbus.)) + nil + nil + group-mapper + nil) + _ (.launchServer nimbus) context (mk-shared-context daemon-conf) nimbus-thrift-server (if nimbus-daemon (start-nimbus-daemon daemon-conf nimbus) nil) cluster-map {:nimbus nimbus @@ -242,11 +297,12 @@ (.stop (:nimbus-thrift-server cluster-map)) (catch Exception e (log-message "failed to stop thrift"))) )) - (.close (:state cluster-map)) - (.disconnect (:storm-cluster-state cluster-map)) - (doseq [s @(:supervisors cluster-map)] - (.shutdownAllWorkers s nil ReadClusterState/THREAD_DUMP_ON_ERROR) - (.close s)) + (if (:state cluster-map) (.close (:state cluster-map))) + (if (:storm-cluster-state cluster-map) (.disconnect (:storm-cluster-state cluster-map))) + (if (:supervisors cluster-map) + (doseq [s @(:supervisors cluster-map)] + (.shutdownAllWorkers s nil ReadClusterState/THREAD_DUMP_ON_ERROR) + (.close s))) (ProcessSimulator/killAllProcesses) (if (not-nil? (:zookeeper cluster-map)) (do @@ -315,6 +371,21 @@ ([cluster-map secs] (advance-cluster-time cluster-map secs 1))) +(defmacro with-mocked-nimbus + [[nimbus-sym & args] & body] + `(let [~nimbus-sym (mk-mocked-nimbus ~@args)] + (try + ~@body + (catch Throwable t# + (log-error t# "Error in cluster") + (throw t#)) + (finally + (let [keep-waiting?# (atom true) + f# (future (while @keep-waiting?# (simulate-wait ~nimbus-sym)))] + (kill-local-storm-cluster ~nimbus-sym) + (reset! keep-waiting?# false) + @f#))))) + (defmacro with-local-cluster [[cluster-sym & args] & body] `(let [~cluster-sym (mk-local-storm-cluster ~@args)] @@ -356,48 +427,6 @@ (throw (IllegalArgumentException. "Topology conf is not json-serializable"))) (.submitTopologyWithOpts nimbus storm-name nil (JSONValue/toJSONString conf) topology submit-opts)) -(defn mocked-convert-assignments-to-worker->resources [storm-cluster-state storm-name worker->resources] - (fn [existing-assignments] - (let [topology-id (StormCommon/getStormId storm-cluster-state storm-name) - existing-assignments (into {} (for [[tid assignment] existing-assignments] - {tid (:worker->resources assignment)})) - new-assignments (assoc existing-assignments topology-id worker->resources)] - new-assignments))) - -(defn mocked-compute-new-topology->executor->node+port [storm-cluster-state storm-name executor->node+port] - (fn [new-scheduler-assignments existing-assignments] - (let [topology-id (StormCommon/getStormId storm-cluster-state storm-name) - existing-assignments (into {} (for [[tid assignment] existing-assignments] - {tid (:executor->node+port assignment)})) - new-assignments (assoc existing-assignments topology-id executor->node+port)] - new-assignments))) - -(defn mocked-compute-new-scheduler-assignments [] - (fn [nimbus existing-assignments topologies scratch-topology-id] - existing-assignments)) - -(defn submit-mocked-assignment - [nimbus storm-cluster-state storm-name conf topology task->component executor->node+port worker->resources] - (let [fake-common (proxy [StormCommon] [] - (stormTaskInfoImpl [_] task->component))] - (with-open [- (StormCommonInstaller. fake-common)] - (with-var-roots [nimbus/compute-new-scheduler-assignments (mocked-compute-new-scheduler-assignments) - nimbus/convert-assignments-to-worker->resources (mocked-convert-assignments-to-worker->resources - storm-cluster-state - storm-name - worker->resources) - nimbus/compute-new-topology->executor->node+port (mocked-compute-new-topology->executor->node+port - storm-cluster-state - storm-name - executor->node+port)] - (submit-local-topology nimbus storm-name conf topology))))) - -(defn find-worker-id - [supervisor-conf port] - (let [supervisor-state (ConfigUtils/supervisorState supervisor-conf) - worker->port (.getApprovedWorkers ^LocalState supervisor-state)] - (first ((clojurify-structure (Utils/reverseMap worker->port)) port)))) - (defn find-worker-port [supervisor-conf worker-id] (let [supervisor-state (ConfigUtils/supervisorState supervisor-conf)
http://git-wip-us.apache.org/repos/asf/storm/blob/725003cc/storm-core/src/clj/org/apache/storm/testing4j.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/testing4j.clj b/storm-core/src/clj/org/apache/storm/testing4j.clj index b2acffc..41f8258 100644 --- a/storm-core/src/clj/org/apache/storm/testing4j.clj +++ b/storm-core/src/clj/org/apache/storm/testing4j.clj @@ -18,7 +18,6 @@ (:require [org.apache.storm [LocalCluster :as LocalCluster]]) (:import [org.apache.storm Config ILocalCluster LocalCluster]) (:import [org.apache.storm.generated StormTopology]) - (:import [org.apache.storm.daemon nimbus]) (:import [org.apache.storm.testing TestJob MockedSources TrackedTopology MkClusterParam CompleteTopologyParam MkTupleParam]) (:import [org.apache.storm.utils Utils]) http://git-wip-us.apache.org/repos/asf/storm/blob/725003cc/storm-core/src/clj/org/apache/storm/ui/core.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/ui/core.clj b/storm-core/src/clj/org/apache/storm/ui/core.clj index 52e4059..5465b92 100644 --- a/storm-core/src/clj/org/apache/storm/ui/core.clj +++ b/storm-core/src/clj/org/apache/storm/ui/core.clj @@ -888,9 +888,9 @@ "stream" (.get_streamId s) "executeLatency" (StatsUtil/floatStr (.get_execute_latency_ms bas)) "processLatency" (StatsUtil/floatStr (.get_process_latency_ms bas)) - "executed" (Utils/nullToZero (.get_executed bas)) - "acked" (Utils/nullToZero (.get_acked cas)) - "failed" (Utils/nullToZero (.get_failed cas))})) + "executed" (if-let [e (.get_executed bas)] e 0) + "acked" (if-let [a (.get_acked cas)] a 0) + "failed" (if-let [f (.get_failed cas)] f 0)})) (defmulti unpack-comp-output-stat (fn [[_ ^ComponentAggregateStats s]] (.get_type s))) @@ -899,8 +899,8 @@ [[stream-id ^ComponentAggregateStats stats]] (let [^CommonAggregateStats cas (.get_common_stats stats)] {"stream" stream-id - "emitted" (Utils/nullToZero (.get_emitted cas)) - "transferred" (Utils/nullToZero (.get_transferred cas))})) + "emitted" (if-let [e (.get_emitted cas)] e 0) + "transferred" (if-let [t (.get_transferred cas)] t 0)})) (defmethod unpack-comp-output-stat ComponentType/SPOUT [[stream-id ^ComponentAggregateStats stats]] @@ -908,11 +908,11 @@ ^SpecificAggregateStats spec-s (.get_specific_stats stats) ^SpoutAggregateStats spout-s (.get_spout spec-s)] {"stream" stream-id - "emitted" (Utils/nullToZero (.get_emitted cas)) - "transferred" (Utils/nullToZero (.get_transferred cas)) + "emitted" (if-let [e (.get_emitted cas)] e 0) + "transferred" (if-let [t (.get_transferred cas)] t 0) "completeLatency" (StatsUtil/floatStr (.get_complete_latency_ms spout-s)) - "acked" (Utils/nullToZero (.get_acked cas)) - "failed" (Utils/nullToZero (.get_failed cas))})) + "acked" (if-let [a (.get_acked cas)] a 0) + "failed" (if-let [f (.get_failed cas)] f 0)})) (defmulti unpack-comp-exec-stat (fn [_ _ ^ComponentAggregateStats cas] (.get_type (.get_stats ^ExecutorAggregateStats cas)))) @@ -935,14 +935,14 @@ "uptimeSeconds" uptime "host" host "port" port - "emitted" (Utils/nullToZero (.get_emitted cas)) - "transferred" (Utils/nullToZero (.get_transferred cas)) + "emitted" (if-let [e (.get_emitted cas)] e 0) + "transferred" (if-let [t (.get_transferred cas)] t 0) "capacity" (StatsUtil/floatStr (Utils/nullToZero (.get_capacity bas))) "executeLatency" (StatsUtil/floatStr (.get_execute_latency_ms bas)) - "executed" (Utils/nullToZero (.get_executed bas)) + "executed" (if-let [e (.get_executed bas)] e 0) "processLatency" (StatsUtil/floatStr (.get_process_latency_ms bas)) - "acked" (Utils/nullToZero (.get_acked cas)) - "failed" (Utils/nullToZero (.get_failed cas)) + "acked" (if-let [a (.get_acked cas)] a 0) + "failed" (if-let [f (.get_failed cas)] f 0) "workerLogLink" (worker-log-link host port topology-id secure?)})) (defmethod unpack-comp-exec-stat ComponentType/SPOUT @@ -963,11 +963,11 @@ "uptimeSeconds" uptime "host" host "port" port - "emitted" (Utils/nullToZero (.get_emitted cas)) - "transferred" (Utils/nullToZero (.get_transferred cas)) + "emitted" (if-let [em (.get_emitted cas)] em 0) + "transferred" (if-let [t (.get_transferred cas)] t 0) "completeLatency" (StatsUtil/floatStr (.get_complete_latency_ms sas)) - "acked" (Utils/nullToZero (.get_acked cas)) - "failed" (Utils/nullToZero (.get_failed cas)) + "acked" (if-let [ack (.get_acked cas)] ack 0) + "failed" (if-let [f (.get_failed cas)] f 0) "workerLogLink" (worker-log-link host port topology-id secure?)})) (defmulti unpack-component-page-info http://git-wip-us.apache.org/repos/asf/storm/blob/725003cc/storm-core/src/clj/org/apache/storm/util.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/util.clj b/storm-core/src/clj/org/apache/storm/util.clj index 016fe55..bd76022 100644 --- a/storm-core/src/clj/org/apache/storm/util.clj +++ b/storm-core/src/clj/org/apache/storm/util.clj @@ -25,7 +25,7 @@ MutableObject]) (:import [org.apache.storm.security.auth NimbusPrincipal]) (:import [javax.security.auth Subject]) - (:import [java.util UUID Random ArrayList List Collections]) + (:import [java.util UUID Random ArrayList List Collections Set]) (:import [java.util.zip ZipFile]) (:import [java.util.concurrent.locks ReentrantReadWriteLock]) (:import [java.util.concurrent Semaphore]) @@ -117,7 +117,9 @@ ~@body false (catch Throwable t# - (Utils/exceptionCauseIsInstanceOf ~klass t#)))) + (let [tc# (Utils/exceptionCauseIsInstanceOf ~klass t#)] + (if (not tc#) (log-error t# "Exception did not match " ~klass)) + tc#)))) (defmacro forcat [[args aseq] & body] @@ -145,14 +147,16 @@ (defn clojurify-structure [s] - (prewalk (fn [x] + (if s + (prewalk (fn [x] (cond (instance? Map x) (into {} x) (instance? List x) (vec x) + (instance? Set x) (into #{} x) ;; (Boolean. false) does not evaluate to false in an if. ;; This fixes that. (instance? Boolean x) (boolean x) true x)) - s)) + s))) ; move this func form convert.clj due to cyclic load dependency (defn clojurify-error [^ErrorInfo error] (if error http://git-wip-us.apache.org/repos/asf/storm/blob/725003cc/storm-core/src/jvm/org/apache/storm/Config.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/Config.java b/storm-core/src/jvm/org/apache/storm/Config.java index de96d40..4f05261 100644 --- a/storm-core/src/jvm/org/apache/storm/Config.java +++ b/storm-core/src/jvm/org/apache/storm/Config.java @@ -18,6 +18,7 @@ package org.apache.storm; import org.apache.storm.container.ResourceIsolationInterface; +import org.apache.storm.nimbus.ITopologyActionNotifierPlugin; import org.apache.storm.scheduler.resource.strategies.eviction.IEvictionStrategy; import org.apache.storm.scheduler.resource.strategies.priority.ISchedulingPriorityStrategy; import org.apache.storm.scheduler.resource.strategies.scheduling.IStrategy; @@ -694,9 +695,9 @@ public class Config extends HashMap<String, Object> { /** * FQCN of a class that implements {@code I} @see org.apache.storm.nimbus.ITopologyActionNotifierPlugin for details. */ + @isImplementationOfClass(implementsClass = ITopologyActionNotifierPlugin.class) public static final String NIMBUS_TOPOLOGY_ACTION_NOTIFIER_PLUGIN = "nimbus.topology.action.notifier.plugin.class"; - public static final Object NIMBUS_TOPOLOGY_ACTION_NOTIFIER_PLUGIN_SCHEMA = String.class; - + /** * Storm UI binds to this host/interface. */ http://git-wip-us.apache.org/repos/asf/storm/blob/725003cc/storm-core/src/jvm/org/apache/storm/blobstore/BlobStore.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/blobstore/BlobStore.java b/storm-core/src/jvm/org/apache/storm/blobstore/BlobStore.java index 14879b4..1c10c40 100644 --- a/storm-core/src/jvm/org/apache/storm/blobstore/BlobStore.java +++ b/storm-core/src/jvm/org/apache/storm/blobstore/BlobStore.java @@ -31,6 +31,8 @@ import java.util.regex.Pattern; import javax.security.auth.Subject; import org.apache.storm.nimbus.NimbusInfo; +import org.apache.storm.utils.ConfigUtils; +import org.apache.storm.utils.Utils; import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,6 +43,7 @@ import org.apache.storm.generated.KeyNotFoundException; import org.apache.storm.generated.KeyAlreadyExistsException; import org.apache.storm.generated.ReadableBlobMeta; import org.apache.storm.generated.SettableBlobMeta; +import org.apache.storm.generated.StormTopology; /** * Provides a way to store blobs that can be downloaded. @@ -304,6 +307,41 @@ public abstract class BlobStore implements Shutdownable { } /** + * Helper method to read a stored topology + * @param topoId the id of the topology to read + * @param who who to read it as + * @return the deserialized topology. + * @throws IOException on any error while reading the blob. + * @throws AuthorizationException if who is not allowed to read the blob + * @throws KeyNotFoundException if the blob could not be found + */ + public StormTopology readTopology(String topoId, Subject who) throws KeyNotFoundException, AuthorizationException, IOException { + return Utils.deserialize(readBlob(ConfigUtils.masterStormCodeKey(topoId), who), StormTopology.class); + } + + /** + * Helper method to read a stored topology config + * @param topoId the id of the topology whose conf we are reading + * @param who who we are reading this as + * @return the deserialized config + * @throws KeyNotFoundException if the blob could not be found + * @throws AuthorizationException if who is not allowed to read the blob + * @throws IOException on any error while reading the blob. + */ + public Map<String, Object> readTopologyConf(String topoId, Subject who) throws KeyNotFoundException, AuthorizationException, IOException { + return Utils.fromCompressedJsonConf(readBlob(ConfigUtils.masterStormConfKey(topoId), who)); + } + + private static final KeyFilter<String> TO_TOPO_ID = (key) -> ConfigUtils.getIdFromBlobKey(key); + /** + * @return a set of all of the topology ids with special data stored in the + * blob store. + */ + public Set<String> storedTopoIds() { + return filterAndListKeys(TO_TOPO_ID); + } + + /** * Output stream implementation used for reading the * metadata and data information. */ http://git-wip-us.apache.org/repos/asf/storm/blob/725003cc/storm-core/src/jvm/org/apache/storm/cluster/IStormClusterState.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/cluster/IStormClusterState.java b/storm-core/src/jvm/org/apache/storm/cluster/IStormClusterState.java index a6f07ed..443e471 100644 --- a/storm-core/src/jvm/org/apache/storm/cluster/IStormClusterState.java +++ b/storm-core/src/jvm/org/apache/storm/cluster/IStormClusterState.java @@ -17,12 +17,24 @@ */ package org.apache.storm.cluster; -import org.apache.storm.generated.*; -import org.apache.storm.nimbus.NimbusInfo; - import java.security.NoSuchAlgorithmException; +import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; + +import org.apache.storm.generated.Assignment; +import org.apache.storm.generated.ClusterWorkerHeartbeat; +import org.apache.storm.generated.Credentials; +import org.apache.storm.generated.ErrorInfo; +import org.apache.storm.generated.ExecutorInfo; +import org.apache.storm.generated.LogConfig; +import org.apache.storm.generated.NimbusSummary; +import org.apache.storm.generated.NodeInfo; +import org.apache.storm.generated.ProfileRequest; +import org.apache.storm.generated.StormBase; +import org.apache.storm.generated.SupervisorInfo; +import org.apache.storm.nimbus.NimbusInfo; public interface IStormClusterState { public List<String> assignments(Runnable callback); @@ -35,7 +47,7 @@ public interface IStormClusterState { public List<String> blobstoreInfo(String blobKey); - public List nimbuses(); + public List<NimbusSummary> nimbuses(); public void addNimbusHost(String nimbusId, NimbusSummary nimbusSummary); @@ -117,10 +129,62 @@ public interface IStormClusterState { public ErrorInfo lastError(String stormId, String componentId); - public void setCredentials(String stormId, Credentials creds, Map topoConf) throws NoSuchAlgorithmException; + public void setCredentials(String stormId, Credentials creds, Map<String, Object> topoConf) throws NoSuchAlgorithmException; public Credentials credentials(String stormId, Runnable callback); public void disconnect(); - -} + + /** + * @return All of the supervisors with the ID as the key + */ + default Map<String, SupervisorInfo> allSupervisorInfo() { + return allSupervisorInfo(null); + } + + /** + * @param callback be alerted if the list of supervisors change + * @return All of the supervisors with the ID as the key + */ + default Map<String, SupervisorInfo> allSupervisorInfo(Runnable callback) { + Map<String, SupervisorInfo> ret = new HashMap<>(); + for (String id: supervisors(callback)) { + ret.put(id, supervisorInfo(id)); + } + return ret; + } + + /** + * Get a topology ID from the name of a topology + * @param topologyName the name of the topology to look for + * @return the id of the topology or null if it is not alive. + */ + default Optional<String> getTopoId(final String topologyName) { + String ret = null; + for (String topoId: activeStorms()) { + String name = stormBase(topoId, null).get_name(); + if (topologyName.equals(name)) { + ret = topoId; + break; + } + } + return Optional.ofNullable(ret); + } + + default Map<String, Assignment> topologyAssignments() { + Map<String, Assignment> ret = new HashMap<>(); + for (String topoId: assignments(null)) { + ret.put(topoId, assignmentInfo(topoId, null)); + } + return ret; + } + + default Map<String, StormBase> topologyBases() { + Map<String, StormBase> stormBases = new HashMap<>(); + for (String topologyId : activeStorms()) { + StormBase base = stormBase(topologyId, null); + stormBases.put(topologyId, base); + } + return stormBases; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/725003cc/storm-core/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java b/storm-core/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java index 2fbb6c2..971b426 100644 --- a/storm-core/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java +++ b/storm-core/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java @@ -194,7 +194,7 @@ public class StormClusterStateImpl implements IStormClusterState { } @Override - public List nimbuses() { + public List<NimbusSummary> nimbuses() { List<NimbusSummary> nimbusSummaries = new ArrayList<>(); List<String> nimbusIds = stateStorage.get_children(ClusterUtils.NIMBUSES_SUBTREE, false); for (String nimbusId : nimbusIds) { @@ -698,7 +698,7 @@ public class StormClusterStateImpl implements IStormClusterState { } @Override - public void setCredentials(String stormId, Credentials creds, Map topoConf) throws NoSuchAlgorithmException { + public void setCredentials(String stormId, Credentials creds, Map<String, Object> topoConf) throws NoSuchAlgorithmException { List<ACL> aclList = ClusterUtils.mkTopoOnlyAcls(topoConf); String path = ClusterUtils.credentialsPath(stormId); stateStorage.set_data(path, Utils.serialize(creds), aclList); http://git-wip-us.apache.org/repos/asf/storm/blob/725003cc/storm-core/src/jvm/org/apache/storm/daemon/StormCommon.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/daemon/StormCommon.java b/storm-core/src/jvm/org/apache/storm/daemon/StormCommon.java index 70a91f3..874b607 100644 --- a/storm-core/src/jvm/org/apache/storm/daemon/StormCommon.java +++ b/storm-core/src/jvm/org/apache/storm/daemon/StormCommon.java @@ -56,6 +56,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.TreeMap; @@ -94,31 +95,9 @@ public class StormCommon { public static final String TOPOLOGY_METRICS_CONSUMER_EXPAND_MAP_TYPE = "expandMapType"; public static final String TOPOLOGY_METRICS_CONSUMER_METRIC_NAME_SEPARATOR = "metricNameSeparator"; - @SuppressWarnings("unchecked") + @Deprecated public static String getStormId(final IStormClusterState stormClusterState, final String topologyName) { - List<String> activeTopologys = stormClusterState.activeStorms(); - IPredicate pred = new IPredicate<String>() { - @Override - public boolean test(String obj) { - String name = stormClusterState.stormBase(obj, null).get_name(); - return name.equals(topologyName); - } - }; - return Utils.findOne(pred, activeTopologys); - } - - public static Map<String, StormBase> topologyBases(IStormClusterState stormClusterState) { - return _instance.topologyBasesImpl(stormClusterState); - } - - protected Map<String, StormBase> topologyBasesImpl(IStormClusterState stormClusterState) { - List<String> activeTopologys = stormClusterState.activeStorms(); - Map<String, StormBase> stormBases = new HashMap<>(); - for (String topologyId : activeTopologys) { - StormBase base = stormClusterState.stormBase(topologyId, null); - stormBases.put(topologyId, base); - } - return stormBases; + return stormClusterState.getTopoId(topologyName).get(); } public static void validateDistributedMode(Map conf) { @@ -181,13 +160,13 @@ public class StormCommon { } @SuppressWarnings("unchecked") - public static Map componentConf(Object component) { + public static Map<String, Object> componentConf(Object component) { try { - Map<Object, Object> conf = new HashMap<>(); + Map<String, Object> conf = new HashMap<>(); ComponentCommon common = getComponentCommon(component); String jconf = common.get_json_conf(); if (jconf != null) { - conf.putAll((Map<Object, Object>) JSONValue.parseWithException(jconf)); + conf.putAll((Map<String, Object>) JSONValue.parseWithException(jconf)); } return conf; } catch (Exception e) {