Merge branch 'master' into ClusterUtils
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/cc1f6d77 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/cc1f6d77 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/cc1f6d77 Branch: refs/heads/master Commit: cc1f6d77962ec96c69cf5af85e4016a9e08ccdcc Parents: afcd0c6 d0cd52b Author: xiaojian.fxj <xiaojian....@alibaba-inc.com> Authored: Fri Feb 19 16:22:50 2016 +0800 Committer: xiaojian.fxj <xiaojian....@alibaba-inc.com> Committed: Fri Feb 19 16:47:42 2016 +0800 ---------------------------------------------------------------------- CHANGELOG.md | 7 + README.markdown | 1 + bin/storm.cmd | 2 +- bin/storm.py | 4 +- conf/cgconfig.conf.example | 41 +++ conf/defaults.yaml | 16 +- examples/storm-starter/pom.xml | 10 + .../org/apache/storm/starter/clj/word_count.clj | 3 +- .../starter/ResourceAwareExampleTopology.java | 2 +- pom.xml | 10 + storm-clojure/pom.xml | 74 ++++ .../src/clj/org/apache/storm/clojure.clj | 207 +++++++++++ .../src/clj/org/apache/storm/thrift.clj | 286 +++++++++++++++ storm-clojure/src/test/clj/clojure_test.clj | 158 +++++++++ storm-core/src/clj/org/apache/storm/clojure.clj | 207 ----------- .../org/apache/storm/command/dev_zookeeper.clj | 28 -- .../clj/org/apache/storm/command/get_errors.clj | 3 +- .../org/apache/storm/command/healthcheck.clj | 90 ----- .../clj/org/apache/storm/command/monitor.clj | 2 +- .../clj/org/apache/storm/command/rebalance.clj | 3 +- .../org/apache/storm/command/set_log_level.clj | 3 +- .../apache/storm/command/shell_submission.clj | 2 +- .../src/clj/org/apache/storm/daemon/common.clj | 119 ++++--- .../clj/org/apache/storm/daemon/executor.clj | 51 +-- .../clj/org/apache/storm/daemon/logviewer.clj | 19 +- .../src/clj/org/apache/storm/daemon/nimbus.clj | 122 +++---- .../clj/org/apache/storm/daemon/supervisor.clj | 217 ++++++++---- .../src/clj/org/apache/storm/daemon/task.clj | 4 +- .../src/clj/org/apache/storm/daemon/worker.clj | 116 +++--- .../clj/org/apache/storm/internal/clojure.clj | 201 +++++++++++ .../clj/org/apache/storm/internal/thrift.clj | 96 +++++ .../src/clj/org/apache/storm/local_state.clj | 134 ------- .../org/apache/storm/local_state_converter.clj | 24 ++ storm-core/src/clj/org/apache/storm/testing.clj | 37 +- storm-core/src/clj/org/apache/storm/thrift.clj | 286 --------------- storm-core/src/clj/org/apache/storm/timer.clj | 128 ------- storm-core/src/clj/org/apache/storm/ui/core.clj | 2 +- storm-core/src/jvm/org/apache/storm/Config.java | 88 +++++ .../src/jvm/org/apache/storm/StormTimer.java | 241 +++++++++++++ storm-core/src/jvm/org/apache/storm/Thrift.java | 351 +++++++++++++++++++ .../org/apache/storm/command/DevZookeeper.java | 35 ++ .../org/apache/storm/command/HealthCheck.java | 125 +++++++ .../container/ResourceIsolationInterface.java | 51 +++ .../storm/container/cgroup/CgroupCenter.java | 216 ++++++++++++ .../storm/container/cgroup/CgroupCommon.java | 270 ++++++++++++++ .../container/cgroup/CgroupCommonOperation.java | 81 +++++ .../container/cgroup/CgroupCoreFactory.java | 74 ++++ .../storm/container/cgroup/CgroupManager.java | 210 +++++++++++ .../storm/container/cgroup/CgroupOperation.java | 79 +++++ .../storm/container/cgroup/CgroupUtils.java | 118 +++++++ .../apache/storm/container/cgroup/Device.java | 75 ++++ .../storm/container/cgroup/Hierarchy.java | 130 +++++++ .../storm/container/cgroup/SubSystem.java | 81 +++++ .../storm/container/cgroup/SubSystemType.java | 36 ++ .../storm/container/cgroup/SystemOperation.java | 75 ++++ .../storm/container/cgroup/core/BlkioCore.java | 213 +++++++++++ .../storm/container/cgroup/core/CgroupCore.java | 26 ++ .../storm/container/cgroup/core/CpuCore.java | 135 +++++++ .../container/cgroup/core/CpuacctCore.java | 71 ++++ .../storm/container/cgroup/core/CpusetCore.java | 209 +++++++++++ .../container/cgroup/core/DevicesCore.java | 189 ++++++++++ .../container/cgroup/core/FreezerCore.java | 66 ++++ .../storm/container/cgroup/core/MemoryCore.java | 188 ++++++++++ .../storm/container/cgroup/core/NetClsCore.java | 69 ++++ .../container/cgroup/core/NetPrioCore.java | 65 ++++ .../jvm/org/apache/storm/testing/NGrouping.java | 4 +- .../storm/testing/PythonShellMetricsBolt.java | 14 +- .../storm/testing/PythonShellMetricsSpout.java | 8 +- .../jvm/org/apache/storm/utils/LocalState.java | 112 +++++- .../src/jvm/org/apache/storm/utils/Utils.java | 17 +- .../org/apache/storm/integration_test.clj | 259 +++++++------- .../org/apache/storm/testing4j_test.clj | 72 ++-- .../test/clj/org/apache/storm/clojure_test.clj | 64 ++-- .../test/clj/org/apache/storm/cluster_test.clj | 3 +- .../test/clj/org/apache/storm/drpc_test.clj | 23 +- .../test/clj/org/apache/storm/grouping_test.clj | 56 +-- .../storm/messaging/netty_integration_test.clj | 19 +- .../clj/org/apache/storm/messaging_test.clj | 14 +- .../test/clj/org/apache/storm/metrics_test.clj | 85 +++-- .../test/clj/org/apache/storm/nimbus_test.clj | 259 +++++++++----- .../scheduler/resource_aware_scheduler_test.clj | 3 +- .../clj/org/apache/storm/supervisor_test.clj | 55 +-- .../clj/org/apache/storm/tick_tuple_test.clj | 15 +- .../clj/org/apache/storm/transactional_test.clj | 3 +- .../test/jvm/org/apache/storm/TestCgroups.java | 130 +++++++ .../resource/TestResourceAwareScheduler.java | 3 + 86 files changed, 5939 insertions(+), 1561 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/cc1f6d77/conf/defaults.yaml ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/cc1f6d77/storm-clojure/src/clj/org/apache/storm/thrift.clj ---------------------------------------------------------------------- diff --cc storm-clojure/src/clj/org/apache/storm/thrift.clj index 0000000,bf13d23..45aa740 mode 000000,100644..100644 --- a/storm-clojure/src/clj/org/apache/storm/thrift.clj +++ b/storm-clojure/src/clj/org/apache/storm/thrift.clj @@@ -1,0 -1,286 +1,286 @@@ + ;; Licensed to the Apache Software Foundation (ASF) under one + ;; or more contributor license agreements. See the NOTICE file + ;; distributed with this work for additional information + ;; regarding copyright ownership. The ASF licenses this file + ;; to you under the Apache License, Version 2.0 (the + ;; "License"); you may not use this file except in compliance + ;; with the License. You may obtain a copy of the License at + ;; + ;; http://www.apache.org/licenses/LICENSE-2.0 + ;; + ;; Unless required by applicable law or agreed to in writing, software + ;; distributed under the License is distributed on an "AS IS" BASIS, + ;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ;; See the License for the specific language governing permissions and + ;; limitations under the License. + + (ns org.apache.storm.thrift + (:import [java.util HashMap] + [java.io Serializable] + [org.apache.storm.generated NodeInfo Assignment]) + (:import [org.apache.storm.generated JavaObject Grouping Nimbus StormTopology + StormTopology$_Fields Bolt Nimbus$Client Nimbus$Iface + ComponentCommon Grouping$_Fields SpoutSpec NullStruct StreamInfo + GlobalStreamId ComponentObject ComponentObject$_Fields + ShellComponent SupervisorInfo]) + (:import [org.apache.storm.utils Utils NimbusClient ConfigUtils]) + (:import [org.apache.storm Constants]) + (:import [org.apache.storm.security.auth ReqContext]) + (:import [org.apache.storm.grouping CustomStreamGrouping]) + (:import [org.apache.storm.topology TopologyBuilder]) + (:import [org.apache.storm.clojure RichShellBolt RichShellSpout]) + (:import [org.apache.storm.thrift.transport TTransport] + (org.json.simple JSONValue)) - (:use [org.apache.storm util config log zookeeper])) ++ (:use [org.apache.storm util config log])) + + (defn instantiate-java-object + [^JavaObject obj] + (let [name (symbol (.get_full_class_name obj)) + args (map (memfn getFieldValue) (.get_args_list obj))] + (eval `(new ~name ~@args)))) + + (def grouping-constants + {Grouping$_Fields/FIELDS :fields + Grouping$_Fields/SHUFFLE :shuffle + Grouping$_Fields/ALL :all + Grouping$_Fields/NONE :none + Grouping$_Fields/CUSTOM_SERIALIZED :custom-serialized + Grouping$_Fields/CUSTOM_OBJECT :custom-object + Grouping$_Fields/DIRECT :direct + Grouping$_Fields/LOCAL_OR_SHUFFLE :local-or-shuffle}) + + (defn grouping-type + [^Grouping grouping] + (grouping-constants (.getSetField grouping))) + + (defn field-grouping + [^Grouping grouping] + (when-not (= (grouping-type grouping) :fields) + (throw (IllegalArgumentException. "Tried to get grouping fields from non fields grouping"))) + (.get_fields grouping)) + + (defn global-grouping? + [^Grouping grouping] + (and (= :fields (grouping-type grouping)) + (empty? (field-grouping grouping)))) + + (defn parallelism-hint + [^ComponentCommon component-common] + (let [phint (.get_parallelism_hint component-common)] + (if-not (.is_set_parallelism_hint component-common) 1 phint))) + + (defn nimbus-client-and-conn + ([host port] + (nimbus-client-and-conn host port nil)) + ([host port as-user] + (log-message "Connecting to Nimbus at " host ":" port " as user: " as-user) + (let [conf (clojurify-structure (ConfigUtils/readStormConfig)) + nimbusClient (NimbusClient. conf host port nil as-user) + client (.getClient nimbusClient) + transport (.transport nimbusClient)] + [client transport] ))) + + (defmacro with-nimbus-connection + [[client-sym host port] & body] + `(let [[^Nimbus$Client ~client-sym ^TTransport conn#] (nimbus-client-and-conn ~host ~port)] + (try + ~@body + (finally (.close conn#))))) + + (defmacro with-configured-nimbus-connection + [client-sym & body] + `(let [conf# (clojurify-structure (ConfigUtils/readStormConfig)) + context# (ReqContext/context) + user# (if (.principal context#) (.getName (.principal context#))) + nimbusClient# (NimbusClient/getConfiguredClientAs conf# user#) + ~client-sym (.getClient nimbusClient#) + conn# (.transport nimbusClient#) + ] + (try + ~@body + (finally (.close conn#))))) + + (defn direct-output-fields + [fields] + (StreamInfo. fields true)) + + (defn output-fields + [fields] + (StreamInfo. fields false)) + + ;TODO: when translating this function, you should replace the map-val with a proper for loop HERE + (defn mk-output-spec + [output-spec] + (let [output-spec (if (map? output-spec) + output-spec + {Utils/DEFAULT_STREAM_ID output-spec})] + (map-val + (fn [out] + (if (instance? StreamInfo out) + out + (StreamInfo. out false))) + output-spec))) + + (defnk mk-plain-component-common + [inputs output-spec parallelism-hint :conf nil] + (let [ret (ComponentCommon. (HashMap. inputs) (HashMap. (mk-output-spec output-spec)))] + (when parallelism-hint + (.set_parallelism_hint ret parallelism-hint)) + (when conf + (.set_json_conf ret (JSONValue/toJSONString conf))) + ret)) + + (defnk mk-spout-spec* + [spout outputs :p nil :conf nil] + (SpoutSpec. (ComponentObject/serialized_java (Utils/javaSerialize spout)) + (mk-plain-component-common {} outputs p :conf conf))) + + (defn mk-shuffle-grouping + [] + (Grouping/shuffle (NullStruct.))) + + (defn mk-local-or-shuffle-grouping + [] + (Grouping/local_or_shuffle (NullStruct.))) + + (defn mk-fields-grouping + [fields] + (Grouping/fields fields)) + + (defn mk-global-grouping + [] + (mk-fields-grouping [])) + + (defn mk-direct-grouping + [] + (Grouping/direct (NullStruct.))) + + (defn mk-all-grouping + [] + (Grouping/all (NullStruct.))) + + (defn mk-none-grouping + [] + (Grouping/none (NullStruct.))) + + (defn deserialized-component-object + [^ComponentObject obj] + (when (not= (.getSetField obj) ComponentObject$_Fields/SERIALIZED_JAVA) + (throw (RuntimeException. "Cannot deserialize non-java-serialized object"))) + (Utils/javaDeserialize (.get_serialized_java obj) Serializable)) + + (defn serialize-component-object + [obj] + (ComponentObject/serialized_java (Utils/javaSerialize obj))) + + (defn- mk-grouping + [grouping-spec] + (cond (nil? grouping-spec) + (mk-none-grouping) + + (instance? Grouping grouping-spec) + grouping-spec + + (instance? CustomStreamGrouping grouping-spec) + (Grouping/custom_serialized (Utils/javaSerialize grouping-spec)) + + (instance? JavaObject grouping-spec) + (Grouping/custom_object grouping-spec) + + (sequential? grouping-spec) + (mk-fields-grouping grouping-spec) + + (= grouping-spec :shuffle) + (mk-shuffle-grouping) + + (= grouping-spec :local-or-shuffle) + (mk-local-or-shuffle-grouping) + (= grouping-spec :none) + (mk-none-grouping) + + (= grouping-spec :all) + (mk-all-grouping) + + (= grouping-spec :global) + (mk-global-grouping) + + (= grouping-spec :direct) + (mk-direct-grouping) + + true + (throw (IllegalArgumentException. + (str grouping-spec " is not a valid grouping"))))) + + (defn- mk-inputs + [inputs] + (into {} (for [[stream-id grouping-spec] inputs] + [(if (sequential? stream-id) + (GlobalStreamId. (first stream-id) (second stream-id)) + (GlobalStreamId. stream-id Utils/DEFAULT_STREAM_ID)) + (mk-grouping grouping-spec)]))) + + (defnk mk-bolt-spec* + [inputs bolt outputs :p nil :conf nil] + (let [common (mk-plain-component-common (mk-inputs inputs) outputs p :conf conf)] + (Bolt. (ComponentObject/serialized_java (Utils/javaSerialize bolt)) + common))) + + (defnk mk-spout-spec + [spout :parallelism-hint nil :p nil :conf nil] + (let [parallelism-hint (if p p parallelism-hint)] + {:obj spout :p parallelism-hint :conf conf})) + + (defn- shell-component-params + [command script-or-output-spec kwargs] + (if (string? script-or-output-spec) + [(into-array String [command script-or-output-spec]) + (first kwargs) + (rest kwargs)] + [(into-array String command) + script-or-output-spec + kwargs])) + + (defnk mk-bolt-spec + [inputs bolt :parallelism-hint nil :p nil :conf nil] + (let [parallelism-hint (if p p parallelism-hint)] + {:obj bolt :inputs inputs :p parallelism-hint :conf conf})) + + (defn mk-shell-bolt-spec + [inputs command script-or-output-spec & kwargs] + (let [[command output-spec kwargs] + (shell-component-params command script-or-output-spec kwargs)] + (apply mk-bolt-spec inputs + (RichShellBolt. command (mk-output-spec output-spec)) kwargs))) + + (defn mk-shell-spout-spec + [command script-or-output-spec & kwargs] + (let [[command output-spec kwargs] + (shell-component-params command script-or-output-spec kwargs)] + (apply mk-spout-spec + (RichShellSpout. command (mk-output-spec output-spec)) kwargs))) + + (defn- add-inputs + [declarer inputs] + (doseq [[id grouping] (mk-inputs inputs)] + (.grouping declarer id grouping))) + + (defn mk-topology + ([spout-map bolt-map] + (let [builder (TopologyBuilder.)] + (doseq [[name {spout :obj p :p conf :conf}] spout-map] + (-> builder (.setSpout name spout (if-not (nil? p) (int p) p)) (.addConfigurations conf))) + (doseq [[name {bolt :obj p :p conf :conf inputs :inputs}] bolt-map] + (-> builder (.setBolt name bolt (if-not (nil? p) (int p) p)) (.addConfigurations conf) (add-inputs inputs))) + (.createTopology builder))) + ([spout-map bolt-map state-spout-map] + (mk-topology spout-map bolt-map))) + + ;; clojurify-structure is needed or else every element becomes the same after successive calls + ;; don't know why this happens + (def STORM-TOPOLOGY-FIELDS + (-> StormTopology/metaDataMap clojurify-structure keys)) + + (def SPOUT-FIELDS + [StormTopology$_Fields/SPOUTS + StormTopology$_Fields/STATE_SPOUTS]) + http://git-wip-us.apache.org/repos/asf/storm/blob/cc1f6d77/storm-core/src/clj/org/apache/storm/command/shell_submission.clj ---------------------------------------------------------------------- diff --cc storm-core/src/clj/org/apache/storm/command/shell_submission.clj index 870e7f6,0d29376..8d193e6 --- a/storm-core/src/clj/org/apache/storm/command/shell_submission.clj +++ b/storm-core/src/clj/org/apache/storm/command/shell_submission.clj @@@ -17,7 -17,7 +17,7 @@@ (:import [org.apache.storm StormSubmitter] [org.apache.storm.utils Utils] [org.apache.storm.zookeeper Zookeeper]) - (:use [org.apache.storm thrift util config log]) - (:use [org.apache.storm util config log zookeeper]) ++ (:use [org.apache.storm util config log]) (:require [clojure.string :as str]) (:import [org.apache.storm.utils ConfigUtils]) (:gen-class)) http://git-wip-us.apache.org/repos/asf/storm/blob/cc1f6d77/storm-core/src/clj/org/apache/storm/daemon/common.clj ---------------------------------------------------------------------- diff --cc storm-core/src/clj/org/apache/storm/daemon/common.clj index db342d2,db7fd40..99b34dc --- a/storm-core/src/clj/org/apache/storm/daemon/common.clj +++ b/storm-core/src/clj/org/apache/storm/daemon/common.clj @@@ -15,8 -15,8 +15,8 @@@ ;; limitations under the License. (ns org.apache.storm.daemon.common (:use [org.apache.storm log config util]) - (:import [org.apache.storm.generated StormTopology + (:import [org.apache.storm.generated StormTopology NodeInfo - InvalidTopologyException GlobalStreamId] + InvalidTopologyException GlobalStreamId Grouping Grouping$_Fields] [org.apache.storm.utils Utils ConfigUtils IPredicate ThriftTopologyUtils] [org.apache.storm.daemon.metrics.reporters PreparableReporter] [com.codahale.metrics MetricRegistry]) http://git-wip-us.apache.org/repos/asf/storm/blob/cc1f6d77/storm-core/src/clj/org/apache/storm/daemon/executor.clj ---------------------------------------------------------------------- diff --cc storm-core/src/clj/org/apache/storm/daemon/executor.clj index 902650c,92cc003..9ff93f8 --- a/storm-core/src/clj/org/apache/storm/daemon/executor.clj +++ b/storm-core/src/clj/org/apache/storm/daemon/executor.clj @@@ -39,8 -39,9 +39,9 @@@ (:import [java.lang Thread Thread$UncaughtExceptionHandler] [java.util.concurrent ConcurrentLinkedQueue] [org.json.simple JSONValue] - [com.lmax.disruptor.dsl ProducerType]) - (:require [org.apache.storm [thrift :as thrift] [stats :as stats]]) + [com.lmax.disruptor.dsl ProducerType] + [org.apache.storm StormTimer]) - (:require [org.apache.storm [cluster :as cluster] [stats :as stats]]) ++ (:require [org.apache.storm [stats :as stats]]) (:require [org.apache.storm.daemon [task :as task]]) (:require [org.apache.storm.daemon.builtin-metrics :as builtin-metrics]) (:require [clojure.set :as set])) http://git-wip-us.apache.org/repos/asf/storm/blob/cc1f6d77/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj ---------------------------------------------------------------------- diff --cc storm-core/src/clj/org/apache/storm/daemon/nimbus.clj index beb6639,28a6fb8..7d28075 --- a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj +++ b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj @@@ -46,13 -46,14 +46,13 @@@ KillOptions RebalanceOptions ClusterSummary SupervisorSummary TopologySummary TopologyInfo TopologyHistoryInfo ExecutorSummary AuthorizationException GetInfoOptions NumErrorsChoice SettableBlobMeta ReadableBlobMeta BeginDownloadResult ListBlobsResult ComponentPageInfo TopologyPageInfo LogConfig LogLevel LogLevelAction - ProfileRequest ProfileAction NodeInfo]) + ProfileRequest ProfileAction NodeInfo LSTopoHistory]) (:import [org.apache.storm.daemon Shutdownable]) (:import [org.apache.storm.validation ConfigValidation]) - (:import [org.apache.storm.cluster ClusterStateContext DaemonType]) - (:use [org.apache.storm util config log zookeeper]) - (:require [org.apache.storm [cluster :as cluster] - [converter :as converter] - [stats :as stats]]) + (:import [org.apache.storm.cluster ClusterStateContext DaemonType StormClusterStateImpl ClusterUtils]) - (:use [org.apache.storm util config log timer local-state converter]) ++ (:use [org.apache.storm util config log converter]) + (:require [org.apache.storm [converter :as converter] - [stats :as stats]]) ++ [stats :as stats]]) (:require [clojure.set :as set]) (:import [org.apache.storm.daemon.common StormBase Assignment]) (:import [org.apache.storm.zookeeper Zookeeper]) @@@ -270,17 -279,17 +274,17 @@@ {:status {:type :rebalancing} :prev-status status :topology-action-options (-> {:delay-secs delay :action :rebalance} -- (assoc-non-nil :num-workers num-workers) -- (assoc-non-nil :component->executors executor-overrides)) ++ (converter/assoc-non-nil :num-workers num-workers) ++ (converter/assoc-non-nil :component->executors executor-overrides)) }))) (defn do-rebalance [nimbus storm-id status storm-base] (let [rebalance-options (:topology-action-options storm-base)] - (.update-storm! (:storm-cluster-state nimbus) + (.updateStorm (:storm-cluster-state nimbus) storm-id - (thriftify-storm-base (-> {:topology-action-options nil} - (-> {:topology-action-options nil} -- (assoc-non-nil :component->executors (:component->executors rebalance-options)) - (assoc-non-nil :num-workers (:num-workers rebalance-options)))))) - (assoc-non-nil :num-workers (:num-workers rebalance-options))))) ++ (converter/thriftify-storm-base (-> {:topology-action-options nil} ++ (converter/assoc-non-nil :component->executors (:component->executors rebalance-options)) ++ (converter/assoc-non-nil :num-workers (:num-workers rebalance-options)))))) (mk-assignments nimbus :scratch-topology-id storm-id)) (defn state-transitions [nimbus storm-id status storm-base] @@@ -363,7 -372,7 +367,7 @@@ storm-base-updates)] (when storm-base-updates - (.updateStorm (:storm-cluster-state nimbus) storm-id (thriftify-storm-base storm-base-updates)))))) - (.update-storm! (:storm-cluster-state nimbus) storm-id storm-base-updates))))) ++ (.updateStorm (:storm-cluster-state nimbus) storm-id (converter/thriftify-storm-base storm-base-updates)))))) ))) (defn transition-name! [nimbus storm-name event & args] @@@ -1002,9 -1005,9 +1005,9 @@@ topology (system-topology! storm-conf (read-storm-topology storm-id blob-store)) num-executors (->> (all-components topology) (map-val num-start-executors))] (log-message "Activating " storm-name ": " storm-id) - (.activate-storm! storm-cluster-state + (.activateStorm storm-cluster-state storm-id - (thriftify-storm-base (StormBase. storm-name - (StormBase. storm-name ++ (converter/thriftify-storm-base (StormBase. storm-name (Time/currentTimeSecs) {:type topology-initial-status} (storm-conf TOPOLOGY-WORKERS) @@@ -1448,41 -1446,39 +1454,39 @@@ (setup-blobstore nimbus)) (when (is-leader nimbus :throw-exception false) - (doseq [storm-id (.active-storms (:storm-cluster-state nimbus))] + (doseq [storm-id (.activeStorms (:storm-cluster-state nimbus))] (transition! nimbus storm-id :startup))) - (schedule-recurring (:timer nimbus) - 0 - (conf NIMBUS-MONITOR-FREQ-SECS) - (fn [] - (when-not (conf ConfigUtils/NIMBUS_DO_NOT_REASSIGN) - (locking (:submit-lock nimbus) - (mk-assignments nimbus))) - (do-cleanup nimbus))) + + (.scheduleRecurring (:timer nimbus) + 0 + (conf NIMBUS-MONITOR-FREQ-SECS) + (fn [] + (when-not (conf ConfigUtils/NIMBUS_DO_NOT_REASSIGN) + (locking (:submit-lock nimbus) + (mk-assignments nimbus))) + (do-cleanup nimbus))) ;; Schedule Nimbus inbox cleaner - (schedule-recurring (:timer nimbus) - 0 - (conf NIMBUS-CLEANUP-INBOX-FREQ-SECS) - (fn [] - (clean-inbox (inbox nimbus) (conf NIMBUS-INBOX-JAR-EXPIRATION-SECS)))) + (.scheduleRecurring (:timer nimbus) + 0 + (conf NIMBUS-CLEANUP-INBOX-FREQ-SECS) + (fn [] (clean-inbox (inbox nimbus) (conf NIMBUS-INBOX-JAR-EXPIRATION-SECS)))) ;; Schedule nimbus code sync thread to sync code from other nimbuses. (if (instance? LocalFsBlobStore blob-store) - (schedule-recurring (:timer nimbus) - 0 - (conf NIMBUS-CODE-SYNC-FREQ-SECS) - (fn [] - (blob-sync conf nimbus)))) + (.scheduleRecurring (:timer nimbus) + 0 + (conf NIMBUS-CODE-SYNC-FREQ-SECS) + (fn [] (blob-sync conf nimbus)))) ;; Schedule topology history cleaner (when-let [interval (conf LOGVIEWER-CLEANUP-INTERVAL-SECS)] - (schedule-recurring (:timer nimbus) + (.scheduleRecurring (:timer nimbus) 0 (conf LOGVIEWER-CLEANUP-INTERVAL-SECS) - (fn [] - (clean-topology-history (conf LOGVIEWER-CLEANUP-AGE-MINS) nimbus)))) - (schedule-recurring (:timer nimbus) - 0 - (conf NIMBUS-CREDENTIAL-RENEW-FREQ-SECS) - (fn [] - (renew-credentials nimbus))) + (fn [] (clean-topology-history (conf LOGVIEWER-CLEANUP-AGE-MINS) nimbus)))) + (.scheduleRecurring (:timer nimbus) + 0 + (conf NIMBUS-CREDENTIAL-RENEW-FREQ-SECS) + (fn [] + (renew-credentials nimbus))) (defgauge nimbus:num-supervisors (fn [] (.size (.supervisors (:storm-cluster-state nimbus) nil)))) @@@ -1650,7 -1646,7 +1654,7 @@@ (log-message "Nimbus setting debug to " enable? " for storm-name '" storm-name "' storm-id '" storm-id "' sampling pct '" spct "'" (if (not (clojure.string/blank? component-id)) (str " component-id '" component-id "'"))) (locking (:submit-lock nimbus) - (.updateStorm storm-cluster-state storm-id (thriftify-storm-base storm-base-updates))))) - (.update-storm! storm-cluster-state storm-id storm-base-updates)))) ++ (.updateStorm storm-cluster-state storm-id (converter/thriftify-storm-base storm-base-updates))))) (^void setWorkerProfiler [this ^String id ^ProfileRequest profileRequest] http://git-wip-us.apache.org/repos/asf/storm/blob/cc1f6d77/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj ---------------------------------------------------------------------- diff --cc storm-core/src/clj/org/apache/storm/daemon/supervisor.clj index 58f6291,fad5b1a..f429d09 --- a/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj +++ b/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj @@@ -24,17 -24,17 +24,17 @@@ [java.net JarURLConnection] [java.net URI URLDecoder] [org.apache.commons.io FileUtils]) - (:use [org.apache.storm config util log timer local-state converter]) - (:use [org.apache.storm config util log local-state-converter]) ++ (:use [org.apache.storm config util log converter local-state-converter]) (:import [org.apache.storm.generated AuthorizationException KeyNotFoundException WorkerResources]) (:import [org.apache.storm.utils NimbusLeaderNotFoundException VersionInfo]) (:import [java.nio.file Files StandardCopyOption]) (:import [org.apache.storm Config]) - (:import [org.apache.storm.generated WorkerResources ProfileAction]) + (:import [org.apache.storm.generated WorkerResources ProfileAction LocalAssignment]) (:import [org.apache.storm.localizer LocalResource]) (:use [org.apache.storm.daemon common]) - (:require [org.apache.storm.command [healthcheck :as healthcheck]]) + (:import [org.apache.storm.command HealthCheck]) (:require [org.apache.storm.daemon [worker :as worker]] - [org.apache.storm [process-simulator :as psim] [cluster :as cluster] [event :as event]] + [org.apache.storm [process-simulator :as psim] [event :as event]] [clojure.set :as set]) (:import [org.apache.thrift.transport TTransportException]) (:import [org.apache.zookeeper data.ACL ZooDefs$Ids ZooDefs$Perms]) http://git-wip-us.apache.org/repos/asf/storm/blob/cc1f6d77/storm-core/src/clj/org/apache/storm/daemon/worker.clj ---------------------------------------------------------------------- diff --cc storm-core/src/clj/org/apache/storm/daemon/worker.clj index 395be23,8f9becd..a75dc35 --- a/storm-core/src/clj/org/apache/storm/daemon/worker.clj +++ b/storm-core/src/clj/org/apache/storm/daemon/worker.clj @@@ -15,7 -15,7 +15,7 @@@ ;; limitations under the License. (ns org.apache.storm.daemon.worker (:use [org.apache.storm.daemon common]) - (:use [org.apache.storm config log util timer local-state converter]) - (:use [org.apache.storm config log util local-state-converter]) ++ (:use [org.apache.storm config log util converter local-state-converter]) (:require [clj-time.core :as time]) (:require [clj-time.coerce :as coerce]) (:require [org.apache.storm.daemon [executor :as executor]]) @@@ -238,13 -243,14 +243,14 @@@ {}) (defn mk-halting-timer [timer-name] - (mk-timer :kill-fn (fn [t] - (log-error t "Error when processing event") - (Utils/exitProcess 20 "Error when processing an event") - ) - :timer-name timer-name)) + (StormTimer. timer-name + (reify Thread$UncaughtExceptionHandler + (^void uncaughtException + [this ^Thread t ^Throwable e] + (log-error e "Error when processing event") + (Utils/exitProcess 20 "Error when processing an event"))))) -(defn worker-data [conf mq-context storm-id assignment-id port worker-id storm-conf cluster-state storm-cluster-state] +(defn worker-data [conf mq-context storm-id assignment-id port worker-id storm-conf state-store storm-cluster-state] (let [assignment-versions (atom {}) executors (set (read-worker-executors storm-conf storm-cluster-state storm-id assignment-id port assignment-versions)) transfer-queue (DisruptorQueue. "worker-transfer-queue" @@@ -374,15 -380,16 +380,17 @@@ conf (:conf worker) storm-cluster-state (:storm-cluster-state worker) storm-id (:storm-id worker)] - (fn this + (fn refresh-connections ([] - (this (fn [& ignored] (schedule (:refresh-connections-timer worker) 0 this)))) + (refresh-connections (fn [& ignored] + (.schedule + (:refresh-connections-timer worker) 0 refresh-connections)))) ([callback] - (let [version (.assignment-version storm-cluster-state storm-id callback) + (let [version (.assignmentVersion storm-cluster-state storm-id callback) assignment (if (= version (:version (get @(:assignment-versions worker) storm-id))) (:data (get @(:assignment-versions worker) storm-id)) - (let [new-assignment (.assignment-info-with-version storm-cluster-state storm-id callback)] + (let [thriftify-assignment-version (.assignmentInfoWithVersion storm-cluster-state storm-id callback) + new-assignment {:data (clojurify-assignment (.get thriftify-assignment-version (IStateStorage/DATA))) :version version}] (swap! (:assignment-versions worker) assoc storm-id new-assignment) (:data new-assignment))) my-assignment (-> assignment @@@ -428,9 -435,12 +436,12 @@@ (defn refresh-storm-active ([worker] - (refresh-storm-active worker (fn [& ignored] (schedule (:refresh-active-timer worker) 0 (partial refresh-storm-active worker))))) + (refresh-storm-active + worker (fn [& ignored] + (.schedule + (:refresh-active-timer worker) 0 (partial refresh-storm-active worker))))) ([worker callback] - (let [base (.storm-base (:storm-cluster-state worker) (:storm-id worker) callback)] + (let [base (clojurify-storm-base (.stormBase (:storm-cluster-state worker) (:storm-id worker) callback))] (reset! (:storm-active-atom worker) (and (= :active (-> base :status :type)) @(:worker-active-flag worker))) @@@ -757,22 -768,28 +769,28 @@@ (log-message "Started with log levels: " @original-log-levels) (defn establish-log-setting-callback [] - (.topology-log-config (:storm-cluster-state worker) storm-id (fn [args] (check-log-config-changed)))) + (.topologyLogConfig (:storm-cluster-state worker) storm-id (fn [args] (check-log-config-changed)))) (establish-log-setting-callback) - (.credentials (:storm-cluster-state worker) storm-id (fn [args] (check-credentials-changed))) + (clojurify-crdentials (.credentials (:storm-cluster-state worker) storm-id (fn [] (check-credentials-changed)))) - (schedule-recurring (:refresh-credentials-timer worker) 0 (conf TASK-CREDENTIALS-POLL-SECS) - (fn [& args] - (check-credentials-changed) - (if ((:storm-conf worker) TOPOLOGY-BACKPRESSURE-ENABLE) - (check-throttle-changed)))) + + (.scheduleRecurring + (:refresh-credentials-timer worker) 0 (conf TASK-CREDENTIALS-POLL-SECS) + (fn [] + (check-credentials-changed) + (if ((:storm-conf worker) TOPOLOGY-BACKPRESSURE-ENABLE) + (check-throttle-changed)))) ;; The jitter allows the clients to get the data at different times, and avoids thundering herd (when-not (.get conf TOPOLOGY-DISABLE-LOADAWARE-MESSAGING) - (schedule-recurring-with-jitter (:refresh-load-timer worker) 0 1 500 refresh-load)) - (schedule-recurring (:refresh-connections-timer worker) 0 (conf TASK-REFRESH-POLL-SECS) refresh-connections) - (schedule-recurring (:reset-log-levels-timer worker) 0 (conf WORKER-LOG-LEVEL-RESET-POLL-SECS) (fn [] (reset-log-levels latest-log-config))) - (schedule-recurring (:refresh-active-timer worker) 0 (conf TASK-REFRESH-POLL-SECS) (partial refresh-storm-active worker)) - + (.scheduleRecurringWithJitter + (:refresh-load-timer worker) 0 1 500 refresh-load)) + (.scheduleRecurring + (:refresh-connections-timer worker) 0 (conf TASK-REFRESH-POLL-SECS) refresh-connections) + (.scheduleRecurring + (:reset-log-levels-timer worker) 0 (conf WORKER-LOG-LEVEL-RESET-POLL-SECS) + (fn [] (reset-log-levels latest-log-config))) + (.scheduleRecurring + (:refresh-active-timer worker) 0 (conf TASK-REFRESH-POLL-SECS) (partial refresh-storm-active worker)) (log-message "Worker has topology config " (Utils/redactValue (:storm-conf worker) STORM-ZOOKEEPER-TOPOLOGY-AUTH-PAYLOAD)) (log-message "Worker " worker-id " for storm " storm-id " on " assignment-id ":" port " has finished loading") ret http://git-wip-us.apache.org/repos/asf/storm/blob/cc1f6d77/storm-core/src/clj/org/apache/storm/internal/thrift.clj ---------------------------------------------------------------------- diff --cc storm-core/src/clj/org/apache/storm/internal/thrift.clj index 0000000,4ccf8a7..07e0bdf mode 000000,100644..100644 --- a/storm-core/src/clj/org/apache/storm/internal/thrift.clj +++ b/storm-core/src/clj/org/apache/storm/internal/thrift.clj @@@ -1,0 -1,96 +1,96 @@@ + ;; Licensed to the Apache Software Foundation (ASF) under one + ;; or more contributor license agreements. See the NOTICE file + ;; distributed with this work for additional information + ;; regarding copyright ownership. The ASF licenses this file + ;; to you under the Apache License, Version 2.0 (the + ;; "License"); you may not use this file except in compliance + ;; with the License. You may obtain a copy of the License at + ;; + ;; http://www.apache.org/licenses/LICENSE-2.0 + ;; + ;; Unless required by applicable law or agreed to in writing, software + ;; distributed under the License is distributed on an "AS IS" BASIS, + ;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ;; See the License for the specific language governing permissions and + ;; limitations under the License. + + (ns org.apache.storm.internal.thrift + (:import [java.util HashMap] + [java.io Serializable] + [org.apache.storm.generated NodeInfo Assignment]) + (:import [org.apache.storm.generated JavaObject Grouping Nimbus StormTopology + StormTopology$_Fields Bolt Nimbus$Client Nimbus$Iface + ComponentCommon Grouping$_Fields SpoutSpec NullStruct StreamInfo + GlobalStreamId ComponentObject ComponentObject$_Fields + ShellComponent SupervisorInfo]) + (:import [org.apache.storm.utils Utils NimbusClient ConfigUtils]) + (:import [org.apache.storm Constants]) + (:import [org.apache.storm.security.auth ReqContext]) + (:import [org.apache.storm.grouping CustomStreamGrouping]) + (:import [org.apache.storm.topology TopologyBuilder]) + (:import [org.apache.storm.clojure RichShellBolt RichShellSpout]) + (:import [org.apache.thrift.transport TTransport]) - (:use [org.apache.storm util config log zookeeper])) ++ (:use [org.apache.storm util config log])) + + ;; Leaving this definition as core.clj is using them as a nested keyword argument + ;; Must remove once core.clj is ported to java + (def grouping-constants + {Grouping$_Fields/FIELDS :fields + Grouping$_Fields/SHUFFLE :shuffle + Grouping$_Fields/ALL :all + Grouping$_Fields/NONE :none + Grouping$_Fields/CUSTOM_SERIALIZED :custom-serialized + Grouping$_Fields/CUSTOM_OBJECT :custom-object + Grouping$_Fields/DIRECT :direct + Grouping$_Fields/LOCAL_OR_SHUFFLE :local-or-shuffle}) + + ;; Leaving this method as core.clj is using them as a nested keyword argument + ;; Must remove once core.clj is ported to java + (defn grouping-type + [^Grouping grouping] + (grouping-constants (.getSetField grouping))) + + (defn nimbus-client-and-conn + ([host port] + (nimbus-client-and-conn host port nil)) + ([host port as-user] + (log-message "Connecting to Nimbus at " host ":" port " as user: " as-user) + (let [conf (clojurify-structure (ConfigUtils/readStormConfig)) + nimbusClient (NimbusClient. conf host port nil as-user) + client (.getClient nimbusClient) + transport (.transport nimbusClient)] + [client transport] ))) + + (defmacro with-nimbus-connection + [[client-sym host port] & body] + `(let [[^Nimbus$Client ~client-sym ^TTransport conn#] (nimbus-client-and-conn ~host ~port)] + (try + ~@body + (finally (.close conn#))))) + + (defmacro with-configured-nimbus-connection + [client-sym & body] + `(let [conf# (clojurify-structure (ConfigUtils/readStormConfig)) + context# (ReqContext/context) + user# (if (.principal context#) (.getName (.principal context#))) + nimbusClient# (NimbusClient/getConfiguredClientAs conf# user#) + ~client-sym (.getClient nimbusClient#) + conn# (.transport nimbusClient#) + ] + (try + ~@body + (finally (.close conn#))))) + + ;; Leaving this definition as core.clj is using them as a nested keyword argument + ;; Must remove once core.clj is ported to java + (defn mk-output-spec + [output-spec] + (let [output-spec (if (map? output-spec) + output-spec + {Utils/DEFAULT_STREAM_ID output-spec})] + (map-val + (fn [out] + (if (instance? StreamInfo out) + out + (StreamInfo. out false))) + output-spec))) http://git-wip-us.apache.org/repos/asf/storm/blob/cc1f6d77/storm-core/src/clj/org/apache/storm/testing.clj ---------------------------------------------------------------------- diff --cc storm-core/src/clj/org/apache/storm/testing.clj index 3dee54b,7817929..8f313cc --- a/storm-core/src/clj/org/apache/storm/testing.clj +++ b/storm-core/src/clj/org/apache/storm/testing.clj @@@ -48,9 -49,10 +49,10 @@@ (:import [org.apache.storm.task TopologyContext] (org.apache.storm.messaging IContext) [org.json.simple JSONValue]) - (:require [org.apache.storm [zookeeper :as zk]]) + (:import [org.apache.storm.cluster ZKStateStorage ClusterStateContext StormClusterStateImpl ClusterUtils]) (:require [org.apache.storm.daemon.acker :as acker]) - (:use [org.apache.storm util thrift config log local-state converter])) - (:use [org.apache.storm cluster util config log local-state-converter]) ++ (:use [org.apache.storm util config log local-state-converter converter]) + (:use [org.apache.storm.internal thrift])) (defn feeder-spout [fields] http://git-wip-us.apache.org/repos/asf/storm/blob/cc1f6d77/storm-core/src/clj/org/apache/storm/ui/core.clj ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/cc1f6d77/storm-core/test/clj/integration/org/apache/storm/integration_test.clj ---------------------------------------------------------------------- diff --cc storm-core/test/clj/integration/org/apache/storm/integration_test.clj index 1bb8279,6dce7d6..697bdae --- a/storm-core/test/clj/integration/org/apache/storm/integration_test.clj +++ b/storm-core/test/clj/integration/org/apache/storm/integration_test.clj @@@ -21,10 -21,11 +21,12 @@@ (:import [org.apache.storm.testing TestWordCounter TestWordSpout TestGlobalCount TestAggregatesCounter TestConfBolt AckFailMapTracker AckTracker TestPlannerSpout]) (:import [org.apache.storm.tuple Fields]) - (:use [org.apache.storm testing config util]) + (:import [org.apache.storm.cluster StormClusterStateImpl]) - (:use [org.apache.storm testing config clojure util converter]) + (:use [org.apache.storm.internal clojure]) ++ (:use [org.apache.storm testing config util]) (:use [org.apache.storm.daemon common]) - (:require [org.apache.storm [thrift :as thrift]])) + (:import [org.apache.storm Thrift]) + (:import [org.apache.storm.utils Utils])) (deftest test-basic-topology (doseq [zmq-on? [true false]] http://git-wip-us.apache.org/repos/asf/storm/blob/cc1f6d77/storm-core/test/clj/org/apache/storm/cluster_test.clj ---------------------------------------------------------------------- diff --cc storm-core/test/clj/org/apache/storm/cluster_test.clj index 22c1f80,18e3a80..13198fa --- a/storm-core/test/clj/org/apache/storm/cluster_test.clj +++ b/storm-core/test/clj/org/apache/storm/cluster_test.clj @@@ -30,7 -30,8 +30,8 @@@ (:require [conjure.core]) (:use [conjure core]) (:use [clojure test]) - (:use [org.apache.storm config util testing thrift log converter])) - (:use [org.apache.storm cluster config util testing log]) ++ (:use [org.apache.storm config util testing log converter]) + (:use [org.apache.storm.internal thrift])) (defn mk-config [zk-port] (merge (clojurify-structure (ConfigUtils/readStormConfig)) http://git-wip-us.apache.org/repos/asf/storm/blob/cc1f6d77/storm-core/test/clj/org/apache/storm/messaging/netty_integration_test.clj ---------------------------------------------------------------------- diff --cc storm-core/test/clj/org/apache/storm/messaging/netty_integration_test.clj index f75a8e3,7fffd34..6a3d3ca --- a/storm-core/test/clj/org/apache/storm/messaging/netty_integration_test.clj +++ b/storm-core/test/clj/org/apache/storm/messaging/netty_integration_test.clj @@@ -1,3 -1,3 +1,4 @@@ ++ ;; Licensed to the Apache Software Foundation (ASF) under one ;; or more contributor license agreements. See the NOTICE file ;; distributed with this work for additional information http://git-wip-us.apache.org/repos/asf/storm/blob/cc1f6d77/storm-core/test/clj/org/apache/storm/nimbus_test.clj ---------------------------------------------------------------------- diff --cc storm-core/test/clj/org/apache/storm/nimbus_test.clj index 09c4371,ce58f42..3670fd1 --- a/storm-core/test/clj/org/apache/storm/nimbus_test.clj +++ b/storm-core/test/clj/org/apache/storm/nimbus_test.clj @@@ -20,13 -20,12 +20,15 @@@ (:require [org.apache.storm [converter :as converter]]) (:import [org.apache.storm.testing TestWordCounter TestWordSpout TestGlobalCount TestAggregatesCounter TestPlannerSpout TestPlannerBolt] - [org.apache.storm.nimbus InMemoryTopologyActionNotifier]) + [org.apache.storm.nimbus InMemoryTopologyActionNotifier] + [org.apache.storm.generated GlobalStreamId] + [org.apache.storm Thrift]) (:import [org.apache.storm.testing.staticmocking MockedZookeeper]) (:import [org.apache.storm.scheduler INimbus]) + (:import [org.mockito Mockito]) + (:import [org.mockito.exceptions.base MockitoAssertionError]) (:import [org.apache.storm.nimbus ILeaderElector NimbusInfo]) + (:import [org.apache.storm.testing.staticmocking MockedCluster]) (:import [org.apache.storm.generated Credentials NotAliveException SubmitOptions TopologyInitialStatus TopologyStatus AlreadyAliveException KillOptions RebalanceOptions InvalidTopologyException AuthorizationException @@@ -38,12 -37,10 +40,11 @@@ (:import [org.apache.storm.zookeeper Zookeeper]) (:import [org.apache.commons.io FileUtils] [org.json.simple JSONValue]) - (:use [org.apache.storm testing MockAutoCred util config log zookeeper]) + (:import [org.apache.storm.cluster StormClusterStateImpl ClusterStateContext ClusterUtils]) - (:use [org.apache.storm testing MockAutoCred util config log timer converter]) ++ (:use [org.apache.storm testing MockAutoCred util config log converter]) (:use [org.apache.storm.daemon common]) (:require [conjure.core]) - (:require [org.apache.storm - [thrift :as thrift]]) - (:require [org.apache.storm [cluster :as cluster]]) ++ (:use [conjure core])) (defn- from-json @@@ -1081,10 -1145,11 +1149,11 @@@ STORM-CLUSTER-MODE "local" STORM-ZOOKEEPER-PORT zk-port STORM-LOCAL-DIR nimbus-dir})) - (bind cluster-state (cluster/mk-storm-cluster-state conf)) + (bind cluster-state (ClusterUtils/mkStormClusterState conf nil (ClusterStateContext.))) (bind nimbus (nimbus/service-handler conf (nimbus/standalone-nimbus))) - (bind topology (thrift/mk-topology - {"1" (thrift/mk-spout-spec (TestPlannerSpout. true) :parallelism-hint 3)} + (bind topology (Thrift/buildTopology + {"1" (Thrift/prepareSpoutDetails + (TestPlannerSpout. true) (Integer. 3))} {})) (submit-local-topology nimbus "t1" {} topology) (submit-local-topology nimbus "t2" {} topology) @@@ -1153,10 -1218,11 +1222,11 @@@ STORM-CLUSTER-MODE "local" STORM-ZOOKEEPER-PORT zk-port STORM-LOCAL-DIR nimbus-dir})) - (bind cluster-state (cluster/mk-storm-cluster-state conf)) + (bind cluster-state (ClusterUtils/mkStormClusterState conf nil (ClusterStateContext.))) (bind nimbus (nimbus/service-handler conf (nimbus/standalone-nimbus))) - (bind topology (thrift/mk-topology - {"1" (thrift/mk-spout-spec (TestPlannerSpout. true) :parallelism-hint 3)} + (bind topology (Thrift/buildTopology + {"1" (Thrift/prepareSpoutDetails + (TestPlannerSpout. true) (Integer. 3))} {})) (with-open [_ (MockedZookeeper. (proxy [Zookeeper] [] @@@ -1412,17 -1477,17 +1482,16 @@@ (with-open [_ (ConfigUtilsInstaller. fake-cu) _ (UtilsInstaller. fake-utils) zk-le (MockedZookeeper. (proxy [Zookeeper] [] - (zkLeaderElectorImpl [conf] nil)))] + (zkLeaderElectorImpl [conf] nil))) + mocked-cluster (MockedCluster. cluster-utils)] (stubbing [mk-authorization-handler nil - cluster/mk-storm-cluster-state nil - nimbus/file-cache-map nil - nimbus/mk-blob-cache-map nil - nimbus/mk-bloblist-cache-map nil - nimbus/mk-scheduler nil] - (nimbus/nimbus-data auth-conf fake-inimbus) - (verify-call-times-for cluster/mk-storm-cluster-state 1) - (verify-first-call-args-for-indices cluster/mk-storm-cluster-state [2] - expected-acls)))))) + nimbus/file-cache-map nil + nimbus/mk-blob-cache-map nil + nimbus/mk-bloblist-cache-map nil - mk-timer nil + nimbus/mk-scheduler nil] + (nimbus/nimbus-data auth-conf fake-inimbus) + (.mkStormClusterStateImpl (Mockito/verify cluster-utils (Mockito/times 1)) (Mockito/any) (Mockito/eq expected-acls) (Mockito/any)) + ))))) (deftest test-file-bogus-download (with-local-cluster [cluster :daemon-conf {SUPERVISOR-ENABLE false TOPOLOGY-ACKER-EXECUTORS 0 TOPOLOGY-EVENTLOGGER-EXECUTORS 0}] @@@ -1453,11 -1518,12 +1522,12 @@@ STORM-CLUSTER-MODE "local" STORM-ZOOKEEPER-PORT zk-port STORM-LOCAL-DIR nimbus-dir})) - (bind cluster-state (cluster/mk-storm-cluster-state conf)) + (bind cluster-state (ClusterUtils/mkStormClusterState conf nil (ClusterStateContext.))) (bind nimbus (nimbus/service-handler conf (nimbus/standalone-nimbus))) (Time/sleepSecs 1) - (bind topology (thrift/mk-topology - {"1" (thrift/mk-spout-spec (TestPlannerSpout. true) :parallelism-hint 3)} + (bind topology (Thrift/buildTopology + {"1" (Thrift/prepareSpoutDetails + (TestPlannerSpout. true) (Integer. 3))} {})) (submit-local-topology nimbus "t1" {TOPOLOGY-MESSAGE-TIMEOUT-SECS 30} topology) ; make transition for topology t1 to be killed -> nimbus applies this event to cluster state http://git-wip-us.apache.org/repos/asf/storm/blob/cc1f6d77/storm-core/test/clj/org/apache/storm/supervisor_test.clj ---------------------------------------------------------------------- diff --cc storm-core/test/clj/org/apache/storm/supervisor_test.clj index 3ebdbcd,ef40c4a..cdd66e4 --- a/storm-core/test/clj/org/apache/storm/supervisor_test.clj +++ b/storm-core/test/clj/org/apache/storm/supervisor_test.clj @@@ -22,22 -22,21 +22,21 @@@ (:import [org.apache.storm.testing TestWordCounter TestWordSpout TestGlobalCount TestAggregatesCounter TestPlannerSpout]) (:import [org.apache.storm.scheduler ISupervisor]) (:import [org.apache.storm.utils Time Utils$UptimeComputer ConfigUtils]) - (:import [org.apache.storm.generated RebalanceOptions]) + (:import [org.apache.storm.generated RebalanceOptions WorkerResources]) - (:import [org.mockito Matchers Mockito]) + (:import [org.apache.storm.testing.staticmocking MockedCluster]) (:import [java.util UUID]) ++ (:import [org.apache.storm Thrift]) + (:import [org.mockito Mockito Matchers]) + (:import [org.mockito.exceptions.base MockitoAssertionError]) (:import [java.io File]) (:import [java.nio.file Files]) - (:import [org.apache.storm.utils Utils IPredicate] - [org.apache.storm.utils.staticmocking ConfigUtilsInstaller - UtilsInstaller]) + (:import [org.apache.storm.utils Utils IPredicate]) + (:import [org.apache.storm.cluster StormClusterStateImpl ClusterStateContext ClusterUtils] - [org.apache.storm.utils.staticmocking ConfigUtilsInstaller - UtilsInstaller]) ++ [org.apache.storm.utils.staticmocking ConfigUtilsInstaller UtilsInstaller]) (:import [java.nio.file.attribute FileAttribute]) - (:use [org.apache.storm config testing util timer log converter]) - (:import [org.apache.storm Thrift]) - (:import [org.apache.storm.utils Utils]) - (:use [org.apache.storm config testing util log]) ++ (:use [org.apache.storm config testing util log converter]) (:use [org.apache.storm.daemon common]) -- (:require [org.apache.storm.daemon [worker :as worker] [supervisor :as supervisor]] - [org.apache.storm [thrift :as thrift]]) - [org.apache.storm [cluster :as cluster]]) ++ (:require [org.apache.storm.daemon [worker :as worker] [supervisor :as supervisor]]) (:use [conjure core]) (:require [clojure.java.io :as io])) @@@ -770,66 -772,68 +771,68 @@@ childopts-with-ids (supervisor/substitute-childopts childopts worker-id topology-id port mem-onheap)] (is (= expected-childopts childopts-with-ids))))) -(deftest test-retry-read-assignments - (with-simulated-time-local-cluster [cluster - :supervisors 0 - :ports-per-supervisor 2 - :daemon-conf {ConfigUtils/NIMBUS_DO_NOT_REASSIGN true - NIMBUS-MONITOR-FREQ-SECS 10 - TOPOLOGY-MESSAGE-TIMEOUT-SECS 30 - TOPOLOGY-ACKER-EXECUTORS 0}] - (letlocals - (bind sup1 (add-supervisor cluster :id "sup1" :ports [1 2 3 4])) - (bind topology1 (Thrift/buildTopology - {"1" (Thrift/prepareSpoutDetails - (TestPlannerSpout. true) (Integer. 2))} - {})) - (bind topology2 (Thrift/buildTopology - {"1" (Thrift/prepareSpoutDetails - (TestPlannerSpout. true) (Integer. 2))} - {})) - (bind state (:storm-cluster-state cluster)) - (bind changed (capture-changed-workers - (submit-mocked-assignment - (:nimbus cluster) - (:storm-cluster-state cluster) - "topology1" - {TOPOLOGY-WORKERS 2} - topology1 - {1 "1" - 2 "1"} - {[1 1] ["sup1" 1] - [2 2] ["sup1" 2]} - {["sup1" 1] [0.0 0.0 0.0] - ["sup1" 2] [0.0 0.0 0.0] - }) - (submit-mocked-assignment - (:nimbus cluster) - (:storm-cluster-state cluster) - "topology2" - {TOPOLOGY-WORKERS 2} - topology2 - {1 "1" - 2 "1"} - {[1 1] ["sup1" 1] - [2 2] ["sup1" 2]} - {["sup1" 1] [0.0 0.0 0.0] - ["sup1" 2] [0.0 0.0 0.0] - }) - ;; Instead of sleeping until topology is scheduled, rebalance topology so mk-assignments is called. - (.rebalance (:nimbus cluster) "topology1" (doto (RebalanceOptions.) (.set_wait_secs 0))) - )) - (is (empty? (:launched changed))) - (bind options (RebalanceOptions.)) - (.set_wait_secs options 0) - (bind changed (capture-changed-workers - (.rebalance (:nimbus cluster) "topology2" options) - (advance-cluster-time cluster 10) - (heartbeat-workers cluster "sup1" [1 2 3 4]) - (advance-cluster-time cluster 10) - )) - (validate-launched-once (:launched changed) - {"sup1" [1 2]} - (get-storm-id (:storm-cluster-state cluster) "topology1")) - (validate-launched-once (:launched changed) - {"sup1" [3 4]} - (get-storm-id (:storm-cluster-state cluster) "topology2")) - ))) + (deftest test-retry-read-assignments + (with-simulated-time-local-cluster [cluster + :supervisors 0 + :ports-per-supervisor 2 + :daemon-conf {ConfigUtils/NIMBUS_DO_NOT_REASSIGN true + NIMBUS-MONITOR-FREQ-SECS 10 + TOPOLOGY-MESSAGE-TIMEOUT-SECS 30 + TOPOLOGY-ACKER-EXECUTORS 0}] + (letlocals + (bind sup1 (add-supervisor cluster :id "sup1" :ports [1 2 3 4])) - (bind topology1 (thrift/mk-topology - {"1" (thrift/mk-spout-spec (TestPlannerSpout. true) :parallelism-hint 2)} ++ (bind topology1 (Thrift/buildTopology ++ {"1" (Thrift/prepareSpoutDetails ++ (TestPlannerSpout. true) (Integer. 2))} + {})) - (bind topology2 (thrift/mk-topology - {"1" (thrift/mk-spout-spec (TestPlannerSpout. true) :parallelism-hint 2)} ++ (bind topology2 (Thrift/buildTopology ++ {"1" (Thrift/prepareSpoutDetails ++ (TestPlannerSpout. true) (Integer. 2))} + {})) + (bind state (:storm-cluster-state cluster)) + (bind changed (capture-changed-workers + (submit-mocked-assignment + (:nimbus cluster) + (:storm-cluster-state cluster) + "topology1" + {TOPOLOGY-WORKERS 2} + topology1 + {1 "1" + 2 "1"} + {[1 1] ["sup1" 1] + [2 2] ["sup1" 2]} + {["sup1" 1] [0.0 0.0 0.0] + ["sup1" 2] [0.0 0.0 0.0] + }) + (submit-mocked-assignment + (:nimbus cluster) + (:storm-cluster-state cluster) + "topology2" + {TOPOLOGY-WORKERS 2} + topology2 + {1 "1" + 2 "1"} + {[1 1] ["sup1" 1] + [2 2] ["sup1" 2]} + {["sup1" 1] [0.0 0.0 0.0] + ["sup1" 2] [0.0 0.0 0.0] + }) + ;; Instead of sleeping until topology is scheduled, rebalance topology so mk-assignments is called. + (.rebalance (:nimbus cluster) "topology1" (doto (RebalanceOptions.) (.set_wait_secs 0))) + )) + (is (empty? (:launched changed))) + (bind options (RebalanceOptions.)) + (.set_wait_secs options 0) + (bind changed (capture-changed-workers + (.rebalance (:nimbus cluster) "topology2" options) + (advance-cluster-time cluster 10) + (heartbeat-workers cluster "sup1" [1 2 3 4]) + (advance-cluster-time cluster 10) + )) + (validate-launched-once (:launched changed) + {"sup1" [1 2]} + (get-storm-id (:storm-cluster-state cluster) "topology1")) + (validate-launched-once (:launched changed) + {"sup1" [3 4]} + (get-storm-id (:storm-cluster-state cluster) "topology2")) + )))