[ 
https://issues.apache.org/jira/browse/STORM-342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14025212#comment-14025212
 ] 

Sean Zhong commented on STORM-342:
----------------------------------

why cache is introduced in Disruptor queue? The logic for 
SingleThreadedClaimStrategy is different with MultiThreadedClaimStrategy.

For SingleThreadedClaimStrategy, it will mark consumerStartedFlag directly.

In Disprutor,  
{code:title=Disruptor.java|borderStyle=solid}
    public DisruptorQueue(String queueName, ClaimStrategy claim, WaitStrategy 
wait) {
         this._queueName = PREFIX + queueName;
         
         LOG.info("========Disruptor queueName" +  _queueName + ", " + 
claim.getClass().toString() + ", " + wait.getClass().toString());
         
        _buffer = new RingBuffer<MutableObject>(new ObjectEventFactory(), 
claim, wait);
        _consumer = new Sequence();
        _barrier = _buffer.newBarrier();
        _buffer.setGatingSequences(_consumer);
        if(claim instanceof SingleThreadedClaimStrategy) {
            consumerStartedFlag = true;
        }
    }
{code}

> Contention in Disruptor Queue which may cause message out of order
> ------------------------------------------------------------------
>
>                 Key: STORM-342
>                 URL: https://issues.apache.org/jira/browse/STORM-342
>             Project: Apache Storm (Incubating)
>          Issue Type: Bug
>            Reporter: Sean Zhong
>            Priority: Critical
>
> Disruptor contains a potential contention bug between consumer and producer:
> {code:title=Disruptor.java|borderStyle=solid}
> class Disruptor {
> ...
>     public void publish(Object obj, boolean block) throws 
> InsufficientCapacityException {
>         if(consumerStartedFlag) {
>             final long id;
>             if(block) {
>                 id = _buffer.next();
>             } else {
>                 id = _buffer.tryNext(1);
>             }
>             final MutableObject m = _buffer.get(id);
>             m.setObject(obj);
>             _buffer.publish(id);
>         } else {
>             _cache.add(obj);
>             if(consumerStartedFlag) flushCache();
>         }
>     }
>     
>     public void consumerStarted() {
>         if(!consumerStartedFlag) {
>             consumerStartedFlag = true;
>             flushCache();
>         }
>     }
> }
> {code}
> Suppose the following sequence to reproduce this bug:
> 1. consumer not started.
> 2. producer in another thread publish message "1", as "consumerStartedFlag" 
> == false, it will publish it into cache.
> 3. consumerStarted() is called. consumerStartedFlag is set to true, but 
> flushCache() is not called yet.
> 4. producer in another thread publish message "2", as "consumerStartedFlag" 
> == true now, it will publish directly in RingBuffer.
> 5. flushCache() is called in consumerStarted() 
> 6. FLUSH_CACHE object is published RingBuffer, it will mark the position of 
> message "1" in RingBuffer.
> 7. consume() is called, it will first fecth "2", then "1"
> 8. message order is wrong!
> I found this after troubleshooting a tricky random failure(1 in 100 times). 
> It usually happen when producer and consumer colocated in same process, for 
> example,  the task send queue thread as producer, produce message to local 
> task receive queue in same worker.



--
This message was sent by Atlassian JIRA
(v6.2#6252)

Reply via email to