http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/backtype/storm/daemon/drpc.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/drpc.clj 
b/storm-core/src/clj/backtype/storm/daemon/drpc.clj
deleted file mode 100644
index 7ffb7d8..0000000
--- a/storm-core/src/clj/backtype/storm/daemon/drpc.clj
+++ /dev/null
@@ -1,274 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements.  See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership.  The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License.  You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-
-(ns backtype.storm.daemon.drpc
-  (:import [backtype.storm.security.auth AuthUtils ThriftServer 
ThriftConnectionType ReqContext])
-  (:import [backtype.storm.security.auth.authorizer DRPCAuthorizerBase])
-  (:import [backtype.storm.generated DistributedRPC DistributedRPC$Iface 
DistributedRPC$Processor
-            DRPCRequest DRPCExecutionException DistributedRPCInvocations 
DistributedRPCInvocations$Iface
-            DistributedRPCInvocations$Processor])
-  (:import [java.util.concurrent Semaphore ConcurrentLinkedQueue
-            ThreadPoolExecutor ArrayBlockingQueue TimeUnit])
-  (:import [backtype.storm.daemon Shutdownable])
-  (:import [java.net InetAddress])
-  (:import [backtype.storm.generated AuthorizationException]
-           [backtype.storm.utils VersionInfo])
-  (:use [backtype.storm config log util])
-  (:use [backtype.storm.daemon common])
-  (:use [backtype.storm.ui helpers])
-  (:use compojure.core)
-  (:use ring.middleware.reload)
-  (:require [compojure.handler :as handler])
-  (:require [metrics.meters :refer [defmeter mark!]])
-  (:gen-class))
-
-(defmeter drpc:num-execute-http-requests)
-(defmeter drpc:num-execute-calls)
-(defmeter drpc:num-result-calls)
-(defmeter drpc:num-failRequest-calls)
-(defmeter drpc:num-fetchRequest-calls)
-(defmeter drpc:num-shutdown-calls)
-
-(def STORM-VERSION (VersionInfo/getVersion))
-
-(defn timeout-check-secs [] 5)
-
-(defn acquire-queue [queues-atom function]
-  (swap! queues-atom
-    (fn [amap]
-      (if-not (amap function)
-        (assoc amap function (ConcurrentLinkedQueue.))
-        amap)))
-  (@queues-atom function))
-
-(defn check-authorization
-  ([aclHandler mapping operation context]
-    (if (not-nil? context)
-      (log-thrift-access (.requestID context) (.remoteAddress context) 
(.principal context) operation))
-    (if aclHandler
-      (let [context (or context (ReqContext/context))]
-        (if-not (.permit aclHandler context operation mapping)
-          (let [principal (.principal context)
-                user (if principal (.getName principal) "unknown")]
-              (throw (AuthorizationException.
-                       (str "DRPC request '" operation "' for '"
-                            user "' user is not authorized"))))))))
-  ([aclHandler mapping operation]
-    (check-authorization aclHandler mapping operation (ReqContext/context))))
-
-;; TODO: change this to use TimeCacheMap
-(defn service-handler [conf]
-  (let [drpc-acl-handler (mk-authorization-handler (conf DRPC-AUTHORIZER) conf)
-        ctr (atom 0)
-        id->sem (atom {})
-        id->result (atom {})
-        id->start (atom {})
-        id->function (atom {})
-        id->request (atom {})
-        request-queues (atom {})
-        cleanup (fn [id] (swap! id->sem dissoc id)
-                  (swap! id->result dissoc id)
-                  (swap! id->function dissoc id)
-                  (swap! id->request dissoc id)
-                  (swap! id->start dissoc id))
-        my-ip (.getHostAddress (InetAddress/getLocalHost))
-        clear-thread (async-loop
-                       (fn []
-                         (doseq [[id start] @id->start]
-                           (when (> (time-delta start) (conf 
DRPC-REQUEST-TIMEOUT-SECS))
-                             (when-let [sem (@id->sem id)]
-                               (.remove (acquire-queue request-queues 
(@id->function id)) (@id->request id))
-                               (log-warn "Timeout DRPC request id: " id " 
start at " start)
-                               (.release sem))
-                             (cleanup id)))
-                         (timeout-check-secs)))]
-    (reify DistributedRPC$Iface
-      (^String execute
-        [this ^String function ^String args]
-        (mark! drpc:num-execute-calls)
-        (log-debug "Received DRPC request for " function " (" args ") at " 
(System/currentTimeMillis))
-        (check-authorization drpc-acl-handler
-                             {DRPCAuthorizerBase/FUNCTION_NAME function}
-                             "execute")
-        (let [id (str (swap! ctr (fn [v] (mod (inc v) 1000000000))))
-              ^Semaphore sem (Semaphore. 0)
-              req (DRPCRequest. args id)
-              ^ConcurrentLinkedQueue queue (acquire-queue request-queues 
function)]
-          (swap! id->start assoc id (current-time-secs))
-          (swap! id->sem assoc id sem)
-          (swap! id->function assoc id function)
-          (swap! id->request assoc id req)
-          (.add queue req)
-          (log-debug "Waiting for DRPC result for " function " " args " at " 
(System/currentTimeMillis))
-          (.acquire sem)
-          (log-debug "Acquired DRPC result for " function " " args " at " 
(System/currentTimeMillis))
-          (let [result (@id->result id)]
-            (cleanup id)
-            (log-debug "Returning DRPC result for " function " " args " at " 
(System/currentTimeMillis))
-            (if (instance? DRPCExecutionException result)
-              (throw result)
-              (if (nil? result)
-                (throw (DRPCExecutionException. "Request timed out"))
-                result)))))
-
-      DistributedRPCInvocations$Iface
-
-      (^void result
-        [this ^String id ^String result]
-        (mark! drpc:num-result-calls)
-        (when-let [func (@id->function id)]
-          (check-authorization drpc-acl-handler
-                               {DRPCAuthorizerBase/FUNCTION_NAME func}
-                               "result")
-          (let [^Semaphore sem (@id->sem id)]
-            (log-debug "Received result " result " for " id " at " 
(System/currentTimeMillis))
-            (when sem
-              (swap! id->result assoc id result)
-              (.release sem)
-              ))))
-
-      (^void failRequest
-        [this ^String id]
-        (mark! drpc:num-failRequest-calls)
-        (when-let [func (@id->function id)]
-          (check-authorization drpc-acl-handler
-                               {DRPCAuthorizerBase/FUNCTION_NAME func}
-                               "failRequest")
-          (let [^Semaphore sem (@id->sem id)]
-            (when sem
-              (swap! id->result assoc id (DRPCExecutionException. "Request 
failed"))
-              (.release sem)))))
-
-      (^DRPCRequest fetchRequest
-        [this ^String func]
-        (mark! drpc:num-fetchRequest-calls)
-        (check-authorization drpc-acl-handler
-                             {DRPCAuthorizerBase/FUNCTION_NAME func}
-                             "fetchRequest")
-        (let [^ConcurrentLinkedQueue queue (acquire-queue request-queues func)
-              ret (.poll queue)]
-          (if ret
-            (do (log-debug "Fetched request for " func " at " 
(System/currentTimeMillis))
-              ret)
-            (DRPCRequest. "" ""))))
-
-      Shutdownable
-
-      (shutdown
-        [this]
-        (mark! drpc:num-shutdown-calls)
-        (.interrupt clear-thread)))))
-
-(defn handle-request [handler]
-  (fn [request]
-    (handler request)))
-
-(defn populate-context!
-  "Populate the Storm RequestContext from an servlet-request. This should be 
called in each handler"
-  [http-creds-handler servlet-request]
-    (when http-creds-handler
-      (.populateContext http-creds-handler (ReqContext/context) 
servlet-request)))
-
-(defn webapp [handler http-creds-handler]
-  (mark! drpc:num-execute-http-requests)
-  (->
-    (routes
-      (POST "/drpc/:func" [:as {:keys [body servlet-request]} func & m]
-        (let [args (slurp body)]
-          (populate-context! http-creds-handler servlet-request)
-          (.execute handler func args)))
-      (POST "/drpc/:func/" [:as {:keys [body servlet-request]} func & m]
-        (let [args (slurp body)]
-          (populate-context! http-creds-handler servlet-request)
-          (.execute handler func args)))
-      (GET "/drpc/:func/:args" [:as {:keys [servlet-request]} func args & m]
-          (populate-context! http-creds-handler servlet-request)
-          (.execute handler func args))
-      (GET "/drpc/:func/" [:as {:keys [servlet-request]} func & m]
-          (populate-context! http-creds-handler servlet-request)
-          (.execute handler func ""))
-      (GET "/drpc/:func" [:as {:keys [servlet-request]} func & m]
-          (populate-context! http-creds-handler servlet-request)
-          (.execute handler func "")))
-    (wrap-reload '[backtype.storm.daemon.drpc])
-    handle-request))
-
-(defn launch-server!
-  ([]
-    (log-message "Starting drpc server for storm version '" STORM-VERSION "'")
-    (let [conf (read-storm-config)
-          worker-threads (int (conf DRPC-WORKER-THREADS))
-          queue-size (int (conf DRPC-QUEUE-SIZE))
-          drpc-http-port (int (conf DRPC-HTTP-PORT))
-          drpc-port (int (conf DRPC-PORT))
-          drpc-service-handler (service-handler conf)
-          ;; requests and returns need to be on separate thread pools, since 
calls to
-          ;; "execute" don't unblock until other thrift methods are called. So 
if
-          ;; 64 threads are calling execute, the server won't accept the result
-          ;; invocations that will unblock those threads
-          handler-server (when (> drpc-port 0)
-                           (ThriftServer. conf
-                             (DistributedRPC$Processor. drpc-service-handler)
-                             ThriftConnectionType/DRPC))
-          invoke-server (ThriftServer. conf
-                          (DistributedRPCInvocations$Processor. 
drpc-service-handler)
-                          ThriftConnectionType/DRPC_INVOCATIONS)
-          http-creds-handler (AuthUtils/GetDrpcHttpCredentialsPlugin conf)]
-      (add-shutdown-hook-with-force-kill-in-1-sec (fn []
-                                                    (if handler-server (.stop 
handler-server))
-                                                    (.stop invoke-server)))
-      (log-message "Starting Distributed RPC servers...")
-      (future (.serve invoke-server))
-      (when (> drpc-http-port 0)
-        (let [app (-> (webapp drpc-service-handler http-creds-handler)
-                    requests-middleware)
-              filter-class (conf DRPC-HTTP-FILTER)
-              filter-params (conf DRPC-HTTP-FILTER-PARAMS)
-              filters-confs [{:filter-class filter-class
-                              :filter-params filter-params}]
-              https-port (int (conf DRPC-HTTPS-PORT))
-              https-ks-path (conf DRPC-HTTPS-KEYSTORE-PATH)
-              https-ks-password (conf DRPC-HTTPS-KEYSTORE-PASSWORD)
-              https-ks-type (conf DRPC-HTTPS-KEYSTORE-TYPE)
-              https-key-password (conf DRPC-HTTPS-KEY-PASSWORD)
-              https-ts-path (conf DRPC-HTTPS-TRUSTSTORE-PATH)
-              https-ts-password (conf DRPC-HTTPS-TRUSTSTORE-PASSWORD)
-              https-ts-type (conf DRPC-HTTPS-TRUSTSTORE-TYPE)
-              https-want-client-auth (conf DRPC-HTTPS-WANT-CLIENT-AUTH)
-              https-need-client-auth (conf DRPC-HTTPS-NEED-CLIENT-AUTH)]
-
-          (storm-run-jetty
-           {:port drpc-http-port
-            :configurator (fn [server]
-                            (config-ssl server
-                                        https-port
-                                        https-ks-path
-                                        https-ks-password
-                                        https-ks-type
-                                        https-key-password
-                                        https-ts-path
-                                        https-ts-password
-                                        https-ts-type
-                                        https-need-client-auth
-                                        https-want-client-auth)
-                            (config-filter server app filters-confs))})))
-      (start-metrics-reporters)
-      (when handler-server
-        (.serve handler-server)))))
-
-(defn -main []
-  (setup-default-uncaught-exception-handler)
-  (launch-server!))

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/backtype/storm/daemon/executor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/executor.clj 
b/storm-core/src/clj/backtype/storm/daemon/executor.clj
deleted file mode 100644
index 7fee67b..0000000
--- a/storm-core/src/clj/backtype/storm/daemon/executor.clj
+++ /dev/null
@@ -1,855 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements.  See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership.  The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License.  You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-(ns backtype.storm.daemon.executor
-  (:use [backtype.storm.daemon common])
-  (:import [backtype.storm.generated Grouping]
-           [java.io Serializable])
-  (:use [backtype.storm util config log timer stats])
-  (:import [java.util List Random HashMap ArrayList LinkedList Map])
-  (:import [backtype.storm ICredentialsListener])
-  (:import [backtype.storm.hooks ITaskHook])
-  (:import [backtype.storm.tuple AddressedTuple Tuple Fields TupleImpl 
MessageId])
-  (:import [backtype.storm.spout ISpoutWaitStrategy ISpout 
SpoutOutputCollector ISpoutOutputCollector])
-  (:import [backtype.storm.hooks.info SpoutAckInfo SpoutFailInfo
-            EmitInfo BoltFailInfo BoltAckInfo BoltExecuteInfo])
-  (:import [backtype.storm.grouping CustomStreamGrouping])
-  (:import [backtype.storm.task WorkerTopologyContext IBolt OutputCollector 
IOutputCollector])
-  (:import [backtype.storm.generated GlobalStreamId])
-  (:import [backtype.storm.utils Utils TupleUtils MutableObject RotatingMap 
RotatingMap$ExpiredCallback MutableLong Time DisruptorQueue 
WorkerBackpressureThread])
-  (:import [com.lmax.disruptor InsufficientCapacityException])
-  (:import [backtype.storm.serialization KryoTupleSerializer])
-  (:import [backtype.storm.daemon Shutdownable])
-  (:import [backtype.storm.metric.api IMetric IMetricsConsumer$TaskInfo 
IMetricsConsumer$DataPoint StateMetric])
-  (:import [backtype.storm Config Constants])
-  (:import [backtype.storm.cluster ClusterStateContext DaemonType])
-  (:import [backtype.storm.grouping LoadAwareCustomStreamGrouping 
LoadAwareShuffleGrouping LoadMapping ShuffleGrouping])
-  (:import [java.util.concurrent ConcurrentLinkedQueue])
-  (:require [backtype.storm [thrift :as thrift]
-             [cluster :as cluster] [disruptor :as disruptor] [stats :as 
stats]])
-  (:require [backtype.storm.daemon [task :as task]])
-  (:require [backtype.storm.daemon.builtin-metrics :as builtin-metrics])
-  (:require [clojure.set :as set]))
-
-(defn- mk-fields-grouper
-  [^Fields out-fields ^Fields group-fields ^List target-tasks]
-  (let [num-tasks (count target-tasks)
-        task-getter (fn [i] (.get target-tasks i))]
-    (fn [task-id ^List values load]
-      (-> (.select out-fields group-fields values)
-          (TupleUtils/listHashCode)
-          (mod num-tasks)
-          task-getter))))
-
-(defn- mk-custom-grouper
-  [^CustomStreamGrouping grouping ^WorkerTopologyContext context ^String 
component-id ^String stream-id target-tasks]
-  (.prepare grouping context (GlobalStreamId. component-id stream-id) 
target-tasks)
-  (if (instance? LoadAwareCustomStreamGrouping grouping)
-    (fn [task-id ^List values load]
-      (.chooseTasks grouping task-id values load))
-    (fn [task-id ^List values load]
-      (.chooseTasks grouping task-id values))))
-
-(defn mk-shuffle-grouper
-  [^List target-tasks topo-conf ^WorkerTopologyContext context ^String 
component-id ^String stream-id]
-  (if (.get topo-conf TOPOLOGY-DISABLE-LOADAWARE-MESSAGING)
-    (mk-custom-grouper (ShuffleGrouping.) context component-id stream-id 
target-tasks)
-    (mk-custom-grouper (LoadAwareShuffleGrouping.) context component-id 
stream-id target-tasks)))
-
-(defn- mk-grouper
-  "Returns a function that returns a vector of which task indices to send 
tuple to, or just a single task index."
-  [^WorkerTopologyContext context component-id stream-id ^Fields out-fields 
thrift-grouping ^List target-tasks topo-conf]
-  (let [num-tasks (count target-tasks)
-        random (Random.)
-        target-tasks (vec (sort target-tasks))]
-    (condp = (thrift/grouping-type thrift-grouping)
-      :fields
-        (if (thrift/global-grouping? thrift-grouping)
-          (fn [task-id tuple load]
-            ;; It's possible for target to have multiple tasks if it reads 
multiple sources
-            (first target-tasks))
-          (let [group-fields (Fields. (thrift/field-grouping thrift-grouping))]
-            (mk-fields-grouper out-fields group-fields target-tasks)
-            ))
-      :all
-        (fn [task-id tuple load] target-tasks)
-      :shuffle
-        (mk-shuffle-grouper target-tasks topo-conf context component-id 
stream-id)
-      :local-or-shuffle
-        (let [same-tasks (set/intersection
-                           (set target-tasks)
-                           (set (.getThisWorkerTasks context)))]
-          (if-not (empty? same-tasks)
-            (mk-shuffle-grouper (vec same-tasks) topo-conf context 
component-id stream-id)
-            (mk-shuffle-grouper target-tasks topo-conf context component-id 
stream-id)))
-      :none
-        (fn [task-id tuple load]
-          (let [i (mod (.nextInt random) num-tasks)]
-            (get target-tasks i)
-            ))
-      :custom-object
-        (let [grouping (thrift/instantiate-java-object (.get_custom_object 
thrift-grouping))]
-          (mk-custom-grouper grouping context component-id stream-id 
target-tasks))
-      :custom-serialized
-        (let [grouping (Utils/javaDeserialize (.get_custom_serialized 
thrift-grouping) Serializable)]
-          (mk-custom-grouper grouping context component-id stream-id 
target-tasks))
-      :direct
-        :direct
-      )))
-
-(defn- outbound-groupings
-  [^WorkerTopologyContext worker-context this-component-id stream-id 
out-fields component->grouping topo-conf]
-  (->> component->grouping
-       (filter-key #(-> worker-context
-                        (.getComponentTasks %)
-                        count
-                        pos?))
-       (map (fn [[component tgrouping]]
-               [component
-                (mk-grouper worker-context
-                            this-component-id
-                            stream-id
-                            out-fields
-                            tgrouping
-                            (.getComponentTasks worker-context component)
-                            topo-conf)]))
-       (into {})
-       (HashMap.)))
-
-(defn outbound-components
-  "Returns map of stream id to component id to grouper"
-  [^WorkerTopologyContext worker-context component-id topo-conf]
-  (->> (.getTargets worker-context component-id)
-        clojurify-structure
-        (map (fn [[stream-id component->grouping]]
-               [stream-id
-                (outbound-groupings
-                  worker-context
-                  component-id
-                  stream-id
-                  (.getComponentOutputFields worker-context component-id 
stream-id)
-                  component->grouping
-                  topo-conf)]))
-         (into {})
-         (HashMap.)))
-
-(defn executor-type [^WorkerTopologyContext context component-id]
-  (let [topology (.getRawTopology context)
-        spouts (.get_spouts topology)
-        bolts (.get_bolts topology)]
-    (cond (contains? spouts component-id) :spout
-          (contains? bolts component-id) :bolt
-          :else (throw-runtime "Could not find " component-id " in topology " 
topology))))
-
-(defn executor-selector [executor-data & _] (:type executor-data))
-
-(defmulti mk-threads executor-selector)
-(defmulti mk-executor-stats executor-selector)
-(defmulti close-component executor-selector)
-
-(defn- normalized-component-conf [storm-conf general-context component-id]
-  (let [to-remove (disj (set ALL-CONFIGS)
-                        TOPOLOGY-DEBUG
-                        TOPOLOGY-MAX-SPOUT-PENDING
-                        TOPOLOGY-MAX-TASK-PARALLELISM
-                        TOPOLOGY-TRANSACTIONAL-ID
-                        TOPOLOGY-TICK-TUPLE-FREQ-SECS
-                        TOPOLOGY-SLEEP-SPOUT-WAIT-STRATEGY-TIME-MS
-                        TOPOLOGY-SPOUT-WAIT-STRATEGY
-                        TOPOLOGY-BOLTS-WINDOW-LENGTH-COUNT
-                        TOPOLOGY-BOLTS-WINDOW-LENGTH-DURATION-MS
-                        TOPOLOGY-BOLTS-SLIDING-INTERVAL-COUNT
-                        TOPOLOGY-BOLTS-SLIDING-INTERVAL-DURATION-MS
-                        TOPOLOGY-BOLTS-TUPLE-TIMESTAMP-FIELD-NAME
-                        TOPOLOGY-BOLTS-TUPLE-TIMESTAMP-MAX-LAG-MS
-                        )
-        spec-conf (-> general-context
-                      (.getComponentCommon component-id)
-                      .get_json_conf
-                      from-json)]
-    (merge storm-conf (apply dissoc spec-conf to-remove))
-    ))
-
-(defprotocol RunningExecutor
-  (render-stats [this])
-  (get-executor-id [this])
-  (credentials-changed [this creds])
-  (get-backpressure-flag [this]))
-
-(defn throttled-report-error-fn [executor]
-  (let [storm-conf (:storm-conf executor)
-        error-interval-secs (storm-conf TOPOLOGY-ERROR-THROTTLE-INTERVAL-SECS)
-        max-per-interval (storm-conf TOPOLOGY-MAX-ERROR-REPORT-PER-INTERVAL)
-        interval-start-time (atom (current-time-secs))
-        interval-errors (atom 0)
-        ]
-    (fn [error]
-      (log-error error)
-      (when (> (time-delta @interval-start-time)
-               error-interval-secs)
-        (reset! interval-errors 0)
-        (reset! interval-start-time (current-time-secs)))
-      (swap! interval-errors inc)
-
-      (when (<= @interval-errors max-per-interval)
-        (cluster/report-error (:storm-cluster-state executor) (:storm-id 
executor) (:component-id executor)
-                              (hostname storm-conf)
-                              (.getThisWorkerPort (:worker-context executor)) 
error)
-        ))))
-
-;; in its own function so that it can be mocked out by tracked topologies
-(defn mk-executor-transfer-fn [batch-transfer->worker storm-conf]
-  (fn this
-    [task tuple]
-    (let [val (AddressedTuple. task tuple)]
-      (when (= true (storm-conf TOPOLOGY-DEBUG))
-        (log-message "TRANSFERING tuple " val))
-      (disruptor/publish batch-transfer->worker val))))
-
-(defn mk-executor-data [worker executor-id]
-  (let [worker-context (worker-context worker)
-        task-ids (executor-id->tasks executor-id)
-        component-id (.getComponentId worker-context (first task-ids))
-        storm-conf (normalized-component-conf (:storm-conf worker) 
worker-context component-id)
-        executor-type (executor-type worker-context component-id)
-        batch-transfer->worker (disruptor/disruptor-queue
-                                  (str "executor"  executor-id "-send-queue")
-                                  (storm-conf 
TOPOLOGY-EXECUTOR-SEND-BUFFER-SIZE)
-                                  (storm-conf 
TOPOLOGY-DISRUPTOR-WAIT-TIMEOUT-MILLIS)
-                                  :producer-type :single-threaded
-                                  :batch-size (storm-conf 
TOPOLOGY-DISRUPTOR-BATCH-SIZE)
-                                  :batch-timeout (storm-conf 
TOPOLOGY-DISRUPTOR-BATCH-TIMEOUT-MILLIS))
-        ]
-    (recursive-map
-     :worker worker
-     :worker-context worker-context
-     :executor-id executor-id
-     :task-ids task-ids
-     :component-id component-id
-     :open-or-prepare-was-called? (atom false)
-     :storm-conf storm-conf
-     :receive-queue ((:executor-receive-queue-map worker) executor-id)
-     :storm-id (:storm-id worker)
-     :conf (:conf worker)
-     :shared-executor-data (HashMap.)
-     :storm-active-atom (:storm-active-atom worker)
-     :storm-component->debug-atom (:storm-component->debug-atom worker)
-     :batch-transfer-queue batch-transfer->worker
-     :transfer-fn (mk-executor-transfer-fn batch-transfer->worker storm-conf)
-     :suicide-fn (:suicide-fn worker)
-     :storm-cluster-state (cluster/mk-storm-cluster-state (:cluster-state 
worker) 
-                                                          :acls 
(Utils/getWorkerACL storm-conf)
-                                                          :context 
(ClusterStateContext. DaemonType/WORKER))
-     :type executor-type
-     ;; TODO: should refactor this to be part of the executor specific map 
(spout or bolt with :common field)
-     :stats (mk-executor-stats <> (sampling-rate storm-conf))
-     :interval->task->metric-registry (HashMap.)
-     :task->component (:task->component worker)
-     :stream->component->grouper (outbound-components worker-context 
component-id storm-conf)
-     :report-error (throttled-report-error-fn <>)
-     :report-error-and-die (fn [error]
-                             ((:report-error <>) error)
-                             (if (or
-                                    (exception-cause? InterruptedException 
error)
-                                    (exception-cause? 
java.io.InterruptedIOException error))
-                               (log-message "Got interrupted excpetion 
shutting thread down...")
-                               ((:suicide-fn <>))))
-     :sampler (mk-stats-sampler storm-conf)
-     :backpressure (atom false)
-     :spout-throttling-metrics (if (= executor-type :spout) 
-                                (builtin-metrics/make-spout-throttling-data)
-                                nil)
-     ;; TODO: add in the executor-specific stuff in a :specific... or make a 
spout-data, bolt-data function?
-     )))
-
-(defn- mk-disruptor-backpressure-handler [executor-data]
-  "make a handler for the executor's receive disruptor queue to
-  check highWaterMark and lowWaterMark for backpressure"
-  (disruptor/disruptor-backpressure-handler
-    (fn []
-      "When receive queue is above highWaterMark"
-      (if (not @(:backpressure executor-data))
-        (do (reset! (:backpressure executor-data) true)
-            (log-debug "executor " (:executor-id executor-data) " is 
congested, set backpressure flag true")
-            (WorkerBackpressureThread/notifyBackpressureChecker 
(:backpressure-trigger (:worker executor-data))))))
-    (fn []
-      "When receive queue is below lowWaterMark"
-      (if @(:backpressure executor-data)
-        (do (reset! (:backpressure executor-data) false)
-            (log-debug "executor " (:executor-id executor-data) " is 
not-congested, set backpressure flag false")
-            (WorkerBackpressureThread/notifyBackpressureChecker 
(:backpressure-trigger (:worker executor-data))))))))
-
-(defn start-batch-transfer->worker-handler! [worker executor-data]
-  (let [worker-transfer-fn (:transfer-fn worker)
-        cached-emit (MutableObject. (ArrayList.))
-        storm-conf (:storm-conf executor-data)
-        serializer (KryoTupleSerializer. storm-conf (:worker-context 
executor-data))
-        ]
-    (disruptor/consume-loop*
-      (:batch-transfer-queue executor-data)
-      (disruptor/handler [o seq-id batch-end?]
-        (let [^ArrayList alist (.getObject cached-emit)]
-          (.add alist o)
-          (when batch-end?
-            (worker-transfer-fn serializer alist)
-            (.setObject cached-emit (ArrayList.)))))
-      :kill-fn (:report-error-and-die executor-data))))
-
-(defn setup-metrics! [executor-data]
-  (let [{:keys [storm-conf receive-queue worker-context 
interval->task->metric-registry]} executor-data
-        distinct-time-bucket-intervals (keys interval->task->metric-registry)]
-    (doseq [interval distinct-time-bucket-intervals]
-      (schedule-recurring 
-       (:user-timer (:worker executor-data)) 
-       interval
-       interval
-       (fn []
-         (let [val [(AddressedTuple. AddressedTuple/BROADCAST_DEST (TupleImpl. 
worker-context [interval] Constants/SYSTEM_TASK_ID 
Constants/METRICS_TICK_STREAM_ID))]]
-           (disruptor/publish receive-queue val)))))))
-
-(defn metrics-tick
-  [executor-data task-data ^TupleImpl tuple]
-   (let [{:keys [interval->task->metric-registry ^WorkerTopologyContext 
worker-context]} executor-data
-         interval (.getInteger tuple 0)
-         task-id (:task-id task-data)
-         name->imetric (-> interval->task->metric-registry (get interval) (get 
task-id))
-         task-info (IMetricsConsumer$TaskInfo.
-                     (hostname (:storm-conf executor-data))
-                     (.getThisWorkerPort worker-context)
-                     (:component-id executor-data)
-                     task-id
-                     (long (/ (System/currentTimeMillis) 1000))
-                     interval)
-         data-points (->> name->imetric
-                          (map (fn [[name imetric]]
-                                 (let [value (.getValueAndReset ^IMetric 
imetric)]
-                                   (if value
-                                     (IMetricsConsumer$DataPoint. name 
value)))))
-                          (filter identity)
-                          (into []))]
-     (when (seq data-points)
-       (task/send-unanchored task-data Constants/METRICS_STREAM_ID [task-info 
data-points]))))
-
-(defn setup-ticks! [worker executor-data]
-  (let [storm-conf (:storm-conf executor-data)
-        tick-time-secs (storm-conf TOPOLOGY-TICK-TUPLE-FREQ-SECS)
-        receive-queue (:receive-queue executor-data)
-        context (:worker-context executor-data)]
-    (when tick-time-secs
-      (if (or (Utils/isSystemId (:component-id executor-data))
-              (and (= false (storm-conf TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS))
-                   (= :spout (:type executor-data))))
-        (log-message "Timeouts disabled for executor " (:component-id 
executor-data) ":" (:executor-id executor-data))
-        (schedule-recurring
-          (:user-timer worker)
-          tick-time-secs
-          tick-time-secs
-          (fn []
-            (let [val [(AddressedTuple. AddressedTuple/BROADCAST_DEST 
(TupleImpl. context [tick-time-secs] Constants/SYSTEM_TASK_ID 
Constants/SYSTEM_TICK_STREAM_ID))]]
-              (disruptor/publish receive-queue val))))))))
-
-(defn mk-executor [worker executor-id initial-credentials]
-  (let [executor-data (mk-executor-data worker executor-id)
-        _ (log-message "Loading executor " (:component-id executor-data) ":" 
(pr-str executor-id))
-        task-datas (->> executor-data
-                        :task-ids
-                        (map (fn [t] [t (task/mk-task executor-data t)]))
-                        (into {})
-                        (HashMap.))
-        _ (log-message "Loaded executor tasks " (:component-id executor-data) 
":" (pr-str executor-id))
-        report-error-and-die (:report-error-and-die executor-data)
-        component-id (:component-id executor-data)
-
-
-        disruptor-handler (mk-disruptor-backpressure-handler executor-data)
-        _ (.registerBackpressureCallback (:receive-queue executor-data) 
disruptor-handler)
-        _ (-> (.setHighWaterMark (:receive-queue executor-data) ((:storm-conf 
executor-data) BACKPRESSURE-DISRUPTOR-HIGH-WATERMARK))
-              (.setLowWaterMark ((:storm-conf executor-data) 
BACKPRESSURE-DISRUPTOR-LOW-WATERMARK))
-              (.setEnableBackpressure ((:storm-conf executor-data) 
TOPOLOGY-BACKPRESSURE-ENABLE)))
-
-        ;; starting the batch-transfer->worker ensures that anything 
publishing to that queue 
-        ;; doesn't block (because it's a single threaded queue and the 
caching/consumer started
-        ;; trick isn't thread-safe)
-        system-threads [(start-batch-transfer->worker-handler! worker 
executor-data)]
-        handlers (with-error-reaction report-error-and-die
-                   (mk-threads executor-data task-datas initial-credentials))
-        threads (concat handlers system-threads)]    
-    (setup-ticks! worker executor-data)
-
-    (log-message "Finished loading executor " component-id ":" (pr-str 
executor-id))
-    ;; TODO: add method here to get rendered stats... have worker call that 
when heartbeating
-    (reify
-      RunningExecutor
-      (render-stats [this]
-        (stats/render-stats! (:stats executor-data)))
-      (get-executor-id [this]
-        executor-id)
-      (credentials-changed [this creds]
-        (let [receive-queue (:receive-queue executor-data)
-              context (:worker-context executor-data)
-              val [(AddressedTuple. AddressedTuple/BROADCAST_DEST (TupleImpl. 
context [creds] Constants/SYSTEM_TASK_ID 
Constants/CREDENTIALS_CHANGED_STREAM_ID))]]
-          (disruptor/publish receive-queue val)))
-      (get-backpressure-flag [this]
-        @(:backpressure executor-data))
-      Shutdownable
-      (shutdown
-        [this]
-        (log-message "Shutting down executor " component-id ":" (pr-str 
executor-id))
-        (disruptor/halt-with-interrupt! (:receive-queue executor-data))
-        (disruptor/halt-with-interrupt! (:batch-transfer-queue executor-data))
-        (doseq [t threads]
-          (.interrupt t)
-          (.join t))
-        
-        (doseq [user-context (map :user-context (vals task-datas))]
-          (doseq [hook (.getHooks user-context)]
-            (.cleanup hook)))
-        (.disconnect (:storm-cluster-state executor-data))
-        (when @(:open-or-prepare-was-called? executor-data)
-          (doseq [obj (map :object (vals task-datas))]
-            (close-component executor-data obj)))
-        (log-message "Shut down executor " component-id ":" (pr-str 
executor-id)))
-        )))
-
-(defn- fail-spout-msg [executor-data task-data msg-id tuple-info time-delta 
reason id]
-  (let [^ISpout spout (:object task-data)
-        storm-conf (:storm-conf executor-data)
-        task-id (:task-id task-data)]
-    ;;TODO: need to throttle these when there's lots of failures
-    (when (= true (storm-conf TOPOLOGY-DEBUG))
-      (log-message "SPOUT Failing " id ": " tuple-info " REASON: " reason " 
MSG-ID: " msg-id))
-    (.fail spout msg-id)
-    (task/apply-hooks (:user-context task-data) .spoutFail (SpoutFailInfo. 
msg-id task-id time-delta))
-    (when time-delta
-      (stats/spout-failed-tuple! (:stats executor-data) (:stream tuple-info) 
time-delta))))
-
-(defn- ack-spout-msg [executor-data task-data msg-id tuple-info time-delta id]
-  (let [storm-conf (:storm-conf executor-data)
-        ^ISpout spout (:object task-data)
-        task-id (:task-id task-data)]
-    (when (= true (storm-conf TOPOLOGY-DEBUG))
-      (log-message "SPOUT Acking message " id " " msg-id))
-    (.ack spout msg-id)
-    (task/apply-hooks (:user-context task-data) .spoutAck (SpoutAckInfo. 
msg-id task-id time-delta))
-    (when time-delta
-      (stats/spout-acked-tuple! (:stats executor-data) (:stream tuple-info) 
time-delta))))
-
-(defn mk-task-receiver [executor-data tuple-action-fn]
-  (let [task-ids (:task-ids executor-data)
-        debug? (= true (-> executor-data :storm-conf (get TOPOLOGY-DEBUG)))
-        ]
-    (disruptor/clojure-handler
-      (fn [tuple-batch sequence-id end-of-batch?]
-        (fast-list-iter [^AddressedTuple addressed-tuple tuple-batch]
-          (let [^TupleImpl tuple (.getTuple addressed-tuple)
-                task-id (.getDest addressed-tuple)]
-            (when debug? (log-message "Processing received message FOR " 
task-id " TUPLE: " tuple))
-            (if (not= task-id AddressedTuple/BROADCAST_DEST)
-              (tuple-action-fn task-id tuple)
-              ;; null task ids are broadcast tuples
-              (fast-list-iter [task-id task-ids]
-                (tuple-action-fn task-id tuple)
-                ))
-            ))))))
-
-(defn executor-max-spout-pending [storm-conf num-tasks]
-  (let [p (storm-conf TOPOLOGY-MAX-SPOUT-PENDING)]
-    (if p (* p num-tasks))))
-
-(defn init-spout-wait-strategy [storm-conf]
-  (let [ret (-> storm-conf (get TOPOLOGY-SPOUT-WAIT-STRATEGY) new-instance)]
-    (.prepare ret storm-conf)
-    ret
-    ))
-
-;; Send sampled data to the eventlogger if the global or component level
-;; debug flag is set (via nimbus api).
-(defn send-to-eventlogger [executor-data task-data values component-id 
message-id random]
-    (let [c->d @(:storm-component->debug-atom executor-data)
-          options (get c->d component-id (get c->d (:storm-id executor-data)))
-          spct    (if (and (not-nil? options) (:enable options)) (:samplingpct 
options) 0)]
-      ;; the thread's initialized random number generator is used to generate
-      ;; uniformily distributed random numbers.
-      (when (and (> spct 0) (< (* 100 (.nextDouble random)) spct))
-        (task/send-unanchored
-          task-data
-          EVENTLOGGER-STREAM-ID
-          [component-id message-id (System/currentTimeMillis) values]))))
-
-(defmethod mk-threads :spout [executor-data task-datas initial-credentials]
-  (let [{:keys [storm-conf component-id worker-context transfer-fn 
report-error sampler open-or-prepare-was-called?]} executor-data
-        ^ISpoutWaitStrategy spout-wait-strategy (init-spout-wait-strategy 
storm-conf)
-        max-spout-pending (executor-max-spout-pending storm-conf (count 
task-datas))
-        ^Integer max-spout-pending (if max-spout-pending (int 
max-spout-pending))        
-        last-active (atom false)        
-        spouts (ArrayList. (map :object (vals task-datas)))
-        rand (Random. (Utils/secureRandomLong))
-        ^DisruptorQueue transfer-queue (executor-data :batch-transfer-queue)
-        debug? (= true (storm-conf TOPOLOGY-DEBUG))
-
-        pending (RotatingMap.
-                 2 ;; microoptimize for performance of .size method
-                 (reify RotatingMap$ExpiredCallback
-                   (expire [this id [task-id spout-id tuple-info 
start-time-ms]]
-                     (let [time-delta (if start-time-ms (time-delta-ms 
start-time-ms))]
-                       (fail-spout-msg executor-data (get task-datas task-id) 
spout-id tuple-info time-delta "TIMEOUT" id)
-                       ))))
-        tuple-action-fn (fn [task-id ^TupleImpl tuple]
-                          (let [stream-id (.getSourceStreamId tuple)]
-                            (condp = stream-id
-                              Constants/SYSTEM_TICK_STREAM_ID (.rotate pending)
-                              Constants/METRICS_TICK_STREAM_ID (metrics-tick 
executor-data (get task-datas task-id) tuple)
-                              Constants/CREDENTIALS_CHANGED_STREAM_ID 
-                                (let [task-data (get task-datas task-id)
-                                      spout-obj (:object task-data)]
-                                  (when (instance? ICredentialsListener 
spout-obj)
-                                    (.setCredentials spout-obj (.getValue 
tuple 0))))
-                              (let [id (.getValue tuple 0)
-                                    [stored-task-id spout-id 
tuple-finished-info start-time-ms] (.remove pending id)]
-                                (when spout-id
-                                  (when-not (= stored-task-id task-id)
-                                    (throw-runtime "Fatal error, mismatched 
task ids: " task-id " " stored-task-id))
-                                  (let [time-delta (if start-time-ms 
(time-delta-ms start-time-ms))]
-                                    (condp = stream-id
-                                      ACKER-ACK-STREAM-ID (ack-spout-msg 
executor-data (get task-datas task-id)
-                                                                         
spout-id tuple-finished-info time-delta id)
-                                      ACKER-FAIL-STREAM-ID (fail-spout-msg 
executor-data (get task-datas task-id)
-                                                                           
spout-id tuple-finished-info time-delta "FAIL-STREAM" id)
-                                      )))
-                                ;; TODO: on failure, emit tuple to failure 
stream
-                                ))))
-        receive-queue (:receive-queue executor-data)
-        event-handler (mk-task-receiver executor-data tuple-action-fn)
-        has-ackers? (has-ackers? storm-conf)
-        has-eventloggers? (has-eventloggers? storm-conf)
-        emitted-count (MutableLong. 0)
-        empty-emit-streak (MutableLong. 0)]
-   
-    [(async-loop
-      (fn []
-        ;; If topology was started in inactive state, don't call (.open spout) 
until it's activated first.
-        (while (not @(:storm-active-atom executor-data))
-          (Thread/sleep 100))
-        
-        (log-message "Opening spout " component-id ":" (keys task-datas))
-        (builtin-metrics/register-spout-throttling-metrics 
(:spout-throttling-metrics executor-data) storm-conf (:user-context (first 
(vals task-datas))))
-        (doseq [[task-id task-data] task-datas
-                :let [^ISpout spout-obj (:object task-data)
-                     tasks-fn (:tasks-fn task-data)
-                     send-spout-msg (fn [out-stream-id values message-id 
out-task-id]
-                                       (.increment emitted-count)
-                                       (let [out-tasks (if out-task-id
-                                                         (tasks-fn out-task-id 
out-stream-id values)
-                                                         (tasks-fn 
out-stream-id values))
-                                             rooted? (and message-id 
has-ackers?)
-                                             root-id (if rooted? 
(MessageId/generateId rand))
-                                             ^List out-ids (fast-list-for [t 
out-tasks] (if rooted? (MessageId/generateId rand)))]
-                                         (fast-list-iter [out-task out-tasks 
id out-ids]
-                                                         (let [tuple-id (if 
rooted?
-                                                                          
(MessageId/makeRootId root-id id)
-                                                                          
(MessageId/makeUnanchored))
-                                                               out-tuple 
(TupleImpl. worker-context
-                                                                               
      values
-                                                                               
      task-id
-                                                                               
      out-stream-id
-                                                                               
      tuple-id)]
-                                                           (transfer-fn 
out-task out-tuple)))
-                                         (if has-eventloggers?
-                                           (send-to-eventlogger executor-data 
task-data values component-id message-id rand))
-                                         (if (and rooted?
-                                                  (not (.isEmpty out-ids)))
-                                           (do
-                                             (.put pending root-id [task-id
-                                                                    message-id
-                                                                    {:stream 
out-stream-id 
-                                                                     :values 
(if debug? values nil)}
-                                                                    (if 
(sampler) (System/currentTimeMillis))])
-                                             (task/send-unanchored task-data
-                                                                   
ACKER-INIT-STREAM-ID
-                                                                   [root-id 
(bit-xor-vals out-ids) task-id]))
-                                           (when message-id
-                                             (ack-spout-msg executor-data 
task-data message-id
-                                                            {:stream 
out-stream-id :values values}
-                                                            (if (sampler) 0) 
"0:")))
-                                         (or out-tasks [])
-                                         ))]]
-          (builtin-metrics/register-all (:builtin-metrics task-data) 
storm-conf (:user-context task-data))
-          (builtin-metrics/register-queue-metrics {:sendqueue 
(:batch-transfer-queue executor-data)
-                                                   :receive receive-queue}
-                                                  storm-conf (:user-context 
task-data))
-          (when (instance? ICredentialsListener spout-obj) (.setCredentials 
spout-obj initial-credentials))
-
-          (.open spout-obj
-                 storm-conf
-                 (:user-context task-data)
-                 (SpoutOutputCollector.
-                  (reify ISpoutOutputCollector
-                    (^long getPendingCount[this]
-                      (.size pending)
-                      )
-                    (^List emit [this ^String stream-id ^List tuple ^Object 
message-id]
-                      (send-spout-msg stream-id tuple message-id nil)
-                      )
-                    (^void emitDirect [this ^int out-task-id ^String stream-id
-                                       ^List tuple ^Object message-id]
-                      (send-spout-msg stream-id tuple message-id out-task-id)
-                      )
-                    (reportError [this error]
-                      (report-error error)
-                      )))))
-        (reset! open-or-prepare-was-called? true) 
-        (log-message "Opened spout " component-id ":" (keys task-datas))
-        (setup-metrics! executor-data)
-        
-        (fn []
-          ;; This design requires that spouts be non-blocking
-          (disruptor/consume-batch receive-queue event-handler)
-          
-          (let [active? @(:storm-active-atom executor-data)
-                curr-count (.get emitted-count)
-                backpressure-enabled ((:storm-conf executor-data) 
TOPOLOGY-BACKPRESSURE-ENABLE)
-                throttle-on (and backpressure-enabled
-                              @(:throttle-on (:worker executor-data)))
-                reached-max-spout-pending (and max-spout-pending
-                                               (>= (.size pending) 
max-spout-pending))
-                ]
-            (if active?
-              ; activated
-              (do
-                (when-not @last-active
-                  (reset! last-active true)
-                  (log-message "Activating spout " component-id ":" (keys 
task-datas))
-                  (fast-list-iter [^ISpout spout spouts] (.activate spout)))
-
-                (if (and (not (.isFull transfer-queue))
-                      (not throttle-on)
-                      (not reached-max-spout-pending))
-                  (fast-list-iter [^ISpout spout spouts] (.nextTuple spout))))
-              ; deactivated
-              (do
-                (when @last-active
-                  (reset! last-active false)
-                  (log-message "Deactivating spout " component-id ":" (keys 
task-datas))
-                  (fast-list-iter [^ISpout spout spouts] (.deactivate spout)))
-                ;; TODO: log that it's getting throttled
-                (Time/sleep 100)
-                (builtin-metrics/skipped-inactive! (:spout-throttling-metrics 
executor-data) (:stats executor-data))))
-
-            (if (and (= curr-count (.get emitted-count)) active?)
-              (do (.increment empty-emit-streak)
-                  (.emptyEmit spout-wait-strategy (.get empty-emit-streak))
-                  ;; update the spout throttling metrics
-                  (if throttle-on
-                    (builtin-metrics/skipped-throttle! 
(:spout-throttling-metrics executor-data) (:stats executor-data))
-                    (if reached-max-spout-pending
-                      (builtin-metrics/skipped-max-spout! 
(:spout-throttling-metrics executor-data) (:stats executor-data)))))
-              (.set empty-emit-streak 0)
-              ))
-          0))
-      :kill-fn (:report-error-and-die executor-data)
-      :factory? true
-      :thread-name (str component-id "-executor" (:executor-id 
executor-data)))]))
-
-(defn- tuple-time-delta! [^TupleImpl tuple]
-  (let [ms (.getProcessSampleStartTime tuple)]
-    (if ms
-      (time-delta-ms ms))))
-      
-(defn- tuple-execute-time-delta! [^TupleImpl tuple]
-  (let [ms (.getExecuteSampleStartTime tuple)]
-    (if ms
-      (time-delta-ms ms))))
-
-(defn put-xor! [^Map pending key id]
-  (let [curr (or (.get pending key) (long 0))]
-    (.put pending key (bit-xor curr id))))
-
-(defmethod mk-threads :bolt [executor-data task-datas initial-credentials]
-  (let [storm-conf (:storm-conf executor-data)
-        execute-sampler (mk-stats-sampler storm-conf)
-        executor-stats (:stats executor-data)
-        {:keys [storm-conf component-id worker-context transfer-fn 
report-error sampler
-                open-or-prepare-was-called?]} executor-data
-        rand (Random. (Utils/secureRandomLong))
-
-        tuple-action-fn (fn [task-id ^TupleImpl tuple]
-                          ;; synchronization needs to be done with a key 
provided by this bolt, otherwise:
-                          ;; spout 1 sends synchronization (s1), dies, same 
spout restarts somewhere else, sends synchronization (s2) and incremental 
update. s2 and update finish before s1 -> lose the incremental update
-                          ;; TODO: for state sync, need to first send sync 
messages in a loop and receive tuples until synchronization
-                          ;; buffer other tuples until fully synchronized, 
then process all of those tuples
-                          ;; then go into normal loop
-                          ;; spill to disk?
-                          ;; could be receiving incremental updates while 
waiting for sync or even a partial sync because of another failed task
-                          ;; should remember sync requests and include a 
random sync id in the request. drop anything not related to active sync requests
-                          ;; or just timeout the sync messages that are coming 
in until full sync is hit from that task
-                          ;; need to drop incremental updates from tasks where 
waiting for sync. otherwise, buffer the incremental updates
-                          ;; TODO: for state sync, need to check if tuple 
comes from state spout. if so, update state
-                          ;; TODO: how to handle incremental updates as well 
as synchronizations at same time
-                          ;; TODO: need to version tuples somehow
-                          
-                          ;;(log-debug "Received tuple " tuple " at task " 
task-id)
-                          ;; need to do it this way to avoid reflection
-                          (let [stream-id (.getSourceStreamId tuple)]
-                            (condp = stream-id
-                              Constants/CREDENTIALS_CHANGED_STREAM_ID 
-                                (let [task-data (get task-datas task-id)
-                                      bolt-obj (:object task-data)]
-                                  (when (instance? ICredentialsListener 
bolt-obj)
-                                    (.setCredentials bolt-obj (.getValue tuple 
0))))
-                              Constants/METRICS_TICK_STREAM_ID (metrics-tick 
executor-data (get task-datas task-id) tuple)
-                              (let [task-data (get task-datas task-id)
-                                    ^IBolt bolt-obj (:object task-data)
-                                    user-context (:user-context task-data)
-                                    sampler? (sampler)
-                                    execute-sampler? (execute-sampler)
-                                    now (if (or sampler? execute-sampler?) 
(System/currentTimeMillis))
-                                    receive-queue (:receive-queue 
executor-data)]
-                                (when sampler?
-                                  (.setProcessSampleStartTime tuple now))
-                                (when execute-sampler?
-                                  (.setExecuteSampleStartTime tuple now))
-                                (.execute bolt-obj tuple)
-                                (let [delta (tuple-execute-time-delta! tuple)]
-                                  (when (= true (storm-conf TOPOLOGY-DEBUG))
-                                    (log-message "Execute done TUPLE " tuple " 
TASK: " task-id " DELTA: " delta))
- 
-                                  (task/apply-hooks user-context .boltExecute 
(BoltExecuteInfo. tuple task-id delta))
-                                  (when delta
-                                    (stats/bolt-execute-tuple! executor-stats
-                                                               
(.getSourceComponent tuple)
-                                                               
(.getSourceStreamId tuple)
-                                                               delta)))))))
-        has-eventloggers? (has-eventloggers? storm-conf)]
-    
-    ;; TODO: can get any SubscribedState objects out of the context now
-
-    [(async-loop
-      (fn []
-        ;; If topology was started in inactive state, don't call prepare bolt 
until it's activated first.
-        (while (not @(:storm-active-atom executor-data))          
-          (Thread/sleep 100))
-        
-        (log-message "Preparing bolt " component-id ":" (keys task-datas))
-        (doseq [[task-id task-data] task-datas
-                :let [^IBolt bolt-obj (:object task-data)
-                      tasks-fn (:tasks-fn task-data)
-                      user-context (:user-context task-data)
-                      bolt-emit (fn [stream anchors values task]
-                                  (let [out-tasks (if task
-                                                    (tasks-fn task stream 
values)
-                                                    (tasks-fn stream values))]
-                                    (fast-list-iter [t out-tasks]
-                                                    (let [anchors-to-ids 
(HashMap.)]
-                                                      (fast-list-iter 
[^TupleImpl a anchors]
-                                                                      (let 
[root-ids (-> a .getMessageId .getAnchorsToIds .keySet)]
-                                                                        (when 
(pos? (count root-ids))
-                                                                          (let 
[edge-id (MessageId/generateId rand)]
-                                                                            
(.updateAckVal a edge-id)
-                                                                            
(fast-list-iter [root-id root-ids]
-                                                                               
             (put-xor! anchors-to-ids root-id edge-id))
-                                                                            
))))
-                                                        (let [tuple 
(TupleImpl. worker-context
-                                                                               
values
-                                                                               
task-id
-                                                                               
stream
-                                                                               
(MessageId/makeId anchors-to-ids))]
-                                                          (transfer-fn t 
tuple))))
-                                    (if has-eventloggers?
-                                      (send-to-eventlogger executor-data 
task-data values component-id nil rand))
-                                    (or out-tasks [])))]]
-          (builtin-metrics/register-all (:builtin-metrics task-data) 
storm-conf user-context)
-          (when (instance? ICredentialsListener bolt-obj) (.setCredentials 
bolt-obj initial-credentials)) 
-          (if (= component-id Constants/SYSTEM_COMPONENT_ID)
-            (do
-              (builtin-metrics/register-queue-metrics {:sendqueue 
(:batch-transfer-queue executor-data)
-                                                       :receive 
(:receive-queue executor-data)
-                                                       :transfer 
(:transfer-queue (:worker executor-data))}
-                                                      storm-conf user-context)
-              (builtin-metrics/register-iconnection-client-metrics 
(:cached-node+port->socket (:worker executor-data)) storm-conf user-context)
-              (builtin-metrics/register-iconnection-server-metric (:receiver 
(:worker executor-data)) storm-conf user-context))
-            (builtin-metrics/register-queue-metrics {:sendqueue 
(:batch-transfer-queue executor-data)
-                                                     :receive (:receive-queue 
executor-data)}
-                                                    storm-conf user-context)
-            )
-
-          (.prepare bolt-obj
-                    storm-conf
-                    user-context
-                    (OutputCollector.
-                     (reify IOutputCollector
-                       (emit [this stream anchors values]
-                         (bolt-emit stream anchors values nil))
-                       (emitDirect [this task stream anchors values]
-                         (bolt-emit stream anchors values task))
-                       (^void ack [this ^Tuple tuple]
-                         (let [^TupleImpl tuple tuple
-                               ack-val (.getAckVal tuple)]
-                           (fast-map-iter [[root id] (.. tuple getMessageId 
getAnchorsToIds)]
-                                          (task/send-unanchored task-data
-                                                                
ACKER-ACK-STREAM-ID
-                                                                [root (bit-xor 
id ack-val)])))
-                         (let [delta (tuple-time-delta! tuple)
-                               debug? (= true (storm-conf TOPOLOGY-DEBUG))]
-                           (when debug? 
-                             (log-message "BOLT ack TASK: " task-id " TIME: " 
delta " TUPLE: " tuple))
-                           (task/apply-hooks user-context .boltAck 
(BoltAckInfo. tuple task-id delta))
-                           (when delta
-                             (stats/bolt-acked-tuple! executor-stats
-                                                      (.getSourceComponent 
tuple)
-                                                      (.getSourceStreamId 
tuple)
-                                                      delta))))
-                       (^void fail [this ^Tuple tuple]
-                         (fast-list-iter [root (.. tuple getMessageId 
getAnchors)]
-                                         (task/send-unanchored task-data
-                                                               
ACKER-FAIL-STREAM-ID
-                                                               [root]))
-                         (let [delta (tuple-time-delta! tuple)
-                               debug? (= true (storm-conf TOPOLOGY-DEBUG))]
-                           (when debug? 
-                             (log-message "BOLT fail TASK: " task-id " TIME: " 
delta " TUPLE: " tuple))
-                           (task/apply-hooks user-context .boltFail 
(BoltFailInfo. tuple task-id delta))
-                           (when delta
-                             (stats/bolt-failed-tuple! executor-stats
-                                                       (.getSourceComponent 
tuple)
-                                                       (.getSourceStreamId 
tuple)
-                                                       delta))))
-                       (reportError [this error]
-                         (report-error error)
-                         )))))
-        (reset! open-or-prepare-was-called? true)        
-        (log-message "Prepared bolt " component-id ":" (keys task-datas))
-        (setup-metrics! executor-data)
-
-        (let [receive-queue (:receive-queue executor-data)
-              event-handler (mk-task-receiver executor-data tuple-action-fn)]
-          (fn []            
-            (disruptor/consume-batch-when-available receive-queue 
event-handler)
-            0)))
-      :kill-fn (:report-error-and-die executor-data)
-      :factory? true
-      :thread-name (str component-id "-executor" (:executor-id 
executor-data)))]))
-
-(defmethod close-component :spout [executor-data spout]
-  (.close spout))
-
-(defmethod close-component :bolt [executor-data bolt]
-  (.cleanup bolt))
-
-;; TODO: refactor this to be part of an executor-specific map
-(defmethod mk-executor-stats :spout [_ rate]
-  (stats/mk-spout-stats rate))
-
-(defmethod mk-executor-stats :bolt [_ rate]
-  (stats/mk-bolt-stats rate))

Reply via email to