GitHub user clockfly opened a pull request:

    https://github.com/apache/incubator-storm/pull/136

    STORM-342: Message loss, executor hang, or message disorder due to 
contention in Disruptor queue under multi-thread mode.

    STORM-342: Message loss, executor hang, or message disorder
    -------------------------
    
    Disruptor helper class contains a potential contention bug between consumer 
and producer. It can cause consume queue hang, message loss, or message 
disorder.
    
    ```java
    Disruptor.java
    class Disruptor {
    ...
        public void publish(Object obj, boolean block) throws 
InsufficientCapacityException {
            if(consumerStartedFlag) {
                final long id;
                if(block) {
                    id = _buffer.next();
                } else {
                    id = _buffer.tryNext(1);
                }
                final MutableObject m = _buffer.get(id);
                m.setObject(obj);
                _buffer.publish(id);
            } else {
                _cache.add(obj);
                if(consumerStartedFlag) flushCache();
            }
        }
        
        public void consumerStarted() {
            if(!consumerStartedFlag) {
                consumerStartedFlag = true;
                flushCache();
            }
        }
    }
    ```
    
    Consumer
    ```lisp
    ;;Executor thead
      (disruptor/consumer-started! receive-queue)
      (fn []            
         (disruptor/consume-batch-when-available receive-queue event-handler)
    ```
    
    Howto: Executor Hang, message loss:
    ------------------------
    1. [Consumer Thread] consumer not started.
    2. [Producer A Thread] publish message "1", as "consumerStartedFlag" == 
false, it will be added it into cache.
    3. [Consumer Thread] consumerStarted() is called. consumerStartedFlag is 
set to true, but flushCache() is not called yet.
    4. As "consumerStartedFlag" is true now, new produced message will be 
published to RingBuffer.
    5. [Producer B Thread] generates enough message, and make RingBuffer full.
    6. [Consumer Thread] flushCache() is called in consumerStarted() 
    7. [Consumer Thread] FLUSH_CACHE object is published RingBuffer in blocking 
way, As now RingBuffer is full, the consumer thread will be blocked.
    8. [Consumer Thread] consumeBatch() will never called, so the RingBuffer is 
always full, and the consumer thread is always blocked.
    
    Howto: Message Disorder
    -----------------------------------
    1. [Consumer Thread] consumer not started.
    2. [Producer A Thread] publish message "1", as "consumerStartedFlag" == 
false, it will be added it into cache.
    3. [Consumer Thread] consumerStarted() is called. consumerStartedFlag is 
set to true, but flushCache() is not called yet.
    4. As "consumerStartedFlag" is true now, new produced message will be 
published to RingBuffer.
    5. [Producer A Thread] publish a new message "2", it will be published 
directly in RingBuffer.
    6. [Consumer Thread] flushCache() is called in consumerStarted() 
    7. [Consumer Thread] FLUSH_CACHE message is published RingBuffer, 
FLUSH_CACHE message is written after message "2".
    8. [Consumer Thread] consumeBatch() is called, first it picks "2", then it 
picks FLUSH_CACHE, will represents "1"
    9. We produce in Producer A Thread in order "1", "2", but we received in 
consumer thread "2", "1"
    10. Message order is wrong.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/clockfly/incubator-storm 
disruptor_message_loss_hang_or_disorder

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/incubator-storm/pull/136.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #136
    
----
commit 72b1f592885abc8c02c6902aa0eb6499bacae7f2
Author: Sean Zhong <[email protected]>
Date:   2014-06-10T11:54:11Z

    STORM-342: Message loss, executor hang, or message disorder due to 
contention in Disruptor queue under multi-thread mode.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to