Repository: storm Updated Branches: refs/heads/master 12ceb0975 -> a759db38d
STORM-1272: port backtype.storm.disruptor to java Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/c7e66894 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/c7e66894 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/c7e66894 Branch: refs/heads/master Commit: c7e66894d9401a1a9363d3ce55e31ceedc6ac38f Parents: 5f1cba5 Author: Abhishek Agarwal <[email protected]> Authored: Thu Feb 11 00:16:18 2016 +0530 Committer: Abhishek Agarwal <[email protected]> Committed: Thu Feb 11 18:35:56 2016 +0530 ---------------------------------------------------------------------- .../clj/org/apache/storm/daemon/executor.clj | 52 +++++++++--------- .../src/clj/org/apache/storm/daemon/worker.clj | 48 ++++++++--------- .../src/clj/org/apache/storm/disruptor.clj | 55 +------------------- .../org/apache/storm/utils/DisruptorQueue.java | 15 +++--- 4 files changed, 61 insertions(+), 109 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/c7e66894/storm-core/src/clj/org/apache/storm/daemon/executor.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/daemon/executor.clj b/storm-core/src/clj/org/apache/storm/daemon/executor.clj index ab0c8aa..08bb2e6 100644 --- a/storm-core/src/clj/org/apache/storm/daemon/executor.clj +++ b/storm-core/src/clj/org/apache/storm/daemon/executor.clj @@ -28,7 +28,7 @@ (:import [org.apache.storm.grouping CustomStreamGrouping]) (:import [org.apache.storm.task WorkerTopologyContext IBolt OutputCollector IOutputCollector]) (:import [org.apache.storm.generated GlobalStreamId]) - (:import [org.apache.storm.utils Utils ConfigUtils TupleUtils MutableObject RotatingMap RotatingMap$ExpiredCallback MutableLong Time DisruptorQueue WorkerBackpressureThread]) + (:import [org.apache.storm.utils Utils ConfigUtils TupleUtils MutableObject RotatingMap RotatingMap$ExpiredCallback MutableLong Time DisruptorQueue WorkerBackpressureThread DisruptorBackpressureCallback]) (:import [com.lmax.disruptor InsufficientCapacityException]) (:import [org.apache.storm.serialization KryoTupleSerializer]) (:import [org.apache.storm.daemon Shutdownable]) @@ -36,7 +36,8 @@ (:import [org.apache.storm Config Constants]) (:import [org.apache.storm.cluster ClusterStateContext DaemonType]) (:import [org.apache.storm.grouping LoadAwareCustomStreamGrouping LoadAwareShuffleGrouping LoadMapping ShuffleGrouping]) - (:import [java.util.concurrent ConcurrentLinkedQueue]) + (:import [java.util.concurrent ConcurrentLinkedQueue] + (com.lmax.disruptor.dsl ProducerType)) (:require [org.apache.storm [thrift :as thrift] [cluster :as cluster] [disruptor :as disruptor] [stats :as stats]]) (:require [org.apache.storm.daemon [task :as task]]) @@ -219,7 +220,7 @@ (let [val (AddressedTuple. task tuple)] (when (= true (storm-conf TOPOLOGY-DEBUG)) (log-message "TRANSFERING tuple " val)) - (disruptor/publish batch-transfer->worker val)))) + (.publish ^DisruptorQueue batch-transfer->worker val) ))) (defn mk-executor-data [worker executor-id] (let [worker-context (worker-context worker) @@ -227,13 +228,13 @@ component-id (.getComponentId worker-context (first task-ids)) storm-conf (normalized-component-conf (:storm-conf worker) worker-context component-id) executor-type (executor-type worker-context component-id) - batch-transfer->worker (disruptor/disruptor-queue + batch-transfer->worker (DisruptorQueue. (str "executor" executor-id "-send-queue") + ProducerType/SINGLE (storm-conf TOPOLOGY-EXECUTOR-SEND-BUFFER-SIZE) (storm-conf TOPOLOGY-DISRUPTOR-WAIT-TIMEOUT-MILLIS) - :producer-type :single-threaded - :batch-size (storm-conf TOPOLOGY-DISRUPTOR-BATCH-SIZE) - :batch-timeout (storm-conf TOPOLOGY-DISRUPTOR-BATCH-TIMEOUT-MILLIS)) + (storm-conf TOPOLOGY-DISRUPTOR-BATCH-SIZE) + (storm-conf TOPOLOGY-DISRUPTOR-BATCH-TIMEOUT-MILLIS)) ] (recursive-map :worker worker @@ -280,14 +281,14 @@ (defn- mk-disruptor-backpressure-handler [executor-data] "make a handler for the executor's receive disruptor queue to check highWaterMark and lowWaterMark for backpressure" - (disruptor/disruptor-backpressure-handler - (fn [] + (reify DisruptorBackpressureCallback + (highWaterMark [this] "When receive queue is above highWaterMark" (if (not @(:backpressure executor-data)) (do (reset! (:backpressure executor-data) true) (log-debug "executor " (:executor-id executor-data) " is congested, set backpressure flag true") (WorkerBackpressureThread/notifyBackpressureChecker (:backpressure-trigger (:worker executor-data)))))) - (fn [] + (lowWaterMark [this] "When receive queue is below lowWaterMark" (if @(:backpressure executor-data) (do (reset! (:backpressure executor-data) false) @@ -302,12 +303,13 @@ ] (disruptor/consume-loop* (:batch-transfer-queue executor-data) - (disruptor/handler [o seq-id batch-end?] - (let [^ArrayList alist (.getObject cached-emit)] - (.add alist o) - (when batch-end? - (worker-transfer-fn serializer alist) - (.setObject cached-emit (ArrayList.))))) + (reify com.lmax.disruptor.EventHandler + (onEvent [this o seq-id batch-end?] + (let [^ArrayList alist (.getObject cached-emit)] + (.add alist o) + (when batch-end? + (worker-transfer-fn serializer alist) + (.setObject cached-emit (ArrayList.)))))) :kill-fn (:report-error-and-die executor-data)))) (defn setup-metrics! [executor-data] @@ -320,7 +322,7 @@ interval (fn [] (let [val [(AddressedTuple. AddressedTuple/BROADCAST_DEST (TupleImpl. worker-context [interval] Constants/SYSTEM_TASK_ID Constants/METRICS_TICK_STREAM_ID))]] - (disruptor/publish receive-queue val))))))) + (.publish ^DisruptorQueue receive-queue val))))))) (defn metrics-tick [executor-data task-data ^TupleImpl tuple] @@ -361,7 +363,7 @@ tick-time-secs (fn [] (let [val [(AddressedTuple. AddressedTuple/BROADCAST_DEST (TupleImpl. context [tick-time-secs] Constants/SYSTEM_TASK_ID Constants/SYSTEM_TICK_STREAM_ID))]] - (disruptor/publish receive-queue val)))))))) + (.publish ^DisruptorQueue receive-queue val)))))))) (defn mk-executor [worker executor-id initial-credentials] (let [executor-data (mk-executor-data worker executor-id) @@ -403,15 +405,15 @@ (let [receive-queue (:receive-queue executor-data) context (:worker-context executor-data) val [(AddressedTuple. AddressedTuple/BROADCAST_DEST (TupleImpl. context [creds] Constants/SYSTEM_TASK_ID Constants/CREDENTIALS_CHANGED_STREAM_ID))]] - (disruptor/publish receive-queue val))) + (.publish ^DisruptorQueue receive-queue val))) (get-backpressure-flag [this] @(:backpressure executor-data)) Shutdownable (shutdown [this] (log-message "Shutting down executor " component-id ":" (pr-str executor-id)) - (disruptor/halt-with-interrupt! (:receive-queue executor-data)) - (disruptor/halt-with-interrupt! (:batch-transfer-queue executor-data)) + (.haltWithInterrupt ^DisruptorQueue (:receive-queue executor-data)) + (.haltWithInterrupt ^DisruptorQueue (:batch-transfer-queue executor-data)) (doseq [t threads] (.interrupt t) (.join t)) @@ -453,8 +455,8 @@ (let [task-ids (:task-ids executor-data) debug? (= true (-> executor-data :storm-conf (get TOPOLOGY-DEBUG))) ] - (disruptor/clojure-handler - (fn [tuple-batch sequence-id end-of-batch?] + (reify com.lmax.disruptor.EventHandler + (onEvent [this tuple-batch sequence-id end-of-batch?] (fast-list-iter [^AddressedTuple addressed-tuple tuple-batch] (let [^TupleImpl tuple (.getTuple addressed-tuple) task-id (.getDest addressed-tuple)] @@ -618,7 +620,7 @@ (fn [] ;; This design requires that spouts be non-blocking - (disruptor/consume-batch receive-queue event-handler) + (.consumeBatch ^DisruptorQueue receive-queue event-handler) (let [active? @(:storm-active-atom executor-data) curr-count (.get emitted-count) @@ -838,7 +840,7 @@ (let [receive-queue (:receive-queue executor-data) event-handler (mk-task-receiver executor-data tuple-action-fn)] (fn [] - (disruptor/consume-batch-when-available receive-queue event-handler) + (.consumeBatchWhenAvailable ^DisruptorQueue receive-queue event-handler) 0))) :kill-fn (:report-error-and-die executor-data) :factory? true http://git-wip-us.apache.org/repos/asf/storm/blob/c7e66894/storm-core/src/clj/org/apache/storm/daemon/worker.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/daemon/worker.clj b/storm-core/src/clj/org/apache/storm/daemon/worker.clj index 48934f6..bfece6a 100644 --- a/storm-core/src/clj/org/apache/storm/daemon/worker.clj +++ b/storm-core/src/clj/org/apache/storm/daemon/worker.clj @@ -25,7 +25,7 @@ (:import [java.util.concurrent Executors] [org.apache.storm.hooks IWorkerHook BaseWorkerHook]) (:import [java.util ArrayList HashMap]) - (:import [org.apache.storm.utils Utils ConfigUtils TransferDrainer ThriftTopologyUtils WorkerBackpressureThread DisruptorQueue]) + (:import [org.apache.storm.utils Utils ConfigUtils TransferDrainer ThriftTopologyUtils WorkerBackpressureThread DisruptorQueue WorkerBackpressureCallback DisruptorBackpressureCallback]) (:import [org.apache.storm.grouping LoadMapping]) (:import [org.apache.storm.messaging TransportFactory]) (:import [org.apache.storm.messaging TaskMessage IContext IConnection ConnectionWithStatus ConnectionWithStatus$Status]) @@ -121,7 +121,7 @@ (fast-map-iter [[short-executor pairs] grouped] (let [q (short-executor-receive-queue-map short-executor)] (if q - (disruptor/publish q pairs) + (.publish ^DisruptorQueue q pairs) (log-warn "Received invalid messages for unknown tasks. Dropping... ") ))))))) @@ -132,8 +132,8 @@ (defn- mk-backpressure-handler [executors] "make a handler that checks and updates worker's backpressure flag" - (disruptor/worker-backpressure-handler - (fn [worker] + (reify WorkerBackpressureCallback + (onEvent [this worker] (let [storm-id (:storm-id worker) assignment-id (:assignment-id worker) port (:port worker) @@ -152,11 +152,11 @@ (defn- mk-disruptor-backpressure-handler [worker] "make a handler for the worker's send disruptor queue to check highWaterMark and lowWaterMark for backpressure" - (disruptor/disruptor-backpressure-handler - (fn [] + (reify DisruptorBackpressureCallback + (highWaterMark [this] (reset! (:transfer-backpressure worker) true) (WorkerBackpressureThread/notifyBackpressureChecker (:backpressure-trigger worker))) - (fn [] + (lowWaterMark [this] (reset! (:transfer-backpressure worker) false) (WorkerBackpressureThread/notifyBackpressureChecker (:backpressure-trigger worker))))) @@ -188,7 +188,7 @@ ))))) (when (not (.isEmpty local)) (local-transfer local)) - (when (not (.isEmpty remoteMap)) (disruptor/publish transfer-queue remoteMap))))] + (when (not (.isEmpty remoteMap)) (.publish ^DisruptorQueue transfer-queue remoteMap))))] (if try-serialize-local (do (log-warn "WILL TRY TO SERIALIZE ALL TUPLES (Turn off " TOPOLOGY-TESTING-ALWAYS-TRY-SERIALIZE " for production)") @@ -200,11 +200,11 @@ (defn- mk-receive-queue-map [storm-conf executors] (->> executors ;; TODO: this depends on the type of executor - (map (fn [e] [e (disruptor/disruptor-queue (str "receive-queue" e) - (storm-conf TOPOLOGY-EXECUTOR-RECEIVE-BUFFER-SIZE) - (storm-conf TOPOLOGY-DISRUPTOR-WAIT-TIMEOUT-MILLIS) - :batch-size (storm-conf TOPOLOGY-DISRUPTOR-BATCH-SIZE) - :batch-timeout (storm-conf TOPOLOGY-DISRUPTOR-BATCH-TIMEOUT-MILLIS))])) + (map (fn [e] [e (DisruptorQueue. (str "receive-queue" e) + (storm-conf TOPOLOGY-EXECUTOR-RECEIVE-BUFFER-SIZE) + (storm-conf TOPOLOGY-DISRUPTOR-WAIT-TIMEOUT-MILLIS) + (storm-conf TOPOLOGY-DISRUPTOR-BATCH-SIZE) + (storm-conf TOPOLOGY-DISRUPTOR-BATCH-TIMEOUT-MILLIS))])) (into {}) )) @@ -244,10 +244,11 @@ (defn worker-data [conf mq-context storm-id assignment-id port worker-id storm-conf cluster-state storm-cluster-state] (let [assignment-versions (atom {}) executors (set (read-worker-executors storm-conf storm-cluster-state storm-id assignment-id port assignment-versions)) - transfer-queue (disruptor/disruptor-queue "worker-transfer-queue" (storm-conf TOPOLOGY-TRANSFER-BUFFER-SIZE) + transfer-queue (DisruptorQueue. "worker-transfer-queue" + (storm-conf TOPOLOGY-TRANSFER-BUFFER-SIZE) (storm-conf TOPOLOGY-DISRUPTOR-WAIT-TIMEOUT-MILLIS) - :batch-size (storm-conf TOPOLOGY-DISRUPTOR-BATCH-SIZE) - :batch-timeout (storm-conf TOPOLOGY-DISRUPTOR-BATCH-TIMEOUT-MILLIS)) + (storm-conf TOPOLOGY-DISRUPTOR-BATCH-SIZE) + (storm-conf TOPOLOGY-DISRUPTOR-BATCH-TIMEOUT-MILLIS)) executor-receive-queue-map (mk-receive-queue-map storm-conf executors) receive-queue-map (->> executor-receive-queue-map @@ -412,21 +413,20 @@ ;; TODO: consider having a max batch size besides what disruptor does automagically to prevent latency issues (defn mk-transfer-tuples-handler [worker] - (let [^DisruptorQueue transfer-queue (:transfer-queue worker) + (let [^DisruptorQueue transfer-queue (:transfer-queue worker) drainer (TransferDrainer.) node+port->socket (:cached-node+port->socket worker) task->node+port (:cached-task->node+port worker) endpoint-socket-lock (:endpoint-socket-lock worker) ] - (disruptor/clojure-handler - (fn [packets _ batch-end?] + (reify com.lmax.disruptor.EventHandler + (onEvent [this packets seqId batch-end?] (.add drainer packets) - (when batch-end? (read-locked endpoint-socket-lock - (let [node+port->socket @node+port->socket - task->node+port @task->node+port] - (.send drainer task->node+port node+port->socket))) + (let [node+port->socket @node+port->socket + task->node+port @task->node+port] + (.send drainer task->node+port node+port->socket))) (.clear drainer)))))) ;; Check whether this messaging connection is ready to send data @@ -664,7 +664,7 @@ ;;in which case it's a noop (.term ^IContext (:mq-context worker)) (log-message "Shutting down transfer thread") - (disruptor/halt-with-interrupt! (:transfer-queue worker)) + (.haltWithInterrupt ^DisruptorQueue (:transfer-queue worker)) (.interrupt transfer-thread) (.join transfer-thread) http://git-wip-us.apache.org/repos/asf/storm/blob/c7e66894/storm-core/src/clj/org/apache/storm/disruptor.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/disruptor.clj b/storm-core/src/clj/org/apache/storm/disruptor.clj index 1546b3f..78b16dc 100644 --- a/storm-core/src/clj/org/apache/storm/disruptor.clj +++ b/storm-core/src/clj/org/apache/storm/disruptor.clj @@ -22,68 +22,15 @@ (:use [clojure walk]) (:use [org.apache.storm util log])) -(def PRODUCER-TYPE - {:multi-threaded ProducerType/MULTI - :single-threaded ProducerType/SINGLE}) -(defnk disruptor-queue - [^String queue-name buffer-size timeout :producer-type :multi-threaded :batch-size 100 :batch-timeout 1] - (DisruptorQueue. queue-name - (PRODUCER-TYPE producer-type) buffer-size - timeout batch-size batch-timeout)) -(defn clojure-handler - [afn] - (reify com.lmax.disruptor.EventHandler - (onEvent - [this o seq-id batchEnd?] - (afn o seq-id batchEnd?)))) -(defn disruptor-backpressure-handler - [afn-high-wm afn-low-wm] - (reify DisruptorBackpressureCallback - (highWaterMark - [this] - (afn-high-wm)) - (lowWaterMark - [this] - (afn-low-wm)))) - -(defn worker-backpressure-handler - [afn] - (reify WorkerBackpressureCallback - (onEvent - [this o] - (afn o)))) - -(defmacro handler - [& args] - `(clojure-handler (fn ~@args))) - -(defn publish - [^DisruptorQueue q o] - (.publish q o)) - -(defn consume-batch - [^DisruptorQueue queue handler] - (.consumeBatch queue handler)) - -(defn consume-batch-when-available - [^DisruptorQueue queue handler] - (.consumeBatchWhenAvailable queue handler)) - -(defn halt-with-interrupt! - [^DisruptorQueue queue] - (.haltWithInterrupt queue)) (defnk consume-loop* [^DisruptorQueue queue handler :kill-fn (fn [error] (exit-process! 1 "Async loop died!"))] (async-loop - (fn [] (consume-batch-when-available queue handler) 0) + (fn [] (.consumeBatchWhenAvailable ^DisruptorQueue queue handler) 0) :kill-fn kill-fn :thread-name (.getName queue))) -(defmacro consume-loop [queue & handler-args] - `(let [handler# (handler ~@handler-args)] - (consume-loop* ~queue handler#))) http://git-wip-us.apache.org/repos/asf/storm/blob/c7e66894/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java b/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java index 19aba06..4482297 100644 --- a/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java +++ b/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java @@ -30,6 +30,11 @@ import com.lmax.disruptor.TimeoutException; import com.lmax.disruptor.WaitStrategy; import com.lmax.disruptor.dsl.ProducerType; +import org.apache.storm.metric.api.IStatefulObject; +import org.apache.storm.metric.internal.RateTracker; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -46,12 +51,6 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReentrantLock; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.storm.metric.api.IStatefulObject; -import org.apache.storm.metric.internal.RateTracker; - /** * A single consumer queue that uses the LMAX Disruptor. They key to the performance is * the ability to catch up to the producer by processing tuples in batches. @@ -381,6 +380,10 @@ public class DisruptorQueue implements IStatefulObject { _flusher.start(); } + public DisruptorQueue(String queueName, int size, long readTimeout, int inputBatchSize, long flushInterval) { + this(queueName, ProducerType.MULTI, size, readTimeout, inputBatchSize, flushInterval); + } + public String getName() { return _queueName; }
