merge conflicts from master
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/88013488 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/88013488 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/88013488 Branch: refs/heads/master Commit: 880134881566427e886b01d44890d22db483f6bd Parents: 67a5878 Author: å«ä¹ <[email protected]> Authored: Thu Feb 25 13:11:50 2016 +0800 Committer: å«ä¹ <[email protected]> Committed: Thu Feb 25 13:11:50 2016 +0800 ---------------------------------------------------------------------- storm-core/src/clj/org/apache/storm/converter.clj | 4 ++-- storm-core/src/clj/org/apache/storm/daemon/executor.clj | 1 - storm-core/src/clj/org/apache/storm/daemon/nimbus.clj | 7 +++---- storm-core/src/clj/org/apache/storm/daemon/supervisor.clj | 2 -- storm-core/src/clj/org/apache/storm/ui/core.clj | 8 +++++++- storm-core/test/clj/org/apache/storm/nimbus_test.clj | 8 +++++--- 6 files changed, 17 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/88013488/storm-core/src/clj/org/apache/storm/converter.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/converter.clj b/storm-core/src/clj/org/apache/storm/converter.clj index 54d906d..495fe7f 100644 --- a/storm-core/src/clj/org/apache/storm/converter.clj +++ b/storm-core/src/clj/org/apache/storm/converter.clj @@ -192,9 +192,9 @@ (defn thriftify-storm-base [storm-base] (doto (StormBase.) (.set_name (:storm-name storm-base)) - (.set_launch_time_secs (int (:launch-time-secs storm-base))) + (.set_launch_time_secs (if (:launch-time-secs storm-base) (int (:launch-time-secs storm-base)) 0)) (.set_status (convert-to-status-from-symbol (:status storm-base))) - (.set_num_workers (int (:num-workers storm-base))) + (.set_num_workers (if (:num-workers storm-base) (int (:num-workers storm-base)) 0)) (.set_component_executors (map-val int (:component->executors storm-base))) (.set_owner (:owner storm-base)) (.set_topology_action_options (thriftify-topology-action-options storm-base)) http://git-wip-us.apache.org/repos/asf/storm/blob/88013488/storm-core/src/clj/org/apache/storm/daemon/executor.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/daemon/executor.clj b/storm-core/src/clj/org/apache/storm/daemon/executor.clj index edd1368..3b4e330 100644 --- a/storm-core/src/clj/org/apache/storm/daemon/executor.clj +++ b/storm-core/src/clj/org/apache/storm/daemon/executor.clj @@ -42,7 +42,6 @@ [org.json.simple JSONValue] [com.lmax.disruptor.dsl ProducerType] [org.apache.storm StormTimer]) - (:require [org.apache.storm [cluster :as cluster]]) (:require [org.apache.storm.daemon [task :as task]]) (:require [org.apache.storm.daemon.builtin-metrics :as builtin-metrics]) (:require [clojure.set :as set])) http://git-wip-us.apache.org/repos/asf/storm/blob/88013488/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj index 735200f..a0e652b 100644 --- a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj +++ b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj @@ -50,10 +50,9 @@ ProfileRequest ProfileAction NodeInfo LSTopoHistory]) (:import [org.apache.storm.daemon Shutdownable]) (:import [org.apache.storm.validation ConfigValidation]) - (:import [org.apache.storm.cluster ClusterStateContext DaemonType]) - (:use [org.apache.storm util config log zookeeper]) - (:require [org.apache.storm [cluster :as cluster] - [converter :as converter]]) + (:import [org.apache.storm.cluster ClusterStateContext DaemonType StormClusterStateImpl ClusterUtils]) + (:use [org.apache.storm util config log converter]) + (:require [org.apache.storm [converter :as converter]]) (:require [clojure.set :as set]) (:import [org.apache.storm.daemon.common StormBase Assignment]) (:import [org.apache.storm.zookeeper Zookeeper]) http://git-wip-us.apache.org/repos/asf/storm/blob/88013488/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj b/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj index 1446ac9..781bd94 100644 --- a/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj +++ b/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj @@ -35,7 +35,6 @@ (:use [org.apache.storm.daemon common]) (:import [org.apache.storm.command HealthCheck]) (:require [org.apache.storm.daemon [worker :as worker]] - [clojure.set :as set]) (:import [org.apache.thrift.transport TTransportException]) (:import [org.apache.zookeeper data.ACL ZooDefs$Ids ZooDefs$Perms]) @@ -80,7 +79,6 @@ new-profiler-actions (->> (dofor [sid (distinct storm-ids)] - (if-let [topo-profile-actions (into [] (for [request (.getTopologyProfileRequests storm-cluster-state sid)] (clojurify-profile-request request)))] {sid topo-profile-actions})) (apply merge))] http://git-wip-us.apache.org/repos/asf/storm/blob/88013488/storm-core/src/clj/org/apache/storm/ui/core.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/ui/core.clj b/storm-core/src/clj/org/apache/storm/ui/core.clj index 1e531c4..25aa717 100644 --- a/storm-core/src/clj/org/apache/storm/ui/core.clj +++ b/storm-core/src/clj/org/apache/storm/ui/core.clj @@ -21,7 +21,7 @@ ring.middleware.multipart-params) (:use [ring.middleware.json :only [wrap-json-params]]) (:use [hiccup core page-helpers]) - (:use [org.apache.storm config util log zookeeper converter]) + (:use [org.apache.storm config util log converter]) (:use [org.apache.storm.ui helpers]) (:use [org.apache.storm.daemon [common :only [ACKER-COMPONENT-ID ACKER-INIT-STREAM-ID ACKER-ACK-STREAM-ID ACKER-FAIL-STREAM-ID mk-authorization-handler @@ -272,6 +272,12 @@ :grouping (clojure.core/name (thrift/grouping-type group))})})])] (into {} (doall components)))) +(defn mk-include-sys-fn + [include-sys?] + (if include-sys? + (fn [_] true) + (fn [stream] (and (string? stream) (not (Utils/isSystemId stream)))))) + (defn stream-boxes [datmap] (let [filter-fn (mk-include-sys-fn true) streams http://git-wip-us.apache.org/repos/asf/storm/blob/88013488/storm-core/test/clj/org/apache/storm/nimbus_test.clj ---------------------------------------------------------------------- diff --git a/storm-core/test/clj/org/apache/storm/nimbus_test.clj b/storm-core/test/clj/org/apache/storm/nimbus_test.clj index 3670fd1..8c383e5 100644 --- a/storm-core/test/clj/org/apache/storm/nimbus_test.clj +++ b/storm-core/test/clj/org/apache/storm/nimbus_test.clj @@ -15,14 +15,15 @@ ;; limitations under the License. (ns org.apache.storm.nimbus-test (:use [clojure test]) - (:require [org.apache.storm [util :as util] [stats :as stats]]) + (:require [org.apache.storm [util :as util]]) (:require [org.apache.storm.daemon [nimbus :as nimbus]]) (:require [org.apache.storm [converter :as converter]]) (:import [org.apache.storm.testing TestWordCounter TestWordSpout TestGlobalCount TestAggregatesCounter TestPlannerSpout TestPlannerBolt] [org.apache.storm.nimbus InMemoryTopologyActionNotifier] [org.apache.storm.generated GlobalStreamId] - [org.apache.storm Thrift]) + [org.apache.storm Thrift] + [org.apache.storm.stats BoltExecutorStats]) (:import [org.apache.storm.testing.staticmocking MockedZookeeper]) (:import [org.apache.storm.scheduler INimbus]) (:import [org.mockito Mockito]) @@ -143,7 +144,8 @@ curr-beat (clojurify-zk-worker-hb (.getWorkerHeartbeat state storm-id node port)) stats (:executor-stats curr-beat)] (.workerHeartbeat state storm-id node port - (thriftify-zk-worker-hb {:storm-id storm-id :time-secs (Time/currentTimeSecs) :uptime 10 :executor-stats (merge stats {executor (stats/render-stats! (stats/mk-bolt-stats 20))})}) + (thriftify-zk-worker-hb {:storm-id storm-id :time-secs (Time/currentTimeSecs) :uptime 10 + :executor-stats (merge stats {executor (clojurify-structure (.renderStats (BoltExecutorStats/mkBoltStats 20)))})}) ))) (defn slot-assignments [cluster storm-id]
