[ 
https://issues.apache.org/jira/browse/STORM-1272?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15141775#comment-15141775
 ] 

ASF GitHub Bot commented on STORM-1272:
---------------------------------------

Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1095#discussion_r52532575
  
    --- Diff: storm-core/src/clj/org/apache/storm/disruptor.clj ---
    @@ -15,75 +15,22 @@
     ;; limitations under the License.
     
     (ns org.apache.storm.disruptor
    -  (:import [org.apache.storm.utils DisruptorQueue 
WorkerBackpressureCallback DisruptorBackpressureCallback])
    +  (:import [org.apache.storm.utils DisruptorQueue 
WorkerBackpressureCallback DisruptorBackpressureCallback DisruptorUtils])
       (:import [com.lmax.disruptor.dsl ProducerType])
       (:require [clojure [string :as str]])
       (:require [clojure [set :as set]])
       (:use [clojure walk])
       (:use [org.apache.storm util log]))
     
    -(def PRODUCER-TYPE
    -  {:multi-threaded ProducerType/MULTI
    -   :single-threaded ProducerType/SINGLE})
     
    -(defnk disruptor-queue
    -  [^String queue-name buffer-size timeout :producer-type :multi-threaded 
:batch-size 100 :batch-timeout 1]
    -  (DisruptorQueue. queue-name
    -                   (PRODUCER-TYPE producer-type) buffer-size
    -                   timeout batch-size batch-timeout))
     
    -(defn clojure-handler
    -  [afn]
    -  (reify com.lmax.disruptor.EventHandler
    -    (onEvent
    -      [this o seq-id batchEnd?]
    -      (afn o seq-id batchEnd?))))
     
    -(defn disruptor-backpressure-handler
    -  [afn-high-wm afn-low-wm]
    -  (reify DisruptorBackpressureCallback
    -    (highWaterMark
    -      [this]
    -      (afn-high-wm))
    -    (lowWaterMark
    -      [this]
    -      (afn-low-wm))))
    -
    -(defn worker-backpressure-handler
    -  [afn]
    -  (reify WorkerBackpressureCallback
    -    (onEvent
    -      [this o]
    -      (afn o))))
    -
    -(defmacro handler
    -  [& args]
    -  `(clojure-handler (fn ~@args)))
    -
    -(defn publish
    -  [^DisruptorQueue q o]
    -  (.publish q o))
    -
    -(defn consume-batch
    -  [^DisruptorQueue queue handler]
    -  (.consumeBatch queue handler))
    -
    -(defn consume-batch-when-available
    -  [^DisruptorQueue queue handler]
    -  (.consumeBatchWhenAvailable queue handler))
    -
    -(defn halt-with-interrupt!
    -  [^DisruptorQueue queue]
    -  (.haltWithInterrupt queue))
     
     (defnk consume-loop*
    --- End diff --
    
    Once utils comes in and async-loop is in java, we should either inline 
this, or make a java version, and then we can delete this file entirely.


> port backtype.storm.disruptor to java
> -------------------------------------
>
>                 Key: STORM-1272
>                 URL: https://issues.apache.org/jira/browse/STORM-1272
>             Project: Apache Storm
>          Issue Type: New Feature
>          Components: storm-core
>            Reporter: Robert Joseph Evans
>            Assignee: Abhishek Agarwal
>              Labels: java-migration, jstorm-merger
>
> wrapper around the disruptor queue. Might need some abstract base classes if 
> it makes since.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to