Repository: storm Updated Branches: refs/heads/master 223b615d1 -> 084492782
STORM-1271: Port backtype.storm.daemon.task to java Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/7d63cb33 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/7d63cb33 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/7d63cb33 Branch: refs/heads/master Commit: 7d63cb33a99940e11663257b515186e40d4f239e Parents: 4264bfc Author: Abhishek Agarwal <[email protected]> Authored: Fri Mar 25 12:19:12 2016 +0530 Committer: Abhishek Agarwal <[email protected]> Committed: Wed Mar 30 21:43:54 2016 +0530 ---------------------------------------------------------------------- storm-core/src/clj/org/apache/storm/config.clj | 27 -- .../clj/org/apache/storm/daemon/executor.clj | 222 +++++++---------- .../src/clj/org/apache/storm/daemon/task.clj | 190 -------------- .../org/apache/storm/daemon/GrouperFactory.java | 243 ++++++++++++++++++ .../src/jvm/org/apache/storm/daemon/Task.java | 247 +++++++++++++++++++ .../daemon/metrics/BuiltinMetricsUtil.java | 8 +- .../apache/storm/hooks/info/BoltAckInfo.java | 8 + .../storm/hooks/info/BoltExecuteInfo.java | 8 + .../apache/storm/hooks/info/BoltFailInfo.java | 8 + .../org/apache/storm/hooks/info/EmitInfo.java | 9 + .../apache/storm/hooks/info/SpoutAckInfo.java | 9 + .../apache/storm/hooks/info/SpoutFailInfo.java | 9 + .../jvm/org/apache/storm/utils/ConfigUtils.java | 35 ++- .../test/clj/org/apache/storm/grouping_test.clj | 19 +- 14 files changed, 675 insertions(+), 367 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/7d63cb33/storm-core/src/clj/org/apache/storm/config.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/config.clj b/storm-core/src/clj/org/apache/storm/config.clj index e50f023..d0c4d87 100644 --- a/storm-core/src/clj/org/apache/storm/config.clj +++ b/storm-core/src/clj/org/apache/storm/config.clj @@ -42,30 +42,3 @@ (defn cluster-mode [conf & args] (keyword (conf STORM-CLUSTER-MODE))) - -(defn sampling-rate - [conf] - (->> (conf TOPOLOGY-STATS-SAMPLE-RATE) - (/ 1) - int)) - -(defn- even-sampler - [freq] - (let [freq (int freq) - start (int 0) - r (java.util.Random.) - curr (MutableInt. -1) - target (MutableInt. (.nextInt r freq))] - (with-meta - (fn [] - (let [i (.increment curr)] - (when (>= i freq) - (.set curr start) - (.set target (.nextInt r freq)))) - (= (.get curr) (.get target))) - {:rate freq}))) - -;; TODO this function together with sampling-rate are to be replaced with Java version when util.clj is in -(defn mk-stats-sampler - [conf] - (even-sampler (sampling-rate conf))) http://git-wip-us.apache.org/repos/asf/storm/blob/7d63cb33/storm-core/src/clj/org/apache/storm/daemon/executor.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/daemon/executor.clj b/storm-core/src/clj/org/apache/storm/daemon/executor.clj index e759b1d..8a77a61 100644 --- a/storm-core/src/clj/org/apache/storm/daemon/executor.clj +++ b/storm-core/src/clj/org/apache/storm/daemon/executor.clj @@ -15,6 +15,7 @@ ;; limitations under the License. (ns org.apache.storm.daemon.executor (:use [org.apache.storm.daemon common]) + (:use [clojure.walk]) (:import [org.apache.storm.generated Grouping Grouping$_Fields] [java.io Serializable] [org.apache.storm.stats BoltExecutorStats SpoutExecutorStats] @@ -33,7 +34,7 @@ (:import [org.apache.storm.utils Utils ConfigUtils TupleUtils MutableObject RotatingMap RotatingMap$ExpiredCallback MutableLong Time DisruptorQueue WorkerBackpressureThread DisruptorBackpressureCallback]) (:import [com.lmax.disruptor InsufficientCapacityException]) (:import [org.apache.storm.serialization KryoTupleSerializer]) - (:import [org.apache.storm.daemon Shutdownable StormCommon Acker]) + (:import [org.apache.storm.daemon Shutdownable StormCommon Acker Task GrouperFactory]) (:import [org.apache.storm.metric.api IMetric IMetricsConsumer$TaskInfo IMetricsConsumer$DataPoint StateMetric]) (:import [org.apache.storm Config Constants]) (:import [org.apache.storm.cluster ClusterStateContext DaemonType StormClusterStateImpl ClusterUtils]) @@ -43,74 +44,8 @@ [org.json.simple JSONValue] [com.lmax.disruptor.dsl ProducerType] [org.apache.storm StormTimer]) - (:require [org.apache.storm.daemon [task :as task]]) (:require [clojure.set :as set])) -(defn- mk-fields-grouper - [^Fields out-fields ^Fields group-fields ^List target-tasks] - (let [num-tasks (count target-tasks) - task-getter (fn [i] (.get target-tasks i))] - (fn [task-id ^List values load] - (-> (.select out-fields group-fields values) - (TupleUtils/listHashCode) - (mod num-tasks) - task-getter)))) - -(defn- mk-custom-grouper - [^CustomStreamGrouping grouping ^WorkerTopologyContext context ^String component-id ^String stream-id target-tasks] - (.prepare grouping context (GlobalStreamId. component-id stream-id) target-tasks) - (if (instance? LoadAwareCustomStreamGrouping grouping) - (fn [task-id ^List values load] - (.chooseTasks ^LoadAwareCustomStreamGrouping grouping task-id values load)) - (fn [task-id ^List values load] - (.chooseTasks grouping task-id values)))) - -(defn mk-shuffle-grouper - [^List target-tasks topo-conf ^WorkerTopologyContext context ^String component-id ^String stream-id] - (if (.get topo-conf TOPOLOGY-DISABLE-LOADAWARE-MESSAGING) - (mk-custom-grouper (ShuffleGrouping.) context component-id stream-id target-tasks) - (mk-custom-grouper (LoadAwareShuffleGrouping.) context component-id stream-id target-tasks))) - -(defn- mk-grouper - "Returns a function that returns a vector of which task indices to send tuple to, or just a single task index." - [^WorkerTopologyContext context component-id stream-id ^Fields out-fields thrift-grouping ^List target-tasks topo-conf] - (let [num-tasks (count target-tasks) - random (Random.) - target-tasks (vec (sort target-tasks))] - (condp = (Thrift/groupingType thrift-grouping) - Grouping$_Fields/FIELDS - (if (Thrift/isGlobalGrouping thrift-grouping) - (fn [task-id tuple load] - ;; It's possible for target to have multiple tasks if it reads multiple sources - (first target-tasks)) - (let [group-fields (Fields. (Thrift/fieldGrouping thrift-grouping))] - (mk-fields-grouper out-fields group-fields target-tasks) - )) - Grouping$_Fields/ALL - (fn [task-id tuple load] target-tasks) - Grouping$_Fields/SHUFFLE - (mk-shuffle-grouper target-tasks topo-conf context component-id stream-id) - Grouping$_Fields/LOCAL_OR_SHUFFLE - (let [same-tasks (set/intersection - (set target-tasks) - (set (.getThisWorkerTasks context)))] - (if-not (empty? same-tasks) - (mk-shuffle-grouper (vec same-tasks) topo-conf context component-id stream-id) - (mk-shuffle-grouper target-tasks topo-conf context component-id stream-id))) - Grouping$_Fields/NONE - (fn [task-id tuple load] - (let [i (mod (.nextInt random) num-tasks)] - (get target-tasks i) - )) - Grouping$_Fields/CUSTOM_OBJECT - (let [grouping (Thrift/instantiateJavaObject (.get_custom_object thrift-grouping))] - (mk-custom-grouper grouping context component-id stream-id target-tasks)) - Grouping$_Fields/CUSTOM_SERIALIZED - (let [grouping (Utils/javaDeserialize (.get_custom_serialized thrift-grouping) Serializable)] - (mk-custom-grouper grouping context component-id stream-id target-tasks)) - Grouping$_Fields/DIRECT - :direct - ))) ;TODO: when translating this function, you should replace the filter-val with a proper for loop + if condition HERE (defn- outbound-groupings @@ -122,13 +57,13 @@ pos?)) (map (fn [[component tgrouping]] [component - (mk-grouper worker-context - this-component-id - stream-id - out-fields - tgrouping - (.getComponentTasks worker-context component) - topo-conf)])) + (GrouperFactory/mkGrouper worker-context + this-component-id + stream-id + out-fields + tgrouping + (.getComponentTasks worker-context component) + topo-conf)])) (into {}) (HashMap.))) @@ -153,11 +88,11 @@ (let [topology (.getRawTopology context) spouts (.get_spouts topology) bolts (.get_bolts topology)] - (cond (contains? spouts component-id) :spout - (contains? bolts component-id) :bolt + (cond (contains? spouts component-id) "spout" + (contains? bolts component-id) "bolt" :else (throw (RuntimeException. (str "Could not find " component-id " in topology " topology)))))) -(defn executor-selector [executor-data & _] (:type executor-data)) +(defn executor-selector [executor-data & _] (keyword (:type executor-data))) (defmulti mk-threads executor-selector) (defmulti mk-executor-stats executor-selector) @@ -275,9 +210,9 @@ (Utils/exceptionCauseIsInstanceOf java.io.InterruptedIOException error)) (log-message "Got interrupted excpetion shutting thread down...") ((:suicide-fn <>))))) - :sampler (mk-stats-sampler storm-conf) + :sampler (ConfigUtils/mkStatsSampler storm-conf) :backpressure (atom false) - :spout-throttling-metrics (if (= executor-type :spout) + :spout-throttling-metrics (if (= (keyword executor-type) :spout) (SpoutThrottlingMetrics.) nil) ;; TODO: add in the executor-specific stuff in a :specific... or make a spout-data, bolt-data function? @@ -319,6 +254,13 @@ (.getName batch-transfer-queue) (:uncaught-exception-handler (:report-error-and-die executor-data))))) +;; TODO: this is all expensive... should be precomputed +(defn send-unanchored + [^Task task-data stream values transfer-fn] + (let [out-tuple (.getTuple task-data stream values)] + (fast-list-iter [t (.getOutgoingTasks task-data stream values)] + (transfer-fn t out-tuple)))) + (defn setup-metrics! [executor-data] (let [{:keys [storm-conf receive-queue worker-context interval->task->metric-registry]} executor-data distinct-time-bucket-intervals (keys interval->task->metric-registry)] @@ -335,7 +277,8 @@ [executor-data task-data ^TupleImpl tuple] (let [{:keys [interval->task->metric-registry ^WorkerTopologyContext worker-context]} executor-data interval (.getInteger tuple 0) - task-id (:task-id task-data) + transfer-fn (:transfer-fn executor-data) + task-id (.getTaskId task-data) name->imetric (-> interval->task->metric-registry (get interval) (get task-id)) task-info (IMetricsConsumer$TaskInfo. (Utils/hostname (:storm-conf executor-data)) @@ -352,7 +295,7 @@ (filter identity) (into []))] (when (seq data-points) - (task/send-unanchored task-data Constants/METRICS_STREAM_ID [task-info data-points])))) + (send-unanchored task-data Constants/METRICS_STREAM_ID [task-info data-points] transfer-fn)))) (defn setup-ticks! [worker executor-data] (let [storm-conf (:storm-conf executor-data) @@ -362,7 +305,7 @@ (when tick-time-secs (if (or (Utils/isSystemId (:component-id executor-data)) (and (= false (storm-conf TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS)) - (= :spout (:type executor-data)))) + (= :spout (keyword (:type executor-data))))) (log-message "Timeouts disabled for executor " (:component-id executor-data) ":" (:executor-id executor-data)) (.scheduleRecurring (:user-timer worker) @@ -374,10 +317,18 @@ (defn mk-executor [worker executor-id initial-credentials] (let [executor-data (mk-executor-data worker executor-id) + transfer-fn (:transfer-fn executor-data) _ (log-message "Loading executor " (:component-id executor-data) ":" (pr-str executor-id)) task-datas (->> executor-data :task-ids - (map (fn [t] [t (task/mk-task executor-data t)])) + (map (fn [t] (let [task (Task. (stringify-keys executor-data) t) + stream StormCommon/SYSTEM_STREAM_ID + values ["startup"]] + ;; when this is called, the threads for the executor haven't been started yet, + ;; so we won't be risking trampling on the single-threaded claim strategy disruptor queue + (send-unanchored task stream values transfer-fn) + [t task] + ))) (into {}) (HashMap.)) _ (log-message "Loaded executor tasks " (:component-id executor-data) ":" (pr-str executor-id)) @@ -426,36 +377,36 @@ (.interrupt t) (.join t)) - (doseq [user-context (map :user-context (vals task-datas))] + (doseq [user-context (map #(.getUserContext %) (vals task-datas))] (doseq [hook (.getHooks user-context)] (.cleanup hook))) (.disconnect (:storm-cluster-state executor-data)) (when @(:open-or-prepare-was-called? executor-data) - (doseq [obj (map :object (vals task-datas))] + (doseq [obj (map #(.getTaskObject %) (vals task-datas))] (close-component executor-data obj))) (log-message "Shut down executor " component-id ":" (pr-str executor-id))) ))) (defn- fail-spout-msg [executor-data task-data msg-id tuple-info time-delta reason id] - (let [^ISpout spout (:object task-data) + (let [^ISpout spout (.getTaskObject task-data) storm-conf (:storm-conf executor-data) - task-id (:task-id task-data)] + task-id (.getTaskId task-data)] ;;TODO: need to throttle these when there's lots of failures (when (= true (storm-conf TOPOLOGY-DEBUG)) (log-message "SPOUT Failing " id ": " tuple-info " REASON: " reason " MSG-ID: " msg-id)) (.fail spout msg-id) - (task/apply-hooks (:user-context task-data) .spoutFail (SpoutFailInfo. msg-id task-id time-delta)) + (.applyOn (SpoutFailInfo. msg-id task-id time-delta) (.getUserContext task-data)) (when time-delta (.spoutFailedTuple (:stats executor-data) (:stream tuple-info) time-delta)))) (defn- ack-spout-msg [executor-data task-data msg-id tuple-info time-delta id] (let [storm-conf (:storm-conf executor-data) - ^ISpout spout (:object task-data) - task-id (:task-id task-data)] + ^ISpout spout (.getTaskObject task-data) + task-id (.getTaskId task-data)] (when (= true (storm-conf TOPOLOGY-DEBUG)) (log-message "SPOUT Acking message " id " " msg-id)) (.ack spout msg-id) - (task/apply-hooks (:user-context task-data) .spoutAck (SpoutAckInfo. msg-id task-id time-delta)) + (.applyOn (SpoutAckInfo. msg-id task-id time-delta) (.getUserContext task-data)) (when time-delta (.spoutAckedTuple (:stats executor-data) (:stream tuple-info) time-delta)))) @@ -496,10 +447,11 @@ ;; the thread's initialized random number generator is used to generate ;; uniformily distributed random numbers. (when (and (> spct 0) (< (* 100 (.nextDouble random)) spct)) - (task/send-unanchored + (send-unanchored task-data StormCommon/EVENTLOGGER_STREAM_ID - [component-id message-id (System/currentTimeMillis) values])))) + [component-id message-id (System/currentTimeMillis) values] + (:transfer-fn executor-data))))) (defmethod mk-threads :spout [executor-data task-datas initial-credentials] (let [{:keys [storm-conf component-id worker-context transfer-fn report-error sampler open-or-prepare-was-called?]} executor-data @@ -507,7 +459,7 @@ max-spout-pending (executor-max-spout-pending storm-conf (count task-datas)) ^Integer max-spout-pending (if max-spout-pending (int max-spout-pending)) last-active (atom false) - spouts (ArrayList. (map :object (vals task-datas))) + spouts (ArrayList. (map #(.getTaskObject %) (vals task-datas))) rand (Random. (Utils/secureRandomLong)) ^DisruptorQueue transfer-queue (executor-data :batch-transfer-queue) debug? (= true (storm-conf TOPOLOGY-DEBUG)) @@ -526,7 +478,7 @@ Constants/METRICS_TICK_STREAM_ID (metrics-tick executor-data (get task-datas task-id) tuple) Constants/CREDENTIALS_CHANGED_STREAM_ID (let [task-data (get task-datas task-id) - spout-obj (:object task-data)] + spout-obj (.getTaskObject task-data)] (when (instance? ICredentialsListener spout-obj) (.setCredentials spout-obj (.getValue tuple 0)))) Acker/ACKER_RESET_TIMEOUT_STREAM_ID @@ -559,15 +511,14 @@ (while (not @(:storm-active-atom executor-data)) (Thread/sleep 100)) (log-message "Opening spout " component-id ":" (keys task-datas)) - (.registerAll (:spout-throttling-metrics executor-data) storm-conf (:user-context (first (vals task-datas)))) + (.registerAll (:spout-throttling-metrics executor-data) storm-conf (.getUserContext (first (vals task-datas)))) (doseq [[task-id task-data] task-datas - :let [^ISpout spout-obj (:object task-data) - tasks-fn (:tasks-fn task-data) + :let [^ISpout spout-obj (.getTaskObject task-data) send-spout-msg (fn [out-stream-id values message-id out-task-id] (.increment emitted-count) (let [out-tasks (if out-task-id - (tasks-fn out-task-id out-stream-id values) - (tasks-fn out-stream-id values)) + (.getOutgoingTasks task-data out-task-id out-stream-id values) + (.getOutgoingTasks task-data out-stream-id values)) rooted? (and message-id has-ackers?) root-id (if rooted? (MessageId/generateId rand)) ^List out-ids (fast-list-for [t out-tasks] (if rooted? (MessageId/generateId rand)))] @@ -590,25 +541,27 @@ message-id {:stream out-stream-id :values (if debug? values nil)} - (if (sampler) (System/currentTimeMillis))]) - (task/send-unanchored task-data + (if (.call ^Callable sampler) (System/currentTimeMillis))]) + (send-unanchored task-data Acker/ACKER_INIT_STREAM_ID - [root-id (Utils/bitXorVals out-ids) task-id])) + [root-id (Utils/bitXorVals out-ids) task-id] + (:transfer-fn executor-data))) (when message-id (ack-spout-msg executor-data task-data message-id {:stream out-stream-id :values values} - (if (sampler) 0) "0:"))) + (if (.call ^Callable sampler) 0) "0:"))) (or out-tasks [])))]] - (.registerAll (:builtin-metrics task-data) storm-conf (:user-context task-data)) + (.registerAll (.getBuiltInMetrics task-data) storm-conf (.getUserContext task-data)) (BuiltinMetricsUtil/registerQueueMetrics {"sendqueue" (:batch-transfer-queue executor-data) "receive" receive-queue} - storm-conf (:user-context task-data)) + storm-conf (.getUserContext task-data)) + (when (instance? ICredentialsListener spout-obj) (.setCredentials spout-obj initial-credentials)) (.open spout-obj storm-conf - (:user-context task-data) + (.getUserContext task-data) (SpoutOutputCollector. (reify ISpoutOutputCollector (^long getPendingCount[this] @@ -694,7 +647,7 @@ (defmethod mk-threads :bolt [executor-data task-datas initial-credentials] (let [storm-conf (:storm-conf executor-data) - execute-sampler (mk-stats-sampler storm-conf) + execute-sampler (ConfigUtils/mkStatsSampler storm-conf) executor-stats (:stats executor-data) {:keys [storm-conf component-id worker-context transfer-fn report-error sampler open-or-prepare-was-called?]} executor-data @@ -720,16 +673,16 @@ (let [stream-id (.getSourceStreamId tuple)] (condp = stream-id Constants/CREDENTIALS_CHANGED_STREAM_ID - (let [task-data (get task-datas task-id) - bolt-obj (:object task-data)] + (let [^Task task-data (get task-datas task-id) + bolt-obj (.getTaskObject task-data)] (when (instance? ICredentialsListener bolt-obj) - (.setCredentials bolt-obj (.getValue tuple 0)))) + (.setCredentials ^ICredentialsListener bolt-obj (.getValue tuple 0)))) Constants/METRICS_TICK_STREAM_ID (metrics-tick executor-data (get task-datas task-id) tuple) - (let [task-data (get task-datas task-id) - ^IBolt bolt-obj (:object task-data) - user-context (:user-context task-data) - sampler? (sampler) - execute-sampler? (execute-sampler) + (let [^Task task-data (get task-datas task-id) + ^IBolt bolt-obj (.getTaskObject task-data) + user-context (.getUserContext task-data) + sampler? (.call ^Callable sampler) + execute-sampler? (.call ^Callable execute-sampler) now (if (or sampler? execute-sampler?) (System/currentTimeMillis)) receive-queue (:receive-queue executor-data)] (when sampler? @@ -741,7 +694,7 @@ (when (= true (storm-conf TOPOLOGY-DEBUG)) (log-message "Execute done TUPLE " tuple " TASK: " task-id " DELTA: " delta)) - (task/apply-hooks user-context .boltExecute (BoltExecuteInfo. tuple task-id delta)) + (.applyOn (BoltExecuteInfo. tuple task-id delta) user-context) (when delta (.boltExecuteTuple executor-stats (.getSourceComponent tuple) @@ -755,13 +708,13 @@ (log-message "Preparing bolt " component-id ":" (keys task-datas)) (doseq [[task-id task-data] task-datas - :let [^IBolt bolt-obj (:object task-data) - tasks-fn (:tasks-fn task-data) - user-context (:user-context task-data) + :let [^IBolt bolt-obj (.getTaskObject task-data) + user-context (.getUserContext task-data) + transfer-fn (:transfer-fn executor-data) bolt-emit (fn [stream anchors values task] (let [out-tasks (if task - (tasks-fn task stream values) - (tasks-fn stream values))] + (.getOutgoingTasks task-data task stream values) + (.getOutgoingTasks task-data stream values))] (fast-list-iter [t out-tasks] (let [anchors-to-ids (HashMap.)] (fast-list-iter [^TupleImpl a anchors] @@ -780,8 +733,8 @@ (if has-eventloggers? (send-to-eventlogger executor-data task-data values component-id nil rand)) (or out-tasks [])))]] - (.registerAll (:builtin-metrics task-data) storm-conf user-context) - (when (instance? ICredentialsListener bolt-obj) (.setCredentials bolt-obj initial-credentials)) + (.registerAll (.getBuiltInMetrics task-data) storm-conf user-context) + (when (instance? ICredentialsListener bolt-obj) (.setCredentials bolt-obj initial-credentials)) (if (= component-id Constants/SYSTEM_COMPONENT_ID) (do (BuiltinMetricsUtil/registerQueueMetrics {"sendqueue" (:batch-transfer-queue executor-data) @@ -808,14 +761,15 @@ (let [^TupleImpl tuple tuple ack-val (.getAckVal tuple)] (fast-map-iter [[root id] (.. tuple getMessageId getAnchorsToIds)] - (task/send-unanchored task-data - Acker/ACKER_ACK_STREAM_ID - [root (bit-xor id ack-val)]))) + (send-unanchored task-data + Acker/ACKER_ACK_STREAM_ID + [root (bit-xor id ack-val)] + transfer-fn))) (let [delta (tuple-time-delta! tuple) debug? (= true (storm-conf TOPOLOGY-DEBUG))] (when debug? (log-message "BOLT ack TASK: " task-id " TIME: " delta " TUPLE: " tuple)) - (task/apply-hooks user-context .boltAck (BoltAckInfo. tuple task-id delta)) + (.applyOn (BoltAckInfo. tuple task-id delta) user-context) (when delta (.boltAckedTuple executor-stats (.getSourceComponent tuple) @@ -823,14 +777,15 @@ delta)))) (^void fail [this ^Tuple tuple] (fast-list-iter [root (.. tuple getMessageId getAnchors)] - (task/send-unanchored task-data + (send-unanchored task-data Acker/ACKER_FAIL_STREAM_ID - [root])) + [root] + transfer-fn)) (let [delta (tuple-time-delta! tuple) debug? (= true (storm-conf TOPOLOGY-DEBUG))] (when debug? (log-message "BOLT fail TASK: " task-id " TIME: " delta " TUPLE: " tuple)) - (task/apply-hooks user-context .boltFail (BoltFailInfo. tuple task-id delta)) + (.applyOn (BoltFailInfo. tuple task-id delta) user-context) (when delta (.boltFailedTuple executor-stats (.getSourceComponent tuple) @@ -838,9 +793,10 @@ delta)))) (^void resetTimeout [this ^Tuple tuple] (fast-list-iter [root (.. tuple getMessageId getAnchors)] - (task/send-unanchored task-data + (send-unanchored task-data Acker/ACKER_RESET_TIMEOUT_STREAM_ID - [root]))) + [root] + transfer-fn))) (reportError [this error] (report-error error)))))) (reset! open-or-prepare-was-called? true) http://git-wip-us.apache.org/repos/asf/storm/blob/7d63cb33/storm-core/src/clj/org/apache/storm/daemon/task.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/daemon/task.clj b/storm-core/src/clj/org/apache/storm/daemon/task.clj deleted file mode 100644 index 2cfad7c..0000000 --- a/storm-core/src/clj/org/apache/storm/daemon/task.clj +++ /dev/null @@ -1,190 +0,0 @@ -;; Licensed to the Apache Software Foundation (ASF) under one -;; or more contributor license agreements. See the NOTICE file -;; distributed with this work for additional information -;; regarding copyright ownership. The ASF licenses this file -;; to you under the Apache License, Version 2.0 (the -;; "License"); you may not use this file except in compliance -;; with the License. You may obtain a copy of the License at -;; -;; http://www.apache.org/licenses/LICENSE-2.0 -;; -;; Unless required by applicable law or agreed to in writing, software -;; distributed under the License is distributed on an "AS IS" BASIS, -;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -;; See the License for the specific language governing permissions and -;; limitations under the License. -(ns org.apache.storm.daemon.task - (:use [org.apache.storm.daemon common]) - (:use [org.apache.storm config util log]) - (:import [org.apache.storm.hooks ITaskHook] - [org.apache.storm.daemon.metrics BuiltinMetrics BuiltinMetricsUtil]) - (:import [org.apache.storm.tuple Tuple TupleImpl]) - (:import [org.apache.storm.grouping LoadMapping]) - (:import [org.apache.storm.generated SpoutSpec Bolt StateSpoutSpec StormTopology]) - (:import [org.apache.storm.hooks.info SpoutAckInfo SpoutFailInfo - EmitInfo BoltFailInfo BoltAckInfo]) - (:import [org.apache.storm.task TopologyContext ShellBolt WorkerTopologyContext]) - (:import [org.apache.storm.utils Utils ConfigUtils]) - (:import [org.apache.storm.generated ShellComponent JavaObject]) - (:import [org.apache.storm.spout ShellSpout]) - (:import [java.util Collection List ArrayList]) - (:import [org.apache.storm Thrift] - (org.apache.storm.daemon StormCommon))) - -(defn mk-topology-context-builder [worker executor-data topology] - (let [conf (:conf worker)] - #(TopologyContext. - topology - (:storm-conf worker) - (:task->component worker) - (:component->sorted-tasks worker) - (:component->stream->fields worker) - (:storm-id worker) - (ConfigUtils/supervisorStormResourcesPath - (ConfigUtils/supervisorStormDistRoot conf (:storm-id worker))) - (ConfigUtils/workerPidsRoot conf (:worker-id worker)) - (int %) - (:port worker) - (:task-ids worker) - (:default-shared-resources worker) - (:user-shared-resources worker) - (:shared-executor-data executor-data) - (:interval->task->metric-registry executor-data) - (:open-or-prepare-was-called? executor-data)))) - -(defn system-topology-context [worker executor-data tid] - ((mk-topology-context-builder - worker - executor-data - (:system-topology worker)) - tid)) - -(defn user-topology-context [worker executor-data tid] - ((mk-topology-context-builder - worker - executor-data - (:topology worker)) - tid)) - -(defn- get-task-object [^StormTopology topology component-id] - (let [spouts (.get_spouts topology) - bolts (.get_bolts topology) - state-spouts (.get_state_spouts topology) - obj (Utils/getSetComponentObject - (cond - (contains? spouts component-id) (.get_spout_object ^SpoutSpec (get spouts component-id)) - (contains? bolts component-id) (.get_bolt_object ^Bolt (get bolts component-id)) - (contains? state-spouts component-id) (.get_state_spout_object ^StateSpoutSpec (get state-spouts component-id)) - true (throw (RuntimeException. (str "Could not find " component-id " in " topology))))) - obj (if (instance? ShellComponent obj) - (if (contains? spouts component-id) - (ShellSpout. obj) - (ShellBolt. obj)) - obj ) - obj (if (instance? JavaObject obj) - (Thrift/instantiateJavaObject obj) - obj )] - obj - )) - -(defn get-context-hooks [^TopologyContext context] - (.getHooks context)) - -(defn hooks-empty? [^Collection hooks] - (.isEmpty hooks)) - -(defmacro apply-hooks [topology-context method-sym info-form] - (let [hook-sym (with-meta (gensym "hook") {:tag 'org.apache.storm.hooks.ITaskHook})] - `(let [hooks# (get-context-hooks ~topology-context)] - (when-not (hooks-empty? hooks#) - (let [info# ~info-form] - (fast-list-iter [~hook-sym hooks#] - (~method-sym ~hook-sym info#) - )))))) - - -;; TODO: this is all expensive... should be precomputed -(defn send-unanchored - [task-data stream values] - (let [^TopologyContext topology-context (:system-context task-data) - tasks-fn (:tasks-fn task-data) - transfer-fn (-> task-data :executor-data :transfer-fn) - out-tuple (TupleImpl. topology-context - values - (.getThisTaskId topology-context) - stream)] - (fast-list-iter [t (tasks-fn stream values)] - (transfer-fn t out-tuple)))) - -(defn mk-tasks-fn [task-data] - (let [task-id (:task-id task-data) - executor-data (:executor-data task-data) - ^LoadMapping load-mapping (:load-mapping (:worker executor-data)) - component-id (:component-id executor-data) - ^WorkerTopologyContext worker-context (:worker-context executor-data) - storm-conf (:storm-conf executor-data) - emit-sampler (mk-stats-sampler storm-conf) - stream->component->grouper (:stream->component->grouper executor-data) - user-context (:user-context task-data) - executor-stats (:stats executor-data) - debug? (= true (storm-conf TOPOLOGY-DEBUG))] - - (fn ([^Integer out-task-id ^String stream ^List values] - (when debug? - (log-message "Emitting direct: " out-task-id "; " component-id " " stream " " values)) - (let [target-component (.getComponentId worker-context out-task-id) - component->grouping (get stream->component->grouper stream) - grouping (get component->grouping target-component) - out-task-id (if grouping out-task-id)] - (when (and (not-nil? grouping) (not= :direct grouping)) - (throw (IllegalArgumentException. "Cannot emitDirect to a task expecting a regular grouping"))) - (apply-hooks user-context .emit (EmitInfo. values stream task-id [out-task-id])) - (when (emit-sampler) - (.emittedTuple executor-stats stream) - (if out-task-id - (.transferredTuples executor-stats stream, 1))) - (if out-task-id [out-task-id]) - )) - ([^String stream ^List values] - (when debug? - (log-message "Emitting: " component-id " " stream " " values)) - (let [out-tasks (ArrayList.)] - (if (not (.containsKey stream->component->grouper stream)) - (throw (IllegalArgumentException. (str "Unknown stream ID: " stream)))) - (fast-map-iter [[out-component grouper] (get stream->component->grouper stream)] - (when (= :direct grouper) - ;; TODO: this is wrong, need to check how the stream was declared - (throw (IllegalArgumentException. "Cannot do regular emit to direct stream"))) - (let [comp-tasks (grouper task-id values load-mapping)] - (if (or (sequential? comp-tasks) (instance? Collection comp-tasks)) - (.addAll out-tasks comp-tasks) - (.add out-tasks comp-tasks) - ))) - (apply-hooks user-context .emit (EmitInfo. values stream task-id out-tasks)) - (when (emit-sampler) - (.emittedTuple executor-stats stream) - (.transferredTuples executor-stats stream (count out-tasks))) - out-tasks))) - )) - -(defn mk-task-data [executor-data task-id] - (recursive-map - :executor-data executor-data - :task-id task-id - :system-context (system-topology-context (:worker executor-data) executor-data task-id) - :user-context (user-topology-context (:worker executor-data) executor-data task-id) - :builtin-metrics (BuiltinMetricsUtil/mkData (.getName (:type executor-data)) (:stats executor-data)) - :tasks-fn (mk-tasks-fn <>) - :object (get-task-object (.getRawTopology ^TopologyContext (:system-context <>)) (:component-id executor-data)))) - - -(defn mk-task [executor-data task-id] - (let [task-data (mk-task-data executor-data task-id) - storm-conf (:storm-conf executor-data)] - (doseq [klass (storm-conf TOPOLOGY-AUTO-TASK-HOOKS)] - (.addTaskHook ^TopologyContext (:user-context task-data) (-> klass Class/forName .newInstance))) - ;; when this is called, the threads for the executor haven't been started yet, - ;; so we won't be risking trampling on the single-threaded claim strategy disruptor queue - (send-unanchored task-data StormCommon/SYSTEM_STREAM_ID ["startup"]) - task-data - )) http://git-wip-us.apache.org/repos/asf/storm/blob/7d63cb33/storm-core/src/jvm/org/apache/storm/daemon/GrouperFactory.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/daemon/GrouperFactory.java b/storm-core/src/jvm/org/apache/storm/daemon/GrouperFactory.java new file mode 100644 index 0000000..d06682f --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/daemon/GrouperFactory.java @@ -0,0 +1,243 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.daemon; + +import com.google.common.collect.Ordering; +import com.google.common.collect.Sets; + +import org.apache.storm.Config; +import org.apache.storm.Thrift; +import org.apache.storm.generated.GlobalStreamId; +import org.apache.storm.generated.Grouping; +import org.apache.storm.grouping.CustomStreamGrouping; +import org.apache.storm.grouping.LoadAwareCustomStreamGrouping; +import org.apache.storm.grouping.LoadAwareShuffleGrouping; +import org.apache.storm.grouping.LoadMapping; +import org.apache.storm.grouping.ShuffleGrouping; +import org.apache.storm.task.WorkerTopologyContext; +import org.apache.storm.tuple.Fields; +import org.apache.storm.utils.TupleUtils; +import org.apache.storm.utils.Utils; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.Set; + +public class GrouperFactory { + + public static LoadAwareCustomStreamGrouping mkGrouper(WorkerTopologyContext context, String componentId, String streamId, Fields outFields, + Grouping thriftGrouping, + List<Integer> unsortedTargetTasks, + Map topoConf) { + List<Integer> targetTasks = Ordering.natural().sortedCopy(unsortedTargetTasks); + final boolean isNotLoadAware = (null != topoConf.get(Config.TOPOLOGY_DISABLE_LOADAWARE_MESSAGING) && (boolean) topoConf + .get(Config.TOPOLOGY_DISABLE_LOADAWARE_MESSAGING)); + CustomStreamGrouping result = null; + switch (Thrift.groupingType(thriftGrouping)) { + case FIELDS: + if (Thrift.isGlobalGrouping(thriftGrouping)) { + result = new GlobalGrouper(); + } else { + result = new FieldsGrouper(outFields, thriftGrouping); + } + break; + case SHUFFLE: + if (isNotLoadAware) { + result = new ShuffleGrouping(); + } else { + result = new LoadAwareShuffleGrouping(); + } + break; + case ALL: + result = new AllGrouper(); + break; + case LOCAL_OR_SHUFFLE: + // Prefer local tasks as target tasks if possible + Set<Integer> sameTasks = Sets.intersection(Sets.newHashSet(targetTasks), Sets.newHashSet(context.getThisWorkerTasks())); + targetTasks = (sameTasks.isEmpty()) ? targetTasks : new ArrayList<>(sameTasks); + if (isNotLoadAware) { + result = new ShuffleGrouping(); + } else { + result = new LoadAwareShuffleGrouping(); + } + break; + case NONE: + result = new NoneGrouper(); + break; + case CUSTOM_OBJECT: + result = (CustomStreamGrouping) Thrift.instantiateJavaObject(thriftGrouping.get_custom_object()); + break; + case CUSTOM_SERIALIZED: + result = Utils.javaDeserialize(thriftGrouping.get_custom_serialized(), CustomStreamGrouping.class); + break; + case DIRECT: + result = DIRECT; + break; + default: + result = null; + break; + } + + if (null != result) { + result.prepare(context, new GlobalStreamId(componentId, streamId), targetTasks); + } + + if (result instanceof LoadAwareCustomStreamGrouping) { + return (LoadAwareCustomStreamGrouping) result; + } else { + return new BasicLoadAwareCustomStreamGrouping (result); + } + } + + + /** + * A bridge between CustomStreamGrouping and LoadAwareCustomStreamGrouping + */ + public static class BasicLoadAwareCustomStreamGrouping implements LoadAwareCustomStreamGrouping { + + private final CustomStreamGrouping customStreamGrouping; + + public BasicLoadAwareCustomStreamGrouping(CustomStreamGrouping customStreamGrouping) { + this.customStreamGrouping = customStreamGrouping; + } + + @Override + public List<Integer> chooseTasks(int taskId, List<Object> values, LoadMapping load) { + return customStreamGrouping.chooseTasks(taskId, values); + } + + @Override + public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) { + customStreamGrouping.prepare(context, stream, targetTasks); + } + + @Override + public List<Integer> chooseTasks(int taskId, List<Object> values) { + return customStreamGrouping.chooseTasks(taskId, values); + } + } + + public static class FieldsGrouper implements CustomStreamGrouping { + + private Fields outFields; + private List<Integer> targetTasks; + private Fields groupFields; + private int numTasks; + + public FieldsGrouper(Fields outFields, Grouping thriftGrouping) { + this.outFields = outFields; + this.groupFields = new Fields(Thrift.fieldGrouping(thriftGrouping)); + + } + + @Override + public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) { + this.targetTasks = targetTasks; + this.numTasks = targetTasks.size(); + } + + @Override + public List<Integer> chooseTasks(int taskId, List<Object> values) { + int targetTaskIndex = Math.abs(TupleUtils.listHashCode(outFields.select(groupFields, values))) % numTasks; + return Collections.singletonList(targetTasks.get(targetTaskIndex)); + } + } + + public static class GlobalGrouper implements CustomStreamGrouping { + + private List<Integer> targetTasks; + + public GlobalGrouper() { + } + + @Override + public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) { + this.targetTasks = targetTasks; + } + + @Override + public List<Integer> chooseTasks(int taskId, List<Object> values) { + if (targetTasks.isEmpty()) { + return null; + } + // It's possible for target to have multiple tasks if it reads multiple sources + return Collections.singletonList(targetTasks.get(0)); + } + } + + public static class NoneGrouper implements CustomStreamGrouping { + + private List<Integer> targetTasks; + private int numTasks; + private final Random random; + + public NoneGrouper() { + random = new Random(); + } + + @Override + public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) { + this.targetTasks = targetTasks; + this.numTasks = targetTasks.size(); + } + + @Override + public List<Integer> chooseTasks(int taskId, List<Object> values) { + int index = random.nextInt(numTasks); + return Collections.singletonList(targetTasks.get(index)); + } + } + + public static class AllGrouper implements CustomStreamGrouping { + + private List<Integer> targetTasks; + + @Override + public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) { + this.targetTasks = targetTasks; + } + + @Override + public List<Integer> chooseTasks(int taskId, List<Object> values) { + return targetTasks; + } + } + + // A no-op grouper + public static final LoadAwareCustomStreamGrouping DIRECT = new LoadAwareCustomStreamGrouping() { + @Override + public List<Integer> chooseTasks(int taskId, List<Object> values, LoadMapping load) { + return null; + } + + @Override + public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) { + + } + + @Override + public List<Integer> chooseTasks(int taskId, List<Object> values) { + return null; + } + + }; + +} http://git-wip-us.apache.org/repos/asf/storm/blob/7d63cb33/storm-core/src/jvm/org/apache/storm/daemon/Task.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/daemon/Task.java b/storm-core/src/jvm/org/apache/storm/daemon/Task.java new file mode 100644 index 0000000..60b570a --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/daemon/Task.java @@ -0,0 +1,247 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.daemon; + +import org.apache.storm.Config; +import org.apache.storm.Thrift; +import org.apache.storm.daemon.metrics.BuiltinMetrics; +import org.apache.storm.daemon.metrics.BuiltinMetricsUtil; +import org.apache.storm.generated.Bolt; +import org.apache.storm.generated.ComponentObject; +import org.apache.storm.generated.JavaObject; +import org.apache.storm.generated.ShellComponent; +import org.apache.storm.generated.SpoutSpec; +import org.apache.storm.generated.StateSpoutSpec; +import org.apache.storm.generated.StormTopology; +import org.apache.storm.grouping.LoadAwareCustomStreamGrouping; +import org.apache.storm.grouping.LoadMapping; +import org.apache.storm.hooks.ITaskHook; +import org.apache.storm.hooks.info.EmitInfo; +import org.apache.storm.metric.api.IMetric; +import org.apache.storm.spout.ShellSpout; +import org.apache.storm.stats.CommonStats; +import org.apache.storm.task.ShellBolt; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.task.WorkerTopologyContext; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.TupleImpl; +import org.apache.storm.utils.ConfigUtils; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Callable; + +public class Task { + + private static final Logger LOG = LoggerFactory.getLogger(Task.class); + + private Map executorData; + private Map workerData; + private TopologyContext systemTopologyContext; + private TopologyContext userTopologyContext; + private WorkerTopologyContext workerTopologyContext; + private LoadMapping loadMapping; + private Integer taskId; + private String componentId; + private Object taskObject; // Spout/Bolt object + private Map stormConf; + private Callable<Boolean> emitSampler; + private CommonStats executorStats; + private Map<String, Map<String, LoadAwareCustomStreamGrouping>> streamComponentToGrouper; + private BuiltinMetrics builtInMetrics; + private boolean debug; + + public Task(Map executorData, Integer taskId) throws IOException { + this.taskId = taskId; + this.executorData = executorData; + this.workerData = (Map) executorData.get("worker"); + this.stormConf = (Map) executorData.get("storm-conf"); + this.componentId = (String) executorData.get("component-id"); + this.streamComponentToGrouper = (Map<String, Map<String, LoadAwareCustomStreamGrouping>>) executorData.get("stream->component->grouper"); + this.executorStats = (CommonStats) executorData.get("stats"); + this.builtInMetrics = BuiltinMetricsUtil.mkData((String) executorData.get("type"), this.executorStats); + this.workerTopologyContext = (WorkerTopologyContext) executorData.get("worker-context"); + this.emitSampler = ConfigUtils.mkStatsSampler(stormConf); + this.loadMapping = (LoadMapping) workerData.get("load-mapping"); + this.systemTopologyContext = mkTopologyContext((StormTopology) workerData.get("system-topology")); + this.userTopologyContext = mkTopologyContext((StormTopology) workerData.get("topology")); + this.taskObject = mkTaskObject(); + this.debug = stormConf.containsKey(Config.TOPOLOGY_DEBUG) && (Boolean) stormConf.get(Config.TOPOLOGY_DEBUG); + this.addTaskHooks(); + } + + public List<Integer> getOutgoingTasks(Integer outTaskId, String stream, List<Object> values) { + if (debug) { + LOG.info("Emitting direct: {}; {} {} {} ", outTaskId, componentId, stream, values); + } + String targetComponent = workerTopologyContext.getComponentId(outTaskId); + Map<String, LoadAwareCustomStreamGrouping> componentGrouping = streamComponentToGrouper.get(stream); + LoadAwareCustomStreamGrouping grouping = componentGrouping.get(targetComponent); + if (null == grouping) { + outTaskId = null; + } + if (grouping != null && grouping != GrouperFactory.DIRECT) { + throw new IllegalArgumentException("Cannot emitDirect to a task expecting a regular grouping"); + } + new EmitInfo(values, stream, taskId, Collections.singletonList(outTaskId)).applyOn(userTopologyContext); + try { + if (emitSampler.call()) { + executorStats.emittedTuple(stream); + if (null != outTaskId) { + executorStats.transferredTuples(stream, 1); + } + } + } catch (Exception e) { + throw new RuntimeException(e); + } + if (null != outTaskId) { + return Collections.singletonList(outTaskId); + } + return null; + } + + public List<Integer> getOutgoingTasks(String stream, List<Object> values) { + if (debug) { + LOG.info("Emitting: {} {} {}", componentId, stream, values); + } + List<Integer> outTasks = new ArrayList<>(); + if (!streamComponentToGrouper.containsKey(stream)) { + throw new IllegalArgumentException("Unknown stream ID: " + stream); + } + if (null != streamComponentToGrouper.get(stream)) { + // null value for __system + for (LoadAwareCustomStreamGrouping grouper : streamComponentToGrouper.get(stream).values()) { + if (grouper == GrouperFactory.DIRECT) { + throw new IllegalArgumentException("Cannot do regular emit to direct stream"); + } + List<Integer> compTasks = grouper.chooseTasks(taskId, values, loadMapping); + outTasks.addAll(compTasks); + } + } + new EmitInfo(values, stream, taskId, outTasks).applyOn(userTopologyContext); + try { + if (emitSampler.call()) { + executorStats.emittedTuple(stream); + executorStats.transferredTuples(stream, outTasks.size()); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + return outTasks; + } + + public Tuple getTuple(String stream, List values) { + return new TupleImpl(systemTopologyContext, values, systemTopologyContext.getThisTaskId(), stream); + } + + public Integer getTaskId() { + return taskId; + } + + public String getComponentId() { + return componentId; + } + + public TopologyContext getUserContext() throws IOException { + return userTopologyContext; + } + + public Object getTaskObject() { + return taskObject; + } + + public BuiltinMetrics getBuiltInMetrics() { + return builtInMetrics; + } + + private TopologyContext mkTopologyContext(StormTopology topology) throws IOException { + Map conf = (Map) workerData.get("conf"); + return new TopologyContext( + topology, + (Map) workerData.get("storm-conf"), + (Map<Integer, String>) workerData.get("task->component"), + (Map<String, List<Integer>>) workerData.get("component->sorted-tasks"), + (Map<String, Map<String, Fields>>) workerData.get("component->stream->fields"), + (String) workerData.get("storm-id"), + ConfigUtils.supervisorStormResourcesPath(ConfigUtils.supervisorStormDistRoot(conf, (String) workerData.get("storm-id"))), + ConfigUtils.workerPidsRoot(conf, (String) workerData.get("worker-id")), + taskId, + (Integer) workerData.get("port"), + (List<Integer>) workerData.get("task-ids"), + (Map<String, Object>) workerData.get("default-shared-resources"), + (Map<String, Object>) workerData.get("user-shared-resources"), + (Map<String, Object>) executorData.get("shared-executor-data"), + (Map<Integer, Map<Integer, Map<String, IMetric>>>) executorData.get("interval->task->metric-registry"), + (clojure.lang.Atom) executorData.get("open-or-prepare-was-called?") + ); + } + + private Object mkTaskObject() { + StormTopology topology = systemTopologyContext.getRawTopology(); + Map<String, SpoutSpec> spouts = topology.get_spouts(); + Map<String, Bolt> bolts = topology.get_bolts(); + Map<String, StateSpoutSpec> stateSpouts = topology.get_state_spouts(); + Object result = null; + ComponentObject componentObject = null; + if (spouts.containsKey(componentId)) { + componentObject = spouts.get(componentId).get_spout_object(); + } else if (bolts.containsKey(componentId)) { + componentObject = bolts.get(componentId).get_bolt_object(); + } else if (stateSpouts.containsKey(componentId)) { + componentObject = stateSpouts.get(componentId).get_state_spout_object(); + } else { + throw new RuntimeException("Could not find " + componentId + " in " + topology); + } + result = Utils.getSetComponentObject(componentObject); + + if (result instanceof ShellComponent) { + if (spouts.containsKey(componentId)) { + result = new ShellSpout((ShellComponent) result); + } else { + result = new ShellBolt((ShellComponent) result); + } + } + + if (result instanceof JavaObject) { + result = Thrift.instantiateJavaObject((JavaObject) result); + } + + return result; + } + + private void addTaskHooks() { + List<String> hooksClassList = (List<String>) stormConf.get(Config.TOPOLOGY_AUTO_TASK_HOOKS); + if (null != hooksClassList) { + for (String hookClass : hooksClassList) { + try { + userTopologyContext.addTaskHook(((ITaskHook) Class.forName(hookClass).newInstance())); + } catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) { + throw new RuntimeException("Failed to add hook: " + hookClass, e); + } + } + } + } + +} http://git-wip-us.apache.org/repos/asf/storm/blob/7d63cb33/storm-core/src/jvm/org/apache/storm/daemon/metrics/BuiltinMetricsUtil.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/daemon/metrics/BuiltinMetricsUtil.java b/storm-core/src/jvm/org/apache/storm/daemon/metrics/BuiltinMetricsUtil.java index 84c75d7..2827420 100644 --- a/storm-core/src/jvm/org/apache/storm/daemon/metrics/BuiltinMetricsUtil.java +++ b/storm-core/src/jvm/org/apache/storm/daemon/metrics/BuiltinMetricsUtil.java @@ -17,19 +17,21 @@ */ package org.apache.storm.daemon.metrics; -import java.util.HashMap; -import java.util.Map; import org.apache.storm.Config; import org.apache.storm.metric.api.IMetric; import org.apache.storm.metric.api.IStatefulObject; import org.apache.storm.metric.api.StateMetric; import org.apache.storm.stats.BoltExecutorStats; +import org.apache.storm.stats.CommonStats; import org.apache.storm.stats.SpoutExecutorStats; import org.apache.storm.stats.StatsUtil; import org.apache.storm.task.TopologyContext; +import java.util.HashMap; +import java.util.Map; + public class BuiltinMetricsUtil { - public static BuiltinMetrics mkData(String type, Object stats) { + public static BuiltinMetrics mkData(String type, CommonStats stats) { if (StatsUtil.SPOUT.equals(type)) { return new BuiltinSpoutMetrics((SpoutExecutorStats) stats); } else if (StatsUtil.BOLT.equals(type)) { http://git-wip-us.apache.org/repos/asf/storm/blob/7d63cb33/storm-core/src/jvm/org/apache/storm/hooks/info/BoltAckInfo.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/hooks/info/BoltAckInfo.java b/storm-core/src/jvm/org/apache/storm/hooks/info/BoltAckInfo.java index 905f747..e6f4b11 100644 --- a/storm-core/src/jvm/org/apache/storm/hooks/info/BoltAckInfo.java +++ b/storm-core/src/jvm/org/apache/storm/hooks/info/BoltAckInfo.java @@ -17,6 +17,8 @@ */ package org.apache.storm.hooks.info; +import org.apache.storm.hooks.ITaskHook; +import org.apache.storm.task.TopologyContext; import org.apache.storm.tuple.Tuple; public class BoltAckInfo { @@ -29,4 +31,10 @@ public class BoltAckInfo { this.ackingTaskId = ackingTaskId; this.processLatencyMs = processLatencyMs; } + + public void applyOn(TopologyContext topologyContext) { + for (ITaskHook hook : topologyContext.getHooks()) { + hook.boltAck(this); + } + } } http://git-wip-us.apache.org/repos/asf/storm/blob/7d63cb33/storm-core/src/jvm/org/apache/storm/hooks/info/BoltExecuteInfo.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/hooks/info/BoltExecuteInfo.java b/storm-core/src/jvm/org/apache/storm/hooks/info/BoltExecuteInfo.java index d92a1f8..73a7f33 100644 --- a/storm-core/src/jvm/org/apache/storm/hooks/info/BoltExecuteInfo.java +++ b/storm-core/src/jvm/org/apache/storm/hooks/info/BoltExecuteInfo.java @@ -17,6 +17,8 @@ */ package org.apache.storm.hooks.info; +import org.apache.storm.hooks.ITaskHook; +import org.apache.storm.task.TopologyContext; import org.apache.storm.tuple.Tuple; public class BoltExecuteInfo { @@ -29,4 +31,10 @@ public class BoltExecuteInfo { this.executingTaskId = executingTaskId; this.executeLatencyMs = executeLatencyMs; } + + public void applyOn(TopologyContext topologyContext) { + for (ITaskHook hook : topologyContext.getHooks()) { + hook.boltExecute(this); + } + } } http://git-wip-us.apache.org/repos/asf/storm/blob/7d63cb33/storm-core/src/jvm/org/apache/storm/hooks/info/BoltFailInfo.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/hooks/info/BoltFailInfo.java b/storm-core/src/jvm/org/apache/storm/hooks/info/BoltFailInfo.java index 38e31b7..4e1e32d 100644 --- a/storm-core/src/jvm/org/apache/storm/hooks/info/BoltFailInfo.java +++ b/storm-core/src/jvm/org/apache/storm/hooks/info/BoltFailInfo.java @@ -17,6 +17,8 @@ */ package org.apache.storm.hooks.info; +import org.apache.storm.hooks.ITaskHook; +import org.apache.storm.task.TopologyContext; import org.apache.storm.tuple.Tuple; public class BoltFailInfo { @@ -29,4 +31,10 @@ public class BoltFailInfo { this.failingTaskId = failingTaskId; this.failLatencyMs = failLatencyMs; } + + public void applyOn(TopologyContext topologyContext) { + for (ITaskHook hook : topologyContext.getHooks()) { + hook.boltFail(this); + } + } } http://git-wip-us.apache.org/repos/asf/storm/blob/7d63cb33/storm-core/src/jvm/org/apache/storm/hooks/info/EmitInfo.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/hooks/info/EmitInfo.java b/storm-core/src/jvm/org/apache/storm/hooks/info/EmitInfo.java index 3e3ed8c..52965a1 100644 --- a/storm-core/src/jvm/org/apache/storm/hooks/info/EmitInfo.java +++ b/storm-core/src/jvm/org/apache/storm/hooks/info/EmitInfo.java @@ -17,6 +17,9 @@ */ package org.apache.storm.hooks.info; +import org.apache.storm.hooks.ITaskHook; +import org.apache.storm.task.TopologyContext; + import java.util.Collection; import java.util.List; @@ -32,4 +35,10 @@ public class EmitInfo { this.taskId = taskId; this.outTasks = outTasks; } + + public void applyOn(TopologyContext topologyContext) { + for (ITaskHook hook : topologyContext.getHooks()) { + hook.emit(this); + } + } } http://git-wip-us.apache.org/repos/asf/storm/blob/7d63cb33/storm-core/src/jvm/org/apache/storm/hooks/info/SpoutAckInfo.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/hooks/info/SpoutAckInfo.java b/storm-core/src/jvm/org/apache/storm/hooks/info/SpoutAckInfo.java index 1b1bc76..4949f0f 100644 --- a/storm-core/src/jvm/org/apache/storm/hooks/info/SpoutAckInfo.java +++ b/storm-core/src/jvm/org/apache/storm/hooks/info/SpoutAckInfo.java @@ -17,6 +17,9 @@ */ package org.apache.storm.hooks.info; +import org.apache.storm.hooks.ITaskHook; +import org.apache.storm.task.TopologyContext; + public class SpoutAckInfo { public Object messageId; public int spoutTaskId; @@ -27,4 +30,10 @@ public class SpoutAckInfo { this.spoutTaskId = spoutTaskId; this.completeLatencyMs = completeLatencyMs; } + + public void applyOn(TopologyContext topologyContext) { + for (ITaskHook hook : topologyContext.getHooks()) { + hook.spoutAck(this); + } + } } http://git-wip-us.apache.org/repos/asf/storm/blob/7d63cb33/storm-core/src/jvm/org/apache/storm/hooks/info/SpoutFailInfo.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/hooks/info/SpoutFailInfo.java b/storm-core/src/jvm/org/apache/storm/hooks/info/SpoutFailInfo.java index 34b38b7..5b40005 100644 --- a/storm-core/src/jvm/org/apache/storm/hooks/info/SpoutFailInfo.java +++ b/storm-core/src/jvm/org/apache/storm/hooks/info/SpoutFailInfo.java @@ -17,6 +17,9 @@ */ package org.apache.storm.hooks.info; +import org.apache.storm.hooks.ITaskHook; +import org.apache.storm.task.TopologyContext; + public class SpoutFailInfo { public Object messageId; public int spoutTaskId; @@ -27,4 +30,10 @@ public class SpoutFailInfo { this.spoutTaskId = spoutTaskId; this.failLatencyMs = failLatencyMs; } + + public void applyOn(TopologyContext topologyContext) { + for (ITaskHook hook : topologyContext.getHooks()) { + hook.spoutFail(this); + } + } } http://git-wip-us.apache.org/repos/asf/storm/blob/7d63cb33/storm-core/src/jvm/org/apache/storm/utils/ConfigUtils.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/utils/ConfigUtils.java b/storm-core/src/jvm/org/apache/storm/utils/ConfigUtils.java index d7b7dbf..30d314f 100644 --- a/storm-core/src/jvm/org/apache/storm/utils/ConfigUtils.java +++ b/storm-core/src/jvm/org/apache/storm/utils/ConfigUtils.java @@ -18,10 +18,10 @@ package org.apache.storm.utils; +import org.apache.commons.io.FileUtils; import org.apache.storm.Config; -import org.apache.storm.validation.ConfigValidation; import org.apache.storm.generated.StormTopology; -import org.apache.commons.io.FileUtils; +import org.apache.storm.validation.ConfigValidation; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -35,14 +35,16 @@ import java.io.InputStream; import java.io.InputStreamReader; import java.io.Reader; import java.lang.reflect.Field; +import java.net.URLEncoder; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Random; import java.util.Set; -import java.util.HashSet; -import java.util.Collections; -import java.net.URLEncoder; +import java.util.concurrent.Callable; public class ConfigUtils { private final static Logger LOG = LoggerFactory.getLogger(ConfigUtils.class); @@ -135,7 +137,28 @@ public class ConfigUtils { throw new IllegalArgumentException("Illegal topology.stats.sample.rate in conf: " + rate); } - // public static mkStatsSampler // depends on Utils.evenSampler() TODO, this is sth we need to do after util + public static Callable<Boolean> evenSampler(final int samplingFreq) { + final Random random = new Random(); + + return new Callable<Boolean>() { + private int curr = -1; + private int target = random.nextInt(samplingFreq); + + @Override + public Boolean call() throws Exception { + curr++; + if (curr >= samplingFreq) { + curr = 0; + target = random.nextInt(samplingFreq); + } + return (curr == target); + } + }; + } + + public static Callable<Boolean> mkStatsSampler(Map conf) { + return evenSampler(samplingRate(conf)); + } // we use this "wired" wrapper pattern temporarily for mocking in clojure test public static Map readStormConfig() { http://git-wip-us.apache.org/repos/asf/storm/blob/7d63cb33/storm-core/test/clj/org/apache/storm/grouping_test.clj ---------------------------------------------------------------------- diff --git a/storm-core/test/clj/org/apache/storm/grouping_test.clj b/storm-core/test/clj/org/apache/storm/grouping_test.clj index 61caf68..487d80f 100644 --- a/storm-core/test/clj/org/apache/storm/grouping_test.clj +++ b/storm-core/test/clj/org/apache/storm/grouping_test.clj @@ -16,21 +16,24 @@ (ns org.apache.storm.grouping-test (:use [clojure test]) (:import [org.apache.storm.testing TestWordCounter TestWordSpout TestGlobalCount TestAggregatesCounter TestWordBytesCounter NGrouping] - [org.apache.storm.generated JavaObject JavaObjectArg]) + [org.apache.storm.generated JavaObject JavaObjectArg Grouping NullStruct]) (:import [org.apache.storm.grouping LoadMapping]) (:use [org.apache.storm testing log config]) (:use [org.apache.storm.internal clojure]) (:use [org.apache.storm.daemon common executor]) (:import [org.apache.storm Thrift]) - (:import [org.apache.storm.utils Utils])) + (:import [org.apache.storm.utils Utils] + (org.apache.storm.daemon GrouperFactory))) + +(def shuffle-grouping (Grouping/shuffle (NullStruct. ))) (deftest test-shuffle - (let [shuffle-fn (mk-shuffle-grouper [(int 1) (int 2)] {TOPOLOGY-DISABLE-LOADAWARE-MESSAGING true} nil "comp" "stream") + (let [shuffler (GrouperFactory/mkGrouper nil "comp" "stream" nil shuffle-grouping [(int 1) (int 2)] {TOPOLOGY-DISABLE-LOADAWARE-MESSAGING true}) num-messages 100000 min-prcnt (int (* num-messages 0.49)) max-prcnt (int (* num-messages 0.51)) data [1 2] - freq (frequencies (for [x (range 0 num-messages)] (shuffle-fn (int 1) data nil))) + freq (frequencies (for [x (range 0 num-messages)] (.chooseTasks shuffler (int 1) data nil))) load1 (.get freq [(int 1)]) load2 (.get freq [(int 2)])] (log-message "FREQ:" freq) @@ -40,14 +43,14 @@ (is (<= load2 max-prcnt)))) (deftest test-shuffle-load-even - (let [shuffle-fn (mk-shuffle-grouper [(int 1) (int 2)] {} nil "comp" "stream") + (let [shuffler (GrouperFactory/mkGrouper nil "comp" "stream" nil shuffle-grouping [(int 1) (int 2)] {}) num-messages 100000 min-prcnt (int (* num-messages 0.49)) max-prcnt (int (* num-messages 0.51)) load (LoadMapping.) _ (.setLocal load {(int 1) 0.0 (int 2) 0.0}) data [1 2] - freq (frequencies (for [x (range 0 num-messages)] (shuffle-fn (int 1) data load))) + freq (frequencies (for [x (range 0 num-messages)] (.chooseTasks shuffler (int 1) data load))) load1 (.get freq [(int 1)]) load2 (.get freq [(int 2)])] (log-message "FREQ:" freq) @@ -57,7 +60,7 @@ (is (<= load2 max-prcnt)))) (deftest test-shuffle-load-uneven - (let [shuffle-fn (mk-shuffle-grouper [(int 1) (int 2)] {} nil "comp" "stream") + (let [shuffler (GrouperFactory/mkGrouper nil "comp" "stream" nil shuffle-grouping [(int 1) (int 2)] {}) num-messages 100000 min1-prcnt (int (* num-messages 0.32)) max1-prcnt (int (* num-messages 0.34)) @@ -66,7 +69,7 @@ load (LoadMapping.) _ (.setLocal load {(int 1) 0.5 (int 2) 0.0}) data [1 2] - freq (frequencies (for [x (range 0 num-messages)] (shuffle-fn (int 1) data load))) + freq (frequencies (for [x (range 0 num-messages)] (.chooseTasks shuffler (int 1) data load))) load1 (.get freq [(int 1)]) load2 (.get freq [(int 2)])] (log-message "FREQ:" freq)
