[
https://issues.apache.org/jira/browse/STORM-342?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Sean Zhong updated STORM-342:
-----------------------------
Summary: Contention in Disruptor Queue which may cause message loss or out
of order (was: Contention in Disruptor Queue which may cause message out of
order)
> Contention in Disruptor Queue which may cause message loss or out of order
> --------------------------------------------------------------------------
>
> Key: STORM-342
> URL: https://issues.apache.org/jira/browse/STORM-342
> Project: Apache Storm (Incubating)
> Issue Type: Bug
> Reporter: Sean Zhong
> Priority: Blocker
>
> Disruptor contains a potential contention bug between consumer and producer.
> It can cause consume queue hang, message loss, or message disorder.
> {code:title=Disruptor.java|borderStyle=solid}
> class Disruptor {
> ...
> public void publish(Object obj, boolean block) throws
> InsufficientCapacityException {
> if(consumerStartedFlag) {
> final long id;
> if(block) {
> id = _buffer.next();
> } else {
> id = _buffer.tryNext(1);
> }
> final MutableObject m = _buffer.get(id);
> m.setObject(obj);
> _buffer.publish(id);
> } else {
> _cache.add(obj);
> if(consumerStartedFlag) flushCache();
> }
> }
>
> public void consumerStarted() {
> if(!consumerStartedFlag) {
> consumerStartedFlag = true;
> flushCache();
> }
> }
> }
> {code}
> The following steps will describe the scenario which make message disorder:
> 1. consumer not started.
> 2. producer in another thread publish message "1", as "consumerStartedFlag"
> == false, it will publish it into cache.
> 3. consumerStarted() is called. consumerStartedFlag is set to true, but
> flushCache() is not called yet.
> 4. producer in another thread publish message "2", as "consumerStartedFlag"
> == true now, it will publish directly in RingBuffer.
> 5. flushCache() is called in consumerStarted()
> 6. FLUSH_CACHE object is published RingBuffer, it will mark the position of
> message "1" in RingBuffer.
> 7. consume() is called, it will first fecth "2", then "1"
> 8. message order is wrong!
> The following steps describe the scenario which make message loss, and
> consumer thread hang forever.
> 1. consumer not started.
> 2. producer in another thread publish message "1", as "consumerStartedFlag"
> == false, it will publish it into cache.
> 3. consumerStarted() is called. consumerStartedFlag is set to true, but
> flushCache() is not called yet.
> 4. producer in another thread publish multiple messages "2", as
> "consumerStartedFlag" == true now, it will publish directly in RingBuffer.
> And then the RingBuffer is full.
> 5. flushCache() is called in consumerStarted()
> 6. FLUSH_CACHE object is published RingBuffer, As now RingBuffer is full, the
> consumer thread will be blocked.
> 7. consume() is never called, so the consumer thread is always blocked.
> I found this after troubleshooting a tricky random failure(1 in 100 times).
> It usually happen when producer and consumer colocated in same process, for
> example, the task send queue thread as producer, produce message to local
> task receive queue in same worker.
--
This message was sent by Atlassian JIRA
(v6.2#6252)