Merge apache master branch into STORM-1272
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/6696e3f1 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/6696e3f1 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/6696e3f1 Branch: refs/heads/master Commit: 6696e3f1d8309f85d487a3361306f67c5608a0ff Parents: 55b26dd d041183 Author: Abhishek Agarwal <[email protected]> Authored: Fri Feb 12 02:14:42 2016 +0530 Committer: Abhishek Agarwal <[email protected]> Committed: Fri Feb 12 02:14:42 2016 +0530 ---------------------------------------------------------------------- CHANGELOG.md | 4 +- README.markdown | 1 + dev-tools/travis/travis-script.sh | 4 +- external/sql/storm-sql-core/pom.xml | 9 + .../storm/hbase/security/HBaseSecurityUtil.java | 36 +- external/storm-mqtt/core/pom.xml | 4 +- pom.xml | 9 +- storm-core/pom.xml | 11 +- .../src/clj/org/apache/storm/LocalCluster.clj | 4 +- storm-core/src/clj/org/apache/storm/clojure.clj | 8 +- storm-core/src/clj/org/apache/storm/cluster.clj | 27 +- .../cluster_state/zookeeper_state_factory.clj | 11 +- .../clj/org/apache/storm/command/blobstore.clj | 11 +- .../org/apache/storm/command/dev_zookeeper.clj | 6 +- .../clj/org/apache/storm/command/get_errors.clj | 12 +- .../apache/storm/command/shell_submission.clj | 4 +- storm-core/src/clj/org/apache/storm/config.clj | 18 +- .../src/clj/org/apache/storm/converter.clj | 14 +- .../src/clj/org/apache/storm/daemon/acker.clj | 21 +- .../src/clj/org/apache/storm/daemon/common.clj | 29 +- .../src/clj/org/apache/storm/daemon/drpc.clj | 23 +- .../clj/org/apache/storm/daemon/executor.clj | 552 ++++++----- .../clj/org/apache/storm/daemon/logviewer.clj | 68 +- .../src/clj/org/apache/storm/daemon/nimbus.clj | 170 ++-- .../clj/org/apache/storm/daemon/supervisor.clj | 200 ++-- .../src/clj/org/apache/storm/daemon/task.clj | 2 +- .../src/clj/org/apache/storm/daemon/worker.clj | 70 +- .../src/clj/org/apache/storm/disruptor.clj | 36 - storm-core/src/clj/org/apache/storm/event.clj | 2 +- .../src/clj/org/apache/storm/local_state.clj | 9 +- .../org/apache/storm/pacemaker/pacemaker.clj | 7 +- .../storm/pacemaker/pacemaker_state_factory.clj | 24 +- .../clj/org/apache/storm/process_simulator.clj | 4 +- .../apache/storm/scheduler/DefaultScheduler.clj | 7 +- .../apache/storm/scheduler/EvenScheduler.clj | 23 +- .../storm/scheduler/IsolationScheduler.clj | 29 +- storm-core/src/clj/org/apache/storm/stats.clj | 82 +- storm-core/src/clj/org/apache/storm/testing.clj | 83 +- storm-core/src/clj/org/apache/storm/thrift.clj | 6 +- storm-core/src/clj/org/apache/storm/timer.clj | 12 +- .../clj/org/apache/storm/trident/testing.clj | 9 +- storm-core/src/clj/org/apache/storm/ui/core.clj | 97 +- .../src/clj/org/apache/storm/ui/helpers.clj | 14 +- storm-core/src/clj/org/apache/storm/util.clj | 923 +---------------- .../src/clj/org/apache/storm/zookeeper.clj | 1 - .../storm/logging/ThriftAccessLogger.java | 13 +- .../serialization/SerializationFactory.java | 17 +- .../staticmocking/MockedConfigUtils.java | 31 - .../jvm/org/apache/storm/utils/ConfigUtils.java | 20 +- .../jvm/org/apache/storm/utils/Container.java | 11 +- .../jvm/org/apache/storm/utils/IPredicate.java | 27 + .../org/apache/storm/utils/NimbusClient.java | 2 +- .../jvm/org/apache/storm/utils/TestUtils.java | 34 - .../src/jvm/org/apache/storm/utils/Time.java | 26 +- .../src/jvm/org/apache/storm/utils/Utils.java | 989 +++++++++++++++++-- .../storm/validation/ConfigValidation.java | 2 +- .../org/apache/storm/zookeeper/Zookeeper.java | 7 + .../org/apache/storm/integration_test.clj | 100 +- .../org/apache/storm/testing4j_test.clj | 37 +- .../apache/storm/trident/integration_test.clj | 15 +- .../test/clj/org/apache/storm/cluster_test.clj | 20 +- .../test/clj/org/apache/storm/drpc_test.clj | 23 +- .../clj/org/apache/storm/logviewer_test.clj | 267 ++--- .../storm/messaging/netty_integration_test.clj | 2 +- .../test/clj/org/apache/storm/nimbus_test.clj | 131 ++- .../scheduler/resource_aware_scheduler_test.clj | 21 +- .../apache/storm/security/auth/auth_test.clj | 11 +- .../authorizer/DRPCSimpleACLAuthorizer_test.clj | 2 +- .../BlowfishTupleSerializer_test.clj | 1 - .../clj/org/apache/storm/serialization_test.clj | 23 +- .../clj/org/apache/storm/supervisor_test.clj | 645 ++++++------ .../clj/org/apache/storm/transactional_test.clj | 18 + .../clj/org/apache/storm/trident/state_test.clj | 5 +- .../clj/org/apache/storm/trident/tuple_test.clj | 15 +- .../test/clj/org/apache/storm/utils_test.clj | 16 +- .../test/clj/org/apache/storm/worker_test.clj | 1 - .../staticmocking/ConfigUtilsInstaller.java | 38 + .../utils/staticmocking/UtilsInstaller.java | 38 + .../storm/utils/staticmocking/package-info.java | 95 ++ 79 files changed, 3042 insertions(+), 2357 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/6696e3f1/storm-core/src/clj/org/apache/storm/daemon/executor.clj ---------------------------------------------------------------------- diff --cc storm-core/src/clj/org/apache/storm/daemon/executor.clj index 619a885,e2380b7..03db855 --- a/storm-core/src/clj/org/apache/storm/daemon/executor.clj +++ b/storm-core/src/clj/org/apache/storm/daemon/executor.clj @@@ -36,10 -36,11 +36,12 @@@ (:import [org.apache.storm Config Constants]) (:import [org.apache.storm.cluster ClusterStateContext DaemonType]) (:import [org.apache.storm.grouping LoadAwareCustomStreamGrouping LoadAwareShuffleGrouping LoadMapping ShuffleGrouping]) - (:import [java.util.concurrent ConcurrentLinkedQueue] - (com.lmax.disruptor.dsl ProducerType)) + (:import [java.lang Thread Thread$UncaughtExceptionHandler] + [java.util.concurrent ConcurrentLinkedQueue] - [org.json.simple JSONValue]) ++ [org.json.simple JSONValue] ++ [com.lmax.disruptor.dsl ProducerType]) (:require [org.apache.storm [thrift :as thrift] -- [cluster :as cluster] [disruptor :as disruptor] [stats :as stats]]) ++ [cluster :as cluster] [stats :as stats]]) (:require [org.apache.storm.daemon [task :as task]]) (:require [org.apache.storm.daemon.builtin-metrics :as builtin-metrics]) (:require [clojure.set :as set])) @@@ -300,17 -305,16 +306,19 @@@ cached-emit (MutableObject. (ArrayList.)) storm-conf (:storm-conf executor-data) serializer (KryoTupleSerializer. storm-conf (:worker-context executor-data)) ++ ^DisruptorQueue batch-transfer-queue (:batch-transfer-queue executor-data) ++ handler (reify com.lmax.disruptor.EventHandler ++ (onEvent [this o seq-id batch-end?] ++ (let [^ArrayList alist (.getObject cached-emit)] ++ (.add alist o) ++ (when batch-end? ++ (worker-transfer-fn serializer alist) ++ (.setObject cached-emit (ArrayList.)))))) ] -- (disruptor/consume-loop* -- (:batch-transfer-queue executor-data) - (reify com.lmax.disruptor.EventHandler - (onEvent [this o seq-id batch-end?] - (let [^ArrayList alist (.getObject cached-emit)] - (.add alist o) - (when batch-end? - (worker-transfer-fn serializer alist) - (.setObject cached-emit (ArrayList.)))))) - :kill-fn (:report-error-and-die executor-data)))) - (disruptor/handler [o seq-id batch-end?] - (let [^ArrayList alist (.getObject cached-emit)] - (.add alist o) - (when batch-end? - (worker-transfer-fn serializer alist) - (.setObject cached-emit (ArrayList.))))) - :uncaught-exception-handler (:report-error-and-die executor-data)))) ++ (Utils/asyncLoop ++ (fn [] (.consumeBatchWhenAvailable batch-transfer-queue handler) 0) ++ (.getName batch-transfer-queue) ++ (:uncaught-exception-handler (:report-error-and-die executor-data))))) (defn setup-metrics! [executor-data] (let [{:keys [storm-conf receive-queue worker-context interval->task->metric-registry]} executor-data @@@ -540,132 -549,130 +553,130 @@@ has-ackers? (has-ackers? storm-conf) has-eventloggers? (has-eventloggers? storm-conf) emitted-count (MutableLong. 0) - empty-emit-streak (MutableLong. 0)] - - [(async-loop - (fn [] - ;; If topology was started in inactive state, don't call (.open spout) until it's activated first. - (while (not @(:storm-active-atom executor-data)) - (Thread/sleep 100)) - - (log-message "Opening spout " component-id ":" (keys task-datas)) - (builtin-metrics/register-spout-throttling-metrics (:spout-throttling-metrics executor-data) storm-conf (:user-context (first (vals task-datas)))) - (doseq [[task-id task-data] task-datas - :let [^ISpout spout-obj (:object task-data) - tasks-fn (:tasks-fn task-data) - send-spout-msg (fn [out-stream-id values message-id out-task-id] - (.increment emitted-count) - (let [out-tasks (if out-task-id - (tasks-fn out-task-id out-stream-id values) - (tasks-fn out-stream-id values)) - rooted? (and message-id has-ackers?) - root-id (if rooted? (MessageId/generateId rand)) - ^List out-ids (fast-list-for [t out-tasks] (if rooted? (MessageId/generateId rand)))] - (fast-list-iter [out-task out-tasks id out-ids] - (let [tuple-id (if rooted? - (MessageId/makeRootId root-id id) - (MessageId/makeUnanchored)) - out-tuple (TupleImpl. worker-context - values - task-id - out-stream-id - tuple-id)] - (transfer-fn out-task out-tuple))) - (if has-eventloggers? - (send-to-eventlogger executor-data task-data values component-id message-id rand)) - (if (and rooted? - (not (.isEmpty out-ids))) - (do - (.put pending root-id [task-id - message-id - {:stream out-stream-id - :values (if debug? values nil)} - (if (sampler) (System/currentTimeMillis))]) - (task/send-unanchored task-data - ACKER-INIT-STREAM-ID - [root-id (bit-xor-vals out-ids) task-id])) - (when message-id - (ack-spout-msg executor-data task-data message-id - {:stream out-stream-id :values values} - (if (sampler) 0) "0:"))) - (or out-tasks []) - ))]] - (builtin-metrics/register-all (:builtin-metrics task-data) storm-conf (:user-context task-data)) - (builtin-metrics/register-queue-metrics {:sendqueue (:batch-transfer-queue executor-data) - :receive receive-queue} - storm-conf (:user-context task-data)) - (when (instance? ICredentialsListener spout-obj) (.setCredentials spout-obj initial-credentials)) - - (.open spout-obj - storm-conf - (:user-context task-data) - (SpoutOutputCollector. - (reify ISpoutOutputCollector - (^long getPendingCount[this] - (.size pending) - ) - (^List emit [this ^String stream-id ^List tuple ^Object message-id] - (send-spout-msg stream-id tuple message-id nil) - ) - (^void emitDirect [this ^int out-task-id ^String stream-id - ^List tuple ^Object message-id] - (send-spout-msg stream-id tuple message-id out-task-id) - ) - (reportError [this error] - (report-error error) - ))))) - (reset! open-or-prepare-was-called? true) - (log-message "Opened spout " component-id ":" (keys task-datas)) - (setup-metrics! executor-data) - - (fn [] - ;; This design requires that spouts be non-blocking - (.consumeBatch ^DisruptorQueue receive-queue event-handler) - - (let [active? @(:storm-active-atom executor-data) - curr-count (.get emitted-count) - backpressure-enabled ((:storm-conf executor-data) TOPOLOGY-BACKPRESSURE-ENABLE) - throttle-on (and backpressure-enabled - @(:throttle-on (:worker executor-data))) - reached-max-spout-pending (and max-spout-pending - (>= (.size pending) max-spout-pending)) - ] - (if active? - ; activated - (do - (when-not @last-active - (reset! last-active true) - (log-message "Activating spout " component-id ":" (keys task-datas)) - (fast-list-iter [^ISpout spout spouts] (.activate spout))) - - (if (and (not (.isFull transfer-queue)) - (not throttle-on) - (not reached-max-spout-pending)) - (fast-list-iter [^ISpout spout spouts] (.nextTuple spout)))) - ; deactivated - (do - (when @last-active - (reset! last-active false) - (log-message "Deactivating spout " component-id ":" (keys task-datas)) - (fast-list-iter [^ISpout spout spouts] (.deactivate spout))) - ;; TODO: log that it's getting throttled - (Time/sleep 100) - (builtin-metrics/skipped-inactive! (:spout-throttling-metrics executor-data) (:stats executor-data)))) - - (if (and (= curr-count (.get emitted-count)) active?) - (do (.increment empty-emit-streak) - (.emptyEmit spout-wait-strategy (.get empty-emit-streak)) - ;; update the spout throttling metrics - (if throttle-on - (builtin-metrics/skipped-throttle! (:spout-throttling-metrics executor-data) (:stats executor-data)) - (if reached-max-spout-pending - (builtin-metrics/skipped-max-spout! (:spout-throttling-metrics executor-data) (:stats executor-data))))) - (.set empty-emit-streak 0) - )) - 0)) - :kill-fn (:report-error-and-die executor-data) - :factory? true - :thread-name (str component-id "-executor" (:executor-id executor-data)))])) + empty-emit-streak (MutableLong. 0) + spout-transfer-fn (fn [] + ;; If topology was started in inactive state, don't call (.open spout) until it's activated first. + (while (not @(:storm-active-atom executor-data)) + (Thread/sleep 100)) + (log-message "Opening spout " component-id ":" (keys task-datas)) + (builtin-metrics/register-spout-throttling-metrics (:spout-throttling-metrics executor-data) storm-conf (:user-context (first (vals task-datas)))) + (doseq [[task-id task-data] task-datas + :let [^ISpout spout-obj (:object task-data) + tasks-fn (:tasks-fn task-data) + send-spout-msg (fn [out-stream-id values message-id out-task-id] + (.increment emitted-count) + (let [out-tasks (if out-task-id + (tasks-fn out-task-id out-stream-id values) + (tasks-fn out-stream-id values)) + rooted? (and message-id has-ackers?) + root-id (if rooted? (MessageId/generateId rand)) + ^List out-ids (fast-list-for [t out-tasks] (if rooted? (MessageId/generateId rand)))] + (fast-list-iter [out-task out-tasks id out-ids] + (let [tuple-id (if rooted? + (MessageId/makeRootId root-id id) + (MessageId/makeUnanchored)) + out-tuple (TupleImpl. worker-context + values + task-id + out-stream-id + tuple-id)] + (transfer-fn out-task out-tuple))) + (if has-eventloggers? + (send-to-eventlogger executor-data task-data values component-id message-id rand)) + (if (and rooted? + (not (.isEmpty out-ids))) + (do + (.put pending root-id [task-id + message-id + {:stream out-stream-id + :values (if debug? values nil)} + (if (sampler) (System/currentTimeMillis))]) + (task/send-unanchored task-data + ACKER-INIT-STREAM-ID + [root-id (bit-xor-vals out-ids) task-id])) + (when message-id + (ack-spout-msg executor-data task-data message-id + {:stream out-stream-id :values values} + (if (sampler) 0) "0:"))) + (or out-tasks [])))]] + + (builtin-metrics/register-all (:builtin-metrics task-data) storm-conf (:user-context task-data)) + (builtin-metrics/register-queue-metrics {:sendqueue (:batch-transfer-queue executor-data) + :receive receive-queue} + storm-conf (:user-context task-data)) + (when (instance? ICredentialsListener spout-obj) (.setCredentials spout-obj initial-credentials)) + + (.open spout-obj + storm-conf + (:user-context task-data) + (SpoutOutputCollector. + (reify ISpoutOutputCollector + (^long getPendingCount[this] + (.size pending)) + (^List emit [this ^String stream-id ^List tuple ^Object message-id] + (send-spout-msg stream-id tuple message-id nil)) + (^void emitDirect [this ^int out-task-id ^String stream-id + ^List tuple ^Object message-id] + (send-spout-msg stream-id tuple message-id out-task-id)) + (reportError [this error] + (report-error error)))))) + + (reset! open-or-prepare-was-called? true) + (log-message "Opened spout " component-id ":" (keys task-datas)) + (setup-metrics! executor-data) + + (fn [] + ;; This design requires that spouts be non-blocking - (disruptor/consume-batch receive-queue event-handler) ++ (.consumeBatch ^DisruptorQueue receive-queue event-handler) + + (let [active? @(:storm-active-atom executor-data) + curr-count (.get emitted-count) + backpressure-enabled ((:storm-conf executor-data) TOPOLOGY-BACKPRESSURE-ENABLE) + throttle-on (and backpressure-enabled + @(:throttle-on (:worker executor-data))) + reached-max-spout-pending (and max-spout-pending + (>= (.size pending) max-spout-pending))] + (if active? + ; activated + (do + (when-not @last-active + (reset! last-active true) + (log-message "Activating spout " component-id ":" (keys task-datas)) + (fast-list-iter [^ISpout spout spouts] (.activate spout))) + + (if (and (not (.isFull transfer-queue)) + (not throttle-on) + (not reached-max-spout-pending)) + (fast-list-iter [^ISpout spout spouts] (.nextTuple spout)))) + ; deactivated + (do + (when @last-active + (reset! last-active false) + (log-message "Deactivating spout " component-id ":" (keys task-datas)) + (fast-list-iter [^ISpout spout spouts] (.deactivate spout))) + ;; TODO: log that it's getting throttled + (Time/sleep 100) + (builtin-metrics/skipped-inactive! (:spout-throttling-metrics executor-data) (:stats executor-data)))) + + (if (and (= curr-count (.get emitted-count)) active?) + (do (.increment empty-emit-streak) + (.emptyEmit spout-wait-strategy (.get empty-emit-streak)) + ;; update the spout throttling metrics + (if throttle-on + (builtin-metrics/skipped-throttle! (:spout-throttling-metrics executor-data) (:stats executor-data)) + (if reached-max-spout-pending + (builtin-metrics/skipped-max-spout! (:spout-throttling-metrics executor-data) (:stats executor-data))))) + (.set empty-emit-streak 0))) + 0))] + + [(Utils/asyncLoop + spout-transfer-fn + false ; isDaemon + (:report-error-and-die executor-data) + Thread/NORM_PRIORITY + true ; isFactory + true ; startImmediately + (str component-id "-executor" (:executor-id executor-data)))])) (defn- tuple-time-delta! [^TupleImpl tuple] (let [ms (.getProcessSampleStartTime tuple)] @@@ -736,115 -743,116 +747,116 @@@ (.getSourceComponent tuple) (.getSourceStreamId tuple) delta))))))) - has-eventloggers? (has-eventloggers? storm-conf)] - + has-eventloggers? (has-eventloggers? storm-conf) + bolt-transfer-fn (fn [] + ;; If topology was started in inactive state, don't call prepare bolt until it's activated first. + (while (not @(:storm-active-atom executor-data)) + (Thread/sleep 100)) + + (log-message "Preparing bolt " component-id ":" (keys task-datas)) + (doseq [[task-id task-data] task-datas + :let [^IBolt bolt-obj (:object task-data) + tasks-fn (:tasks-fn task-data) + user-context (:user-context task-data) + bolt-emit (fn [stream anchors values task] + (let [out-tasks (if task + (tasks-fn task stream values) + (tasks-fn stream values))] + (fast-list-iter [t out-tasks] + (let [anchors-to-ids (HashMap.)] + (fast-list-iter [^TupleImpl a anchors] + (let [root-ids (-> a .getMessageId .getAnchorsToIds .keySet)] + (when (pos? (count root-ids)) + (let [edge-id (MessageId/generateId rand)] + (.updateAckVal a edge-id) + (fast-list-iter [root-id root-ids] + (put-xor! anchors-to-ids root-id edge-id)))))) + (let [tuple (TupleImpl. worker-context + values + task-id + stream + (MessageId/makeId anchors-to-ids))] + (transfer-fn t tuple)))) + (if has-eventloggers? + (send-to-eventlogger executor-data task-data values component-id nil rand)) + (or out-tasks [])))]] + (builtin-metrics/register-all (:builtin-metrics task-data) storm-conf user-context) + (when (instance? ICredentialsListener bolt-obj) (.setCredentials bolt-obj initial-credentials)) + (if (= component-id Constants/SYSTEM_COMPONENT_ID) + (do + (builtin-metrics/register-queue-metrics {:sendqueue (:batch-transfer-queue executor-data) + :receive (:receive-queue executor-data) + :transfer (:transfer-queue (:worker executor-data))} + storm-conf user-context) + (builtin-metrics/register-iconnection-client-metrics (:cached-node+port->socket (:worker executor-data)) storm-conf user-context) + (builtin-metrics/register-iconnection-server-metric (:receiver (:worker executor-data)) storm-conf user-context)) + (builtin-metrics/register-queue-metrics {:sendqueue (:batch-transfer-queue executor-data) + :receive (:receive-queue executor-data)} + storm-conf user-context)) + + (.prepare bolt-obj + storm-conf + user-context + (OutputCollector. + (reify IOutputCollector + (emit [this stream anchors values] + (bolt-emit stream anchors values nil)) + (emitDirect [this task stream anchors values] + (bolt-emit stream anchors values task)) + (^void ack [this ^Tuple tuple] + (let [^TupleImpl tuple tuple + ack-val (.getAckVal tuple)] + (fast-map-iter [[root id] (.. tuple getMessageId getAnchorsToIds)] + (task/send-unanchored task-data + ACKER-ACK-STREAM-ID + [root (bit-xor id ack-val)]))) + (let [delta (tuple-time-delta! tuple) + debug? (= true (storm-conf TOPOLOGY-DEBUG))] + (when debug? + (log-message "BOLT ack TASK: " task-id " TIME: " delta " TUPLE: " tuple)) + (task/apply-hooks user-context .boltAck (BoltAckInfo. tuple task-id delta)) + (when delta + (stats/bolt-acked-tuple! executor-stats + (.getSourceComponent tuple) + (.getSourceStreamId tuple) + delta)))) + (^void fail [this ^Tuple tuple] + (fast-list-iter [root (.. tuple getMessageId getAnchors)] + (task/send-unanchored task-data + ACKER-FAIL-STREAM-ID + [root])) + (let [delta (tuple-time-delta! tuple) + debug? (= true (storm-conf TOPOLOGY-DEBUG))] + (when debug? + (log-message "BOLT fail TASK: " task-id " TIME: " delta " TUPLE: " tuple)) + (task/apply-hooks user-context .boltFail (BoltFailInfo. tuple task-id delta)) + (when delta + (stats/bolt-failed-tuple! executor-stats + (.getSourceComponent tuple) + (.getSourceStreamId tuple) + delta)))) + (reportError [this error] + (report-error error)))))) + + (reset! open-or-prepare-was-called? true) + (log-message "Prepared bolt " component-id ":" (keys task-datas)) + (setup-metrics! executor-data) + + (let [receive-queue (:receive-queue executor-data) + event-handler (mk-task-receiver executor-data tuple-action-fn)] + (fn [] - (disruptor/consume-batch-when-available receive-queue event-handler) ++ (.consumeBatchWhenAvailable ^DisruptorQueue receive-queue event-handler) + 0)))] ;; TODO: can get any SubscribedState objects out of the context now - [(async-loop - (fn [] - ;; If topology was started in inactive state, don't call prepare bolt until it's activated first. - (while (not @(:storm-active-atom executor-data)) - (Thread/sleep 100)) - - (log-message "Preparing bolt " component-id ":" (keys task-datas)) - (doseq [[task-id task-data] task-datas - :let [^IBolt bolt-obj (:object task-data) - tasks-fn (:tasks-fn task-data) - user-context (:user-context task-data) - bolt-emit (fn [stream anchors values task] - (let [out-tasks (if task - (tasks-fn task stream values) - (tasks-fn stream values))] - (fast-list-iter [t out-tasks] - (let [anchors-to-ids (HashMap.)] - (fast-list-iter [^TupleImpl a anchors] - (let [root-ids (-> a .getMessageId .getAnchorsToIds .keySet)] - (when (pos? (count root-ids)) - (let [edge-id (MessageId/generateId rand)] - (.updateAckVal a edge-id) - (fast-list-iter [root-id root-ids] - (put-xor! anchors-to-ids root-id edge-id)) - )))) - (let [tuple (TupleImpl. worker-context - values - task-id - stream - (MessageId/makeId anchors-to-ids))] - (transfer-fn t tuple)))) - (if has-eventloggers? - (send-to-eventlogger executor-data task-data values component-id nil rand)) - (or out-tasks [])))]] - (builtin-metrics/register-all (:builtin-metrics task-data) storm-conf user-context) - (when (instance? ICredentialsListener bolt-obj) (.setCredentials bolt-obj initial-credentials)) - (if (= component-id Constants/SYSTEM_COMPONENT_ID) - (do - (builtin-metrics/register-queue-metrics {:sendqueue (:batch-transfer-queue executor-data) - :receive (:receive-queue executor-data) - :transfer (:transfer-queue (:worker executor-data))} - storm-conf user-context) - (builtin-metrics/register-iconnection-client-metrics (:cached-node+port->socket (:worker executor-data)) storm-conf user-context) - (builtin-metrics/register-iconnection-server-metric (:receiver (:worker executor-data)) storm-conf user-context)) - (builtin-metrics/register-queue-metrics {:sendqueue (:batch-transfer-queue executor-data) - :receive (:receive-queue executor-data)} - storm-conf user-context) - ) - - (.prepare bolt-obj - storm-conf - user-context - (OutputCollector. - (reify IOutputCollector - (emit [this stream anchors values] - (bolt-emit stream anchors values nil)) - (emitDirect [this task stream anchors values] - (bolt-emit stream anchors values task)) - (^void ack [this ^Tuple tuple] - (let [^TupleImpl tuple tuple - ack-val (.getAckVal tuple)] - (fast-map-iter [[root id] (.. tuple getMessageId getAnchorsToIds)] - (task/send-unanchored task-data - ACKER-ACK-STREAM-ID - [root (bit-xor id ack-val)]))) - (let [delta (tuple-time-delta! tuple) - debug? (= true (storm-conf TOPOLOGY-DEBUG))] - (when debug? - (log-message "BOLT ack TASK: " task-id " TIME: " delta " TUPLE: " tuple)) - (task/apply-hooks user-context .boltAck (BoltAckInfo. tuple task-id delta)) - (when delta - (stats/bolt-acked-tuple! executor-stats - (.getSourceComponent tuple) - (.getSourceStreamId tuple) - delta)))) - (^void fail [this ^Tuple tuple] - (fast-list-iter [root (.. tuple getMessageId getAnchors)] - (task/send-unanchored task-data - ACKER-FAIL-STREAM-ID - [root])) - (let [delta (tuple-time-delta! tuple) - debug? (= true (storm-conf TOPOLOGY-DEBUG))] - (when debug? - (log-message "BOLT fail TASK: " task-id " TIME: " delta " TUPLE: " tuple)) - (task/apply-hooks user-context .boltFail (BoltFailInfo. tuple task-id delta)) - (when delta - (stats/bolt-failed-tuple! executor-stats - (.getSourceComponent tuple) - (.getSourceStreamId tuple) - delta)))) - (reportError [this error] - (report-error error) - ))))) - (reset! open-or-prepare-was-called? true) - (log-message "Prepared bolt " component-id ":" (keys task-datas)) - (setup-metrics! executor-data) - - (let [receive-queue (:receive-queue executor-data) - event-handler (mk-task-receiver executor-data tuple-action-fn)] - (fn [] - (.consumeBatchWhenAvailable ^DisruptorQueue receive-queue event-handler) - 0))) - :kill-fn (:report-error-and-die executor-data) - :factory? true - :thread-name (str component-id "-executor" (:executor-id executor-data)))])) + [(Utils/asyncLoop + bolt-transfer-fn + false ; isDaemon + (:report-error-and-die executor-data) + Thread/NORM_PRIORITY + true ; isFactory + true ; startImmediately + (str component-id "-executor" (:executor-id executor-data)))])) (defmethod close-component :spout [executor-data spout] (.close spout)) http://git-wip-us.apache.org/repos/asf/storm/blob/6696e3f1/storm-core/src/clj/org/apache/storm/daemon/worker.clj ---------------------------------------------------------------------- diff --cc storm-core/src/clj/org/apache/storm/daemon/worker.clj index bfece6a,b2bdcdb..1f530ac --- a/storm-core/src/clj/org/apache/storm/daemon/worker.clj +++ b/storm-core/src/clj/org/apache/storm/daemon/worker.clj @@@ -19,13 -19,16 +19,16 @@@ (:require [clj-time.core :as time]) (:require [clj-time.coerce :as coerce]) (:require [org.apache.storm.daemon [executor :as executor]]) -- (:require [org.apache.storm [disruptor :as disruptor] [cluster :as cluster]]) ++ (:require [org.apache.storm [cluster :as cluster]]) (:require [clojure.set :as set]) (:require [org.apache.storm.messaging.loader :as msg-loader]) (:import [java.util.concurrent Executors] - [org.apache.storm.hooks IWorkerHook BaseWorkerHook]) - (:import [java.util ArrayList HashMap]) - (:import [org.apache.storm.utils Utils ConfigUtils TransferDrainer ThriftTopologyUtils WorkerBackpressureThread DisruptorQueue WorkerBackpressureCallback DisruptorBackpressureCallback]) + [org.apache.storm.hooks IWorkerHook BaseWorkerHook] + [uk.org.lidalia.sysoutslf4j.context SysOutOverSLF4J]) ++ (:import [org.apache.storm.utils Utils ConfigUtils TransferDrainer ThriftTopologyUtils WorkerBackpressureThread DisruptorQueue Time WorkerBackpressureCallback DisruptorBackpressureCallback]) + (:import [java.util ArrayList HashMap] + [java.util.concurrent.locks ReentrantReadWriteLock]) + (:import [org.apache.commons.io FileUtils]) - (:import [org.apache.storm.utils Utils ConfigUtils TransferDrainer ThriftTopologyUtils WorkerBackpressureThread DisruptorQueue Time]) (:import [org.apache.storm.grouping LoadMapping]) (:import [org.apache.storm.messaging TransportFactory]) (:import [org.apache.storm.messaging TaskMessage IContext IConnection ConnectionWithStatus ConnectionWithStatus$Status]) @@@ -632,7 -658,7 +658,9 @@@ transfer-tuples (mk-transfer-tuples-handler worker) -- transfer-thread (disruptor/consume-loop* (:transfer-queue worker) transfer-tuples) ++ transfer-thread (Utils/asyncLoop ++ (fn [] ++ (.consumeBatchWhenAvailable ^DisruptorQueue (:transfer-queue worker) transfer-tuples) 0)) disruptor-handler (mk-disruptor-backpressure-handler worker) _ (.registerBackpressureCallback (:transfer-queue worker) disruptor-handler) http://git-wip-us.apache.org/repos/asf/storm/blob/6696e3f1/storm-core/src/clj/org/apache/storm/disruptor.clj ---------------------------------------------------------------------- diff --cc storm-core/src/clj/org/apache/storm/disruptor.clj index 78b16dc,e2211c0..0000000 deleted file mode 100644,100644 --- a/storm-core/src/clj/org/apache/storm/disruptor.clj +++ /dev/null @@@ -1,36 -1,89 +1,0 @@@ --;; Licensed to the Apache Software Foundation (ASF) under one --;; or more contributor license agreements. See the NOTICE file --;; distributed with this work for additional information --;; regarding copyright ownership. The ASF licenses this file --;; to you under the Apache License, Version 2.0 (the --;; "License"); you may not use this file except in compliance --;; with the License. You may obtain a copy of the License at --;; --;; http://www.apache.org/licenses/LICENSE-2.0 --;; --;; Unless required by applicable law or agreed to in writing, software --;; distributed under the License is distributed on an "AS IS" BASIS, --;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. --;; See the License for the specific language governing permissions and --;; limitations under the License. -- --(ns org.apache.storm.disruptor - (:import [org.apache.storm.utils DisruptorQueue WorkerBackpressureCallback DisruptorBackpressureCallback]) - (:import [org.apache.storm.utils DisruptorQueue WorkerBackpressureCallback DisruptorBackpressureCallback Utils]) -- (:import [com.lmax.disruptor.dsl ProducerType]) -- (:require [clojure [string :as str]]) -- (:require [clojure [set :as set]]) -- (:use [clojure walk]) -- (:use [org.apache.storm util log])) - -(def PRODUCER-TYPE - {:multi-threaded ProducerType/MULTI - :single-threaded ProducerType/SINGLE}) - -(defnk disruptor-queue - [^String queue-name buffer-size timeout :producer-type :multi-threaded :batch-size 100 :batch-timeout 1] - (DisruptorQueue. queue-name - (PRODUCER-TYPE producer-type) buffer-size - timeout batch-size batch-timeout)) - -(defn clojure-handler - [afn] - (reify com.lmax.disruptor.EventHandler - (onEvent - [this o seq-id batchEnd?] - (afn o seq-id batchEnd?)))) - -(defn disruptor-backpressure-handler - [afn-high-wm afn-low-wm] - (reify DisruptorBackpressureCallback - (highWaterMark - [this] - (afn-high-wm)) - (lowWaterMark - [this] - (afn-low-wm)))) - -(defn worker-backpressure-handler - [afn] - (reify WorkerBackpressureCallback - (onEvent - [this o] - (afn o)))) - -(defmacro handler - [& args] - `(clojure-handler (fn ~@args))) -- -(defn publish - [^DisruptorQueue q o] - (.publish q o)) -- -(defn consume-batch - [^DisruptorQueue queue handler] - (.consumeBatch queue handler)) -- -(defn consume-batch-when-available - [^DisruptorQueue queue handler] - (.consumeBatchWhenAvailable queue handler)) -- -(defn halt-with-interrupt! - [^DisruptorQueue queue] - (.haltWithInterrupt queue)) -- --(defnk consume-loop* -- [^DisruptorQueue queue handler - :kill-fn (fn [error] (exit-process! 1 "Async loop died!"))] - (async-loop - (fn [] (.consumeBatchWhenAvailable ^DisruptorQueue queue handler) 0) - :kill-fn kill-fn - :thread-name (.getName queue))) - :uncaught-exception-handler nil] - (Utils/asyncLoop - (fn [] (consume-batch-when-available queue handler) 0) - (.getName queue) - uncaught-exception-handler)) -- -(defmacro consume-loop [queue & handler-args] - `(let [handler# (handler ~@handler-args)] - (consume-loop* ~queue handler#)))
