[ 
https://issues.apache.org/jira/browse/STORM-342?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sean Zhong updated STORM-342:
-----------------------------

    Summary: Contention in Disruptor Queue which may cause message loss or out 
of order  (was: Contention in Disruptor Queue which may cause message out of 
order)

> Contention in Disruptor Queue which may cause message loss or out of order
> --------------------------------------------------------------------------
>
>                 Key: STORM-342
>                 URL: https://issues.apache.org/jira/browse/STORM-342
>             Project: Apache Storm (Incubating)
>          Issue Type: Bug
>            Reporter: Sean Zhong
>            Priority: Blocker
>
> Disruptor contains a potential contention bug between consumer and producer. 
> It can cause consume queue hang, message loss, or message disorder.
> {code:title=Disruptor.java|borderStyle=solid}
> class Disruptor {
> ...
>     public void publish(Object obj, boolean block) throws 
> InsufficientCapacityException {
>         if(consumerStartedFlag) {
>             final long id;
>             if(block) {
>                 id = _buffer.next();
>             } else {
>                 id = _buffer.tryNext(1);
>             }
>             final MutableObject m = _buffer.get(id);
>             m.setObject(obj);
>             _buffer.publish(id);
>         } else {
>             _cache.add(obj);
>             if(consumerStartedFlag) flushCache();
>         }
>     }
>     
>     public void consumerStarted() {
>         if(!consumerStartedFlag) {
>             consumerStartedFlag = true;
>             flushCache();
>         }
>     }
> }
> {code}
> The following steps will describe the scenario which make message disorder:
> 1. consumer not started.
> 2. producer in another thread publish message "1", as "consumerStartedFlag" 
> == false, it will publish it into cache.
> 3. consumerStarted() is called. consumerStartedFlag is set to true, but 
> flushCache() is not called yet.
> 4. producer in another thread publish message "2", as "consumerStartedFlag" 
> == true now, it will publish directly in RingBuffer.
> 5. flushCache() is called in consumerStarted() 
> 6. FLUSH_CACHE object is published RingBuffer, it will mark the position of 
> message "1" in RingBuffer.
> 7. consume() is called, it will first fecth "2", then "1"
> 8. message order is wrong!
> The following steps describe the scenario which make message loss, and 
> consumer thread hang forever.
> 1. consumer not started.
> 2. producer in another thread publish message "1", as "consumerStartedFlag" 
> == false, it will publish it into cache.
> 3. consumerStarted() is called. consumerStartedFlag is set to true, but 
> flushCache() is not called yet.
> 4. producer in another thread publish multiple messages "2", as 
> "consumerStartedFlag" == true now, it will publish directly in RingBuffer. 
> And then the RingBuffer is full.
> 5. flushCache() is called in consumerStarted() 
> 6. FLUSH_CACHE object is published RingBuffer, As now RingBuffer is full, the 
> consumer thread will be blocked.
> 7. consume() is never called, so the consumer thread is always blocked.
> I found this after troubleshooting a tricky random failure(1 in 100 times). 
> It usually happen when producer and consumer colocated in same process, for 
> example,  the task send queue thread as producer, produce message to local 
> task receive queue in same worker.



--
This message was sent by Atlassian JIRA
(v6.2#6252)

Reply via email to