Repository: storm
Updated Branches:
  refs/heads/master 12ceb0975 -> a759db38d


STORM-1272: port backtype.storm.disruptor to java


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/c7e66894
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/c7e66894
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/c7e66894

Branch: refs/heads/master
Commit: c7e66894d9401a1a9363d3ce55e31ceedc6ac38f
Parents: 5f1cba5
Author: Abhishek Agarwal <[email protected]>
Authored: Thu Feb 11 00:16:18 2016 +0530
Committer: Abhishek Agarwal <[email protected]>
Committed: Thu Feb 11 18:35:56 2016 +0530

----------------------------------------------------------------------
 .../clj/org/apache/storm/daemon/executor.clj    | 52 +++++++++---------
 .../src/clj/org/apache/storm/daemon/worker.clj  | 48 ++++++++---------
 .../src/clj/org/apache/storm/disruptor.clj      | 55 +-------------------
 .../org/apache/storm/utils/DisruptorQueue.java  | 15 +++---
 4 files changed, 61 insertions(+), 109 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/c7e66894/storm-core/src/clj/org/apache/storm/daemon/executor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/executor.clj 
b/storm-core/src/clj/org/apache/storm/daemon/executor.clj
index ab0c8aa..08bb2e6 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/executor.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/executor.clj
@@ -28,7 +28,7 @@
   (:import [org.apache.storm.grouping CustomStreamGrouping])
   (:import [org.apache.storm.task WorkerTopologyContext IBolt OutputCollector 
IOutputCollector])
   (:import [org.apache.storm.generated GlobalStreamId])
-  (:import [org.apache.storm.utils Utils ConfigUtils TupleUtils MutableObject 
RotatingMap RotatingMap$ExpiredCallback MutableLong Time DisruptorQueue 
WorkerBackpressureThread])
+  (:import [org.apache.storm.utils Utils ConfigUtils TupleUtils MutableObject 
RotatingMap RotatingMap$ExpiredCallback MutableLong Time DisruptorQueue 
WorkerBackpressureThread DisruptorBackpressureCallback])
   (:import [com.lmax.disruptor InsufficientCapacityException])
   (:import [org.apache.storm.serialization KryoTupleSerializer])
   (:import [org.apache.storm.daemon Shutdownable])
@@ -36,7 +36,8 @@
   (:import [org.apache.storm Config Constants])
   (:import [org.apache.storm.cluster ClusterStateContext DaemonType])
   (:import [org.apache.storm.grouping LoadAwareCustomStreamGrouping 
LoadAwareShuffleGrouping LoadMapping ShuffleGrouping])
-  (:import [java.util.concurrent ConcurrentLinkedQueue])
+  (:import [java.util.concurrent ConcurrentLinkedQueue]
+           (com.lmax.disruptor.dsl ProducerType))
   (:require [org.apache.storm [thrift :as thrift]
              [cluster :as cluster] [disruptor :as disruptor] [stats :as 
stats]])
   (:require [org.apache.storm.daemon [task :as task]])
@@ -219,7 +220,7 @@
     (let [val (AddressedTuple. task tuple)]
       (when (= true (storm-conf TOPOLOGY-DEBUG))
         (log-message "TRANSFERING tuple " val))
-      (disruptor/publish batch-transfer->worker val))))
+        (.publish ^DisruptorQueue batch-transfer->worker val) )))
 
 (defn mk-executor-data [worker executor-id]
   (let [worker-context (worker-context worker)
@@ -227,13 +228,13 @@
         component-id (.getComponentId worker-context (first task-ids))
         storm-conf (normalized-component-conf (:storm-conf worker) 
worker-context component-id)
         executor-type (executor-type worker-context component-id)
-        batch-transfer->worker (disruptor/disruptor-queue
+        batch-transfer->worker (DisruptorQueue.
                                   (str "executor"  executor-id "-send-queue")
+                                  ProducerType/SINGLE
                                   (storm-conf 
TOPOLOGY-EXECUTOR-SEND-BUFFER-SIZE)
                                   (storm-conf 
TOPOLOGY-DISRUPTOR-WAIT-TIMEOUT-MILLIS)
-                                  :producer-type :single-threaded
-                                  :batch-size (storm-conf 
TOPOLOGY-DISRUPTOR-BATCH-SIZE)
-                                  :batch-timeout (storm-conf 
TOPOLOGY-DISRUPTOR-BATCH-TIMEOUT-MILLIS))
+                                  (storm-conf TOPOLOGY-DISRUPTOR-BATCH-SIZE)
+                                  (storm-conf 
TOPOLOGY-DISRUPTOR-BATCH-TIMEOUT-MILLIS))
         ]
     (recursive-map
      :worker worker
@@ -280,14 +281,14 @@
 (defn- mk-disruptor-backpressure-handler [executor-data]
   "make a handler for the executor's receive disruptor queue to
   check highWaterMark and lowWaterMark for backpressure"
-  (disruptor/disruptor-backpressure-handler
-    (fn []
+  (reify DisruptorBackpressureCallback
+    (highWaterMark [this]
       "When receive queue is above highWaterMark"
       (if (not @(:backpressure executor-data))
         (do (reset! (:backpressure executor-data) true)
             (log-debug "executor " (:executor-id executor-data) " is 
congested, set backpressure flag true")
             (WorkerBackpressureThread/notifyBackpressureChecker 
(:backpressure-trigger (:worker executor-data))))))
-    (fn []
+    (lowWaterMark [this]
       "When receive queue is below lowWaterMark"
       (if @(:backpressure executor-data)
         (do (reset! (:backpressure executor-data) false)
@@ -302,12 +303,13 @@
         ]
     (disruptor/consume-loop*
       (:batch-transfer-queue executor-data)
-      (disruptor/handler [o seq-id batch-end?]
-        (let [^ArrayList alist (.getObject cached-emit)]
-          (.add alist o)
-          (when batch-end?
-            (worker-transfer-fn serializer alist)
-            (.setObject cached-emit (ArrayList.)))))
+      (reify com.lmax.disruptor.EventHandler
+        (onEvent [this o seq-id batch-end?]
+          (let [^ArrayList alist (.getObject cached-emit)]
+            (.add alist o)
+            (when batch-end?
+              (worker-transfer-fn serializer alist)
+              (.setObject cached-emit (ArrayList.))))))
       :kill-fn (:report-error-and-die executor-data))))
 
 (defn setup-metrics! [executor-data]
@@ -320,7 +322,7 @@
        interval
        (fn []
          (let [val [(AddressedTuple. AddressedTuple/BROADCAST_DEST (TupleImpl. 
worker-context [interval] Constants/SYSTEM_TASK_ID 
Constants/METRICS_TICK_STREAM_ID))]]
-           (disruptor/publish receive-queue val)))))))
+           (.publish ^DisruptorQueue receive-queue val)))))))
 
 (defn metrics-tick
   [executor-data task-data ^TupleImpl tuple]
@@ -361,7 +363,7 @@
           tick-time-secs
           (fn []
             (let [val [(AddressedTuple. AddressedTuple/BROADCAST_DEST 
(TupleImpl. context [tick-time-secs] Constants/SYSTEM_TASK_ID 
Constants/SYSTEM_TICK_STREAM_ID))]]
-              (disruptor/publish receive-queue val))))))))
+              (.publish ^DisruptorQueue receive-queue val))))))))
 
 (defn mk-executor [worker executor-id initial-credentials]
   (let [executor-data (mk-executor-data worker executor-id)
@@ -403,15 +405,15 @@
         (let [receive-queue (:receive-queue executor-data)
               context (:worker-context executor-data)
               val [(AddressedTuple. AddressedTuple/BROADCAST_DEST (TupleImpl. 
context [creds] Constants/SYSTEM_TASK_ID 
Constants/CREDENTIALS_CHANGED_STREAM_ID))]]
-          (disruptor/publish receive-queue val)))
+          (.publish ^DisruptorQueue receive-queue val)))
       (get-backpressure-flag [this]
         @(:backpressure executor-data))
       Shutdownable
       (shutdown
         [this]
         (log-message "Shutting down executor " component-id ":" (pr-str 
executor-id))
-        (disruptor/halt-with-interrupt! (:receive-queue executor-data))
-        (disruptor/halt-with-interrupt! (:batch-transfer-queue executor-data))
+        (.haltWithInterrupt ^DisruptorQueue (:receive-queue executor-data))
+        (.haltWithInterrupt ^DisruptorQueue (:batch-transfer-queue 
executor-data))
         (doseq [t threads]
           (.interrupt t)
           (.join t))
@@ -453,8 +455,8 @@
   (let [task-ids (:task-ids executor-data)
         debug? (= true (-> executor-data :storm-conf (get TOPOLOGY-DEBUG)))
         ]
-    (disruptor/clojure-handler
-      (fn [tuple-batch sequence-id end-of-batch?]
+    (reify com.lmax.disruptor.EventHandler
+      (onEvent [this tuple-batch sequence-id end-of-batch?]
         (fast-list-iter [^AddressedTuple addressed-tuple tuple-batch]
           (let [^TupleImpl tuple (.getTuple addressed-tuple)
                 task-id (.getDest addressed-tuple)]
@@ -618,7 +620,7 @@
         
         (fn []
           ;; This design requires that spouts be non-blocking
-          (disruptor/consume-batch receive-queue event-handler)
+          (.consumeBatch ^DisruptorQueue receive-queue event-handler)
           
           (let [active? @(:storm-active-atom executor-data)
                 curr-count (.get emitted-count)
@@ -838,7 +840,7 @@
         (let [receive-queue (:receive-queue executor-data)
               event-handler (mk-task-receiver executor-data tuple-action-fn)]
           (fn []            
-            (disruptor/consume-batch-when-available receive-queue 
event-handler)
+            (.consumeBatchWhenAvailable ^DisruptorQueue receive-queue 
event-handler)
             0)))
       :kill-fn (:report-error-and-die executor-data)
       :factory? true

http://git-wip-us.apache.org/repos/asf/storm/blob/c7e66894/storm-core/src/clj/org/apache/storm/daemon/worker.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/worker.clj 
b/storm-core/src/clj/org/apache/storm/daemon/worker.clj
index 48934f6..bfece6a 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/worker.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/worker.clj
@@ -25,7 +25,7 @@
   (:import [java.util.concurrent Executors]
            [org.apache.storm.hooks IWorkerHook BaseWorkerHook])
   (:import [java.util ArrayList HashMap])
-  (:import [org.apache.storm.utils Utils ConfigUtils TransferDrainer 
ThriftTopologyUtils WorkerBackpressureThread DisruptorQueue])
+  (:import [org.apache.storm.utils Utils ConfigUtils TransferDrainer 
ThriftTopologyUtils WorkerBackpressureThread DisruptorQueue 
WorkerBackpressureCallback DisruptorBackpressureCallback])
   (:import [org.apache.storm.grouping LoadMapping])
   (:import [org.apache.storm.messaging TransportFactory])
   (:import [org.apache.storm.messaging TaskMessage IContext IConnection 
ConnectionWithStatus ConnectionWithStatus$Status])
@@ -121,7 +121,7 @@
         (fast-map-iter [[short-executor pairs] grouped]
           (let [q (short-executor-receive-queue-map short-executor)]
             (if q
-              (disruptor/publish q pairs)
+              (.publish ^DisruptorQueue q pairs)
               (log-warn "Received invalid messages for unknown tasks. 
Dropping... ")
               )))))))
 
@@ -132,8 +132,8 @@
 
 (defn- mk-backpressure-handler [executors]
   "make a handler that checks and updates worker's backpressure flag"
-  (disruptor/worker-backpressure-handler
-    (fn [worker]
+  (reify WorkerBackpressureCallback
+    (onEvent [this worker]
       (let [storm-id (:storm-id worker)
             assignment-id (:assignment-id worker)
             port (:port worker)
@@ -152,11 +152,11 @@
 (defn- mk-disruptor-backpressure-handler [worker]
   "make a handler for the worker's send disruptor queue to
   check highWaterMark and lowWaterMark for backpressure"
-  (disruptor/disruptor-backpressure-handler
-    (fn []
+  (reify DisruptorBackpressureCallback
+    (highWaterMark [this]
       (reset! (:transfer-backpressure worker) true)
       (WorkerBackpressureThread/notifyBackpressureChecker 
(:backpressure-trigger worker)))
-    (fn []
+    (lowWaterMark [this]
       (reset! (:transfer-backpressure worker) false)
       (WorkerBackpressureThread/notifyBackpressureChecker 
(:backpressure-trigger worker)))))
 
@@ -188,7 +188,7 @@
                        )))))
 
               (when (not (.isEmpty local)) (local-transfer local))
-              (when (not (.isEmpty remoteMap)) (disruptor/publish 
transfer-queue remoteMap))))]
+              (when (not (.isEmpty remoteMap)) (.publish ^DisruptorQueue 
transfer-queue remoteMap))))]
     (if try-serialize-local
       (do
         (log-warn "WILL TRY TO SERIALIZE ALL TUPLES (Turn off " 
TOPOLOGY-TESTING-ALWAYS-TRY-SERIALIZE " for production)")
@@ -200,11 +200,11 @@
 (defn- mk-receive-queue-map [storm-conf executors]
   (->> executors
        ;; TODO: this depends on the type of executor
-       (map (fn [e] [e (disruptor/disruptor-queue (str "receive-queue" e)
-                                                  (storm-conf 
TOPOLOGY-EXECUTOR-RECEIVE-BUFFER-SIZE)
-                                                  (storm-conf 
TOPOLOGY-DISRUPTOR-WAIT-TIMEOUT-MILLIS)
-                                                  :batch-size (storm-conf 
TOPOLOGY-DISRUPTOR-BATCH-SIZE)
-                                                  :batch-timeout (storm-conf 
TOPOLOGY-DISRUPTOR-BATCH-TIMEOUT-MILLIS))]))
+       (map (fn [e] [e (DisruptorQueue. (str "receive-queue" e)
+                                       (storm-conf 
TOPOLOGY-EXECUTOR-RECEIVE-BUFFER-SIZE)
+                                       (storm-conf 
TOPOLOGY-DISRUPTOR-WAIT-TIMEOUT-MILLIS)
+                                       (storm-conf 
TOPOLOGY-DISRUPTOR-BATCH-SIZE)
+                                       (storm-conf 
TOPOLOGY-DISRUPTOR-BATCH-TIMEOUT-MILLIS))]))
        (into {})
        ))
 
@@ -244,10 +244,11 @@
 (defn worker-data [conf mq-context storm-id assignment-id port worker-id 
storm-conf cluster-state storm-cluster-state]
   (let [assignment-versions (atom {})
         executors (set (read-worker-executors storm-conf storm-cluster-state 
storm-id assignment-id port assignment-versions))
-        transfer-queue (disruptor/disruptor-queue "worker-transfer-queue" 
(storm-conf TOPOLOGY-TRANSFER-BUFFER-SIZE)
+        transfer-queue (DisruptorQueue. "worker-transfer-queue"
+                                                  (storm-conf 
TOPOLOGY-TRANSFER-BUFFER-SIZE)
                                                   (storm-conf 
TOPOLOGY-DISRUPTOR-WAIT-TIMEOUT-MILLIS)
-                                                  :batch-size (storm-conf 
TOPOLOGY-DISRUPTOR-BATCH-SIZE)
-                                                  :batch-timeout (storm-conf 
TOPOLOGY-DISRUPTOR-BATCH-TIMEOUT-MILLIS))
+                                                  (storm-conf 
TOPOLOGY-DISRUPTOR-BATCH-SIZE)
+                                                  (storm-conf 
TOPOLOGY-DISRUPTOR-BATCH-TIMEOUT-MILLIS))
         executor-receive-queue-map (mk-receive-queue-map storm-conf executors)
 
         receive-queue-map (->> executor-receive-queue-map
@@ -412,21 +413,20 @@
 
 ;; TODO: consider having a max batch size besides what disruptor does 
automagically to prevent latency issues
 (defn mk-transfer-tuples-handler [worker]
-  (let [^DisruptorQueue transfer-queue (:transfer-queue worker)
+   (let [^DisruptorQueue transfer-queue (:transfer-queue worker)
         drainer (TransferDrainer.)
         node+port->socket (:cached-node+port->socket worker)
         task->node+port (:cached-task->node+port worker)
         endpoint-socket-lock (:endpoint-socket-lock worker)
         ]
-    (disruptor/clojure-handler
-      (fn [packets _ batch-end?]
+    (reify com.lmax.disruptor.EventHandler
+      (onEvent [this packets seqId batch-end?]
         (.add drainer packets)
-
         (when batch-end?
           (read-locked endpoint-socket-lock
-             (let [node+port->socket @node+port->socket
-                   task->node+port @task->node+port]
-               (.send drainer task->node+port node+port->socket)))
+                       (let [node+port->socket @node+port->socket
+                             task->node+port @task->node+port]
+                         (.send drainer task->node+port node+port->socket)))
           (.clear drainer))))))
 
 ;; Check whether this messaging connection is ready to send data
@@ -664,7 +664,7 @@
                     ;;in which case it's a noop
                     (.term ^IContext (:mq-context worker))
                     (log-message "Shutting down transfer thread")
-                    (disruptor/halt-with-interrupt! (:transfer-queue worker))
+                    (.haltWithInterrupt ^DisruptorQueue (:transfer-queue 
worker))
 
                     (.interrupt transfer-thread)
                     (.join transfer-thread)

http://git-wip-us.apache.org/repos/asf/storm/blob/c7e66894/storm-core/src/clj/org/apache/storm/disruptor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/disruptor.clj 
b/storm-core/src/clj/org/apache/storm/disruptor.clj
index 1546b3f..78b16dc 100644
--- a/storm-core/src/clj/org/apache/storm/disruptor.clj
+++ b/storm-core/src/clj/org/apache/storm/disruptor.clj
@@ -22,68 +22,15 @@
   (:use [clojure walk])
   (:use [org.apache.storm util log]))
 
-(def PRODUCER-TYPE
-  {:multi-threaded ProducerType/MULTI
-   :single-threaded ProducerType/SINGLE})
 
-(defnk disruptor-queue
-  [^String queue-name buffer-size timeout :producer-type :multi-threaded 
:batch-size 100 :batch-timeout 1]
-  (DisruptorQueue. queue-name
-                   (PRODUCER-TYPE producer-type) buffer-size
-                   timeout batch-size batch-timeout))
 
-(defn clojure-handler
-  [afn]
-  (reify com.lmax.disruptor.EventHandler
-    (onEvent
-      [this o seq-id batchEnd?]
-      (afn o seq-id batchEnd?))))
 
-(defn disruptor-backpressure-handler
-  [afn-high-wm afn-low-wm]
-  (reify DisruptorBackpressureCallback
-    (highWaterMark
-      [this]
-      (afn-high-wm))
-    (lowWaterMark
-      [this]
-      (afn-low-wm))))
-
-(defn worker-backpressure-handler
-  [afn]
-  (reify WorkerBackpressureCallback
-    (onEvent
-      [this o]
-      (afn o))))
-
-(defmacro handler
-  [& args]
-  `(clojure-handler (fn ~@args)))
-
-(defn publish
-  [^DisruptorQueue q o]
-  (.publish q o))
-
-(defn consume-batch
-  [^DisruptorQueue queue handler]
-  (.consumeBatch queue handler))
-
-(defn consume-batch-when-available
-  [^DisruptorQueue queue handler]
-  (.consumeBatchWhenAvailable queue handler))
-
-(defn halt-with-interrupt!
-  [^DisruptorQueue queue]
-  (.haltWithInterrupt queue))
 
 (defnk consume-loop*
   [^DisruptorQueue queue handler
    :kill-fn (fn [error] (exit-process! 1 "Async loop died!"))]
   (async-loop
-          (fn [] (consume-batch-when-available queue handler) 0)
+          (fn [] (.consumeBatchWhenAvailable ^DisruptorQueue queue handler) 0)
           :kill-fn kill-fn
           :thread-name (.getName queue)))
 
-(defmacro consume-loop [queue & handler-args]
-  `(let [handler# (handler ~@handler-args)]
-     (consume-loop* ~queue handler#)))

http://git-wip-us.apache.org/repos/asf/storm/blob/c7e66894/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java 
b/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java
index 19aba06..4482297 100644
--- a/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java
+++ b/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java
@@ -30,6 +30,11 @@ import com.lmax.disruptor.TimeoutException;
 import com.lmax.disruptor.WaitStrategy;
 import com.lmax.disruptor.dsl.ProducerType;
 
+import org.apache.storm.metric.api.IStatefulObject;
+import org.apache.storm.metric.internal.RateTracker;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
@@ -46,12 +51,6 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.ReentrantLock;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.storm.metric.api.IStatefulObject;
-import org.apache.storm.metric.internal.RateTracker;
-
 /**
  * A single consumer queue that uses the LMAX Disruptor. They key to the 
performance is
  * the ability to catch up to the producer by processing tuples in batches.
@@ -381,6 +380,10 @@ public class DisruptorQueue implements IStatefulObject {
         _flusher.start();
     }
 
+    public DisruptorQueue(String queueName,  int size, long readTimeout, int 
inputBatchSize, long flushInterval) {
+        this(queueName, ProducerType.MULTI, size, readTimeout, inputBatchSize, 
flushInterval);
+    }
+
     public String getName() {
         return _queueName;
     }

Reply via email to