Sean Zhong created STORM-342:
--------------------------------

             Summary: Contention in Disruptor Queue which may cause message out 
of order
                 Key: STORM-342
                 URL: https://issues.apache.org/jira/browse/STORM-342
             Project: Apache Storm (Incubating)
          Issue Type: Bug
            Reporter: Sean Zhong
            Priority: Critical


Disruptor contains a potential contention bug between consumer and producer:


class Disruptor {
...
    public void publish(Object obj, boolean block) throws 
InsufficientCapacityException {
        if(consumerStartedFlag) {
            final long id;
            if(block) {
                id = _buffer.next();
            } else {
                id = _buffer.tryNext(1);
            }
            final MutableObject m = _buffer.get(id);
            m.setObject(obj);
            _buffer.publish(id);
        } else {
            _cache.add(obj);
            if(consumerStartedFlag) flushCache();
        }
    }
    
    public void consumerStarted() {
        if(!consumerStartedFlag) {
            consumerStartedFlag = true;
            flushCache();
        }
    }
}


Suppose the following sequence to reproduce this bug:
1. consumer not started.
2. producer in another thread publish message "1", as "consumerStartedFlag" == 
false, it will publish it into cache.
3. consumerStarted() is called. consumerStartedFlag is set to true, but 
flushCache() is not called yet.
4. producer in another thread publish message "2", as "consumerStartedFlag" == 
true now, it will publish directly in RingBuffer.
5. flushCache() is called in consumerStarted() 
6. FLUSH_CACHE object is published RingBuffer, it will mark the position of 
message "1" in RingBuffer.
7. consume() is called, it will first fecth "2", then "1"
8. message order is wrong!

I found this after troubleshooting a tricky random failure(1 in 100 times). It 
usually happen when producer and consumer colocated in same process, for 
example,  the task send queue thread as producer, produce message to local task 
receive queue in same worker.







--
This message was sent by Atlassian JIRA
(v6.2#6252)

Reply via email to