STORM-1276: line for line translation of nimbus to java
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/725003cc Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/725003cc Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/725003cc Branch: refs/heads/master Commit: 725003cc5f488191dda011dda738e41d57eca474 Parents: 57998b4 Author: Robert (Bobby) Evans <ev...@yahoo-inc.com> Authored: Mon Oct 24 15:59:36 2016 -0500 Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com> Committed: Thu Dec 1 13:58:59 2016 -0600 ---------------------------------------------------------------------- bin/storm.py | 2 +- .../src/clj/org/apache/storm/converter.clj | 37 +- .../src/clj/org/apache/storm/daemon/nimbus.clj | 2533 ------------ storm-core/src/clj/org/apache/storm/testing.clj | 137 +- .../src/clj/org/apache/storm/testing4j.clj | 1 - storm-core/src/clj/org/apache/storm/ui/core.clj | 36 +- storm-core/src/clj/org/apache/storm/util.clj | 12 +- storm-core/src/jvm/org/apache/storm/Config.java | 5 +- .../org/apache/storm/blobstore/BlobStore.java | 38 + .../storm/cluster/IStormClusterState.java | 78 +- .../storm/cluster/StormClusterStateImpl.java | 4 +- .../org/apache/storm/daemon/StormCommon.java | 33 +- .../org/apache/storm/daemon/nimbus/Nimbus.java | 3729 ++++++++++++++++++ .../storm/daemon/nimbus/TopologyActions.java | 31 + .../storm/daemon/nimbus/TopologyResources.java | 63 + .../daemon/nimbus/TopologyStateTransition.java | 27 + .../storm/daemon/supervisor/BasicContainer.java | 14 +- .../org/apache/storm/daemon/worker/Worker.java | 10 +- .../jvm/org/apache/storm/nimbus/NimbusInfo.java | 8 +- .../scheduler/SchedulerAssignmentImpl.java | 16 + .../storm/scheduler/SupervisorDetails.java | 20 +- .../apache/storm/security/auth/AuthUtils.java | 34 +- .../security/auth/ShellBasedGroupsMapping.java | 13 +- .../jvm/org/apache/storm/stats/StatsUtil.java | 22 +- .../apache/storm/utils/BufferInputStream.java | 3 +- .../jvm/org/apache/storm/utils/ConfigUtils.java | 4 +- .../src/jvm/org/apache/storm/utils/Utils.java | 26 +- .../test/clj/org/apache/storm/cluster_test.clj | 8 +- .../test/clj/org/apache/storm/nimbus_test.clj | 731 ++-- .../scheduler/multitenant_scheduler_test.clj | 38 +- .../clj/org/apache/storm/scheduler_test.clj | 4 +- .../apache/storm/security/auth/auth_test.clj | 44 +- .../storm/security/auth/nimbus_auth_test.clj | 114 +- 33 files changed, 4669 insertions(+), 3206 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/725003cc/bin/storm.py ---------------------------------------------------------------------- diff --git a/bin/storm.py b/bin/storm.py index 3630bb0..c48a2b9 100755 --- a/bin/storm.py +++ b/bin/storm.py @@ -632,7 +632,7 @@ def get_log4j2_conf_dir(): storm_log4j2_conf_dir = os.path.join(STORM_DIR, storm_log4j2_conf_dir) return storm_log4j2_conf_dir -def nimbus(klass="org.apache.storm.daemon.nimbus"): +def nimbus(klass="org.apache.storm.daemon.nimbus.Nimbus"): """Syntax: [storm nimbus] Launches the nimbus daemon. This command should be run under http://git-wip-us.apache.org/repos/asf/storm/blob/725003cc/storm-core/src/clj/org/apache/storm/converter.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/converter.clj b/storm-core/src/clj/org/apache/storm/converter.clj index 0ba3901..6f041bb 100644 --- a/storm-core/src/clj/org/apache/storm/converter.clj +++ b/storm-core/src/clj/org/apache/storm/converter.clj @@ -17,6 +17,7 @@ (:import [org.apache.storm.generated SupervisorInfo NodeInfo Assignment WorkerResources StormBase TopologyStatus ClusterWorkerHeartbeat ExecutorInfo ErrorInfo Credentials RebalanceOptions KillOptions TopologyActionOptions DebugOptions ProfileRequest] + [org.apache.storm.daemon.nimbus TopologyActions] [org.apache.storm.utils Utils] [org.apache.storm.stats StatsUtil]) (:import [org.apache.storm.cluster ExecutorBeat]) @@ -121,28 +122,18 @@ (clojurify-worker->resources (into {} (.get_worker_resources assignment)))))) (defn convert-to-symbol-from-status [status] - (condp = status - TopologyStatus/ACTIVE {:type :active} - TopologyStatus/INACTIVE {:type :inactive} - TopologyStatus/REBALANCING {:type :rebalancing} - TopologyStatus/KILLED {:type :killed} - nil)) + (if status {:type status} nil)) (defn- convert-to-status-from-symbol [status] (if status - (condp = (:type status) - :active TopologyStatus/ACTIVE - :inactive TopologyStatus/INACTIVE - :rebalancing TopologyStatus/REBALANCING - :killed TopologyStatus/KILLED - nil))) + (:type status))) (defn assoc-non-nil [m k v] (if v (assoc m k v) m)) (defn clojurify-rebalance-options [^RebalanceOptions rebalance-options] - (-> {:action :rebalance} + (-> {:action TopologyActions/REBALANCE} (assoc-non-nil :delay-secs (if (.is_set_wait_secs rebalance-options) (.get_wait_secs rebalance-options))) (assoc-non-nil :num-workers (if (.is_set_num_workers rebalance-options) (.get_num_workers rebalance-options))) (assoc-non-nil :component->executors (if (.is_set_num_executors rebalance-options) (into {} (.get_num_executors rebalance-options)))))) @@ -160,7 +151,7 @@ thrift-rebalance-options))) (defn clojurify-kill-options [^KillOptions kill-options] - (-> {:action :kill} + (-> {:action TopologyActions/KILL} (assoc-non-nil :delay-secs (if (.is_set_wait_secs kill-options) (.get_wait_secs kill-options))))) (defn thriftify-kill-options [kill-options] @@ -175,9 +166,9 @@ (let [ topology-action-options (:topology-action-options storm-base) action (:action topology-action-options) thrift-topology-action-options (TopologyActionOptions.)] - (if (= action :kill) + (if (= action TopologyActions/KILL) (.set_kill_options thrift-topology-action-options (thriftify-kill-options topology-action-options))) - (if (= action :rebalance) + (if (= action TopologyActions/REBALANCE) (.set_rebalance_options thrift-topology-action-options (thriftify-rebalance-options topology-action-options))) thrift-topology-action-options))) @@ -230,11 +221,6 @@ (convert-to-symbol-from-status (.get_prev_status storm-base)) (map-val clojurify-debugoptions (.get_component_debug storm-base))))) -(defn thriftify-error [error] - (doto (ErrorInfo. (:error error) (:time-secs error)) - (.set_host (:host error)) - (.set_port (:port error)))) - (defn clojurify-profile-request [^ProfileRequest request] (when request @@ -243,15 +229,6 @@ :action (.get_action request) :timestamp (.get_time_stamp request)})) -(defn thriftify-profile-request - [profile-request] - (let [nodeinfo (doto (NodeInfo.) - (.set_node (:host profile-request)) - (.set_port (set [(:port profile-request)]))) - request (ProfileRequest. nodeinfo (:action profile-request))] - (.set_time_stamp request (:timestamp profile-request)) - request)) - (defn thriftify-credentials [credentials] (doto (Credentials.) (.set_creds (if credentials credentials {}))))