Merge branch 'master' into ClusterUtils

Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/cc1f6d77
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/cc1f6d77
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/cc1f6d77

Branch: refs/heads/master
Commit: cc1f6d77962ec96c69cf5af85e4016a9e08ccdcc
Parents: afcd0c6 d0cd52b
Author: xiaojian.fxj <xiaojian....@alibaba-inc.com>
Authored: Fri Feb 19 16:22:50 2016 +0800
Committer: xiaojian.fxj <xiaojian....@alibaba-inc.com>
Committed: Fri Feb 19 16:47:42 2016 +0800

----------------------------------------------------------------------
 CHANGELOG.md                                    |   7 +
 README.markdown                                 |   1 +
 bin/storm.cmd                                   |   2 +-
 bin/storm.py                                    |   4 +-
 conf/cgconfig.conf.example                      |  41 +++
 conf/defaults.yaml                              |  16 +-
 examples/storm-starter/pom.xml                  |  10 +
 .../org/apache/storm/starter/clj/word_count.clj |   3 +-
 .../starter/ResourceAwareExampleTopology.java   |   2 +-
 pom.xml                                         |  10 +
 storm-clojure/pom.xml                           |  74 ++++
 .../src/clj/org/apache/storm/clojure.clj        | 207 +++++++++++
 .../src/clj/org/apache/storm/thrift.clj         | 286 +++++++++++++++
 storm-clojure/src/test/clj/clojure_test.clj     | 158 +++++++++
 storm-core/src/clj/org/apache/storm/clojure.clj | 207 -----------
 .../org/apache/storm/command/dev_zookeeper.clj  |  28 --
 .../clj/org/apache/storm/command/get_errors.clj |   3 +-
 .../org/apache/storm/command/healthcheck.clj    |  90 -----
 .../clj/org/apache/storm/command/monitor.clj    |   2 +-
 .../clj/org/apache/storm/command/rebalance.clj  |   3 +-
 .../org/apache/storm/command/set_log_level.clj  |   3 +-
 .../apache/storm/command/shell_submission.clj   |   2 +-
 .../src/clj/org/apache/storm/daemon/common.clj  | 119 ++++---
 .../clj/org/apache/storm/daemon/executor.clj    |  51 +--
 .../clj/org/apache/storm/daemon/logviewer.clj   |  19 +-
 .../src/clj/org/apache/storm/daemon/nimbus.clj  | 122 +++----
 .../clj/org/apache/storm/daemon/supervisor.clj  | 217 ++++++++----
 .../src/clj/org/apache/storm/daemon/task.clj    |   4 +-
 .../src/clj/org/apache/storm/daemon/worker.clj  | 116 +++---
 .../clj/org/apache/storm/internal/clojure.clj   | 201 +++++++++++
 .../clj/org/apache/storm/internal/thrift.clj    |  96 +++++
 .../src/clj/org/apache/storm/local_state.clj    | 134 -------
 .../org/apache/storm/local_state_converter.clj  |  24 ++
 storm-core/src/clj/org/apache/storm/testing.clj |  37 +-
 storm-core/src/clj/org/apache/storm/thrift.clj  | 286 ---------------
 storm-core/src/clj/org/apache/storm/timer.clj   | 128 -------
 storm-core/src/clj/org/apache/storm/ui/core.clj |   2 +-
 storm-core/src/jvm/org/apache/storm/Config.java |  88 +++++
 .../src/jvm/org/apache/storm/StormTimer.java    | 241 +++++++++++++
 storm-core/src/jvm/org/apache/storm/Thrift.java | 351 +++++++++++++++++++
 .../org/apache/storm/command/DevZookeeper.java  |  35 ++
 .../org/apache/storm/command/HealthCheck.java   | 125 +++++++
 .../container/ResourceIsolationInterface.java   |  51 +++
 .../storm/container/cgroup/CgroupCenter.java    | 216 ++++++++++++
 .../storm/container/cgroup/CgroupCommon.java    | 270 ++++++++++++++
 .../container/cgroup/CgroupCommonOperation.java |  81 +++++
 .../container/cgroup/CgroupCoreFactory.java     |  74 ++++
 .../storm/container/cgroup/CgroupManager.java   | 210 +++++++++++
 .../storm/container/cgroup/CgroupOperation.java |  79 +++++
 .../storm/container/cgroup/CgroupUtils.java     | 118 +++++++
 .../apache/storm/container/cgroup/Device.java   |  75 ++++
 .../storm/container/cgroup/Hierarchy.java       | 130 +++++++
 .../storm/container/cgroup/SubSystem.java       |  81 +++++
 .../storm/container/cgroup/SubSystemType.java   |  36 ++
 .../storm/container/cgroup/SystemOperation.java |  75 ++++
 .../storm/container/cgroup/core/BlkioCore.java  | 213 +++++++++++
 .../storm/container/cgroup/core/CgroupCore.java |  26 ++
 .../storm/container/cgroup/core/CpuCore.java    | 135 +++++++
 .../container/cgroup/core/CpuacctCore.java      |  71 ++++
 .../storm/container/cgroup/core/CpusetCore.java | 209 +++++++++++
 .../container/cgroup/core/DevicesCore.java      | 189 ++++++++++
 .../container/cgroup/core/FreezerCore.java      |  66 ++++
 .../storm/container/cgroup/core/MemoryCore.java | 188 ++++++++++
 .../storm/container/cgroup/core/NetClsCore.java |  69 ++++
 .../container/cgroup/core/NetPrioCore.java      |  65 ++++
 .../jvm/org/apache/storm/testing/NGrouping.java |   4 +-
 .../storm/testing/PythonShellMetricsBolt.java   |  14 +-
 .../storm/testing/PythonShellMetricsSpout.java  |   8 +-
 .../jvm/org/apache/storm/utils/LocalState.java  | 112 +++++-
 .../src/jvm/org/apache/storm/utils/Utils.java   |  17 +-
 .../org/apache/storm/integration_test.clj       | 259 +++++++-------
 .../org/apache/storm/testing4j_test.clj         |  72 ++--
 .../test/clj/org/apache/storm/clojure_test.clj  |  64 ++--
 .../test/clj/org/apache/storm/cluster_test.clj  |   3 +-
 .../test/clj/org/apache/storm/drpc_test.clj     |  23 +-
 .../test/clj/org/apache/storm/grouping_test.clj |  56 +--
 .../storm/messaging/netty_integration_test.clj  |  19 +-
 .../clj/org/apache/storm/messaging_test.clj     |  14 +-
 .../test/clj/org/apache/storm/metrics_test.clj  |  85 +++--
 .../test/clj/org/apache/storm/nimbus_test.clj   | 259 +++++++++-----
 .../scheduler/resource_aware_scheduler_test.clj |   3 +-
 .../clj/org/apache/storm/supervisor_test.clj    |  55 +--
 .../clj/org/apache/storm/tick_tuple_test.clj    |  15 +-
 .../clj/org/apache/storm/transactional_test.clj |   3 +-
 .../test/jvm/org/apache/storm/TestCgroups.java  | 130 +++++++
 .../resource/TestResourceAwareScheduler.java    |   3 +
 86 files changed, 5939 insertions(+), 1561 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/cc1f6d77/conf/defaults.yaml
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/storm/blob/cc1f6d77/storm-clojure/src/clj/org/apache/storm/thrift.clj
----------------------------------------------------------------------
diff --cc storm-clojure/src/clj/org/apache/storm/thrift.clj
index 0000000,bf13d23..45aa740
mode 000000,100644..100644
--- a/storm-clojure/src/clj/org/apache/storm/thrift.clj
+++ b/storm-clojure/src/clj/org/apache/storm/thrift.clj
@@@ -1,0 -1,286 +1,286 @@@
+ ;; Licensed to the Apache Software Foundation (ASF) under one
+ ;; or more contributor license agreements.  See the NOTICE file
+ ;; distributed with this work for additional information
+ ;; regarding copyright ownership.  The ASF licenses this file
+ ;; to you under the Apache License, Version 2.0 (the
+ ;; "License"); you may not use this file except in compliance
+ ;; with the License.  You may obtain a copy of the License at
+ ;;
+ ;; http://www.apache.org/licenses/LICENSE-2.0
+ ;;
+ ;; Unless required by applicable law or agreed to in writing, software
+ ;; distributed under the License is distributed on an "AS IS" BASIS,
+ ;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ;; See the License for the specific language governing permissions and
+ ;; limitations under the License.
+ 
+ (ns org.apache.storm.thrift
+   (:import [java.util HashMap]
+            [java.io Serializable]
+            [org.apache.storm.generated NodeInfo Assignment])
+   (:import [org.apache.storm.generated JavaObject Grouping Nimbus 
StormTopology
+             StormTopology$_Fields Bolt Nimbus$Client Nimbus$Iface
+             ComponentCommon Grouping$_Fields SpoutSpec NullStruct StreamInfo
+             GlobalStreamId ComponentObject ComponentObject$_Fields
+             ShellComponent SupervisorInfo])
+   (:import [org.apache.storm.utils Utils NimbusClient ConfigUtils])
+   (:import [org.apache.storm Constants])
+   (:import [org.apache.storm.security.auth ReqContext])
+   (:import [org.apache.storm.grouping CustomStreamGrouping])
+   (:import [org.apache.storm.topology TopologyBuilder])
+   (:import [org.apache.storm.clojure RichShellBolt RichShellSpout])
+   (:import [org.apache.storm.thrift.transport TTransport]
+            (org.json.simple JSONValue))
 -  (:use [org.apache.storm util config log zookeeper]))
++  (:use [org.apache.storm util config log]))
+ 
+ (defn instantiate-java-object
+   [^JavaObject obj]
+   (let [name (symbol (.get_full_class_name obj))
+         args (map (memfn getFieldValue) (.get_args_list obj))]
+     (eval `(new ~name ~@args))))
+ 
+ (def grouping-constants
+   {Grouping$_Fields/FIELDS :fields
+    Grouping$_Fields/SHUFFLE :shuffle
+    Grouping$_Fields/ALL :all
+    Grouping$_Fields/NONE :none
+    Grouping$_Fields/CUSTOM_SERIALIZED :custom-serialized
+    Grouping$_Fields/CUSTOM_OBJECT :custom-object
+    Grouping$_Fields/DIRECT :direct
+    Grouping$_Fields/LOCAL_OR_SHUFFLE :local-or-shuffle})
+ 
+ (defn grouping-type
+   [^Grouping grouping]
+   (grouping-constants (.getSetField grouping)))
+ 
+ (defn field-grouping
+   [^Grouping grouping]
+   (when-not (= (grouping-type grouping) :fields)
+     (throw (IllegalArgumentException. "Tried to get grouping fields from non 
fields grouping")))
+   (.get_fields grouping))
+ 
+ (defn global-grouping?
+   [^Grouping grouping]
+   (and (= :fields (grouping-type grouping))
+        (empty? (field-grouping grouping))))
+ 
+ (defn parallelism-hint
+   [^ComponentCommon component-common]
+   (let [phint (.get_parallelism_hint component-common)]
+     (if-not (.is_set_parallelism_hint component-common) 1 phint)))
+ 
+ (defn nimbus-client-and-conn
+   ([host port]
+     (nimbus-client-and-conn host port nil))
+   ([host port as-user]
+   (log-message "Connecting to Nimbus at " host ":" port " as user: " as-user)
+   (let [conf (clojurify-structure (ConfigUtils/readStormConfig))
+         nimbusClient (NimbusClient. conf host port nil as-user)
+         client (.getClient nimbusClient)
+         transport (.transport nimbusClient)]
+         [client transport] )))
+ 
+ (defmacro with-nimbus-connection
+   [[client-sym host port] & body]
+   `(let [[^Nimbus$Client ~client-sym ^TTransport conn#] 
(nimbus-client-and-conn ~host ~port)]
+     (try
+       ~@body
+     (finally (.close conn#)))))
+ 
+ (defmacro with-configured-nimbus-connection
+   [client-sym & body]
+   `(let [conf# (clojurify-structure (ConfigUtils/readStormConfig))
+          context# (ReqContext/context)
+          user# (if (.principal context#) (.getName (.principal context#)))
+          nimbusClient# (NimbusClient/getConfiguredClientAs conf# user#)
+          ~client-sym (.getClient nimbusClient#)
+          conn# (.transport nimbusClient#)
+          ]
+      (try
+        ~@body
+      (finally (.close conn#)))))
+ 
+ (defn direct-output-fields
+   [fields]
+   (StreamInfo. fields true))
+ 
+ (defn output-fields
+   [fields]
+   (StreamInfo. fields false))
+ 
+ ;TODO: when translating this function, you should replace the map-val with a 
proper for loop HERE
+ (defn mk-output-spec
+   [output-spec]
+   (let [output-spec (if (map? output-spec)
+                       output-spec
+                       {Utils/DEFAULT_STREAM_ID output-spec})]
+     (map-val
+       (fn [out]
+         (if (instance? StreamInfo out)
+           out
+           (StreamInfo. out false)))
+       output-spec)))
+ 
+ (defnk mk-plain-component-common
+   [inputs output-spec parallelism-hint :conf nil]
+   (let [ret (ComponentCommon. (HashMap. inputs) (HashMap. (mk-output-spec 
output-spec)))]
+     (when parallelism-hint
+       (.set_parallelism_hint ret parallelism-hint))
+     (when conf
+       (.set_json_conf ret (JSONValue/toJSONString conf)))
+     ret))
+ 
+ (defnk mk-spout-spec*
+   [spout outputs :p nil :conf nil]
+   (SpoutSpec. (ComponentObject/serialized_java (Utils/javaSerialize spout))
+               (mk-plain-component-common {} outputs p :conf conf)))
+ 
+ (defn mk-shuffle-grouping
+   []
+   (Grouping/shuffle (NullStruct.)))
+ 
+ (defn mk-local-or-shuffle-grouping
+   []
+   (Grouping/local_or_shuffle (NullStruct.)))
+ 
+ (defn mk-fields-grouping
+   [fields]
+   (Grouping/fields fields))
+ 
+ (defn mk-global-grouping
+   []
+   (mk-fields-grouping []))
+ 
+ (defn mk-direct-grouping
+   []
+   (Grouping/direct (NullStruct.)))
+ 
+ (defn mk-all-grouping
+   []
+   (Grouping/all (NullStruct.)))
+ 
+ (defn mk-none-grouping
+   []
+   (Grouping/none (NullStruct.)))
+ 
+ (defn deserialized-component-object
+   [^ComponentObject obj]
+   (when (not= (.getSetField obj) ComponentObject$_Fields/SERIALIZED_JAVA)
+     (throw (RuntimeException. "Cannot deserialize non-java-serialized 
object")))
+   (Utils/javaDeserialize (.get_serialized_java obj) Serializable))
+ 
+ (defn serialize-component-object
+   [obj]
+   (ComponentObject/serialized_java (Utils/javaSerialize obj)))
+ 
+ (defn- mk-grouping
+   [grouping-spec]
+   (cond (nil? grouping-spec)
+         (mk-none-grouping)
+ 
+         (instance? Grouping grouping-spec)
+         grouping-spec
+ 
+         (instance? CustomStreamGrouping grouping-spec)
+         (Grouping/custom_serialized (Utils/javaSerialize grouping-spec))
+ 
+         (instance? JavaObject grouping-spec)
+         (Grouping/custom_object grouping-spec)
+ 
+         (sequential? grouping-spec)
+         (mk-fields-grouping grouping-spec)
+ 
+         (= grouping-spec :shuffle)
+         (mk-shuffle-grouping)
+ 
+         (= grouping-spec :local-or-shuffle)
+         (mk-local-or-shuffle-grouping)
+         (= grouping-spec :none)
+         (mk-none-grouping)
+ 
+         (= grouping-spec :all)
+         (mk-all-grouping)
+ 
+         (= grouping-spec :global)
+         (mk-global-grouping)
+ 
+         (= grouping-spec :direct)
+         (mk-direct-grouping)
+ 
+         true
+         (throw (IllegalArgumentException.
+                  (str grouping-spec " is not a valid grouping")))))
+ 
+ (defn- mk-inputs
+   [inputs]
+   (into {} (for [[stream-id grouping-spec] inputs]
+              [(if (sequential? stream-id)
+                 (GlobalStreamId. (first stream-id) (second stream-id))
+                 (GlobalStreamId. stream-id Utils/DEFAULT_STREAM_ID))
+               (mk-grouping grouping-spec)])))
+ 
+ (defnk mk-bolt-spec*
+   [inputs bolt outputs :p nil :conf nil]
+   (let [common (mk-plain-component-common (mk-inputs inputs) outputs p :conf 
conf)]
+     (Bolt. (ComponentObject/serialized_java (Utils/javaSerialize bolt))
+            common)))
+ 
+ (defnk mk-spout-spec
+   [spout :parallelism-hint nil :p nil :conf nil]
+   (let [parallelism-hint (if p p parallelism-hint)]
+     {:obj spout :p parallelism-hint :conf conf}))
+ 
+ (defn- shell-component-params
+   [command script-or-output-spec kwargs]
+   (if (string? script-or-output-spec)
+     [(into-array String [command script-or-output-spec])
+      (first kwargs)
+      (rest kwargs)]
+     [(into-array String command)
+      script-or-output-spec
+      kwargs]))
+ 
+ (defnk mk-bolt-spec
+   [inputs bolt :parallelism-hint nil :p nil :conf nil]
+   (let [parallelism-hint (if p p parallelism-hint)]
+     {:obj bolt :inputs inputs :p parallelism-hint :conf conf}))
+ 
+ (defn mk-shell-bolt-spec
+   [inputs command script-or-output-spec & kwargs]
+   (let [[command output-spec kwargs]
+         (shell-component-params command script-or-output-spec kwargs)]
+     (apply mk-bolt-spec inputs
+            (RichShellBolt. command (mk-output-spec output-spec)) kwargs)))
+ 
+ (defn mk-shell-spout-spec
+   [command script-or-output-spec & kwargs]
+   (let [[command output-spec kwargs]
+         (shell-component-params command script-or-output-spec kwargs)]
+     (apply mk-spout-spec
+            (RichShellSpout. command (mk-output-spec output-spec)) kwargs)))
+ 
+ (defn- add-inputs
+   [declarer inputs]
+   (doseq [[id grouping] (mk-inputs inputs)]
+     (.grouping declarer id grouping)))
+ 
+ (defn mk-topology
+   ([spout-map bolt-map]
+    (let [builder (TopologyBuilder.)]
+      (doseq [[name {spout :obj p :p conf :conf}] spout-map]
+        (-> builder (.setSpout name spout (if-not (nil? p) (int p) p)) 
(.addConfigurations conf)))
+      (doseq [[name {bolt :obj p :p conf :conf inputs :inputs}] bolt-map]
+        (-> builder (.setBolt name bolt (if-not (nil? p) (int p) p)) 
(.addConfigurations conf) (add-inputs inputs)))
+      (.createTopology builder)))
+   ([spout-map bolt-map state-spout-map]
+    (mk-topology spout-map bolt-map)))
+ 
+ ;; clojurify-structure is needed or else every element becomes the same after 
successive calls
+ ;; don't know why this happens
+ (def STORM-TOPOLOGY-FIELDS
+   (-> StormTopology/metaDataMap clojurify-structure keys))
+ 
+ (def SPOUT-FIELDS
+   [StormTopology$_Fields/SPOUTS
+    StormTopology$_Fields/STATE_SPOUTS])
+ 

http://git-wip-us.apache.org/repos/asf/storm/blob/cc1f6d77/storm-core/src/clj/org/apache/storm/command/shell_submission.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/org/apache/storm/command/shell_submission.clj
index 870e7f6,0d29376..8d193e6
--- a/storm-core/src/clj/org/apache/storm/command/shell_submission.clj
+++ b/storm-core/src/clj/org/apache/storm/command/shell_submission.clj
@@@ -17,7 -17,7 +17,7 @@@
    (:import [org.apache.storm StormSubmitter]
             [org.apache.storm.utils Utils]
             [org.apache.storm.zookeeper Zookeeper])
-   (:use [org.apache.storm thrift util config log])
 -  (:use [org.apache.storm util config log zookeeper])
++  (:use [org.apache.storm util config log])
    (:require [clojure.string :as str])
    (:import [org.apache.storm.utils ConfigUtils])
    (:gen-class))

http://git-wip-us.apache.org/repos/asf/storm/blob/cc1f6d77/storm-core/src/clj/org/apache/storm/daemon/common.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/org/apache/storm/daemon/common.clj
index db342d2,db7fd40..99b34dc
--- a/storm-core/src/clj/org/apache/storm/daemon/common.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/common.clj
@@@ -15,8 -15,8 +15,8 @@@
  ;; limitations under the License.
  (ns org.apache.storm.daemon.common
    (:use [org.apache.storm log config util])
 -  (:import [org.apache.storm.generated StormTopology
 +  (:import [org.apache.storm.generated StormTopology NodeInfo
-             InvalidTopologyException GlobalStreamId]
+             InvalidTopologyException GlobalStreamId Grouping Grouping$_Fields]
             [org.apache.storm.utils Utils ConfigUtils IPredicate 
ThriftTopologyUtils]
             [org.apache.storm.daemon.metrics.reporters PreparableReporter]
             [com.codahale.metrics MetricRegistry])

http://git-wip-us.apache.org/repos/asf/storm/blob/cc1f6d77/storm-core/src/clj/org/apache/storm/daemon/executor.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/org/apache/storm/daemon/executor.clj
index 902650c,92cc003..9ff93f8
--- a/storm-core/src/clj/org/apache/storm/daemon/executor.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/executor.clj
@@@ -39,8 -39,9 +39,9 @@@
    (:import [java.lang Thread Thread$UncaughtExceptionHandler]
             [java.util.concurrent ConcurrentLinkedQueue]
             [org.json.simple JSONValue]
-            [com.lmax.disruptor.dsl ProducerType])
-   (:require [org.apache.storm [thrift :as thrift] [stats :as stats]])
+            [com.lmax.disruptor.dsl ProducerType]
+            [org.apache.storm StormTimer])
 -  (:require [org.apache.storm [cluster :as cluster] [stats :as stats]])
++  (:require [org.apache.storm [stats :as stats]])
    (:require [org.apache.storm.daemon [task :as task]])
    (:require [org.apache.storm.daemon.builtin-metrics :as builtin-metrics])
    (:require [clojure.set :as set]))

http://git-wip-us.apache.org/repos/asf/storm/blob/cc1f6d77/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
index beb6639,28a6fb8..7d28075
--- a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
@@@ -46,13 -46,14 +46,13 @@@
              KillOptions RebalanceOptions ClusterSummary SupervisorSummary 
TopologySummary TopologyInfo TopologyHistoryInfo
              ExecutorSummary AuthorizationException GetInfoOptions 
NumErrorsChoice SettableBlobMeta ReadableBlobMeta
              BeginDownloadResult ListBlobsResult ComponentPageInfo 
TopologyPageInfo LogConfig LogLevel LogLevelAction
-             ProfileRequest ProfileAction NodeInfo])
+             ProfileRequest ProfileAction NodeInfo LSTopoHistory])
    (:import [org.apache.storm.daemon Shutdownable])
    (:import [org.apache.storm.validation ConfigValidation])
 -  (:import [org.apache.storm.cluster ClusterStateContext DaemonType])
 -  (:use [org.apache.storm util config log zookeeper])
 -  (:require [org.apache.storm [cluster :as cluster]
 -                            [converter :as converter]
 -                            [stats :as stats]])
 +  (:import [org.apache.storm.cluster ClusterStateContext DaemonType 
StormClusterStateImpl ClusterUtils])
-   (:use [org.apache.storm util config log timer local-state converter])
++  (:use [org.apache.storm util config log converter])
 +  (:require [org.apache.storm [converter :as converter]
-                             [stats :as stats]])
++                              [stats :as stats]])
    (:require [clojure.set :as set])
    (:import [org.apache.storm.daemon.common StormBase Assignment])
    (:import [org.apache.storm.zookeeper Zookeeper])
@@@ -270,17 -279,17 +274,17 @@@
        {:status {:type :rebalancing}
         :prev-status status
         :topology-action-options (-> {:delay-secs delay :action :rebalance}
--                                  (assoc-non-nil :num-workers num-workers)
--                                  (assoc-non-nil :component->executors 
executor-overrides))
++                                  (converter/assoc-non-nil :num-workers 
num-workers)
++                                  (converter/assoc-non-nil 
:component->executors executor-overrides))
         })))
  
  (defn do-rebalance [nimbus storm-id status storm-base]
    (let [rebalance-options (:topology-action-options storm-base)]
 -    (.update-storm! (:storm-cluster-state nimbus)
 +    (.updateStorm (:storm-cluster-state nimbus)
        storm-id
-       (thriftify-storm-base (-> {:topology-action-options nil}
 -        (-> {:topology-action-options nil}
--          (assoc-non-nil :component->executors (:component->executors 
rebalance-options))
-           (assoc-non-nil :num-workers (:num-workers rebalance-options))))))
 -          (assoc-non-nil :num-workers (:num-workers rebalance-options)))))
++      (converter/thriftify-storm-base (-> {:topology-action-options nil}
++          (converter/assoc-non-nil :component->executors 
(:component->executors rebalance-options))
++          (converter/assoc-non-nil :num-workers (:num-workers 
rebalance-options))))))
    (mk-assignments nimbus :scratch-topology-id storm-id))
  
  (defn state-transitions [nimbus storm-id status storm-base]
@@@ -363,7 -372,7 +367,7 @@@
                                        storm-base-updates)]
  
               (when storm-base-updates
-                (.updateStorm (:storm-cluster-state nimbus) storm-id 
(thriftify-storm-base storm-base-updates))))))
 -               (.update-storm! (:storm-cluster-state nimbus) storm-id 
storm-base-updates)))))
++               (.updateStorm (:storm-cluster-state nimbus) storm-id 
(converter/thriftify-storm-base storm-base-updates))))))
         )))
  
  (defn transition-name! [nimbus storm-name event & args]
@@@ -1002,9 -1005,9 +1005,9 @@@
          topology (system-topology! storm-conf (read-storm-topology storm-id 
blob-store))
          num-executors (->> (all-components topology) (map-val 
num-start-executors))]
      (log-message "Activating " storm-name ": " storm-id)
 -    (.activate-storm! storm-cluster-state
 +    (.activateStorm storm-cluster-state
                        storm-id
-       (thriftify-storm-base (StormBase. storm-name
 -                      (StormBase. storm-name
++      (converter/thriftify-storm-base (StormBase. storm-name
                                    (Time/currentTimeSecs)
                                    {:type topology-initial-status}
                                    (storm-conf TOPOLOGY-WORKERS)
@@@ -1448,41 -1446,39 +1454,39 @@@
        (setup-blobstore nimbus))
  
      (when (is-leader nimbus :throw-exception false)
 -      (doseq [storm-id (.active-storms (:storm-cluster-state nimbus))]
 +      (doseq [storm-id (.activeStorms (:storm-cluster-state nimbus))]
          (transition! nimbus storm-id :startup)))
-     (schedule-recurring (:timer nimbus)
-                         0
-                         (conf NIMBUS-MONITOR-FREQ-SECS)
-                         (fn []
-                           (when-not (conf ConfigUtils/NIMBUS_DO_NOT_REASSIGN)
-                             (locking (:submit-lock nimbus)
-                               (mk-assignments nimbus)))
-                           (do-cleanup nimbus)))
+ 
+     (.scheduleRecurring (:timer nimbus)
+       0
+       (conf NIMBUS-MONITOR-FREQ-SECS)
+       (fn []
+         (when-not (conf ConfigUtils/NIMBUS_DO_NOT_REASSIGN)
+           (locking (:submit-lock nimbus)
+             (mk-assignments nimbus)))
+         (do-cleanup nimbus)))
      ;; Schedule Nimbus inbox cleaner
-     (schedule-recurring (:timer nimbus)
-                         0
-                         (conf NIMBUS-CLEANUP-INBOX-FREQ-SECS)
-                         (fn []
-                           (clean-inbox (inbox nimbus) (conf 
NIMBUS-INBOX-JAR-EXPIRATION-SECS))))
+     (.scheduleRecurring (:timer nimbus)
+       0
+       (conf NIMBUS-CLEANUP-INBOX-FREQ-SECS)
+       (fn [] (clean-inbox (inbox nimbus) (conf 
NIMBUS-INBOX-JAR-EXPIRATION-SECS))))
      ;; Schedule nimbus code sync thread to sync code from other nimbuses.
      (if (instance? LocalFsBlobStore blob-store)
-       (schedule-recurring (:timer nimbus)
-                           0
-                           (conf NIMBUS-CODE-SYNC-FREQ-SECS)
-                           (fn []
-                             (blob-sync conf nimbus))))
+       (.scheduleRecurring (:timer nimbus)
+         0
+         (conf NIMBUS-CODE-SYNC-FREQ-SECS)
+         (fn [] (blob-sync conf nimbus))))
      ;; Schedule topology history cleaner
      (when-let [interval (conf LOGVIEWER-CLEANUP-INTERVAL-SECS)]
-       (schedule-recurring (:timer nimbus)
+       (.scheduleRecurring (:timer nimbus)
          0
          (conf LOGVIEWER-CLEANUP-INTERVAL-SECS)
-         (fn []
-           (clean-topology-history (conf LOGVIEWER-CLEANUP-AGE-MINS) nimbus))))
-     (schedule-recurring (:timer nimbus)
-                         0
-                         (conf NIMBUS-CREDENTIAL-RENEW-FREQ-SECS)
-                         (fn []
-                           (renew-credentials nimbus)))
+         (fn [] (clean-topology-history (conf LOGVIEWER-CLEANUP-AGE-MINS) 
nimbus))))
+     (.scheduleRecurring (:timer nimbus)
+       0
+       (conf NIMBUS-CREDENTIAL-RENEW-FREQ-SECS)
+       (fn []
+         (renew-credentials nimbus)))
  
      (defgauge nimbus:num-supervisors
        (fn [] (.size (.supervisors (:storm-cluster-state nimbus) nil))))
@@@ -1650,7 -1646,7 +1654,7 @@@
            (log-message "Nimbus setting debug to " enable? " for storm-name '" 
storm-name "' storm-id '" storm-id "' sampling pct '" spct "'"
              (if (not (clojure.string/blank? component-id)) (str " 
component-id '" component-id "'")))
            (locking (:submit-lock nimbus)
-             (.updateStorm storm-cluster-state storm-id  (thriftify-storm-base 
storm-base-updates)))))
 -            (.update-storm! storm-cluster-state storm-id 
storm-base-updates))))
++            (.updateStorm storm-cluster-state storm-id  
(converter/thriftify-storm-base storm-base-updates)))))
  
        (^void setWorkerProfiler
          [this ^String id ^ProfileRequest profileRequest]

http://git-wip-us.apache.org/repos/asf/storm/blob/cc1f6d77/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
index 58f6291,fad5b1a..f429d09
--- a/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
@@@ -24,17 -24,17 +24,17 @@@
             [java.net JarURLConnection]
             [java.net URI URLDecoder]
             [org.apache.commons.io FileUtils])
-   (:use [org.apache.storm config util log timer local-state converter])
 -  (:use [org.apache.storm config util log local-state-converter])
++  (:use [org.apache.storm config util log converter local-state-converter])
    (:import [org.apache.storm.generated AuthorizationException 
KeyNotFoundException WorkerResources])
    (:import [org.apache.storm.utils NimbusLeaderNotFoundException VersionInfo])
    (:import [java.nio.file Files StandardCopyOption])
    (:import [org.apache.storm Config])
-   (:import [org.apache.storm.generated WorkerResources ProfileAction])
+   (:import [org.apache.storm.generated WorkerResources ProfileAction 
LocalAssignment])
    (:import [org.apache.storm.localizer LocalResource])
    (:use [org.apache.storm.daemon common])
-   (:require [org.apache.storm.command [healthcheck :as healthcheck]])
+   (:import [org.apache.storm.command HealthCheck])
    (:require [org.apache.storm.daemon [worker :as worker]]
 -            [org.apache.storm [process-simulator :as psim] [cluster :as 
cluster] [event :as event]]
 +            [org.apache.storm [process-simulator :as psim] [event :as event]]
              [clojure.set :as set])
    (:import [org.apache.thrift.transport TTransportException])
    (:import [org.apache.zookeeper data.ACL ZooDefs$Ids ZooDefs$Perms])

http://git-wip-us.apache.org/repos/asf/storm/blob/cc1f6d77/storm-core/src/clj/org/apache/storm/daemon/worker.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/org/apache/storm/daemon/worker.clj
index 395be23,8f9becd..a75dc35
--- a/storm-core/src/clj/org/apache/storm/daemon/worker.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/worker.clj
@@@ -15,7 -15,7 +15,7 @@@
  ;; limitations under the License.
  (ns org.apache.storm.daemon.worker
    (:use [org.apache.storm.daemon common])
-   (:use [org.apache.storm config log util timer local-state converter])
 -  (:use [org.apache.storm config log util local-state-converter])
++  (:use [org.apache.storm config log util converter local-state-converter])
    (:require [clj-time.core :as time])
    (:require [clj-time.coerce :as coerce])
    (:require [org.apache.storm.daemon [executor :as executor]])
@@@ -238,13 -243,14 +243,14 @@@
    {})
  
  (defn mk-halting-timer [timer-name]
-   (mk-timer :kill-fn (fn [t]
-                        (log-error t "Error when processing event")
-                        (Utils/exitProcess 20 "Error when processing an event")
-                        )
-             :timer-name timer-name))
+   (StormTimer. timer-name
+     (reify Thread$UncaughtExceptionHandler
+       (^void uncaughtException
+         [this ^Thread t ^Throwable e]
+         (log-error e "Error when processing event")
+         (Utils/exitProcess 20 "Error when processing an event")))))
  
 -(defn worker-data [conf mq-context storm-id assignment-id port worker-id 
storm-conf cluster-state storm-cluster-state]
 +(defn worker-data [conf mq-context storm-id assignment-id port worker-id 
storm-conf state-store storm-cluster-state]
    (let [assignment-versions (atom {})
          executors (set (read-worker-executors storm-conf storm-cluster-state 
storm-id assignment-id port assignment-versions))
          transfer-queue (DisruptorQueue. "worker-transfer-queue"
@@@ -374,15 -380,16 +380,17 @@@
          conf (:conf worker)
          storm-cluster-state (:storm-cluster-state worker)
          storm-id (:storm-id worker)]
-     (fn this
+     (fn refresh-connections
        ([]
-         (this (fn [& ignored] (schedule (:refresh-connections-timer worker) 0 
this))))
+         (refresh-connections (fn [& ignored]
+                 (.schedule
+                   (:refresh-connections-timer worker) 0 
refresh-connections))))
        ([callback]
 -         (let [version (.assignment-version storm-cluster-state storm-id 
callback)
 +         (let [version (.assignmentVersion storm-cluster-state storm-id 
callback)
                 assignment (if (= version (:version (get 
@(:assignment-versions worker) storm-id)))
                              (:data (get @(:assignment-versions worker) 
storm-id))
 -                            (let [new-assignment 
(.assignment-info-with-version storm-cluster-state storm-id callback)]
 +                            (let [thriftify-assignment-version 
(.assignmentInfoWithVersion storm-cluster-state storm-id callback)
 +                              new-assignment {:data (clojurify-assignment 
(.get thriftify-assignment-version (IStateStorage/DATA))) :version version}]
                                (swap! (:assignment-versions worker) assoc 
storm-id new-assignment)
                                (:data new-assignment)))
                my-assignment (-> assignment
@@@ -428,9 -435,12 +436,12 @@@
  
  (defn refresh-storm-active
    ([worker]
-     (refresh-storm-active worker (fn [& ignored] (schedule 
(:refresh-active-timer worker) 0 (partial refresh-storm-active worker)))))
+     (refresh-storm-active
+       worker (fn [& ignored]
+                (.schedule
+                  (:refresh-active-timer worker) 0 (partial 
refresh-storm-active worker)))))
    ([worker callback]
 -    (let [base (.storm-base (:storm-cluster-state worker) (:storm-id worker) 
callback)]
 +    (let [base (clojurify-storm-base (.stormBase (:storm-cluster-state 
worker) (:storm-id worker) callback))]
        (reset!
          (:storm-active-atom worker)
          (and (= :active (-> base :status :type)) @(:worker-active-flag 
worker)))
@@@ -757,22 -768,28 +769,28 @@@
      (log-message "Started with log levels: " @original-log-levels)
    
      (defn establish-log-setting-callback []
 -      (.topology-log-config (:storm-cluster-state worker) storm-id (fn [args] 
(check-log-config-changed))))
 +      (.topologyLogConfig (:storm-cluster-state worker) storm-id (fn [args] 
(check-log-config-changed))))
  
      (establish-log-setting-callback)
 -    (.credentials (:storm-cluster-state worker) storm-id (fn [args] 
(check-credentials-changed)))
 +    (clojurify-crdentials (.credentials (:storm-cluster-state worker) 
storm-id (fn [] (check-credentials-changed))))
-     (schedule-recurring (:refresh-credentials-timer worker) 0 (conf 
TASK-CREDENTIALS-POLL-SECS)
-                         (fn [& args]
-                           (check-credentials-changed)
-                           (if ((:storm-conf worker) 
TOPOLOGY-BACKPRESSURE-ENABLE)
-                             (check-throttle-changed))))
+ 
+     (.scheduleRecurring
+       (:refresh-credentials-timer worker) 0 (conf TASK-CREDENTIALS-POLL-SECS)
+         (fn []
+           (check-credentials-changed)
+           (if ((:storm-conf worker) TOPOLOGY-BACKPRESSURE-ENABLE)
+             (check-throttle-changed))))
      ;; The jitter allows the clients to get the data at different times, and 
avoids thundering herd
      (when-not (.get conf TOPOLOGY-DISABLE-LOADAWARE-MESSAGING)
-       (schedule-recurring-with-jitter (:refresh-load-timer worker) 0 1 500 
refresh-load))
-     (schedule-recurring (:refresh-connections-timer worker) 0 (conf 
TASK-REFRESH-POLL-SECS) refresh-connections)
-     (schedule-recurring (:reset-log-levels-timer worker) 0 (conf 
WORKER-LOG-LEVEL-RESET-POLL-SECS) (fn [] (reset-log-levels latest-log-config)))
-     (schedule-recurring (:refresh-active-timer worker) 0 (conf 
TASK-REFRESH-POLL-SECS) (partial refresh-storm-active worker))
- 
+       (.scheduleRecurringWithJitter
+         (:refresh-load-timer worker) 0 1 500 refresh-load))
+     (.scheduleRecurring
+       (:refresh-connections-timer worker) 0 (conf TASK-REFRESH-POLL-SECS) 
refresh-connections)
+     (.scheduleRecurring
+       (:reset-log-levels-timer worker) 0 (conf 
WORKER-LOG-LEVEL-RESET-POLL-SECS)
+         (fn [] (reset-log-levels latest-log-config)))
+     (.scheduleRecurring
+       (:refresh-active-timer worker) 0 (conf TASK-REFRESH-POLL-SECS) (partial 
refresh-storm-active worker))
      (log-message "Worker has topology config " (Utils/redactValue 
(:storm-conf worker) STORM-ZOOKEEPER-TOPOLOGY-AUTH-PAYLOAD))
      (log-message "Worker " worker-id " for storm " storm-id " on " 
assignment-id ":" port " has finished loading")
      ret

http://git-wip-us.apache.org/repos/asf/storm/blob/cc1f6d77/storm-core/src/clj/org/apache/storm/internal/thrift.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/org/apache/storm/internal/thrift.clj
index 0000000,4ccf8a7..07e0bdf
mode 000000,100644..100644
--- a/storm-core/src/clj/org/apache/storm/internal/thrift.clj
+++ b/storm-core/src/clj/org/apache/storm/internal/thrift.clj
@@@ -1,0 -1,96 +1,96 @@@
+ ;; Licensed to the Apache Software Foundation (ASF) under one
+ ;; or more contributor license agreements.  See the NOTICE file
+ ;; distributed with this work for additional information
+ ;; regarding copyright ownership.  The ASF licenses this file
+ ;; to you under the Apache License, Version 2.0 (the
+ ;; "License"); you may not use this file except in compliance
+ ;; with the License.  You may obtain a copy of the License at
+ ;;
+ ;; http://www.apache.org/licenses/LICENSE-2.0
+ ;;
+ ;; Unless required by applicable law or agreed to in writing, software
+ ;; distributed under the License is distributed on an "AS IS" BASIS,
+ ;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ;; See the License for the specific language governing permissions and
+ ;; limitations under the License.
+ 
+ (ns org.apache.storm.internal.thrift
+   (:import [java.util HashMap]
+            [java.io Serializable]
+            [org.apache.storm.generated NodeInfo Assignment])
+   (:import [org.apache.storm.generated JavaObject Grouping Nimbus 
StormTopology
+             StormTopology$_Fields Bolt Nimbus$Client Nimbus$Iface
+             ComponentCommon Grouping$_Fields SpoutSpec NullStruct StreamInfo
+             GlobalStreamId ComponentObject ComponentObject$_Fields
+             ShellComponent SupervisorInfo])
+   (:import [org.apache.storm.utils Utils NimbusClient ConfigUtils])
+   (:import [org.apache.storm Constants])
+   (:import [org.apache.storm.security.auth ReqContext])
+   (:import [org.apache.storm.grouping CustomStreamGrouping])
+   (:import [org.apache.storm.topology TopologyBuilder])
+   (:import [org.apache.storm.clojure RichShellBolt RichShellSpout])
+   (:import [org.apache.thrift.transport TTransport])
 -  (:use [org.apache.storm util config log zookeeper]))
++  (:use [org.apache.storm util config log]))
+ 
+ ;; Leaving this definition as core.clj is using them as a nested keyword 
argument
+ ;; Must remove once core.clj is ported to java
+ (def grouping-constants
+   {Grouping$_Fields/FIELDS :fields
+    Grouping$_Fields/SHUFFLE :shuffle
+    Grouping$_Fields/ALL :all
+    Grouping$_Fields/NONE :none
+    Grouping$_Fields/CUSTOM_SERIALIZED :custom-serialized
+    Grouping$_Fields/CUSTOM_OBJECT :custom-object
+    Grouping$_Fields/DIRECT :direct
+    Grouping$_Fields/LOCAL_OR_SHUFFLE :local-or-shuffle})
+ 
+ ;; Leaving this method as core.clj is using them as a nested keyword argument
+ ;; Must remove once core.clj is ported to java
+ (defn grouping-type
+   [^Grouping grouping]
+   (grouping-constants (.getSetField grouping)))
+ 
+ (defn nimbus-client-and-conn
+   ([host port]
+     (nimbus-client-and-conn host port nil))
+   ([host port as-user]
+   (log-message "Connecting to Nimbus at " host ":" port " as user: " as-user)
+   (let [conf (clojurify-structure (ConfigUtils/readStormConfig))
+         nimbusClient (NimbusClient. conf host port nil as-user)
+         client (.getClient nimbusClient)
+         transport (.transport nimbusClient)]
+         [client transport] )))
+ 
+ (defmacro with-nimbus-connection
+   [[client-sym host port] & body]
+   `(let [[^Nimbus$Client ~client-sym ^TTransport conn#] 
(nimbus-client-and-conn ~host ~port)]
+     (try
+       ~@body
+     (finally (.close conn#)))))
+ 
+ (defmacro with-configured-nimbus-connection
+   [client-sym & body]
+   `(let [conf# (clojurify-structure (ConfigUtils/readStormConfig))
+          context# (ReqContext/context)
+          user# (if (.principal context#) (.getName (.principal context#)))
+          nimbusClient# (NimbusClient/getConfiguredClientAs conf# user#)
+          ~client-sym (.getClient nimbusClient#)
+          conn# (.transport nimbusClient#)
+          ]
+      (try
+        ~@body
+      (finally (.close conn#)))))
+ 
+ ;; Leaving this definition as core.clj is using them as a nested keyword 
argument
+ ;; Must remove once core.clj is ported to java
+ (defn mk-output-spec
+   [output-spec]
+   (let [output-spec (if (map? output-spec)
+                       output-spec
+                       {Utils/DEFAULT_STREAM_ID output-spec})]
+     (map-val
+       (fn [out]
+         (if (instance? StreamInfo out)
+           out
+           (StreamInfo. out false)))
+       output-spec)))

http://git-wip-us.apache.org/repos/asf/storm/blob/cc1f6d77/storm-core/src/clj/org/apache/storm/testing.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/org/apache/storm/testing.clj
index 3dee54b,7817929..8f313cc
--- a/storm-core/src/clj/org/apache/storm/testing.clj
+++ b/storm-core/src/clj/org/apache/storm/testing.clj
@@@ -48,9 -49,10 +49,10 @@@
    (:import [org.apache.storm.task TopologyContext]
             (org.apache.storm.messaging IContext)
             [org.json.simple JSONValue])
 -  (:require [org.apache.storm [zookeeper :as zk]])
 +  (:import [org.apache.storm.cluster ZKStateStorage ClusterStateContext 
StormClusterStateImpl ClusterUtils])
    (:require [org.apache.storm.daemon.acker :as acker])
-   (:use [org.apache.storm util thrift config log local-state converter]))
 -  (:use [org.apache.storm cluster util config log local-state-converter])
++  (:use [org.apache.storm util config log local-state-converter converter])
+   (:use [org.apache.storm.internal thrift]))
  
  (defn feeder-spout
    [fields]

http://git-wip-us.apache.org/repos/asf/storm/blob/cc1f6d77/storm-core/src/clj/org/apache/storm/ui/core.clj
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/storm/blob/cc1f6d77/storm-core/test/clj/integration/org/apache/storm/integration_test.clj
----------------------------------------------------------------------
diff --cc storm-core/test/clj/integration/org/apache/storm/integration_test.clj
index 1bb8279,6dce7d6..697bdae
--- a/storm-core/test/clj/integration/org/apache/storm/integration_test.clj
+++ b/storm-core/test/clj/integration/org/apache/storm/integration_test.clj
@@@ -21,10 -21,11 +21,12 @@@
    (:import [org.apache.storm.testing TestWordCounter TestWordSpout 
TestGlobalCount
              TestAggregatesCounter TestConfBolt AckFailMapTracker AckTracker 
TestPlannerSpout])
    (:import [org.apache.storm.tuple Fields])
 -  (:use [org.apache.storm testing config util])
 +  (:import [org.apache.storm.cluster StormClusterStateImpl])
-   (:use [org.apache.storm testing config clojure util converter])
+   (:use [org.apache.storm.internal clojure])
++  (:use [org.apache.storm testing config util])
    (:use [org.apache.storm.daemon common])
-   (:require [org.apache.storm [thrift :as thrift]]))
+   (:import [org.apache.storm Thrift])
+   (:import [org.apache.storm.utils Utils]))
  
  (deftest test-basic-topology
    (doseq [zmq-on? [true false]]

http://git-wip-us.apache.org/repos/asf/storm/blob/cc1f6d77/storm-core/test/clj/org/apache/storm/cluster_test.clj
----------------------------------------------------------------------
diff --cc storm-core/test/clj/org/apache/storm/cluster_test.clj
index 22c1f80,18e3a80..13198fa
--- a/storm-core/test/clj/org/apache/storm/cluster_test.clj
+++ b/storm-core/test/clj/org/apache/storm/cluster_test.clj
@@@ -30,7 -30,8 +30,8 @@@
    (:require [conjure.core])
    (:use [conjure core])
    (:use [clojure test])
-   (:use [org.apache.storm config util testing thrift log converter]))
 -  (:use [org.apache.storm cluster config util testing log])
++  (:use [org.apache.storm config util testing log converter])
+   (:use [org.apache.storm.internal thrift]))
  
  (defn mk-config [zk-port]
    (merge (clojurify-structure (ConfigUtils/readStormConfig))

http://git-wip-us.apache.org/repos/asf/storm/blob/cc1f6d77/storm-core/test/clj/org/apache/storm/messaging/netty_integration_test.clj
----------------------------------------------------------------------
diff --cc 
storm-core/test/clj/org/apache/storm/messaging/netty_integration_test.clj
index f75a8e3,7fffd34..6a3d3ca
--- a/storm-core/test/clj/org/apache/storm/messaging/netty_integration_test.clj
+++ b/storm-core/test/clj/org/apache/storm/messaging/netty_integration_test.clj
@@@ -1,3 -1,3 +1,4 @@@
++
  ;; Licensed to the Apache Software Foundation (ASF) under one
  ;; or more contributor license agreements.  See the NOTICE file
  ;; distributed with this work for additional information

http://git-wip-us.apache.org/repos/asf/storm/blob/cc1f6d77/storm-core/test/clj/org/apache/storm/nimbus_test.clj
----------------------------------------------------------------------
diff --cc storm-core/test/clj/org/apache/storm/nimbus_test.clj
index 09c4371,ce58f42..3670fd1
--- a/storm-core/test/clj/org/apache/storm/nimbus_test.clj
+++ b/storm-core/test/clj/org/apache/storm/nimbus_test.clj
@@@ -20,13 -20,12 +20,15 @@@
    (:require [org.apache.storm [converter :as converter]])
    (:import [org.apache.storm.testing TestWordCounter TestWordSpout 
TestGlobalCount
              TestAggregatesCounter TestPlannerSpout TestPlannerBolt]
-            [org.apache.storm.nimbus InMemoryTopologyActionNotifier])
+            [org.apache.storm.nimbus InMemoryTopologyActionNotifier]
+            [org.apache.storm.generated GlobalStreamId]
+            [org.apache.storm Thrift])
    (:import [org.apache.storm.testing.staticmocking MockedZookeeper])
    (:import [org.apache.storm.scheduler INimbus])
 +  (:import [org.mockito Mockito])
 +  (:import [org.mockito.exceptions.base MockitoAssertionError])
    (:import [org.apache.storm.nimbus ILeaderElector NimbusInfo])
 +  (:import [org.apache.storm.testing.staticmocking MockedCluster])
    (:import [org.apache.storm.generated Credentials NotAliveException 
SubmitOptions
              TopologyInitialStatus TopologyStatus AlreadyAliveException 
KillOptions RebalanceOptions
              InvalidTopologyException AuthorizationException
@@@ -38,12 -37,10 +40,11 @@@
    (:import [org.apache.storm.zookeeper Zookeeper])
    (:import [org.apache.commons.io FileUtils]
             [org.json.simple JSONValue])
 -  (:use [org.apache.storm testing MockAutoCred util config log zookeeper])
 +  (:import [org.apache.storm.cluster StormClusterStateImpl 
ClusterStateContext ClusterUtils])
-   (:use [org.apache.storm testing MockAutoCred util config log timer 
converter])
++  (:use [org.apache.storm testing MockAutoCred util config log converter])
    (:use [org.apache.storm.daemon common])
    (:require [conjure.core])
-   (:require [org.apache.storm
-              [thrift :as thrift]])
 -  (:require [org.apache.storm [cluster :as cluster]])
++
    (:use [conjure core]))
  
  (defn- from-json
@@@ -1081,10 -1145,11 +1149,11 @@@
                              STORM-CLUSTER-MODE "local"
                              STORM-ZOOKEEPER-PORT zk-port
                              STORM-LOCAL-DIR nimbus-dir}))
 -         (bind cluster-state (cluster/mk-storm-cluster-state conf))
 +         (bind cluster-state (ClusterUtils/mkStormClusterState conf nil 
(ClusterStateContext.)))
           (bind nimbus (nimbus/service-handler conf 
(nimbus/standalone-nimbus)))
-          (bind topology (thrift/mk-topology
-                          {"1" (thrift/mk-spout-spec (TestPlannerSpout. true) 
:parallelism-hint 3)}
+          (bind topology (Thrift/buildTopology
+                          {"1" (Thrift/prepareSpoutDetails
+                                 (TestPlannerSpout. true) (Integer. 3))}
                           {}))
           (submit-local-topology nimbus "t1" {} topology)
           (submit-local-topology nimbus "t2" {} topology)
@@@ -1153,10 -1218,11 +1222,11 @@@
                          STORM-CLUSTER-MODE "local"
                          STORM-ZOOKEEPER-PORT zk-port
                          STORM-LOCAL-DIR nimbus-dir}))
 -          (bind cluster-state (cluster/mk-storm-cluster-state conf))
 +          (bind cluster-state (ClusterUtils/mkStormClusterState conf nil 
(ClusterStateContext.)))
            (bind nimbus (nimbus/service-handler conf 
(nimbus/standalone-nimbus)))
-           (bind topology (thrift/mk-topology
-                            {"1" (thrift/mk-spout-spec (TestPlannerSpout. 
true) :parallelism-hint 3)}
+           (bind topology (Thrift/buildTopology
+                            {"1" (Thrift/prepareSpoutDetails
+                                   (TestPlannerSpout. true) (Integer. 3))}
                             {}))
  
            (with-open [_ (MockedZookeeper. (proxy [Zookeeper] []
@@@ -1412,17 -1477,17 +1482,16 @@@
        (with-open [_ (ConfigUtilsInstaller. fake-cu)
                    _ (UtilsInstaller. fake-utils)
                    zk-le (MockedZookeeper. (proxy [Zookeeper] []
 -                          (zkLeaderElectorImpl [conf] nil)))]
 +                          (zkLeaderElectorImpl [conf] nil)))
 +                  mocked-cluster (MockedCluster. cluster-utils)]
          (stubbing [mk-authorization-handler nil
 -                   cluster/mk-storm-cluster-state nil
 -                   nimbus/file-cache-map nil
 -                   nimbus/mk-blob-cache-map nil
 -                   nimbus/mk-bloblist-cache-map nil
 -                   nimbus/mk-scheduler nil]
 -                  (nimbus/nimbus-data auth-conf fake-inimbus)
 -                  (verify-call-times-for cluster/mk-storm-cluster-state 1)
 -                  (verify-first-call-args-for-indices 
cluster/mk-storm-cluster-state [2]
 -                                                      expected-acls))))))
 +                 nimbus/file-cache-map nil
 +                 nimbus/mk-blob-cache-map nil
 +                 nimbus/mk-bloblist-cache-map nil
-                  mk-timer nil
 +                 nimbus/mk-scheduler nil]
 +          (nimbus/nimbus-data auth-conf fake-inimbus)
 +          (.mkStormClusterStateImpl (Mockito/verify cluster-utils 
(Mockito/times 1)) (Mockito/any) (Mockito/eq expected-acls) (Mockito/any))
 +          )))))
  
  (deftest test-file-bogus-download
    (with-local-cluster [cluster :daemon-conf {SUPERVISOR-ENABLE false 
TOPOLOGY-ACKER-EXECUTORS 0 TOPOLOGY-EVENTLOGGER-EXECUTORS 0}]
@@@ -1453,11 -1518,12 +1522,12 @@@
                        STORM-CLUSTER-MODE "local"
                        STORM-ZOOKEEPER-PORT zk-port
                        STORM-LOCAL-DIR nimbus-dir}))
 -        (bind cluster-state (cluster/mk-storm-cluster-state conf))
 +        (bind cluster-state (ClusterUtils/mkStormClusterState conf nil 
(ClusterStateContext.)))
          (bind nimbus (nimbus/service-handler conf (nimbus/standalone-nimbus)))
          (Time/sleepSecs 1)
-         (bind topology (thrift/mk-topology
-                          {"1" (thrift/mk-spout-spec (TestPlannerSpout. true) 
:parallelism-hint 3)}
+         (bind topology (Thrift/buildTopology
+                          {"1" (Thrift/prepareSpoutDetails
+                                 (TestPlannerSpout. true) (Integer. 3))}
                           {}))
          (submit-local-topology nimbus "t1" {TOPOLOGY-MESSAGE-TIMEOUT-SECS 30} 
topology)
          ; make transition for topology t1 to be killed -> nimbus applies this 
event to cluster state

http://git-wip-us.apache.org/repos/asf/storm/blob/cc1f6d77/storm-core/test/clj/org/apache/storm/supervisor_test.clj
----------------------------------------------------------------------
diff --cc storm-core/test/clj/org/apache/storm/supervisor_test.clj
index 3ebdbcd,ef40c4a..cdd66e4
--- a/storm-core/test/clj/org/apache/storm/supervisor_test.clj
+++ b/storm-core/test/clj/org/apache/storm/supervisor_test.clj
@@@ -22,22 -22,21 +22,21 @@@
    (:import [org.apache.storm.testing TestWordCounter TestWordSpout 
TestGlobalCount TestAggregatesCounter TestPlannerSpout])
    (:import [org.apache.storm.scheduler ISupervisor])
    (:import [org.apache.storm.utils Time Utils$UptimeComputer ConfigUtils])
-   (:import [org.apache.storm.generated RebalanceOptions])
+   (:import [org.apache.storm.generated RebalanceOptions WorkerResources])
 -  (:import [org.mockito Matchers Mockito])
 +  (:import [org.apache.storm.testing.staticmocking MockedCluster])
    (:import [java.util UUID])
++  (:import [org.apache.storm Thrift])
 +  (:import [org.mockito Mockito Matchers])
 +  (:import [org.mockito.exceptions.base MockitoAssertionError])
    (:import [java.io File])
    (:import [java.nio.file Files])
 -  (:import [org.apache.storm.utils Utils IPredicate]
 -           [org.apache.storm.utils.staticmocking ConfigUtilsInstaller
 -                                                 UtilsInstaller])
 +  (:import [org.apache.storm.utils Utils IPredicate])
 +  (:import [org.apache.storm.cluster StormClusterStateImpl 
ClusterStateContext ClusterUtils]
-            [org.apache.storm.utils.staticmocking ConfigUtilsInstaller
-                                                  UtilsInstaller])
++           [org.apache.storm.utils.staticmocking ConfigUtilsInstaller 
UtilsInstaller])
    (:import [java.nio.file.attribute FileAttribute])
-   (:use [org.apache.storm config testing util timer log converter])
 -  (:import [org.apache.storm Thrift])
 -  (:import [org.apache.storm.utils Utils])
 -  (:use [org.apache.storm config testing util log])
++  (:use [org.apache.storm config testing util log converter])
    (:use [org.apache.storm.daemon common])
--  (:require [org.apache.storm.daemon [worker :as worker] [supervisor :as 
supervisor]]
-             [org.apache.storm [thrift :as thrift]])
 -            [org.apache.storm [cluster :as cluster]])
++  (:require [org.apache.storm.daemon [worker :as worker] [supervisor :as 
supervisor]])
    (:use [conjure core])
    (:require [clojure.java.io :as io]))
  
@@@ -770,66 -772,68 +771,68 @@@
              childopts-with-ids (supervisor/substitute-childopts childopts 
worker-id topology-id port mem-onheap)]
          (is (= expected-childopts childopts-with-ids)))))
  
 -(deftest test-retry-read-assignments
 -  (with-simulated-time-local-cluster [cluster
 -                                      :supervisors 0
 -                                      :ports-per-supervisor 2
 -                                      :daemon-conf 
{ConfigUtils/NIMBUS_DO_NOT_REASSIGN true
 -                                                    NIMBUS-MONITOR-FREQ-SECS 
10
 -                                                    
TOPOLOGY-MESSAGE-TIMEOUT-SECS 30
 -                                                    TOPOLOGY-ACKER-EXECUTORS 
0}]
 -    (letlocals
 -     (bind sup1 (add-supervisor cluster :id "sup1" :ports [1 2 3 4]))
 -     (bind topology1 (Thrift/buildTopology
 -                      {"1" (Thrift/prepareSpoutDetails
 -                             (TestPlannerSpout. true) (Integer. 2))}
 -                      {}))
 -     (bind topology2 (Thrift/buildTopology
 -                      {"1" (Thrift/prepareSpoutDetails
 -                             (TestPlannerSpout. true) (Integer. 2))}
 -                      {}))
 -     (bind state (:storm-cluster-state cluster))
 -     (bind changed (capture-changed-workers
 -                    (submit-mocked-assignment
 -                     (:nimbus cluster)
 -                     (:storm-cluster-state cluster)
 -                     "topology1"
 -                     {TOPOLOGY-WORKERS 2}
 -                     topology1
 -                     {1 "1"
 -                      2 "1"}
 -                     {[1 1] ["sup1" 1]
 -                      [2 2] ["sup1" 2]}
 -                     {["sup1" 1] [0.0 0.0 0.0]
 -                      ["sup1" 2] [0.0 0.0 0.0]
 -                      })
 -                    (submit-mocked-assignment
 -                     (:nimbus cluster)
 -                     (:storm-cluster-state cluster)
 -                     "topology2"
 -                     {TOPOLOGY-WORKERS 2}
 -                     topology2
 -                     {1 "1"
 -                      2 "1"}
 -                     {[1 1] ["sup1" 1]
 -                      [2 2] ["sup1" 2]}
 -                     {["sup1" 1] [0.0 0.0 0.0]
 -                      ["sup1" 2] [0.0 0.0 0.0]
 -                      })
 -                    ;; Instead of sleeping until topology is scheduled, 
rebalance topology so mk-assignments is called.
 -                    (.rebalance (:nimbus cluster) "topology1" (doto 
(RebalanceOptions.) (.set_wait_secs 0)))
 -                    ))
 -     (is (empty? (:launched changed)))
 -     (bind options (RebalanceOptions.))
 -     (.set_wait_secs options 0)
 -     (bind changed (capture-changed-workers
 -                    (.rebalance (:nimbus cluster) "topology2" options)
 -                    (advance-cluster-time cluster 10)
 -                    (heartbeat-workers cluster "sup1" [1 2 3 4])
 -                    (advance-cluster-time cluster 10)
 -                    ))
 -     (validate-launched-once (:launched changed)
 -                             {"sup1" [1 2]}
 -                             (get-storm-id (:storm-cluster-state cluster) 
"topology1"))
 -     (validate-launched-once (:launched changed)
 -                             {"sup1" [3 4]}
 -                             (get-storm-id (:storm-cluster-state cluster) 
"topology2"))
 -     )))
 +  (deftest test-retry-read-assignments
 +    (with-simulated-time-local-cluster [cluster
 +                                        :supervisors 0
 +                                        :ports-per-supervisor 2
 +                                        :daemon-conf 
{ConfigUtils/NIMBUS_DO_NOT_REASSIGN true
 +                                                      
NIMBUS-MONITOR-FREQ-SECS 10
 +                                                      
TOPOLOGY-MESSAGE-TIMEOUT-SECS 30
 +                                                      
TOPOLOGY-ACKER-EXECUTORS 0}]
 +      (letlocals
 +        (bind sup1 (add-supervisor cluster :id "sup1" :ports [1 2 3 4]))
-         (bind topology1 (thrift/mk-topology
-                           {"1" (thrift/mk-spout-spec (TestPlannerSpout. true) 
:parallelism-hint 2)}
++        (bind topology1 (Thrift/buildTopology
++                          {"1" (Thrift/prepareSpoutDetails
++                                 (TestPlannerSpout. true) (Integer. 2))}
 +                          {}))
-         (bind topology2 (thrift/mk-topology
-                           {"1" (thrift/mk-spout-spec (TestPlannerSpout. true) 
:parallelism-hint 2)}
++        (bind topology2 (Thrift/buildTopology
++                          {"1" (Thrift/prepareSpoutDetails
++                                 (TestPlannerSpout. true) (Integer. 2))}
 +                          {}))
 +        (bind state (:storm-cluster-state cluster))
 +        (bind changed (capture-changed-workers
 +                        (submit-mocked-assignment
 +                          (:nimbus cluster)
 +                          (:storm-cluster-state cluster)
 +                          "topology1"
 +                          {TOPOLOGY-WORKERS 2}
 +                          topology1
 +                          {1 "1"
 +                           2 "1"}
 +                          {[1 1] ["sup1" 1]
 +                           [2 2] ["sup1" 2]}
 +                          {["sup1" 1] [0.0 0.0 0.0]
 +                           ["sup1" 2] [0.0 0.0 0.0]
 +                           })
 +                        (submit-mocked-assignment
 +                          (:nimbus cluster)
 +                          (:storm-cluster-state cluster)
 +                          "topology2"
 +                          {TOPOLOGY-WORKERS 2}
 +                          topology2
 +                          {1 "1"
 +                           2 "1"}
 +                          {[1 1] ["sup1" 1]
 +                           [2 2] ["sup1" 2]}
 +                          {["sup1" 1] [0.0 0.0 0.0]
 +                           ["sup1" 2] [0.0 0.0 0.0]
 +                           })
 +                        ;; Instead of sleeping until topology is scheduled, 
rebalance topology so mk-assignments is called.
 +                        (.rebalance (:nimbus cluster) "topology1" (doto 
(RebalanceOptions.) (.set_wait_secs 0)))
 +                        ))
 +        (is (empty? (:launched changed)))
 +        (bind options (RebalanceOptions.))
 +        (.set_wait_secs options 0)
 +        (bind changed (capture-changed-workers
 +                        (.rebalance (:nimbus cluster) "topology2" options)
 +                        (advance-cluster-time cluster 10)
 +                        (heartbeat-workers cluster "sup1" [1 2 3 4])
 +                        (advance-cluster-time cluster 10)
 +                        ))
 +        (validate-launched-once (:launched changed)
 +          {"sup1" [1 2]}
 +          (get-storm-id (:storm-cluster-state cluster) "topology1"))
 +        (validate-launched-once (:launched changed)
 +          {"sup1" [3 4]}
 +          (get-storm-id (:storm-cluster-state cluster) "topology2"))
 +        )))

Reply via email to