[ 
https://issues.apache.org/jira/browse/STORM-342?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sean Zhong updated STORM-342:
-----------------------------

    Description: 
h2. STORM-342: Message loss, executor hang, or message disorder

Disruptor helper class contains a potential contention bug between consumer and 
producer. It can cause consume queue hang, message loss, or message disorder.

{code:title=Disruptor.java|borderStyle=solid}
class Disruptor {
...
    public void publish(Object obj, boolean block) throws 
InsufficientCapacityException {
        if(consumerStartedFlag) {
            final long id;
            if(block) {
                id = _buffer.next();
            } else {
                id = _buffer.tryNext(1);
            }
            final MutableObject m = _buffer.get(id);
            m.setObject(obj);
            _buffer.publish(id);
        } else {
            _cache.add(obj);
            if(consumerStartedFlag) flushCache();
        }
    }

    public void consumerStarted() {
        if(!consumerStartedFlag) {
            consumerStartedFlag = true;
            flushCache();
        }
    }
}
{code}


Consumer

{code:title=Task Executor Thread|borderStyle=solid}
  (disruptor/consumer-started! receive-queue)
  (fn []            
     (disruptor/consume-batch-when-available receive-queue event-handler)
{code}

h3. Howto: Executor Hang, message loss:

1. [Consumer Thread] consumer not started.
2. [Producer A Thread] publish message "1", as "consumerStartedFlag" == false, 
it will be added it into cache.
3. [Consumer Thread] consumerStarted() is called. consumerStartedFlag is set to 
true, but flushCache() is not called yet.
4. As "consumerStartedFlag" is true now, new produced message will be published 
to RingBuffer.
5. [Producer B Thread] generates enough message, and make RingBuffer full.
6. [Consumer Thread] flushCache() is called in consumerStarted()
7. [Consumer Thread] FLUSH_CACHE object is published RingBuffer in blocking 
way, As now RingBuffer is full, the consumer thread will be blocked.
8. [Consumer Thread] consumeBatch() will never called, so the RingBuffer is 
always full, and the consumer thread is always blocked.

h3. Howto: Message Disorder

1. [Consumer Thread] consumer not started.
2. [Producer A Thread] publish message "1", as "consumerStartedFlag" == false, 
it will be added it into cache.
3. [Consumer Thread] consumerStarted() is called. consumerStartedFlag is set to 
true, but flushCache() is not called yet.
4. As "consumerStartedFlag" is true now, new produced message will be published 
to RingBuffer.
5. [Producer A Thread] publish a new message "2", it will be published directly 
in RingBuffer.
6. [Consumer Thread] flushCache() is called in consumerStarted()
7. [Consumer Thread] FLUSH_CACHE message is published RingBuffer, FLUSH_CACHE 
message is written after message "2".
8. [Consumer Thread] consumeBatch() is called, first it picks "2", then it 
picks FLUSH_CACHE, will represents "1"
9. We produce in Producer A Thread in order "1", "2", but we received in 
consumer thread "2", "1"
10. Message order is wrong.

I found this after troubleshooting a tricky random failure(1 in 100 times). It 
usually happen when producer and consumer colocated in same process, for 
example,  the task send queue thread as producer, produce message to local task 
receive queue in same worker.


  was:
Disruptor contains a potential contention bug between consumer and producer. It 
can cause consume queue hang, message loss, or message disorder.

{code:title=Disruptor.java|borderStyle=solid}
class Disruptor {
...
    public void publish(Object obj, boolean block) throws 
InsufficientCapacityException {
        if(consumerStartedFlag) {
            final long id;
            if(block) {
                id = _buffer.next();
            } else {
                id = _buffer.tryNext(1);
            }
            final MutableObject m = _buffer.get(id);
            m.setObject(obj);
            _buffer.publish(id);
        } else {
            _cache.add(obj);
            if(consumerStartedFlag) flushCache();
        }
    }
    
    public void consumerStarted() {
        if(!consumerStartedFlag) {
            consumerStartedFlag = true;
            flushCache();
        }
    }
}
{code}

The following steps will describe the scenario which make message disorder:
1. consumer not started.
2. producer in another thread publish message "1", as "consumerStartedFlag" == 
false, it will publish it into cache.
3. consumerStarted() is called. consumerStartedFlag is set to true, but 
flushCache() is not called yet.
4. producer in another thread publish message "2", as "consumerStartedFlag" == 
true now, it will publish directly in RingBuffer.
5. flushCache() is called in consumerStarted() 
6. FLUSH_CACHE object is published RingBuffer, it will mark the position of 
message "1" in RingBuffer.
7. consume() is called, it will first fecth "2", then "1"
8. message order is wrong!

The following steps describe the scenario which make message loss, and consumer 
thread hang forever.
1. consumer not started.
2. producer in another thread publish message "1", as "consumerStartedFlag" == 
false, it will publish it into cache.
3. consumerStarted() is called. consumerStartedFlag is set to true, but 
flushCache() is not called yet.
4. producer in another thread publish multiple messages "2", as 
"consumerStartedFlag" == true now, it will publish directly in RingBuffer. And 
then the RingBuffer is full.
5. flushCache() is called in consumerStarted() 
6. FLUSH_CACHE object is published RingBuffer, As now RingBuffer is full, the 
consumer thread will be blocked.
7. consume() is never called, so the consumer thread is always blocked.

I found this after troubleshooting a tricky random failure(1 in 100 times). It 
usually happen when producer and consumer colocated in same process, for 
example,  the task send queue thread as producer, produce message to local task 
receive queue in same worker.



> Contention in Disruptor Queue which may cause message loss or out of order
> --------------------------------------------------------------------------
>
>                 Key: STORM-342
>                 URL: https://issues.apache.org/jira/browse/STORM-342
>             Project: Apache Storm (Incubating)
>          Issue Type: Bug
>            Reporter: Sean Zhong
>            Priority: Blocker
>
> h2. STORM-342: Message loss, executor hang, or message disorder
> Disruptor helper class contains a potential contention bug between consumer 
> and producer. It can cause consume queue hang, message loss, or message 
> disorder.
> {code:title=Disruptor.java|borderStyle=solid}
> class Disruptor {
> ...
>     public void publish(Object obj, boolean block) throws 
> InsufficientCapacityException {
>         if(consumerStartedFlag) {
>             final long id;
>             if(block) {
>                 id = _buffer.next();
>             } else {
>                 id = _buffer.tryNext(1);
>             }
>             final MutableObject m = _buffer.get(id);
>             m.setObject(obj);
>             _buffer.publish(id);
>         } else {
>             _cache.add(obj);
>             if(consumerStartedFlag) flushCache();
>         }
>     }
>     public void consumerStarted() {
>         if(!consumerStartedFlag) {
>             consumerStartedFlag = true;
>             flushCache();
>         }
>     }
> }
> {code}
> Consumer
> {code:title=Task Executor Thread|borderStyle=solid}
>   (disruptor/consumer-started! receive-queue)
>   (fn []            
>      (disruptor/consume-batch-when-available receive-queue event-handler)
> {code}
> h3. Howto: Executor Hang, message loss:
> 1. [Consumer Thread] consumer not started.
> 2. [Producer A Thread] publish message "1", as "consumerStartedFlag" == 
> false, it will be added it into cache.
> 3. [Consumer Thread] consumerStarted() is called. consumerStartedFlag is set 
> to true, but flushCache() is not called yet.
> 4. As "consumerStartedFlag" is true now, new produced message will be 
> published to RingBuffer.
> 5. [Producer B Thread] generates enough message, and make RingBuffer full.
> 6. [Consumer Thread] flushCache() is called in consumerStarted()
> 7. [Consumer Thread] FLUSH_CACHE object is published RingBuffer in blocking 
> way, As now RingBuffer is full, the consumer thread will be blocked.
> 8. [Consumer Thread] consumeBatch() will never called, so the RingBuffer is 
> always full, and the consumer thread is always blocked.
> h3. Howto: Message Disorder
> 1. [Consumer Thread] consumer not started.
> 2. [Producer A Thread] publish message "1", as "consumerStartedFlag" == 
> false, it will be added it into cache.
> 3. [Consumer Thread] consumerStarted() is called. consumerStartedFlag is set 
> to true, but flushCache() is not called yet.
> 4. As "consumerStartedFlag" is true now, new produced message will be 
> published to RingBuffer.
> 5. [Producer A Thread] publish a new message "2", it will be published 
> directly in RingBuffer.
> 6. [Consumer Thread] flushCache() is called in consumerStarted()
> 7. [Consumer Thread] FLUSH_CACHE message is published RingBuffer, FLUSH_CACHE 
> message is written after message "2".
> 8. [Consumer Thread] consumeBatch() is called, first it picks "2", then it 
> picks FLUSH_CACHE, will represents "1"
> 9. We produce in Producer A Thread in order "1", "2", but we received in 
> consumer thread "2", "1"
> 10. Message order is wrong.
> I found this after troubleshooting a tricky random failure(1 in 100 times). 
> It usually happen when producer and consumer colocated in same process, for 
> example,  the task send queue thread as producer, produce message to local 
> task receive queue in same worker.



--
This message was sent by Atlassian JIRA
(v6.2#6252)

Reply via email to