GitHub user clockfly opened a pull request:
https://github.com/apache/incubator-storm/pull/136
STORM-342: Message loss, executor hang, or message disorder due to
contention in Disruptor queue under multi-thread mode.
STORM-342: Message loss, executor hang, or message disorder
-------------------------
Disruptor helper class contains a potential contention bug between consumer
and producer. It can cause consume queue hang, message loss, or message
disorder.
```java
Disruptor.java
class Disruptor {
...
public void publish(Object obj, boolean block) throws
InsufficientCapacityException {
if(consumerStartedFlag) {
final long id;
if(block) {
id = _buffer.next();
} else {
id = _buffer.tryNext(1);
}
final MutableObject m = _buffer.get(id);
m.setObject(obj);
_buffer.publish(id);
} else {
_cache.add(obj);
if(consumerStartedFlag) flushCache();
}
}
public void consumerStarted() {
if(!consumerStartedFlag) {
consumerStartedFlag = true;
flushCache();
}
}
}
```
Consumer
```lisp
;;Executor thead
(disruptor/consumer-started! receive-queue)
(fn []
(disruptor/consume-batch-when-available receive-queue event-handler)
```
Howto: Executor Hang, message loss:
------------------------
1. [Consumer Thread] consumer not started.
2. [Producer A Thread] publish message "1", as "consumerStartedFlag" ==
false, it will be added it into cache.
3. [Consumer Thread] consumerStarted() is called. consumerStartedFlag is
set to true, but flushCache() is not called yet.
4. As "consumerStartedFlag" is true now, new produced message will be
published to RingBuffer.
5. [Producer B Thread] generates enough message, and make RingBuffer full.
6. [Consumer Thread] flushCache() is called in consumerStarted()
7. [Consumer Thread] FLUSH_CACHE object is published RingBuffer in blocking
way, As now RingBuffer is full, the consumer thread will be blocked.
8. [Consumer Thread] consumeBatch() will never called, so the RingBuffer is
always full, and the consumer thread is always blocked.
Howto: Message Disorder
-----------------------------------
1. [Consumer Thread] consumer not started.
2. [Producer A Thread] publish message "1", as "consumerStartedFlag" ==
false, it will be added it into cache.
3. [Consumer Thread] consumerStarted() is called. consumerStartedFlag is
set to true, but flushCache() is not called yet.
4. As "consumerStartedFlag" is true now, new produced message will be
published to RingBuffer.
5. [Producer A Thread] publish a new message "2", it will be published
directly in RingBuffer.
6. [Consumer Thread] flushCache() is called in consumerStarted()
7. [Consumer Thread] FLUSH_CACHE message is published RingBuffer,
FLUSH_CACHE message is written after message "2".
8. [Consumer Thread] consumeBatch() is called, first it picks "2", then it
picks FLUSH_CACHE, will represents "1"
9. We produce in Producer A Thread in order "1", "2", but we received in
consumer thread "2", "1"
10. Message order is wrong.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/clockfly/incubator-storm
disruptor_message_loss_hang_or_disorder
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/incubator-storm/pull/136.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #136
----
commit 72b1f592885abc8c02c6902aa0eb6499bacae7f2
Author: Sean Zhong <[email protected]>
Date: 2014-06-10T11:54:11Z
STORM-342: Message loss, executor hang, or message disorder due to
contention in Disruptor queue under multi-thread mode.
----
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---