http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/org/apache/storm/daemon/executor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/executor.clj 
b/storm-core/src/clj/org/apache/storm/daemon/executor.clj
new file mode 100644
index 0000000..c65f5d8
--- /dev/null
+++ b/storm-core/src/clj/org/apache/storm/daemon/executor.clj
@@ -0,0 +1,855 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements.  See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership.  The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License.  You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+(ns org.apache.storm.daemon.executor
+  (:use [org.apache.storm.daemon common])
+  (:import [org.apache.storm.generated Grouping]
+           [java.io Serializable])
+  (:use [org.apache.storm util config log timer stats])
+  (:import [java.util List Random HashMap ArrayList LinkedList Map])
+  (:import [org.apache.storm ICredentialsListener])
+  (:import [org.apache.storm.hooks ITaskHook])
+  (:import [org.apache.storm.tuple AddressedTuple Tuple Fields TupleImpl 
MessageId])
+  (:import [org.apache.storm.spout ISpoutWaitStrategy ISpout 
SpoutOutputCollector ISpoutOutputCollector])
+  (:import [org.apache.storm.hooks.info SpoutAckInfo SpoutFailInfo
+            EmitInfo BoltFailInfo BoltAckInfo BoltExecuteInfo])
+  (:import [org.apache.storm.grouping CustomStreamGrouping])
+  (:import [org.apache.storm.task WorkerTopologyContext IBolt OutputCollector 
IOutputCollector])
+  (:import [org.apache.storm.generated GlobalStreamId])
+  (:import [org.apache.storm.utils Utils TupleUtils MutableObject RotatingMap 
RotatingMap$ExpiredCallback MutableLong Time DisruptorQueue 
WorkerBackpressureThread])
+  (:import [com.lmax.disruptor InsufficientCapacityException])
+  (:import [org.apache.storm.serialization KryoTupleSerializer])
+  (:import [org.apache.storm.daemon Shutdownable])
+  (:import [org.apache.storm.metric.api IMetric IMetricsConsumer$TaskInfo 
IMetricsConsumer$DataPoint StateMetric])
+  (:import [org.apache.storm Config Constants])
+  (:import [org.apache.storm.cluster ClusterStateContext DaemonType])
+  (:import [org.apache.storm.grouping LoadAwareCustomStreamGrouping 
LoadAwareShuffleGrouping LoadMapping ShuffleGrouping])
+  (:import [java.util.concurrent ConcurrentLinkedQueue])
+  (:require [org.apache.storm [thrift :as thrift]
+             [cluster :as cluster] [disruptor :as disruptor] [stats :as 
stats]])
+  (:require [org.apache.storm.daemon [task :as task]])
+  (:require [org.apache.storm.daemon.builtin-metrics :as builtin-metrics])
+  (:require [clojure.set :as set]))
+
+(defn- mk-fields-grouper
+  [^Fields out-fields ^Fields group-fields ^List target-tasks]
+  (let [num-tasks (count target-tasks)
+        task-getter (fn [i] (.get target-tasks i))]
+    (fn [task-id ^List values load]
+      (-> (.select out-fields group-fields values)
+          (TupleUtils/listHashCode)
+          (mod num-tasks)
+          task-getter))))
+
+(defn- mk-custom-grouper
+  [^CustomStreamGrouping grouping ^WorkerTopologyContext context ^String 
component-id ^String stream-id target-tasks]
+  (.prepare grouping context (GlobalStreamId. component-id stream-id) 
target-tasks)
+  (if (instance? LoadAwareCustomStreamGrouping grouping)
+    (fn [task-id ^List values load]
+      (.chooseTasks grouping task-id values load))
+    (fn [task-id ^List values load]
+      (.chooseTasks grouping task-id values))))
+
+(defn mk-shuffle-grouper
+  [^List target-tasks topo-conf ^WorkerTopologyContext context ^String 
component-id ^String stream-id]
+  (if (.get topo-conf TOPOLOGY-DISABLE-LOADAWARE-MESSAGING)
+    (mk-custom-grouper (ShuffleGrouping.) context component-id stream-id 
target-tasks)
+    (mk-custom-grouper (LoadAwareShuffleGrouping.) context component-id 
stream-id target-tasks)))
+
+(defn- mk-grouper
+  "Returns a function that returns a vector of which task indices to send 
tuple to, or just a single task index."
+  [^WorkerTopologyContext context component-id stream-id ^Fields out-fields 
thrift-grouping ^List target-tasks topo-conf]
+  (let [num-tasks (count target-tasks)
+        random (Random.)
+        target-tasks (vec (sort target-tasks))]
+    (condp = (thrift/grouping-type thrift-grouping)
+      :fields
+        (if (thrift/global-grouping? thrift-grouping)
+          (fn [task-id tuple load]
+            ;; It's possible for target to have multiple tasks if it reads 
multiple sources
+            (first target-tasks))
+          (let [group-fields (Fields. (thrift/field-grouping thrift-grouping))]
+            (mk-fields-grouper out-fields group-fields target-tasks)
+            ))
+      :all
+        (fn [task-id tuple load] target-tasks)
+      :shuffle
+        (mk-shuffle-grouper target-tasks topo-conf context component-id 
stream-id)
+      :local-or-shuffle
+        (let [same-tasks (set/intersection
+                           (set target-tasks)
+                           (set (.getThisWorkerTasks context)))]
+          (if-not (empty? same-tasks)
+            (mk-shuffle-grouper (vec same-tasks) topo-conf context 
component-id stream-id)
+            (mk-shuffle-grouper target-tasks topo-conf context component-id 
stream-id)))
+      :none
+        (fn [task-id tuple load]
+          (let [i (mod (.nextInt random) num-tasks)]
+            (get target-tasks i)
+            ))
+      :custom-object
+        (let [grouping (thrift/instantiate-java-object (.get_custom_object 
thrift-grouping))]
+          (mk-custom-grouper grouping context component-id stream-id 
target-tasks))
+      :custom-serialized
+        (let [grouping (Utils/javaDeserialize (.get_custom_serialized 
thrift-grouping) Serializable)]
+          (mk-custom-grouper grouping context component-id stream-id 
target-tasks))
+      :direct
+        :direct
+      )))
+
+(defn- outbound-groupings
+  [^WorkerTopologyContext worker-context this-component-id stream-id 
out-fields component->grouping topo-conf]
+  (->> component->grouping
+       (filter-key #(-> worker-context
+                        (.getComponentTasks %)
+                        count
+                        pos?))
+       (map (fn [[component tgrouping]]
+               [component
+                (mk-grouper worker-context
+                            this-component-id
+                            stream-id
+                            out-fields
+                            tgrouping
+                            (.getComponentTasks worker-context component)
+                            topo-conf)]))
+       (into {})
+       (HashMap.)))
+
+(defn outbound-components
+  "Returns map of stream id to component id to grouper"
+  [^WorkerTopologyContext worker-context component-id topo-conf]
+  (->> (.getTargets worker-context component-id)
+        clojurify-structure
+        (map (fn [[stream-id component->grouping]]
+               [stream-id
+                (outbound-groupings
+                  worker-context
+                  component-id
+                  stream-id
+                  (.getComponentOutputFields worker-context component-id 
stream-id)
+                  component->grouping
+                  topo-conf)]))
+         (into {})
+         (HashMap.)))
+
+(defn executor-type [^WorkerTopologyContext context component-id]
+  (let [topology (.getRawTopology context)
+        spouts (.get_spouts topology)
+        bolts (.get_bolts topology)]
+    (cond (contains? spouts component-id) :spout
+          (contains? bolts component-id) :bolt
+          :else (throw-runtime "Could not find " component-id " in topology " 
topology))))
+
+(defn executor-selector [executor-data & _] (:type executor-data))
+
+(defmulti mk-threads executor-selector)
+(defmulti mk-executor-stats executor-selector)
+(defmulti close-component executor-selector)
+
+(defn- normalized-component-conf [storm-conf general-context component-id]
+  (let [to-remove (disj (set ALL-CONFIGS)
+                        TOPOLOGY-DEBUG
+                        TOPOLOGY-MAX-SPOUT-PENDING
+                        TOPOLOGY-MAX-TASK-PARALLELISM
+                        TOPOLOGY-TRANSACTIONAL-ID
+                        TOPOLOGY-TICK-TUPLE-FREQ-SECS
+                        TOPOLOGY-SLEEP-SPOUT-WAIT-STRATEGY-TIME-MS
+                        TOPOLOGY-SPOUT-WAIT-STRATEGY
+                        TOPOLOGY-BOLTS-WINDOW-LENGTH-COUNT
+                        TOPOLOGY-BOLTS-WINDOW-LENGTH-DURATION-MS
+                        TOPOLOGY-BOLTS-SLIDING-INTERVAL-COUNT
+                        TOPOLOGY-BOLTS-SLIDING-INTERVAL-DURATION-MS
+                        TOPOLOGY-BOLTS-TUPLE-TIMESTAMP-FIELD-NAME
+                        TOPOLOGY-BOLTS-TUPLE-TIMESTAMP-MAX-LAG-MS
+                        )
+        spec-conf (-> general-context
+                      (.getComponentCommon component-id)
+                      .get_json_conf
+                      from-json)]
+    (merge storm-conf (apply dissoc spec-conf to-remove))
+    ))
+
+(defprotocol RunningExecutor
+  (render-stats [this])
+  (get-executor-id [this])
+  (credentials-changed [this creds])
+  (get-backpressure-flag [this]))
+
+(defn throttled-report-error-fn [executor]
+  (let [storm-conf (:storm-conf executor)
+        error-interval-secs (storm-conf TOPOLOGY-ERROR-THROTTLE-INTERVAL-SECS)
+        max-per-interval (storm-conf TOPOLOGY-MAX-ERROR-REPORT-PER-INTERVAL)
+        interval-start-time (atom (current-time-secs))
+        interval-errors (atom 0)
+        ]
+    (fn [error]
+      (log-error error)
+      (when (> (time-delta @interval-start-time)
+               error-interval-secs)
+        (reset! interval-errors 0)
+        (reset! interval-start-time (current-time-secs)))
+      (swap! interval-errors inc)
+
+      (when (<= @interval-errors max-per-interval)
+        (cluster/report-error (:storm-cluster-state executor) (:storm-id 
executor) (:component-id executor)
+                              (hostname storm-conf)
+                              (.getThisWorkerPort (:worker-context executor)) 
error)
+        ))))
+
+;; in its own function so that it can be mocked out by tracked topologies
+(defn mk-executor-transfer-fn [batch-transfer->worker storm-conf]
+  (fn this
+    [task tuple]
+    (let [val (AddressedTuple. task tuple)]
+      (when (= true (storm-conf TOPOLOGY-DEBUG))
+        (log-message "TRANSFERING tuple " val))
+      (disruptor/publish batch-transfer->worker val))))
+
+(defn mk-executor-data [worker executor-id]
+  (let [worker-context (worker-context worker)
+        task-ids (executor-id->tasks executor-id)
+        component-id (.getComponentId worker-context (first task-ids))
+        storm-conf (normalized-component-conf (:storm-conf worker) 
worker-context component-id)
+        executor-type (executor-type worker-context component-id)
+        batch-transfer->worker (disruptor/disruptor-queue
+                                  (str "executor"  executor-id "-send-queue")
+                                  (storm-conf 
TOPOLOGY-EXECUTOR-SEND-BUFFER-SIZE)
+                                  (storm-conf 
TOPOLOGY-DISRUPTOR-WAIT-TIMEOUT-MILLIS)
+                                  :producer-type :single-threaded
+                                  :batch-size (storm-conf 
TOPOLOGY-DISRUPTOR-BATCH-SIZE)
+                                  :batch-timeout (storm-conf 
TOPOLOGY-DISRUPTOR-BATCH-TIMEOUT-MILLIS))
+        ]
+    (recursive-map
+     :worker worker
+     :worker-context worker-context
+     :executor-id executor-id
+     :task-ids task-ids
+     :component-id component-id
+     :open-or-prepare-was-called? (atom false)
+     :storm-conf storm-conf
+     :receive-queue ((:executor-receive-queue-map worker) executor-id)
+     :storm-id (:storm-id worker)
+     :conf (:conf worker)
+     :shared-executor-data (HashMap.)
+     :storm-active-atom (:storm-active-atom worker)
+     :storm-component->debug-atom (:storm-component->debug-atom worker)
+     :batch-transfer-queue batch-transfer->worker
+     :transfer-fn (mk-executor-transfer-fn batch-transfer->worker storm-conf)
+     :suicide-fn (:suicide-fn worker)
+     :storm-cluster-state (cluster/mk-storm-cluster-state (:cluster-state 
worker) 
+                                                          :acls 
(Utils/getWorkerACL storm-conf)
+                                                          :context 
(ClusterStateContext. DaemonType/WORKER))
+     :type executor-type
+     ;; TODO: should refactor this to be part of the executor specific map 
(spout or bolt with :common field)
+     :stats (mk-executor-stats <> (sampling-rate storm-conf))
+     :interval->task->metric-registry (HashMap.)
+     :task->component (:task->component worker)
+     :stream->component->grouper (outbound-components worker-context 
component-id storm-conf)
+     :report-error (throttled-report-error-fn <>)
+     :report-error-and-die (fn [error]
+                             ((:report-error <>) error)
+                             (if (or
+                                    (exception-cause? InterruptedException 
error)
+                                    (exception-cause? 
java.io.InterruptedIOException error))
+                               (log-message "Got interrupted excpetion 
shutting thread down...")
+                               ((:suicide-fn <>))))
+     :sampler (mk-stats-sampler storm-conf)
+     :backpressure (atom false)
+     :spout-throttling-metrics (if (= executor-type :spout) 
+                                (builtin-metrics/make-spout-throttling-data)
+                                nil)
+     ;; TODO: add in the executor-specific stuff in a :specific... or make a 
spout-data, bolt-data function?
+     )))
+
+(defn- mk-disruptor-backpressure-handler [executor-data]
+  "make a handler for the executor's receive disruptor queue to
+  check highWaterMark and lowWaterMark for backpressure"
+  (disruptor/disruptor-backpressure-handler
+    (fn []
+      "When receive queue is above highWaterMark"
+      (if (not @(:backpressure executor-data))
+        (do (reset! (:backpressure executor-data) true)
+            (log-debug "executor " (:executor-id executor-data) " is 
congested, set backpressure flag true")
+            (WorkerBackpressureThread/notifyBackpressureChecker 
(:backpressure-trigger (:worker executor-data))))))
+    (fn []
+      "When receive queue is below lowWaterMark"
+      (if @(:backpressure executor-data)
+        (do (reset! (:backpressure executor-data) false)
+            (log-debug "executor " (:executor-id executor-data) " is 
not-congested, set backpressure flag false")
+            (WorkerBackpressureThread/notifyBackpressureChecker 
(:backpressure-trigger (:worker executor-data))))))))
+
+(defn start-batch-transfer->worker-handler! [worker executor-data]
+  (let [worker-transfer-fn (:transfer-fn worker)
+        cached-emit (MutableObject. (ArrayList.))
+        storm-conf (:storm-conf executor-data)
+        serializer (KryoTupleSerializer. storm-conf (:worker-context 
executor-data))
+        ]
+    (disruptor/consume-loop*
+      (:batch-transfer-queue executor-data)
+      (disruptor/handler [o seq-id batch-end?]
+        (let [^ArrayList alist (.getObject cached-emit)]
+          (.add alist o)
+          (when batch-end?
+            (worker-transfer-fn serializer alist)
+            (.setObject cached-emit (ArrayList.)))))
+      :kill-fn (:report-error-and-die executor-data))))
+
+(defn setup-metrics! [executor-data]
+  (let [{:keys [storm-conf receive-queue worker-context 
interval->task->metric-registry]} executor-data
+        distinct-time-bucket-intervals (keys interval->task->metric-registry)]
+    (doseq [interval distinct-time-bucket-intervals]
+      (schedule-recurring 
+       (:user-timer (:worker executor-data)) 
+       interval
+       interval
+       (fn []
+         (let [val [(AddressedTuple. AddressedTuple/BROADCAST_DEST (TupleImpl. 
worker-context [interval] Constants/SYSTEM_TASK_ID 
Constants/METRICS_TICK_STREAM_ID))]]
+           (disruptor/publish receive-queue val)))))))
+
+(defn metrics-tick
+  [executor-data task-data ^TupleImpl tuple]
+   (let [{:keys [interval->task->metric-registry ^WorkerTopologyContext 
worker-context]} executor-data
+         interval (.getInteger tuple 0)
+         task-id (:task-id task-data)
+         name->imetric (-> interval->task->metric-registry (get interval) (get 
task-id))
+         task-info (IMetricsConsumer$TaskInfo.
+                     (hostname (:storm-conf executor-data))
+                     (.getThisWorkerPort worker-context)
+                     (:component-id executor-data)
+                     task-id
+                     (long (/ (System/currentTimeMillis) 1000))
+                     interval)
+         data-points (->> name->imetric
+                          (map (fn [[name imetric]]
+                                 (let [value (.getValueAndReset ^IMetric 
imetric)]
+                                   (if value
+                                     (IMetricsConsumer$DataPoint. name 
value)))))
+                          (filter identity)
+                          (into []))]
+     (when (seq data-points)
+       (task/send-unanchored task-data Constants/METRICS_STREAM_ID [task-info 
data-points]))))
+
+(defn setup-ticks! [worker executor-data]
+  (let [storm-conf (:storm-conf executor-data)
+        tick-time-secs (storm-conf TOPOLOGY-TICK-TUPLE-FREQ-SECS)
+        receive-queue (:receive-queue executor-data)
+        context (:worker-context executor-data)]
+    (when tick-time-secs
+      (if (or (Utils/isSystemId (:component-id executor-data))
+              (and (= false (storm-conf TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS))
+                   (= :spout (:type executor-data))))
+        (log-message "Timeouts disabled for executor " (:component-id 
executor-data) ":" (:executor-id executor-data))
+        (schedule-recurring
+          (:user-timer worker)
+          tick-time-secs
+          tick-time-secs
+          (fn []
+            (let [val [(AddressedTuple. AddressedTuple/BROADCAST_DEST 
(TupleImpl. context [tick-time-secs] Constants/SYSTEM_TASK_ID 
Constants/SYSTEM_TICK_STREAM_ID))]]
+              (disruptor/publish receive-queue val))))))))
+
+(defn mk-executor [worker executor-id initial-credentials]
+  (let [executor-data (mk-executor-data worker executor-id)
+        _ (log-message "Loading executor " (:component-id executor-data) ":" 
(pr-str executor-id))
+        task-datas (->> executor-data
+                        :task-ids
+                        (map (fn [t] [t (task/mk-task executor-data t)]))
+                        (into {})
+                        (HashMap.))
+        _ (log-message "Loaded executor tasks " (:component-id executor-data) 
":" (pr-str executor-id))
+        report-error-and-die (:report-error-and-die executor-data)
+        component-id (:component-id executor-data)
+
+
+        disruptor-handler (mk-disruptor-backpressure-handler executor-data)
+        _ (.registerBackpressureCallback (:receive-queue executor-data) 
disruptor-handler)
+        _ (-> (.setHighWaterMark (:receive-queue executor-data) ((:storm-conf 
executor-data) BACKPRESSURE-DISRUPTOR-HIGH-WATERMARK))
+              (.setLowWaterMark ((:storm-conf executor-data) 
BACKPRESSURE-DISRUPTOR-LOW-WATERMARK))
+              (.setEnableBackpressure ((:storm-conf executor-data) 
TOPOLOGY-BACKPRESSURE-ENABLE)))
+
+        ;; starting the batch-transfer->worker ensures that anything 
publishing to that queue 
+        ;; doesn't block (because it's a single threaded queue and the 
caching/consumer started
+        ;; trick isn't thread-safe)
+        system-threads [(start-batch-transfer->worker-handler! worker 
executor-data)]
+        handlers (with-error-reaction report-error-and-die
+                   (mk-threads executor-data task-datas initial-credentials))
+        threads (concat handlers system-threads)]    
+    (setup-ticks! worker executor-data)
+
+    (log-message "Finished loading executor " component-id ":" (pr-str 
executor-id))
+    ;; TODO: add method here to get rendered stats... have worker call that 
when heartbeating
+    (reify
+      RunningExecutor
+      (render-stats [this]
+        (stats/render-stats! (:stats executor-data)))
+      (get-executor-id [this]
+        executor-id)
+      (credentials-changed [this creds]
+        (let [receive-queue (:receive-queue executor-data)
+              context (:worker-context executor-data)
+              val [(AddressedTuple. AddressedTuple/BROADCAST_DEST (TupleImpl. 
context [creds] Constants/SYSTEM_TASK_ID 
Constants/CREDENTIALS_CHANGED_STREAM_ID))]]
+          (disruptor/publish receive-queue val)))
+      (get-backpressure-flag [this]
+        @(:backpressure executor-data))
+      Shutdownable
+      (shutdown
+        [this]
+        (log-message "Shutting down executor " component-id ":" (pr-str 
executor-id))
+        (disruptor/halt-with-interrupt! (:receive-queue executor-data))
+        (disruptor/halt-with-interrupt! (:batch-transfer-queue executor-data))
+        (doseq [t threads]
+          (.interrupt t)
+          (.join t))
+        
+        (doseq [user-context (map :user-context (vals task-datas))]
+          (doseq [hook (.getHooks user-context)]
+            (.cleanup hook)))
+        (.disconnect (:storm-cluster-state executor-data))
+        (when @(:open-or-prepare-was-called? executor-data)
+          (doseq [obj (map :object (vals task-datas))]
+            (close-component executor-data obj)))
+        (log-message "Shut down executor " component-id ":" (pr-str 
executor-id)))
+        )))
+
+(defn- fail-spout-msg [executor-data task-data msg-id tuple-info time-delta 
reason id]
+  (let [^ISpout spout (:object task-data)
+        storm-conf (:storm-conf executor-data)
+        task-id (:task-id task-data)]
+    ;;TODO: need to throttle these when there's lots of failures
+    (when (= true (storm-conf TOPOLOGY-DEBUG))
+      (log-message "SPOUT Failing " id ": " tuple-info " REASON: " reason " 
MSG-ID: " msg-id))
+    (.fail spout msg-id)
+    (task/apply-hooks (:user-context task-data) .spoutFail (SpoutFailInfo. 
msg-id task-id time-delta))
+    (when time-delta
+      (stats/spout-failed-tuple! (:stats executor-data) (:stream tuple-info) 
time-delta))))
+
+(defn- ack-spout-msg [executor-data task-data msg-id tuple-info time-delta id]
+  (let [storm-conf (:storm-conf executor-data)
+        ^ISpout spout (:object task-data)
+        task-id (:task-id task-data)]
+    (when (= true (storm-conf TOPOLOGY-DEBUG))
+      (log-message "SPOUT Acking message " id " " msg-id))
+    (.ack spout msg-id)
+    (task/apply-hooks (:user-context task-data) .spoutAck (SpoutAckInfo. 
msg-id task-id time-delta))
+    (when time-delta
+      (stats/spout-acked-tuple! (:stats executor-data) (:stream tuple-info) 
time-delta))))
+
+(defn mk-task-receiver [executor-data tuple-action-fn]
+  (let [task-ids (:task-ids executor-data)
+        debug? (= true (-> executor-data :storm-conf (get TOPOLOGY-DEBUG)))
+        ]
+    (disruptor/clojure-handler
+      (fn [tuple-batch sequence-id end-of-batch?]
+        (fast-list-iter [^AddressedTuple addressed-tuple tuple-batch]
+          (let [^TupleImpl tuple (.getTuple addressed-tuple)
+                task-id (.getDest addressed-tuple)]
+            (when debug? (log-message "Processing received message FOR " 
task-id " TUPLE: " tuple))
+            (if (not= task-id AddressedTuple/BROADCAST_DEST)
+              (tuple-action-fn task-id tuple)
+              ;; null task ids are broadcast tuples
+              (fast-list-iter [task-id task-ids]
+                (tuple-action-fn task-id tuple)
+                ))
+            ))))))
+
+(defn executor-max-spout-pending [storm-conf num-tasks]
+  (let [p (storm-conf TOPOLOGY-MAX-SPOUT-PENDING)]
+    (if p (* p num-tasks))))
+
+(defn init-spout-wait-strategy [storm-conf]
+  (let [ret (-> storm-conf (get TOPOLOGY-SPOUT-WAIT-STRATEGY) new-instance)]
+    (.prepare ret storm-conf)
+    ret
+    ))
+
+;; Send sampled data to the eventlogger if the global or component level
+;; debug flag is set (via nimbus api).
+(defn send-to-eventlogger [executor-data task-data values component-id 
message-id random]
+    (let [c->d @(:storm-component->debug-atom executor-data)
+          options (get c->d component-id (get c->d (:storm-id executor-data)))
+          spct    (if (and (not-nil? options) (:enable options)) (:samplingpct 
options) 0)]
+      ;; the thread's initialized random number generator is used to generate
+      ;; uniformily distributed random numbers.
+      (when (and (> spct 0) (< (* 100 (.nextDouble random)) spct))
+        (task/send-unanchored
+          task-data
+          EVENTLOGGER-STREAM-ID
+          [component-id message-id (System/currentTimeMillis) values]))))
+
+(defmethod mk-threads :spout [executor-data task-datas initial-credentials]
+  (let [{:keys [storm-conf component-id worker-context transfer-fn 
report-error sampler open-or-prepare-was-called?]} executor-data
+        ^ISpoutWaitStrategy spout-wait-strategy (init-spout-wait-strategy 
storm-conf)
+        max-spout-pending (executor-max-spout-pending storm-conf (count 
task-datas))
+        ^Integer max-spout-pending (if max-spout-pending (int 
max-spout-pending))        
+        last-active (atom false)        
+        spouts (ArrayList. (map :object (vals task-datas)))
+        rand (Random. (Utils/secureRandomLong))
+        ^DisruptorQueue transfer-queue (executor-data :batch-transfer-queue)
+        debug? (= true (storm-conf TOPOLOGY-DEBUG))
+
+        pending (RotatingMap.
+                 2 ;; microoptimize for performance of .size method
+                 (reify RotatingMap$ExpiredCallback
+                   (expire [this id [task-id spout-id tuple-info 
start-time-ms]]
+                     (let [time-delta (if start-time-ms (time-delta-ms 
start-time-ms))]
+                       (fail-spout-msg executor-data (get task-datas task-id) 
spout-id tuple-info time-delta "TIMEOUT" id)
+                       ))))
+        tuple-action-fn (fn [task-id ^TupleImpl tuple]
+                          (let [stream-id (.getSourceStreamId tuple)]
+                            (condp = stream-id
+                              Constants/SYSTEM_TICK_STREAM_ID (.rotate pending)
+                              Constants/METRICS_TICK_STREAM_ID (metrics-tick 
executor-data (get task-datas task-id) tuple)
+                              Constants/CREDENTIALS_CHANGED_STREAM_ID 
+                                (let [task-data (get task-datas task-id)
+                                      spout-obj (:object task-data)]
+                                  (when (instance? ICredentialsListener 
spout-obj)
+                                    (.setCredentials spout-obj (.getValue 
tuple 0))))
+                              (let [id (.getValue tuple 0)
+                                    [stored-task-id spout-id 
tuple-finished-info start-time-ms] (.remove pending id)]
+                                (when spout-id
+                                  (when-not (= stored-task-id task-id)
+                                    (throw-runtime "Fatal error, mismatched 
task ids: " task-id " " stored-task-id))
+                                  (let [time-delta (if start-time-ms 
(time-delta-ms start-time-ms))]
+                                    (condp = stream-id
+                                      ACKER-ACK-STREAM-ID (ack-spout-msg 
executor-data (get task-datas task-id)
+                                                                         
spout-id tuple-finished-info time-delta id)
+                                      ACKER-FAIL-STREAM-ID (fail-spout-msg 
executor-data (get task-datas task-id)
+                                                                           
spout-id tuple-finished-info time-delta "FAIL-STREAM" id)
+                                      )))
+                                ;; TODO: on failure, emit tuple to failure 
stream
+                                ))))
+        receive-queue (:receive-queue executor-data)
+        event-handler (mk-task-receiver executor-data tuple-action-fn)
+        has-ackers? (has-ackers? storm-conf)
+        has-eventloggers? (has-eventloggers? storm-conf)
+        emitted-count (MutableLong. 0)
+        empty-emit-streak (MutableLong. 0)]
+   
+    [(async-loop
+      (fn []
+        ;; If topology was started in inactive state, don't call (.open spout) 
until it's activated first.
+        (while (not @(:storm-active-atom executor-data))
+          (Thread/sleep 100))
+        
+        (log-message "Opening spout " component-id ":" (keys task-datas))
+        (builtin-metrics/register-spout-throttling-metrics 
(:spout-throttling-metrics executor-data) storm-conf (:user-context (first 
(vals task-datas))))
+        (doseq [[task-id task-data] task-datas
+                :let [^ISpout spout-obj (:object task-data)
+                     tasks-fn (:tasks-fn task-data)
+                     send-spout-msg (fn [out-stream-id values message-id 
out-task-id]
+                                       (.increment emitted-count)
+                                       (let [out-tasks (if out-task-id
+                                                         (tasks-fn out-task-id 
out-stream-id values)
+                                                         (tasks-fn 
out-stream-id values))
+                                             rooted? (and message-id 
has-ackers?)
+                                             root-id (if rooted? 
(MessageId/generateId rand))
+                                             ^List out-ids (fast-list-for [t 
out-tasks] (if rooted? (MessageId/generateId rand)))]
+                                         (fast-list-iter [out-task out-tasks 
id out-ids]
+                                                         (let [tuple-id (if 
rooted?
+                                                                          
(MessageId/makeRootId root-id id)
+                                                                          
(MessageId/makeUnanchored))
+                                                               out-tuple 
(TupleImpl. worker-context
+                                                                               
      values
+                                                                               
      task-id
+                                                                               
      out-stream-id
+                                                                               
      tuple-id)]
+                                                           (transfer-fn 
out-task out-tuple)))
+                                         (if has-eventloggers?
+                                           (send-to-eventlogger executor-data 
task-data values component-id message-id rand))
+                                         (if (and rooted?
+                                                  (not (.isEmpty out-ids)))
+                                           (do
+                                             (.put pending root-id [task-id
+                                                                    message-id
+                                                                    {:stream 
out-stream-id 
+                                                                     :values 
(if debug? values nil)}
+                                                                    (if 
(sampler) (System/currentTimeMillis))])
+                                             (task/send-unanchored task-data
+                                                                   
ACKER-INIT-STREAM-ID
+                                                                   [root-id 
(bit-xor-vals out-ids) task-id]))
+                                           (when message-id
+                                             (ack-spout-msg executor-data 
task-data message-id
+                                                            {:stream 
out-stream-id :values values}
+                                                            (if (sampler) 0) 
"0:")))
+                                         (or out-tasks [])
+                                         ))]]
+          (builtin-metrics/register-all (:builtin-metrics task-data) 
storm-conf (:user-context task-data))
+          (builtin-metrics/register-queue-metrics {:sendqueue 
(:batch-transfer-queue executor-data)
+                                                   :receive receive-queue}
+                                                  storm-conf (:user-context 
task-data))
+          (when (instance? ICredentialsListener spout-obj) (.setCredentials 
spout-obj initial-credentials))
+
+          (.open spout-obj
+                 storm-conf
+                 (:user-context task-data)
+                 (SpoutOutputCollector.
+                  (reify ISpoutOutputCollector
+                    (^long getPendingCount[this]
+                      (.size pending)
+                      )
+                    (^List emit [this ^String stream-id ^List tuple ^Object 
message-id]
+                      (send-spout-msg stream-id tuple message-id nil)
+                      )
+                    (^void emitDirect [this ^int out-task-id ^String stream-id
+                                       ^List tuple ^Object message-id]
+                      (send-spout-msg stream-id tuple message-id out-task-id)
+                      )
+                    (reportError [this error]
+                      (report-error error)
+                      )))))
+        (reset! open-or-prepare-was-called? true) 
+        (log-message "Opened spout " component-id ":" (keys task-datas))
+        (setup-metrics! executor-data)
+        
+        (fn []
+          ;; This design requires that spouts be non-blocking
+          (disruptor/consume-batch receive-queue event-handler)
+          
+          (let [active? @(:storm-active-atom executor-data)
+                curr-count (.get emitted-count)
+                backpressure-enabled ((:storm-conf executor-data) 
TOPOLOGY-BACKPRESSURE-ENABLE)
+                throttle-on (and backpressure-enabled
+                              @(:throttle-on (:worker executor-data)))
+                reached-max-spout-pending (and max-spout-pending
+                                               (>= (.size pending) 
max-spout-pending))
+                ]
+            (if active?
+              ; activated
+              (do
+                (when-not @last-active
+                  (reset! last-active true)
+                  (log-message "Activating spout " component-id ":" (keys 
task-datas))
+                  (fast-list-iter [^ISpout spout spouts] (.activate spout)))
+
+                (if (and (not (.isFull transfer-queue))
+                      (not throttle-on)
+                      (not reached-max-spout-pending))
+                  (fast-list-iter [^ISpout spout spouts] (.nextTuple spout))))
+              ; deactivated
+              (do
+                (when @last-active
+                  (reset! last-active false)
+                  (log-message "Deactivating spout " component-id ":" (keys 
task-datas))
+                  (fast-list-iter [^ISpout spout spouts] (.deactivate spout)))
+                ;; TODO: log that it's getting throttled
+                (Time/sleep 100)
+                (builtin-metrics/skipped-inactive! (:spout-throttling-metrics 
executor-data) (:stats executor-data))))
+
+            (if (and (= curr-count (.get emitted-count)) active?)
+              (do (.increment empty-emit-streak)
+                  (.emptyEmit spout-wait-strategy (.get empty-emit-streak))
+                  ;; update the spout throttling metrics
+                  (if throttle-on
+                    (builtin-metrics/skipped-throttle! 
(:spout-throttling-metrics executor-data) (:stats executor-data))
+                    (if reached-max-spout-pending
+                      (builtin-metrics/skipped-max-spout! 
(:spout-throttling-metrics executor-data) (:stats executor-data)))))
+              (.set empty-emit-streak 0)
+              ))
+          0))
+      :kill-fn (:report-error-and-die executor-data)
+      :factory? true
+      :thread-name (str component-id "-executor" (:executor-id 
executor-data)))]))
+
+(defn- tuple-time-delta! [^TupleImpl tuple]
+  (let [ms (.getProcessSampleStartTime tuple)]
+    (if ms
+      (time-delta-ms ms))))
+      
+(defn- tuple-execute-time-delta! [^TupleImpl tuple]
+  (let [ms (.getExecuteSampleStartTime tuple)]
+    (if ms
+      (time-delta-ms ms))))
+
+(defn put-xor! [^Map pending key id]
+  (let [curr (or (.get pending key) (long 0))]
+    (.put pending key (bit-xor curr id))))
+
+(defmethod mk-threads :bolt [executor-data task-datas initial-credentials]
+  (let [storm-conf (:storm-conf executor-data)
+        execute-sampler (mk-stats-sampler storm-conf)
+        executor-stats (:stats executor-data)
+        {:keys [storm-conf component-id worker-context transfer-fn 
report-error sampler
+                open-or-prepare-was-called?]} executor-data
+        rand (Random. (Utils/secureRandomLong))
+
+        tuple-action-fn (fn [task-id ^TupleImpl tuple]
+                          ;; synchronization needs to be done with a key 
provided by this bolt, otherwise:
+                          ;; spout 1 sends synchronization (s1), dies, same 
spout restarts somewhere else, sends synchronization (s2) and incremental 
update. s2 and update finish before s1 -> lose the incremental update
+                          ;; TODO: for state sync, need to first send sync 
messages in a loop and receive tuples until synchronization
+                          ;; buffer other tuples until fully synchronized, 
then process all of those tuples
+                          ;; then go into normal loop
+                          ;; spill to disk?
+                          ;; could be receiving incremental updates while 
waiting for sync or even a partial sync because of another failed task
+                          ;; should remember sync requests and include a 
random sync id in the request. drop anything not related to active sync requests
+                          ;; or just timeout the sync messages that are coming 
in until full sync is hit from that task
+                          ;; need to drop incremental updates from tasks where 
waiting for sync. otherwise, buffer the incremental updates
+                          ;; TODO: for state sync, need to check if tuple 
comes from state spout. if so, update state
+                          ;; TODO: how to handle incremental updates as well 
as synchronizations at same time
+                          ;; TODO: need to version tuples somehow
+                          
+                          ;;(log-debug "Received tuple " tuple " at task " 
task-id)
+                          ;; need to do it this way to avoid reflection
+                          (let [stream-id (.getSourceStreamId tuple)]
+                            (condp = stream-id
+                              Constants/CREDENTIALS_CHANGED_STREAM_ID 
+                                (let [task-data (get task-datas task-id)
+                                      bolt-obj (:object task-data)]
+                                  (when (instance? ICredentialsListener 
bolt-obj)
+                                    (.setCredentials bolt-obj (.getValue tuple 
0))))
+                              Constants/METRICS_TICK_STREAM_ID (metrics-tick 
executor-data (get task-datas task-id) tuple)
+                              (let [task-data (get task-datas task-id)
+                                    ^IBolt bolt-obj (:object task-data)
+                                    user-context (:user-context task-data)
+                                    sampler? (sampler)
+                                    execute-sampler? (execute-sampler)
+                                    now (if (or sampler? execute-sampler?) 
(System/currentTimeMillis))
+                                    receive-queue (:receive-queue 
executor-data)]
+                                (when sampler?
+                                  (.setProcessSampleStartTime tuple now))
+                                (when execute-sampler?
+                                  (.setExecuteSampleStartTime tuple now))
+                                (.execute bolt-obj tuple)
+                                (let [delta (tuple-execute-time-delta! tuple)]
+                                  (when (= true (storm-conf TOPOLOGY-DEBUG))
+                                    (log-message "Execute done TUPLE " tuple " 
TASK: " task-id " DELTA: " delta))
+ 
+                                  (task/apply-hooks user-context .boltExecute 
(BoltExecuteInfo. tuple task-id delta))
+                                  (when delta
+                                    (stats/bolt-execute-tuple! executor-stats
+                                                               
(.getSourceComponent tuple)
+                                                               
(.getSourceStreamId tuple)
+                                                               delta)))))))
+        has-eventloggers? (has-eventloggers? storm-conf)]
+    
+    ;; TODO: can get any SubscribedState objects out of the context now
+
+    [(async-loop
+      (fn []
+        ;; If topology was started in inactive state, don't call prepare bolt 
until it's activated first.
+        (while (not @(:storm-active-atom executor-data))          
+          (Thread/sleep 100))
+        
+        (log-message "Preparing bolt " component-id ":" (keys task-datas))
+        (doseq [[task-id task-data] task-datas
+                :let [^IBolt bolt-obj (:object task-data)
+                      tasks-fn (:tasks-fn task-data)
+                      user-context (:user-context task-data)
+                      bolt-emit (fn [stream anchors values task]
+                                  (let [out-tasks (if task
+                                                    (tasks-fn task stream 
values)
+                                                    (tasks-fn stream values))]
+                                    (fast-list-iter [t out-tasks]
+                                                    (let [anchors-to-ids 
(HashMap.)]
+                                                      (fast-list-iter 
[^TupleImpl a anchors]
+                                                                      (let 
[root-ids (-> a .getMessageId .getAnchorsToIds .keySet)]
+                                                                        (when 
(pos? (count root-ids))
+                                                                          (let 
[edge-id (MessageId/generateId rand)]
+                                                                            
(.updateAckVal a edge-id)
+                                                                            
(fast-list-iter [root-id root-ids]
+                                                                               
             (put-xor! anchors-to-ids root-id edge-id))
+                                                                            
))))
+                                                        (let [tuple 
(TupleImpl. worker-context
+                                                                               
values
+                                                                               
task-id
+                                                                               
stream
+                                                                               
(MessageId/makeId anchors-to-ids))]
+                                                          (transfer-fn t 
tuple))))
+                                    (if has-eventloggers?
+                                      (send-to-eventlogger executor-data 
task-data values component-id nil rand))
+                                    (or out-tasks [])))]]
+          (builtin-metrics/register-all (:builtin-metrics task-data) 
storm-conf user-context)
+          (when (instance? ICredentialsListener bolt-obj) (.setCredentials 
bolt-obj initial-credentials)) 
+          (if (= component-id Constants/SYSTEM_COMPONENT_ID)
+            (do
+              (builtin-metrics/register-queue-metrics {:sendqueue 
(:batch-transfer-queue executor-data)
+                                                       :receive 
(:receive-queue executor-data)
+                                                       :transfer 
(:transfer-queue (:worker executor-data))}
+                                                      storm-conf user-context)
+              (builtin-metrics/register-iconnection-client-metrics 
(:cached-node+port->socket (:worker executor-data)) storm-conf user-context)
+              (builtin-metrics/register-iconnection-server-metric (:receiver 
(:worker executor-data)) storm-conf user-context))
+            (builtin-metrics/register-queue-metrics {:sendqueue 
(:batch-transfer-queue executor-data)
+                                                     :receive (:receive-queue 
executor-data)}
+                                                    storm-conf user-context)
+            )
+
+          (.prepare bolt-obj
+                    storm-conf
+                    user-context
+                    (OutputCollector.
+                     (reify IOutputCollector
+                       (emit [this stream anchors values]
+                         (bolt-emit stream anchors values nil))
+                       (emitDirect [this task stream anchors values]
+                         (bolt-emit stream anchors values task))
+                       (^void ack [this ^Tuple tuple]
+                         (let [^TupleImpl tuple tuple
+                               ack-val (.getAckVal tuple)]
+                           (fast-map-iter [[root id] (.. tuple getMessageId 
getAnchorsToIds)]
+                                          (task/send-unanchored task-data
+                                                                
ACKER-ACK-STREAM-ID
+                                                                [root (bit-xor 
id ack-val)])))
+                         (let [delta (tuple-time-delta! tuple)
+                               debug? (= true (storm-conf TOPOLOGY-DEBUG))]
+                           (when debug? 
+                             (log-message "BOLT ack TASK: " task-id " TIME: " 
delta " TUPLE: " tuple))
+                           (task/apply-hooks user-context .boltAck 
(BoltAckInfo. tuple task-id delta))
+                           (when delta
+                             (stats/bolt-acked-tuple! executor-stats
+                                                      (.getSourceComponent 
tuple)
+                                                      (.getSourceStreamId 
tuple)
+                                                      delta))))
+                       (^void fail [this ^Tuple tuple]
+                         (fast-list-iter [root (.. tuple getMessageId 
getAnchors)]
+                                         (task/send-unanchored task-data
+                                                               
ACKER-FAIL-STREAM-ID
+                                                               [root]))
+                         (let [delta (tuple-time-delta! tuple)
+                               debug? (= true (storm-conf TOPOLOGY-DEBUG))]
+                           (when debug? 
+                             (log-message "BOLT fail TASK: " task-id " TIME: " 
delta " TUPLE: " tuple))
+                           (task/apply-hooks user-context .boltFail 
(BoltFailInfo. tuple task-id delta))
+                           (when delta
+                             (stats/bolt-failed-tuple! executor-stats
+                                                       (.getSourceComponent 
tuple)
+                                                       (.getSourceStreamId 
tuple)
+                                                       delta))))
+                       (reportError [this error]
+                         (report-error error)
+                         )))))
+        (reset! open-or-prepare-was-called? true)        
+        (log-message "Prepared bolt " component-id ":" (keys task-datas))
+        (setup-metrics! executor-data)
+
+        (let [receive-queue (:receive-queue executor-data)
+              event-handler (mk-task-receiver executor-data tuple-action-fn)]
+          (fn []            
+            (disruptor/consume-batch-when-available receive-queue 
event-handler)
+            0)))
+      :kill-fn (:report-error-and-die executor-data)
+      :factory? true
+      :thread-name (str component-id "-executor" (:executor-id 
executor-data)))]))
+
+(defmethod close-component :spout [executor-data spout]
+  (.close spout))
+
+(defmethod close-component :bolt [executor-data bolt]
+  (.cleanup bolt))
+
+;; TODO: refactor this to be part of an executor-specific map
+(defmethod mk-executor-stats :spout [_ rate]
+  (stats/mk-spout-stats rate))
+
+(defmethod mk-executor-stats :bolt [_ rate]
+  (stats/mk-bolt-stats rate))

Reply via email to