Github user d2r commented on a diff in the pull request:

    https://github.com/apache/incubator-storm/pull/134#discussion_r13994933
  
    --- Diff: storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java ---
    @@ -41,46 +37,47 @@
      * the ability to catch up to the producer by processing tuples in batches.
      */
     public class DisruptorQueue implements IStatefulObject {
    -    static final Object FLUSH_CACHE = new Object();
    -    static final Object INTERRUPT = new Object();
    -    
    -    RingBuffer<MutableObject> _buffer;
    -    Sequence _consumer;
    -    SequenceBarrier _barrier;
    -    
    +    private static final Object FLUSH_CACHE = new Object();
    +    private static final Object INTERRUPT = new Object();
    +    private static final String PREFIX = "disruptor-";
    +
    +    private final ConcurrentLinkedQueue<Object> _cache = new 
ConcurrentLinkedQueue<Object>();
    +    private final HashMap<String, Object> state = new HashMap<String, 
Object>(4);
    +
    +    private final String _queueName;
    +    private final RingBuffer<MutableObject> _buffer;
    +    private final Sequence _consumer;
    +    private final SequenceBarrier _barrier;
    +
         // TODO: consider having a threadlocal cache of this variable to speed 
up reads?
         volatile boolean consumerStartedFlag = false;
    -    ConcurrentLinkedQueue<Object> _cache = new ConcurrentLinkedQueue();
    -    private static String PREFIX = "disruptor-";
    -    private String _queueName = "";
    -    
    -    public DisruptorQueue(String queueName, ClaimStrategy claim, 
WaitStrategy wait) {
    -         this._queueName = PREFIX + queueName;
    -        _buffer = new RingBuffer<MutableObject>(new ObjectEventFactory(), 
claim, wait);
    +
    +    public DisruptorQueue(String queueName, ProducerType producerType, int 
bufferSize, WaitStrategy wait) {
    +        _buffer = RingBuffer.create(producerType, new 
ObjectEventFactory(), bufferSize, wait);
    +        _queueName = PREFIX + queueName;
             _consumer = new Sequence();
             _barrier = _buffer.newBarrier();
    -        _buffer.setGatingSequences(_consumer);
    -        if(claim instanceof SingleThreadedClaimStrategy) {
    -            consumerStartedFlag = true;
    -        }
    +        _buffer.addGatingSequences(_consumer);
    +        consumerStartedFlag = producerType == ProducerType.SINGLE;
         }
    -    
    +
         public String getName() {
    -      return _queueName;
    +        return _queueName;
         }
    -    
    +
    +
         public void consumeBatch(EventHandler<Object> handler) {
             consumeBatchToCursor(_barrier.getCursor(), handler);
         }
    -    
    +
         public void haltWithInterrupt() {
             publish(INTERRUPT);
         }
    -    
    +
         public void consumeBatchWhenAvailable(EventHandler<Object> handler) {
             try {
                 final long nextSequence = _consumer.get() + 1;
    -            final long availableSequence = _barrier.waitFor(nextSequence, 
10, TimeUnit.MILLISECONDS);
    +            final long availableSequence = _barrier.waitFor(nextSequence);
    --- End diff --
    
    Prior to this change we defaulted to the BlockingWaitStrategy.  Now that 
timeouts are a part of the strategy, should we make a change to default to a 
TimeoutBlockingWaitStrategy with a timeout of 10ms?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to