Merge apache master branch into STORM-1272

Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/6696e3f1
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/6696e3f1
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/6696e3f1

Branch: refs/heads/master
Commit: 6696e3f1d8309f85d487a3361306f67c5608a0ff
Parents: 55b26dd d041183
Author: Abhishek Agarwal <[email protected]>
Authored: Fri Feb 12 02:14:42 2016 +0530
Committer: Abhishek Agarwal <[email protected]>
Committed: Fri Feb 12 02:14:42 2016 +0530

----------------------------------------------------------------------
 CHANGELOG.md                                    |   4 +-
 README.markdown                                 |   1 +
 dev-tools/travis/travis-script.sh               |   4 +-
 external/sql/storm-sql-core/pom.xml             |   9 +
 .../storm/hbase/security/HBaseSecurityUtil.java |  36 +-
 external/storm-mqtt/core/pom.xml                |   4 +-
 pom.xml                                         |   9 +-
 storm-core/pom.xml                              |  11 +-
 .../src/clj/org/apache/storm/LocalCluster.clj   |   4 +-
 storm-core/src/clj/org/apache/storm/clojure.clj |   8 +-
 storm-core/src/clj/org/apache/storm/cluster.clj |  27 +-
 .../cluster_state/zookeeper_state_factory.clj   |  11 +-
 .../clj/org/apache/storm/command/blobstore.clj  |  11 +-
 .../org/apache/storm/command/dev_zookeeper.clj  |   6 +-
 .../clj/org/apache/storm/command/get_errors.clj |  12 +-
 .../apache/storm/command/shell_submission.clj   |   4 +-
 storm-core/src/clj/org/apache/storm/config.clj  |  18 +-
 .../src/clj/org/apache/storm/converter.clj      |  14 +-
 .../src/clj/org/apache/storm/daemon/acker.clj   |  21 +-
 .../src/clj/org/apache/storm/daemon/common.clj  |  29 +-
 .../src/clj/org/apache/storm/daemon/drpc.clj    |  23 +-
 .../clj/org/apache/storm/daemon/executor.clj    | 552 ++++++-----
 .../clj/org/apache/storm/daemon/logviewer.clj   |  68 +-
 .../src/clj/org/apache/storm/daemon/nimbus.clj  | 170 ++--
 .../clj/org/apache/storm/daemon/supervisor.clj  | 200 ++--
 .../src/clj/org/apache/storm/daemon/task.clj    |   2 +-
 .../src/clj/org/apache/storm/daemon/worker.clj  |  70 +-
 .../src/clj/org/apache/storm/disruptor.clj      |  36 -
 storm-core/src/clj/org/apache/storm/event.clj   |   2 +-
 .../src/clj/org/apache/storm/local_state.clj    |   9 +-
 .../org/apache/storm/pacemaker/pacemaker.clj    |   7 +-
 .../storm/pacemaker/pacemaker_state_factory.clj |  24 +-
 .../clj/org/apache/storm/process_simulator.clj  |   4 +-
 .../apache/storm/scheduler/DefaultScheduler.clj |   7 +-
 .../apache/storm/scheduler/EvenScheduler.clj    |  23 +-
 .../storm/scheduler/IsolationScheduler.clj      |  29 +-
 storm-core/src/clj/org/apache/storm/stats.clj   |  82 +-
 storm-core/src/clj/org/apache/storm/testing.clj |  83 +-
 storm-core/src/clj/org/apache/storm/thrift.clj  |   6 +-
 storm-core/src/clj/org/apache/storm/timer.clj   |  12 +-
 .../clj/org/apache/storm/trident/testing.clj    |   9 +-
 storm-core/src/clj/org/apache/storm/ui/core.clj |  97 +-
 .../src/clj/org/apache/storm/ui/helpers.clj     |  14 +-
 storm-core/src/clj/org/apache/storm/util.clj    | 923 +----------------
 .../src/clj/org/apache/storm/zookeeper.clj      |   1 -
 .../storm/logging/ThriftAccessLogger.java       |  13 +-
 .../serialization/SerializationFactory.java     |  17 +-
 .../staticmocking/MockedConfigUtils.java        |  31 -
 .../jvm/org/apache/storm/utils/ConfigUtils.java |  20 +-
 .../jvm/org/apache/storm/utils/Container.java   |  11 +-
 .../jvm/org/apache/storm/utils/IPredicate.java  |  27 +
 .../org/apache/storm/utils/NimbusClient.java    |   2 +-
 .../jvm/org/apache/storm/utils/TestUtils.java   |  34 -
 .../src/jvm/org/apache/storm/utils/Time.java    |  26 +-
 .../src/jvm/org/apache/storm/utils/Utils.java   | 989 +++++++++++++++++--
 .../storm/validation/ConfigValidation.java      |   2 +-
 .../org/apache/storm/zookeeper/Zookeeper.java   |   7 +
 .../org/apache/storm/integration_test.clj       | 100 +-
 .../org/apache/storm/testing4j_test.clj         |  37 +-
 .../apache/storm/trident/integration_test.clj   |  15 +-
 .../test/clj/org/apache/storm/cluster_test.clj  |  20 +-
 .../test/clj/org/apache/storm/drpc_test.clj     |  23 +-
 .../clj/org/apache/storm/logviewer_test.clj     | 267 ++---
 .../storm/messaging/netty_integration_test.clj  |   2 +-
 .../test/clj/org/apache/storm/nimbus_test.clj   | 131 ++-
 .../scheduler/resource_aware_scheduler_test.clj |  21 +-
 .../apache/storm/security/auth/auth_test.clj    |  11 +-
 .../authorizer/DRPCSimpleACLAuthorizer_test.clj |   2 +-
 .../BlowfishTupleSerializer_test.clj            |   1 -
 .../clj/org/apache/storm/serialization_test.clj |  23 +-
 .../clj/org/apache/storm/supervisor_test.clj    | 645 ++++++------
 .../clj/org/apache/storm/transactional_test.clj |  18 +
 .../clj/org/apache/storm/trident/state_test.clj |   5 +-
 .../clj/org/apache/storm/trident/tuple_test.clj |  15 +-
 .../test/clj/org/apache/storm/utils_test.clj    |  16 +-
 .../test/clj/org/apache/storm/worker_test.clj   |   1 -
 .../staticmocking/ConfigUtilsInstaller.java     |  38 +
 .../utils/staticmocking/UtilsInstaller.java     |  38 +
 .../storm/utils/staticmocking/package-info.java |  95 ++
 79 files changed, 3042 insertions(+), 2357 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/6696e3f1/storm-core/src/clj/org/apache/storm/daemon/executor.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/org/apache/storm/daemon/executor.clj
index 619a885,e2380b7..03db855
--- a/storm-core/src/clj/org/apache/storm/daemon/executor.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/executor.clj
@@@ -36,10 -36,11 +36,12 @@@
    (:import [org.apache.storm Config Constants])
    (:import [org.apache.storm.cluster ClusterStateContext DaemonType])
    (:import [org.apache.storm.grouping LoadAwareCustomStreamGrouping 
LoadAwareShuffleGrouping LoadMapping ShuffleGrouping])
-   (:import [java.util.concurrent ConcurrentLinkedQueue]
-            (com.lmax.disruptor.dsl ProducerType))
+   (:import [java.lang Thread Thread$UncaughtExceptionHandler]
+            [java.util.concurrent ConcurrentLinkedQueue]
 -           [org.json.simple JSONValue])
++           [org.json.simple JSONValue]
++           [com.lmax.disruptor.dsl ProducerType])
    (:require [org.apache.storm [thrift :as thrift]
--             [cluster :as cluster] [disruptor :as disruptor] [stats :as 
stats]])
++             [cluster :as cluster] [stats :as stats]])
    (:require [org.apache.storm.daemon [task :as task]])
    (:require [org.apache.storm.daemon.builtin-metrics :as builtin-metrics])
    (:require [clojure.set :as set]))
@@@ -300,17 -305,16 +306,19 @@@
          cached-emit (MutableObject. (ArrayList.))
          storm-conf (:storm-conf executor-data)
          serializer (KryoTupleSerializer. storm-conf (:worker-context 
executor-data))
++        ^DisruptorQueue batch-transfer-queue (:batch-transfer-queue 
executor-data)
++        handler (reify com.lmax.disruptor.EventHandler
++                  (onEvent [this o seq-id batch-end?]
++                    (let [^ArrayList alist (.getObject cached-emit)]
++                      (.add alist o)
++                      (when batch-end?
++                        (worker-transfer-fn serializer alist)
++                        (.setObject cached-emit (ArrayList.))))))
          ]
--    (disruptor/consume-loop*
--      (:batch-transfer-queue executor-data)
-       (reify com.lmax.disruptor.EventHandler
-         (onEvent [this o seq-id batch-end?]
-           (let [^ArrayList alist (.getObject cached-emit)]
-             (.add alist o)
-             (when batch-end?
-               (worker-transfer-fn serializer alist)
-               (.setObject cached-emit (ArrayList.))))))
-       :kill-fn (:report-error-and-die executor-data))))
 -      (disruptor/handler [o seq-id batch-end?]
 -        (let [^ArrayList alist (.getObject cached-emit)]
 -          (.add alist o)
 -          (when batch-end?
 -            (worker-transfer-fn serializer alist)
 -            (.setObject cached-emit (ArrayList.)))))
 -      :uncaught-exception-handler (:report-error-and-die executor-data))))
++    (Utils/asyncLoop
++      (fn [] (.consumeBatchWhenAvailable batch-transfer-queue handler) 0)
++      (.getName batch-transfer-queue)
++      (:uncaught-exception-handler (:report-error-and-die executor-data)))))
  
  (defn setup-metrics! [executor-data]
    (let [{:keys [storm-conf receive-queue worker-context 
interval->task->metric-registry]} executor-data
@@@ -540,132 -549,130 +553,130 @@@
          has-ackers? (has-ackers? storm-conf)
          has-eventloggers? (has-eventloggers? storm-conf)
          emitted-count (MutableLong. 0)
-         empty-emit-streak (MutableLong. 0)]
-    
-     [(async-loop
-       (fn []
-         ;; If topology was started in inactive state, don't call (.open 
spout) until it's activated first.
-         (while (not @(:storm-active-atom executor-data))
-           (Thread/sleep 100))
-         
-         (log-message "Opening spout " component-id ":" (keys task-datas))
-         (builtin-metrics/register-spout-throttling-metrics 
(:spout-throttling-metrics executor-data) storm-conf (:user-context (first 
(vals task-datas))))
-         (doseq [[task-id task-data] task-datas
-                 :let [^ISpout spout-obj (:object task-data)
-                      tasks-fn (:tasks-fn task-data)
-                      send-spout-msg (fn [out-stream-id values message-id 
out-task-id]
-                                        (.increment emitted-count)
-                                        (let [out-tasks (if out-task-id
-                                                          (tasks-fn 
out-task-id out-stream-id values)
-                                                          (tasks-fn 
out-stream-id values))
-                                              rooted? (and message-id 
has-ackers?)
-                                              root-id (if rooted? 
(MessageId/generateId rand))
-                                              ^List out-ids (fast-list-for [t 
out-tasks] (if rooted? (MessageId/generateId rand)))]
-                                          (fast-list-iter [out-task out-tasks 
id out-ids]
-                                                          (let [tuple-id (if 
rooted?
-                                                                           
(MessageId/makeRootId root-id id)
-                                                                           
(MessageId/makeUnanchored))
-                                                                out-tuple 
(TupleImpl. worker-context
-                                                                               
       values
-                                                                               
       task-id
-                                                                               
       out-stream-id
-                                                                               
       tuple-id)]
-                                                            (transfer-fn 
out-task out-tuple)))
-                                          (if has-eventloggers?
-                                            (send-to-eventlogger executor-data 
task-data values component-id message-id rand))
-                                          (if (and rooted?
-                                                   (not (.isEmpty out-ids)))
-                                            (do
-                                              (.put pending root-id [task-id
-                                                                     message-id
-                                                                     {:stream 
out-stream-id 
-                                                                      :values 
(if debug? values nil)}
-                                                                     (if 
(sampler) (System/currentTimeMillis))])
-                                              (task/send-unanchored task-data
-                                                                    
ACKER-INIT-STREAM-ID
-                                                                    [root-id 
(bit-xor-vals out-ids) task-id]))
-                                            (when message-id
-                                              (ack-spout-msg executor-data 
task-data message-id
-                                                             {:stream 
out-stream-id :values values}
-                                                             (if (sampler) 0) 
"0:")))
-                                          (or out-tasks [])
-                                          ))]]
-           (builtin-metrics/register-all (:builtin-metrics task-data) 
storm-conf (:user-context task-data))
-           (builtin-metrics/register-queue-metrics {:sendqueue 
(:batch-transfer-queue executor-data)
-                                                    :receive receive-queue}
-                                                   storm-conf (:user-context 
task-data))
-           (when (instance? ICredentialsListener spout-obj) (.setCredentials 
spout-obj initial-credentials))
- 
-           (.open spout-obj
-                  storm-conf
-                  (:user-context task-data)
-                  (SpoutOutputCollector.
-                   (reify ISpoutOutputCollector
-                     (^long getPendingCount[this]
-                       (.size pending)
-                       )
-                     (^List emit [this ^String stream-id ^List tuple ^Object 
message-id]
-                       (send-spout-msg stream-id tuple message-id nil)
-                       )
-                     (^void emitDirect [this ^int out-task-id ^String stream-id
-                                        ^List tuple ^Object message-id]
-                       (send-spout-msg stream-id tuple message-id out-task-id)
-                       )
-                     (reportError [this error]
-                       (report-error error)
-                       )))))
-         (reset! open-or-prepare-was-called? true) 
-         (log-message "Opened spout " component-id ":" (keys task-datas))
-         (setup-metrics! executor-data)
-         
-         (fn []
-           ;; This design requires that spouts be non-blocking
-           (.consumeBatch ^DisruptorQueue receive-queue event-handler)
-           
-           (let [active? @(:storm-active-atom executor-data)
-                 curr-count (.get emitted-count)
-                 backpressure-enabled ((:storm-conf executor-data) 
TOPOLOGY-BACKPRESSURE-ENABLE)
-                 throttle-on (and backpressure-enabled
-                               @(:throttle-on (:worker executor-data)))
-                 reached-max-spout-pending (and max-spout-pending
-                                                (>= (.size pending) 
max-spout-pending))
-                 ]
-             (if active?
-               ; activated
-               (do
-                 (when-not @last-active
-                   (reset! last-active true)
-                   (log-message "Activating spout " component-id ":" (keys 
task-datas))
-                   (fast-list-iter [^ISpout spout spouts] (.activate spout)))
- 
-                 (if (and (not (.isFull transfer-queue))
-                       (not throttle-on)
-                       (not reached-max-spout-pending))
-                   (fast-list-iter [^ISpout spout spouts] (.nextTuple spout))))
-               ; deactivated
-               (do
-                 (when @last-active
-                   (reset! last-active false)
-                   (log-message "Deactivating spout " component-id ":" (keys 
task-datas))
-                   (fast-list-iter [^ISpout spout spouts] (.deactivate spout)))
-                 ;; TODO: log that it's getting throttled
-                 (Time/sleep 100)
-                 (builtin-metrics/skipped-inactive! (:spout-throttling-metrics 
executor-data) (:stats executor-data))))
- 
-             (if (and (= curr-count (.get emitted-count)) active?)
-               (do (.increment empty-emit-streak)
-                   (.emptyEmit spout-wait-strategy (.get empty-emit-streak))
-                   ;; update the spout throttling metrics
-                   (if throttle-on
-                     (builtin-metrics/skipped-throttle! 
(:spout-throttling-metrics executor-data) (:stats executor-data))
-                     (if reached-max-spout-pending
-                       (builtin-metrics/skipped-max-spout! 
(:spout-throttling-metrics executor-data) (:stats executor-data)))))
-               (.set empty-emit-streak 0)
-               ))
-           0))
-       :kill-fn (:report-error-and-die executor-data)
-       :factory? true
-       :thread-name (str component-id "-executor" (:executor-id 
executor-data)))]))
+         empty-emit-streak (MutableLong. 0)
+         spout-transfer-fn (fn []
+                             ;; If topology was started in inactive state, 
don't call (.open spout) until it's activated first.
+                             (while (not @(:storm-active-atom executor-data))
+                               (Thread/sleep 100))
+                             (log-message "Opening spout " component-id ":" 
(keys task-datas))
+                             
(builtin-metrics/register-spout-throttling-metrics (:spout-throttling-metrics 
executor-data) storm-conf (:user-context (first (vals task-datas))))
+                             (doseq [[task-id task-data] task-datas
+                                     :let [^ISpout spout-obj (:object 
task-data)
+                                           tasks-fn (:tasks-fn task-data)
+                                           send-spout-msg (fn [out-stream-id 
values message-id out-task-id]
+                                                            (.increment 
emitted-count)
+                                                            (let [out-tasks 
(if out-task-id
+                                                                              
(tasks-fn out-task-id out-stream-id values)
+                                                                              
(tasks-fn out-stream-id values))
+                                                                  rooted? (and 
message-id has-ackers?)
+                                                                  root-id (if 
rooted? (MessageId/generateId rand))
+                                                                  ^List 
out-ids (fast-list-for [t out-tasks] (if rooted? (MessageId/generateId rand)))]
+                                                              (fast-list-iter 
[out-task out-tasks id out-ids]
+                                                                              
(let [tuple-id (if rooted?
+                                                                               
                (MessageId/makeRootId root-id id)
+                                                                               
                (MessageId/makeUnanchored))
+                                                                               
     out-tuple (TupleImpl. worker-context
+                                                                               
                           values
+                                                                               
                           task-id
+                                                                               
                           out-stream-id
+                                                                               
                           tuple-id)]
+                                                                               
 (transfer-fn out-task out-tuple)))
+                                                              (if 
has-eventloggers?
+                                                                
(send-to-eventlogger executor-data task-data values component-id message-id 
rand))
+                                                              (if (and rooted?
+                                                                       (not 
(.isEmpty out-ids)))
+                                                                (do
+                                                                  (.put 
pending root-id [task-id
+                                                                               
          message-id
+                                                                               
          {:stream out-stream-id 
+                                                                               
           :values (if debug? values nil)}
+                                                                               
          (if (sampler) (System/currentTimeMillis))])
+                                                                  
(task/send-unanchored task-data
+                                                                               
         ACKER-INIT-STREAM-ID
+                                                                               
         [root-id (bit-xor-vals out-ids) task-id]))
+                                                                (when 
message-id
+                                                                  
(ack-spout-msg executor-data task-data message-id
+                                                                               
  {:stream out-stream-id :values values}
+                                                                               
  (if (sampler) 0) "0:")))
+                                                              (or out-tasks 
[])))]]
+ 
+                               (builtin-metrics/register-all (:builtin-metrics 
task-data) storm-conf (:user-context task-data))
+                               (builtin-metrics/register-queue-metrics 
{:sendqueue (:batch-transfer-queue executor-data)
+                                                                        
:receive receive-queue}
+                                                                       
storm-conf (:user-context task-data))
+                               (when (instance? ICredentialsListener 
spout-obj) (.setCredentials spout-obj initial-credentials))
+ 
+                               (.open spout-obj
+                                      storm-conf
+                                      (:user-context task-data)
+                                      (SpoutOutputCollector.
+                                       (reify ISpoutOutputCollector
+                                         (^long getPendingCount[this]
+                                           (.size pending))
+                                         (^List emit [this ^String stream-id 
^List tuple ^Object message-id]
+                                           (send-spout-msg stream-id tuple 
message-id nil))
+                                         (^void emitDirect [this ^int 
out-task-id ^String stream-id
+                                                            ^List tuple 
^Object message-id]
+                                           (send-spout-msg stream-id tuple 
message-id out-task-id))
+                                         (reportError [this error]
+                                           (report-error error))))))
+ 
+                             (reset! open-or-prepare-was-called? true) 
+                             (log-message "Opened spout " component-id ":" 
(keys task-datas))
+                             (setup-metrics! executor-data)
+ 
+                             (fn []
+                               ;; This design requires that spouts be 
non-blocking
 -                              (disruptor/consume-batch receive-queue 
event-handler)
++                              (.consumeBatch ^DisruptorQueue receive-queue 
event-handler)
+ 
+                               (let [active? @(:storm-active-atom 
executor-data)
+                                     curr-count (.get emitted-count)
+                                     backpressure-enabled ((:storm-conf 
executor-data) TOPOLOGY-BACKPRESSURE-ENABLE)
+                                     throttle-on (and backpressure-enabled
+                                                      @(:throttle-on (:worker 
executor-data)))
+                                     reached-max-spout-pending (and 
max-spout-pending
+                                                                    (>= (.size 
pending) max-spout-pending))]
+                                 (if active?
+                                         ; activated
+                                   (do
+                                     (when-not @last-active
+                                       (reset! last-active true)
+                                       (log-message "Activating spout " 
component-id ":" (keys task-datas))
+                                       (fast-list-iter [^ISpout spout spouts] 
(.activate spout)))
+ 
+                                     (if (and (not (.isFull transfer-queue))
+                                              (not throttle-on)
+                                              (not reached-max-spout-pending))
+                                       (fast-list-iter [^ISpout spout spouts] 
(.nextTuple spout))))
+                                         ; deactivated
+                                   (do
+                                     (when @last-active
+                                       (reset! last-active false)
+                                       (log-message "Deactivating spout " 
component-id ":" (keys task-datas))
+                                       (fast-list-iter [^ISpout spout spouts] 
(.deactivate spout)))
+                                     ;; TODO: log that it's getting throttled
+                                     (Time/sleep 100)
+                                     (builtin-metrics/skipped-inactive! 
(:spout-throttling-metrics executor-data) (:stats executor-data))))
+ 
+                                 (if (and (= curr-count (.get emitted-count)) 
active?)
+                                   (do (.increment empty-emit-streak)
+                                       (.emptyEmit spout-wait-strategy (.get 
empty-emit-streak))
+                                       ;; update the spout throttling metrics
+                                       (if throttle-on
+                                         (builtin-metrics/skipped-throttle! 
(:spout-throttling-metrics executor-data) (:stats executor-data))
+                                         (if reached-max-spout-pending
+                                           (builtin-metrics/skipped-max-spout! 
(:spout-throttling-metrics executor-data) (:stats executor-data)))))
+                                   (.set empty-emit-streak 0)))
+                               0))]
+ 
+     [(Utils/asyncLoop
+       spout-transfer-fn
+       false ; isDaemon
+       (:report-error-and-die executor-data)
+       Thread/NORM_PRIORITY
+       true ; isFactory
+       true ; startImmediately
+       (str component-id "-executor" (:executor-id executor-data)))]))
  
  (defn- tuple-time-delta! [^TupleImpl tuple]
    (let [ms (.getProcessSampleStartTime tuple)]
@@@ -736,115 -743,116 +747,116 @@@
                                                                 
(.getSourceComponent tuple)
                                                                 
(.getSourceStreamId tuple)
                                                                 delta)))))))
-         has-eventloggers? (has-eventloggers? storm-conf)]
-     
+         has-eventloggers? (has-eventloggers? storm-conf)
+         bolt-transfer-fn (fn []
+                            ;; If topology was started in inactive state, 
don't call prepare bolt until it's activated first.
+                            (while (not @(:storm-active-atom executor-data))
+                              (Thread/sleep 100))
+ 
+                            (log-message "Preparing bolt " component-id ":" 
(keys task-datas))
+                            (doseq [[task-id task-data] task-datas
+                                    :let [^IBolt bolt-obj (:object task-data)
+                                          tasks-fn (:tasks-fn task-data)
+                                          user-context (:user-context 
task-data)
+                                          bolt-emit (fn [stream anchors values 
task]
+                                                      (let [out-tasks (if task
+                                                                        
(tasks-fn task stream values)
+                                                                        
(tasks-fn stream values))]
+                                                        (fast-list-iter [t 
out-tasks]
+                                                                        (let 
[anchors-to-ids (HashMap.)]
+                                                                          
(fast-list-iter [^TupleImpl a anchors]
+                                                                               
           (let [root-ids (-> a .getMessageId .getAnchorsToIds .keySet)]
+                                                                               
             (when (pos? (count root-ids))
+                                                                               
               (let [edge-id (MessageId/generateId rand)]
+                                                                               
                 (.updateAckVal a edge-id)
+                                                                               
                 (fast-list-iter [root-id root-ids]
+                                                                               
                                 (put-xor! anchors-to-ids root-id edge-id))))))
+                                                                          (let 
[tuple (TupleImpl. worker-context
+                                                                               
                   values
+                                                                               
                   task-id
+                                                                               
                   stream
+                                                                               
                   (MessageId/makeId anchors-to-ids))]
+                                                                            
(transfer-fn t tuple))))
+                                                        (if has-eventloggers?
+                                                          (send-to-eventlogger 
executor-data task-data values component-id nil rand))
+                                                        (or out-tasks [])))]]
+                              (builtin-metrics/register-all (:builtin-metrics 
task-data) storm-conf user-context)
+                              (when (instance? ICredentialsListener bolt-obj) 
(.setCredentials bolt-obj initial-credentials)) 
+                              (if (= component-id 
Constants/SYSTEM_COMPONENT_ID)
+                                (do
+                                  (builtin-metrics/register-queue-metrics 
{:sendqueue (:batch-transfer-queue executor-data)
+                                                                           
:receive (:receive-queue executor-data)
+                                                                           
:transfer (:transfer-queue (:worker executor-data))}
+                                                                          
storm-conf user-context)
+                                  
(builtin-metrics/register-iconnection-client-metrics (:cached-node+port->socket 
(:worker executor-data)) storm-conf user-context)
+                                  
(builtin-metrics/register-iconnection-server-metric (:receiver (:worker 
executor-data)) storm-conf user-context))
+                                (builtin-metrics/register-queue-metrics 
{:sendqueue (:batch-transfer-queue executor-data)
+                                                                         
:receive (:receive-queue executor-data)}
+                                                                        
storm-conf user-context))
+ 
+                              (.prepare bolt-obj
+                                        storm-conf
+                                        user-context
+                                        (OutputCollector.
+                                         (reify IOutputCollector
+                                           (emit [this stream anchors values]
+                                             (bolt-emit stream anchors values 
nil))
+                                           (emitDirect [this task stream 
anchors values]
+                                             (bolt-emit stream anchors values 
task))
+                                           (^void ack [this ^Tuple tuple]
+                                             (let [^TupleImpl tuple tuple
+                                                   ack-val (.getAckVal tuple)]
+                                               (fast-map-iter [[root id] (.. 
tuple getMessageId getAnchorsToIds)]
+                                                              
(task/send-unanchored task-data
+                                                                               
     ACKER-ACK-STREAM-ID
+                                                                               
     [root (bit-xor id ack-val)])))
+                                             (let [delta (tuple-time-delta! 
tuple)
+                                                   debug? (= true (storm-conf 
TOPOLOGY-DEBUG))]
+                                               (when debug?
+                                                 (log-message "BOLT ack TASK: 
" task-id " TIME: " delta " TUPLE: " tuple))
+                                               (task/apply-hooks user-context 
.boltAck (BoltAckInfo. tuple task-id delta))
+                                               (when delta
+                                                 (stats/bolt-acked-tuple! 
executor-stats
+                                                                          
(.getSourceComponent tuple)
+                                                                          
(.getSourceStreamId tuple)
+                                                                          
delta))))
+                                           (^void fail [this ^Tuple tuple]
+                                             (fast-list-iter [root (.. tuple 
getMessageId getAnchors)]
+                                                             
(task/send-unanchored task-data
+                                                                               
    ACKER-FAIL-STREAM-ID
+                                                                               
    [root]))
+                                             (let [delta (tuple-time-delta! 
tuple)
+                                                   debug? (= true (storm-conf 
TOPOLOGY-DEBUG))]
+                                               (when debug?
+                                                 (log-message "BOLT fail TASK: 
" task-id " TIME: " delta " TUPLE: " tuple))
+                                               (task/apply-hooks user-context 
.boltFail (BoltFailInfo. tuple task-id delta))
+                                               (when delta
+                                                 (stats/bolt-failed-tuple! 
executor-stats
+                                                                           
(.getSourceComponent tuple)
+                                                                           
(.getSourceStreamId tuple)
+                                                                           
delta))))
+                                           (reportError [this error]
+                                             (report-error error))))))
+ 
+                            (reset! open-or-prepare-was-called? true)
+                            (log-message "Prepared bolt " component-id ":" 
(keys task-datas))
+                            (setup-metrics! executor-data)
+ 
+                            (let [receive-queue (:receive-queue executor-data)
+                                  event-handler (mk-task-receiver 
executor-data tuple-action-fn)]
+                              (fn []
 -                               (disruptor/consume-batch-when-available 
receive-queue event-handler)
++                               (.consumeBatchWhenAvailable ^DisruptorQueue 
receive-queue event-handler)
+                                0)))]
      ;; TODO: can get any SubscribedState objects out of the context now
  
-     [(async-loop
-       (fn []
-         ;; If topology was started in inactive state, don't call prepare bolt 
until it's activated first.
-         (while (not @(:storm-active-atom executor-data))          
-           (Thread/sleep 100))
-         
-         (log-message "Preparing bolt " component-id ":" (keys task-datas))
-         (doseq [[task-id task-data] task-datas
-                 :let [^IBolt bolt-obj (:object task-data)
-                       tasks-fn (:tasks-fn task-data)
-                       user-context (:user-context task-data)
-                       bolt-emit (fn [stream anchors values task]
-                                   (let [out-tasks (if task
-                                                     (tasks-fn task stream 
values)
-                                                     (tasks-fn stream values))]
-                                     (fast-list-iter [t out-tasks]
-                                                     (let [anchors-to-ids 
(HashMap.)]
-                                                       (fast-list-iter 
[^TupleImpl a anchors]
-                                                                       (let 
[root-ids (-> a .getMessageId .getAnchorsToIds .keySet)]
-                                                                         (when 
(pos? (count root-ids))
-                                                                           
(let [edge-id (MessageId/generateId rand)]
-                                                                             
(.updateAckVal a edge-id)
-                                                                             
(fast-list-iter [root-id root-ids]
-                                                                               
              (put-xor! anchors-to-ids root-id edge-id))
-                                                                             
))))
-                                                         (let [tuple 
(TupleImpl. worker-context
-                                                                               
 values
-                                                                               
 task-id
-                                                                               
 stream
-                                                                               
 (MessageId/makeId anchors-to-ids))]
-                                                           (transfer-fn t 
tuple))))
-                                     (if has-eventloggers?
-                                       (send-to-eventlogger executor-data 
task-data values component-id nil rand))
-                                     (or out-tasks [])))]]
-           (builtin-metrics/register-all (:builtin-metrics task-data) 
storm-conf user-context)
-           (when (instance? ICredentialsListener bolt-obj) (.setCredentials 
bolt-obj initial-credentials)) 
-           (if (= component-id Constants/SYSTEM_COMPONENT_ID)
-             (do
-               (builtin-metrics/register-queue-metrics {:sendqueue 
(:batch-transfer-queue executor-data)
-                                                        :receive 
(:receive-queue executor-data)
-                                                        :transfer 
(:transfer-queue (:worker executor-data))}
-                                                       storm-conf user-context)
-               (builtin-metrics/register-iconnection-client-metrics 
(:cached-node+port->socket (:worker executor-data)) storm-conf user-context)
-               (builtin-metrics/register-iconnection-server-metric (:receiver 
(:worker executor-data)) storm-conf user-context))
-             (builtin-metrics/register-queue-metrics {:sendqueue 
(:batch-transfer-queue executor-data)
-                                                      :receive (:receive-queue 
executor-data)}
-                                                     storm-conf user-context)
-             )
- 
-           (.prepare bolt-obj
-                     storm-conf
-                     user-context
-                     (OutputCollector.
-                      (reify IOutputCollector
-                        (emit [this stream anchors values]
-                          (bolt-emit stream anchors values nil))
-                        (emitDirect [this task stream anchors values]
-                          (bolt-emit stream anchors values task))
-                        (^void ack [this ^Tuple tuple]
-                          (let [^TupleImpl tuple tuple
-                                ack-val (.getAckVal tuple)]
-                            (fast-map-iter [[root id] (.. tuple getMessageId 
getAnchorsToIds)]
-                                           (task/send-unanchored task-data
-                                                                 
ACKER-ACK-STREAM-ID
-                                                                 [root 
(bit-xor id ack-val)])))
-                          (let [delta (tuple-time-delta! tuple)
-                                debug? (= true (storm-conf TOPOLOGY-DEBUG))]
-                            (when debug? 
-                              (log-message "BOLT ack TASK: " task-id " TIME: " 
delta " TUPLE: " tuple))
-                            (task/apply-hooks user-context .boltAck 
(BoltAckInfo. tuple task-id delta))
-                            (when delta
-                              (stats/bolt-acked-tuple! executor-stats
-                                                       (.getSourceComponent 
tuple)
-                                                       (.getSourceStreamId 
tuple)
-                                                       delta))))
-                        (^void fail [this ^Tuple tuple]
-                          (fast-list-iter [root (.. tuple getMessageId 
getAnchors)]
-                                          (task/send-unanchored task-data
-                                                                
ACKER-FAIL-STREAM-ID
-                                                                [root]))
-                          (let [delta (tuple-time-delta! tuple)
-                                debug? (= true (storm-conf TOPOLOGY-DEBUG))]
-                            (when debug? 
-                              (log-message "BOLT fail TASK: " task-id " TIME: 
" delta " TUPLE: " tuple))
-                            (task/apply-hooks user-context .boltFail 
(BoltFailInfo. tuple task-id delta))
-                            (when delta
-                              (stats/bolt-failed-tuple! executor-stats
-                                                        (.getSourceComponent 
tuple)
-                                                        (.getSourceStreamId 
tuple)
-                                                        delta))))
-                        (reportError [this error]
-                          (report-error error)
-                          )))))
-         (reset! open-or-prepare-was-called? true)        
-         (log-message "Prepared bolt " component-id ":" (keys task-datas))
-         (setup-metrics! executor-data)
- 
-         (let [receive-queue (:receive-queue executor-data)
-               event-handler (mk-task-receiver executor-data tuple-action-fn)]
-           (fn []            
-             (.consumeBatchWhenAvailable ^DisruptorQueue receive-queue 
event-handler)
-             0)))
-       :kill-fn (:report-error-and-die executor-data)
-       :factory? true
-       :thread-name (str component-id "-executor" (:executor-id 
executor-data)))]))
+     [(Utils/asyncLoop
+       bolt-transfer-fn
+       false ; isDaemon
+       (:report-error-and-die executor-data)
+       Thread/NORM_PRIORITY
+       true ; isFactory
+       true ; startImmediately
+       (str component-id "-executor" (:executor-id executor-data)))]))
  
  (defmethod close-component :spout [executor-data spout]
    (.close spout))

http://git-wip-us.apache.org/repos/asf/storm/blob/6696e3f1/storm-core/src/clj/org/apache/storm/daemon/worker.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/org/apache/storm/daemon/worker.clj
index bfece6a,b2bdcdb..1f530ac
--- a/storm-core/src/clj/org/apache/storm/daemon/worker.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/worker.clj
@@@ -19,13 -19,16 +19,16 @@@
    (:require [clj-time.core :as time])
    (:require [clj-time.coerce :as coerce])
    (:require [org.apache.storm.daemon [executor :as executor]])
--  (:require [org.apache.storm [disruptor :as disruptor] [cluster :as 
cluster]])
++  (:require [org.apache.storm [cluster :as cluster]])
    (:require [clojure.set :as set])
    (:require [org.apache.storm.messaging.loader :as msg-loader])
    (:import [java.util.concurrent Executors]
-            [org.apache.storm.hooks IWorkerHook BaseWorkerHook])
-   (:import [java.util ArrayList HashMap])
-   (:import [org.apache.storm.utils Utils ConfigUtils TransferDrainer 
ThriftTopologyUtils WorkerBackpressureThread DisruptorQueue 
WorkerBackpressureCallback DisruptorBackpressureCallback])
+            [org.apache.storm.hooks IWorkerHook BaseWorkerHook]
+            [uk.org.lidalia.sysoutslf4j.context SysOutOverSLF4J])
++  (:import [org.apache.storm.utils Utils ConfigUtils TransferDrainer 
ThriftTopologyUtils WorkerBackpressureThread DisruptorQueue Time 
WorkerBackpressureCallback DisruptorBackpressureCallback])
+   (:import [java.util ArrayList HashMap]
+            [java.util.concurrent.locks ReentrantReadWriteLock])
+   (:import [org.apache.commons.io FileUtils])
 -  (:import [org.apache.storm.utils Utils ConfigUtils TransferDrainer 
ThriftTopologyUtils WorkerBackpressureThread DisruptorQueue Time])
    (:import [org.apache.storm.grouping LoadMapping])
    (:import [org.apache.storm.messaging TransportFactory])
    (:import [org.apache.storm.messaging TaskMessage IContext IConnection 
ConnectionWithStatus ConnectionWithStatus$Status])
@@@ -632,7 -658,7 +658,9 @@@
  
          transfer-tuples (mk-transfer-tuples-handler worker)
          
--        transfer-thread (disruptor/consume-loop* (:transfer-queue worker) 
transfer-tuples)               
++        transfer-thread (Utils/asyncLoop
++                          (fn []
++                            (.consumeBatchWhenAvailable ^DisruptorQueue 
(:transfer-queue worker) transfer-tuples) 0))
  
          disruptor-handler (mk-disruptor-backpressure-handler worker)
          _ (.registerBackpressureCallback (:transfer-queue worker) 
disruptor-handler)

http://git-wip-us.apache.org/repos/asf/storm/blob/6696e3f1/storm-core/src/clj/org/apache/storm/disruptor.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/org/apache/storm/disruptor.clj
index 78b16dc,e2211c0..0000000
deleted file mode 100644,100644
--- a/storm-core/src/clj/org/apache/storm/disruptor.clj
+++ /dev/null
@@@ -1,36 -1,89 +1,0 @@@
--;; Licensed to the Apache Software Foundation (ASF) under one
--;; or more contributor license agreements.  See the NOTICE file
--;; distributed with this work for additional information
--;; regarding copyright ownership.  The ASF licenses this file
--;; to you under the Apache License, Version 2.0 (the
--;; "License"); you may not use this file except in compliance
--;; with the License.  You may obtain a copy of the License at
--;;
--;; http://www.apache.org/licenses/LICENSE-2.0
--;;
--;; Unless required by applicable law or agreed to in writing, software
--;; distributed under the License is distributed on an "AS IS" BASIS,
--;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
--;; See the License for the specific language governing permissions and
--;; limitations under the License.
--
--(ns org.apache.storm.disruptor
-   (:import [org.apache.storm.utils DisruptorQueue WorkerBackpressureCallback 
DisruptorBackpressureCallback])
 -  (:import [org.apache.storm.utils DisruptorQueue WorkerBackpressureCallback 
DisruptorBackpressureCallback Utils])
--  (:import [com.lmax.disruptor.dsl ProducerType])
--  (:require [clojure [string :as str]])
--  (:require [clojure [set :as set]])
--  (:use [clojure walk])
--  (:use [org.apache.storm util log]))
 -
 -(def PRODUCER-TYPE
 -  {:multi-threaded ProducerType/MULTI
 -   :single-threaded ProducerType/SINGLE})
 -
 -(defnk disruptor-queue
 -  [^String queue-name buffer-size timeout :producer-type :multi-threaded 
:batch-size 100 :batch-timeout 1]
 -  (DisruptorQueue. queue-name
 -                   (PRODUCER-TYPE producer-type) buffer-size
 -                   timeout batch-size batch-timeout))
 -
 -(defn clojure-handler
 -  [afn]
 -  (reify com.lmax.disruptor.EventHandler
 -    (onEvent
 -      [this o seq-id batchEnd?]
 -      (afn o seq-id batchEnd?))))
 -
 -(defn disruptor-backpressure-handler
 -  [afn-high-wm afn-low-wm]
 -  (reify DisruptorBackpressureCallback
 -    (highWaterMark
 -      [this]
 -      (afn-high-wm))
 -    (lowWaterMark
 -      [this]
 -      (afn-low-wm))))
 -
 -(defn worker-backpressure-handler
 -  [afn]
 -  (reify WorkerBackpressureCallback
 -    (onEvent
 -      [this o]
 -      (afn o))))
 -
 -(defmacro handler
 -  [& args]
 -  `(clojure-handler (fn ~@args)))
--
 -(defn publish
 -  [^DisruptorQueue q o]
 -  (.publish q o))
--
 -(defn consume-batch
 -  [^DisruptorQueue queue handler]
 -  (.consumeBatch queue handler))
--
 -(defn consume-batch-when-available
 -  [^DisruptorQueue queue handler]
 -  (.consumeBatchWhenAvailable queue handler))
--
 -(defn halt-with-interrupt!
 -  [^DisruptorQueue queue]
 -  (.haltWithInterrupt queue))
--
--(defnk consume-loop*
--  [^DisruptorQueue queue handler
-    :kill-fn (fn [error] (exit-process! 1 "Async loop died!"))]
-   (async-loop
-           (fn [] (.consumeBatchWhenAvailable ^DisruptorQueue queue handler) 0)
-           :kill-fn kill-fn
-           :thread-name (.getName queue)))
 -   :uncaught-exception-handler nil]
 -  (Utils/asyncLoop
 -          (fn [] (consume-batch-when-available queue handler) 0)
 -          (.getName queue)
 -          uncaught-exception-handler))
--
 -(defmacro consume-loop [queue & handler-args]
 -  `(let [handler# (handler ~@handler-args)]
 -     (consume-loop* ~queue handler#)))

Reply via email to