[ 
https://issues.apache.org/jira/browse/STORM-342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14026377#comment-14026377
 ] 

ASF GitHub Bot commented on STORM-342:
--------------------------------------

GitHub user clockfly opened a pull request:

    https://github.com/apache/incubator-storm/pull/136

    STORM-342: Message loss, executor hang, or message disorder due to 
contention in Disruptor queue under multi-thread mode.

    STORM-342: Message loss, executor hang, or message disorder
    -------------------------
    
    Disruptor helper class contains a potential contention bug between consumer 
and producer. It can cause consume queue hang, message loss, or message 
disorder.
    
    ```java
    Disruptor.java
    class Disruptor {
    ...
        public void publish(Object obj, boolean block) throws 
InsufficientCapacityException {
            if(consumerStartedFlag) {
                final long id;
                if(block) {
                    id = _buffer.next();
                } else {
                    id = _buffer.tryNext(1);
                }
                final MutableObject m = _buffer.get(id);
                m.setObject(obj);
                _buffer.publish(id);
            } else {
                _cache.add(obj);
                if(consumerStartedFlag) flushCache();
            }
        }
        
        public void consumerStarted() {
            if(!consumerStartedFlag) {
                consumerStartedFlag = true;
                flushCache();
            }
        }
    }
    ```
    
    Consumer
    ```lisp
    ;;Executor thead
      (disruptor/consumer-started! receive-queue)
      (fn []            
         (disruptor/consume-batch-when-available receive-queue event-handler)
    ```
    
    Howto: Executor Hang, message loss:
    ------------------------
    1. [Consumer Thread] consumer not started.
    2. [Producer A Thread] publish message "1", as "consumerStartedFlag" == 
false, it will be added it into cache.
    3. [Consumer Thread] consumerStarted() is called. consumerStartedFlag is 
set to true, but flushCache() is not called yet.
    4. As "consumerStartedFlag" is true now, new produced message will be 
published to RingBuffer.
    5. [Producer B Thread] generates enough message, and make RingBuffer full.
    6. [Consumer Thread] flushCache() is called in consumerStarted() 
    7. [Consumer Thread] FLUSH_CACHE object is published RingBuffer in blocking 
way, As now RingBuffer is full, the consumer thread will be blocked.
    8. [Consumer Thread] consumeBatch() will never called, so the RingBuffer is 
always full, and the consumer thread is always blocked.
    
    Howto: Message Disorder
    -----------------------------------
    1. [Consumer Thread] consumer not started.
    2. [Producer A Thread] publish message "1", as "consumerStartedFlag" == 
false, it will be added it into cache.
    3. [Consumer Thread] consumerStarted() is called. consumerStartedFlag is 
set to true, but flushCache() is not called yet.
    4. As "consumerStartedFlag" is true now, new produced message will be 
published to RingBuffer.
    5. [Producer A Thread] publish a new message "2", it will be published 
directly in RingBuffer.
    6. [Consumer Thread] flushCache() is called in consumerStarted() 
    7. [Consumer Thread] FLUSH_CACHE message is published RingBuffer, 
FLUSH_CACHE message is written after message "2".
    8. [Consumer Thread] consumeBatch() is called, first it picks "2", then it 
picks FLUSH_CACHE, will represents "1"
    9. We produce in Producer A Thread in order "1", "2", but we received in 
consumer thread "2", "1"
    10. Message order is wrong.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/clockfly/incubator-storm 
disruptor_message_loss_hang_or_disorder

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/incubator-storm/pull/136.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #136
    
----
commit 72b1f592885abc8c02c6902aa0eb6499bacae7f2
Author: Sean Zhong <[email protected]>
Date:   2014-06-10T11:54:11Z

    STORM-342: Message loss, executor hang, or message disorder due to 
contention in Disruptor queue under multi-thread mode.

----


> Contention in Disruptor Queue which may cause message loss or out of order
> --------------------------------------------------------------------------
>
>                 Key: STORM-342
>                 URL: https://issues.apache.org/jira/browse/STORM-342
>             Project: Apache Storm (Incubating)
>          Issue Type: Bug
>            Reporter: Sean Zhong
>            Priority: Blocker
>
> Disruptor contains a potential contention bug between consumer and producer. 
> It can cause consume queue hang, message loss, or message disorder.
> {code:title=Disruptor.java|borderStyle=solid}
> class Disruptor {
> ...
>     public void publish(Object obj, boolean block) throws 
> InsufficientCapacityException {
>         if(consumerStartedFlag) {
>             final long id;
>             if(block) {
>                 id = _buffer.next();
>             } else {
>                 id = _buffer.tryNext(1);
>             }
>             final MutableObject m = _buffer.get(id);
>             m.setObject(obj);
>             _buffer.publish(id);
>         } else {
>             _cache.add(obj);
>             if(consumerStartedFlag) flushCache();
>         }
>     }
>     
>     public void consumerStarted() {
>         if(!consumerStartedFlag) {
>             consumerStartedFlag = true;
>             flushCache();
>         }
>     }
> }
> {code}
> The following steps will describe the scenario which make message disorder:
> 1. consumer not started.
> 2. producer in another thread publish message "1", as "consumerStartedFlag" 
> == false, it will publish it into cache.
> 3. consumerStarted() is called. consumerStartedFlag is set to true, but 
> flushCache() is not called yet.
> 4. producer in another thread publish message "2", as "consumerStartedFlag" 
> == true now, it will publish directly in RingBuffer.
> 5. flushCache() is called in consumerStarted() 
> 6. FLUSH_CACHE object is published RingBuffer, it will mark the position of 
> message "1" in RingBuffer.
> 7. consume() is called, it will first fecth "2", then "1"
> 8. message order is wrong!
> The following steps describe the scenario which make message loss, and 
> consumer thread hang forever.
> 1. consumer not started.
> 2. producer in another thread publish message "1", as "consumerStartedFlag" 
> == false, it will publish it into cache.
> 3. consumerStarted() is called. consumerStartedFlag is set to true, but 
> flushCache() is not called yet.
> 4. producer in another thread publish multiple messages "2", as 
> "consumerStartedFlag" == true now, it will publish directly in RingBuffer. 
> And then the RingBuffer is full.
> 5. flushCache() is called in consumerStarted() 
> 6. FLUSH_CACHE object is published RingBuffer, As now RingBuffer is full, the 
> consumer thread will be blocked.
> 7. consume() is never called, so the consumer thread is always blocked.
> I found this after troubleshooting a tricky random failure(1 in 100 times). 
> It usually happen when producer and consumer colocated in same process, for 
> example,  the task send queue thread as producer, produce message to local 
> task receive queue in same worker.



--
This message was sent by Atlassian JIRA
(v6.2#6252)

Reply via email to