[
https://issues.apache.org/jira/browse/STORM-342?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Robert Joseph Evans updated STORM-342:
--------------------------------------
Assignee: Sean Zhong
> Contention in Disruptor Queue which may cause message loss or out of order
> --------------------------------------------------------------------------
>
> Key: STORM-342
> URL: https://issues.apache.org/jira/browse/STORM-342
> Project: Apache Storm (Incubating)
> Issue Type: Bug
> Reporter: Sean Zhong
> Assignee: Sean Zhong
> Priority: Blocker
> Fix For: 0.9.2-incubating
>
>
> h2. STORM-342: Message loss, executor hang, or message disorder
> Disruptor helper class contains a potential contention bug between consumer
> and producer. It can cause consume queue hang, message loss, or message
> disorder.
> {code:title=Disruptor.java|borderStyle=solid}
> class Disruptor {
> ...
> public void publish(Object obj, boolean block) throws
> InsufficientCapacityException {
> if(consumerStartedFlag) {
> final long id;
> if(block) {
> id = _buffer.next();
> } else {
> id = _buffer.tryNext(1);
> }
> final MutableObject m = _buffer.get(id);
> m.setObject(obj);
> _buffer.publish(id);
> } else {
> _cache.add(obj);
> if(consumerStartedFlag) flushCache();
> }
> }
> public void consumerStarted() {
> if(!consumerStartedFlag) {
> consumerStartedFlag = true;
> flushCache();
> }
> }
> }
> {code}
> Consumer
> {code:title=Task Executor Thread|borderStyle=solid}
> (disruptor/consumer-started! receive-queue)
> (fn []
> (disruptor/consume-batch-when-available receive-queue event-handler)
> {code}
> h3. Howto: Executor Hang, message loss:
> 1. [Consumer Thread] consumer not started.
> 2. [Producer A Thread] publish message "1", as "consumerStartedFlag" ==
> false, it will be added it into cache.
> 3. [Consumer Thread] consumerStarted() is called. consumerStartedFlag is set
> to true, but flushCache() is not called yet.
> 4. As "consumerStartedFlag" is true now, new produced message will be
> published to RingBuffer.
> 5. [Producer B Thread] generates enough message, and make RingBuffer full.
> 6. [Consumer Thread] flushCache() is called in consumerStarted()
> 7. [Consumer Thread] FLUSH_CACHE object is published RingBuffer in blocking
> way, As now RingBuffer is full, the consumer thread will be blocked.
> 8. [Consumer Thread] consumeBatch() will never called, so the RingBuffer is
> always full, and the consumer thread is always blocked.
> h3. Howto: Message Disorder
> 1. [Consumer Thread] consumer not started.
> 2. [Producer A Thread] publish message "1", as "consumerStartedFlag" ==
> false, it will be added it into cache.
> 3. [Consumer Thread] consumerStarted() is called. consumerStartedFlag is set
> to true, but flushCache() is not called yet.
> 4. As "consumerStartedFlag" is true now, new produced message will be
> published to RingBuffer.
> 5. [Producer A Thread] publish a new message "2", it will be published
> directly in RingBuffer.
> 6. [Consumer Thread] flushCache() is called in consumerStarted()
> 7. [Consumer Thread] FLUSH_CACHE message is published RingBuffer, FLUSH_CACHE
> message is written after message "2".
> 8. [Consumer Thread] consumeBatch() is called, first it picks "2", then it
> picks FLUSH_CACHE, will represents "1"
> 9. We produce in Producer A Thread in order "1", "2", but we received in
> consumer thread "2", "1"
> 10. Message order is wrong.
> I found this after troubleshooting a tricky random failure(1 in 100 times).
> It usually happen when producer and consumer colocated in same process, for
> example, the task send queue thread as producer, produce message to local
> task receive queue in same worker.
--
This message was sent by Atlassian JIRA
(v6.2#6252)