[
https://issues.apache.org/jira/browse/STORM-342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14026377#comment-14026377
]
ASF GitHub Bot commented on STORM-342:
--------------------------------------
GitHub user clockfly opened a pull request:
https://github.com/apache/incubator-storm/pull/136
STORM-342: Message loss, executor hang, or message disorder due to
contention in Disruptor queue under multi-thread mode.
STORM-342: Message loss, executor hang, or message disorder
-------------------------
Disruptor helper class contains a potential contention bug between consumer
and producer. It can cause consume queue hang, message loss, or message
disorder.
```java
Disruptor.java
class Disruptor {
...
public void publish(Object obj, boolean block) throws
InsufficientCapacityException {
if(consumerStartedFlag) {
final long id;
if(block) {
id = _buffer.next();
} else {
id = _buffer.tryNext(1);
}
final MutableObject m = _buffer.get(id);
m.setObject(obj);
_buffer.publish(id);
} else {
_cache.add(obj);
if(consumerStartedFlag) flushCache();
}
}
public void consumerStarted() {
if(!consumerStartedFlag) {
consumerStartedFlag = true;
flushCache();
}
}
}
```
Consumer
```lisp
;;Executor thead
(disruptor/consumer-started! receive-queue)
(fn []
(disruptor/consume-batch-when-available receive-queue event-handler)
```
Howto: Executor Hang, message loss:
------------------------
1. [Consumer Thread] consumer not started.
2. [Producer A Thread] publish message "1", as "consumerStartedFlag" ==
false, it will be added it into cache.
3. [Consumer Thread] consumerStarted() is called. consumerStartedFlag is
set to true, but flushCache() is not called yet.
4. As "consumerStartedFlag" is true now, new produced message will be
published to RingBuffer.
5. [Producer B Thread] generates enough message, and make RingBuffer full.
6. [Consumer Thread] flushCache() is called in consumerStarted()
7. [Consumer Thread] FLUSH_CACHE object is published RingBuffer in blocking
way, As now RingBuffer is full, the consumer thread will be blocked.
8. [Consumer Thread] consumeBatch() will never called, so the RingBuffer is
always full, and the consumer thread is always blocked.
Howto: Message Disorder
-----------------------------------
1. [Consumer Thread] consumer not started.
2. [Producer A Thread] publish message "1", as "consumerStartedFlag" ==
false, it will be added it into cache.
3. [Consumer Thread] consumerStarted() is called. consumerStartedFlag is
set to true, but flushCache() is not called yet.
4. As "consumerStartedFlag" is true now, new produced message will be
published to RingBuffer.
5. [Producer A Thread] publish a new message "2", it will be published
directly in RingBuffer.
6. [Consumer Thread] flushCache() is called in consumerStarted()
7. [Consumer Thread] FLUSH_CACHE message is published RingBuffer,
FLUSH_CACHE message is written after message "2".
8. [Consumer Thread] consumeBatch() is called, first it picks "2", then it
picks FLUSH_CACHE, will represents "1"
9. We produce in Producer A Thread in order "1", "2", but we received in
consumer thread "2", "1"
10. Message order is wrong.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/clockfly/incubator-storm
disruptor_message_loss_hang_or_disorder
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/incubator-storm/pull/136.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #136
----
commit 72b1f592885abc8c02c6902aa0eb6499bacae7f2
Author: Sean Zhong <[email protected]>
Date: 2014-06-10T11:54:11Z
STORM-342: Message loss, executor hang, or message disorder due to
contention in Disruptor queue under multi-thread mode.
----
> Contention in Disruptor Queue which may cause message loss or out of order
> --------------------------------------------------------------------------
>
> Key: STORM-342
> URL: https://issues.apache.org/jira/browse/STORM-342
> Project: Apache Storm (Incubating)
> Issue Type: Bug
> Reporter: Sean Zhong
> Priority: Blocker
>
> Disruptor contains a potential contention bug between consumer and producer.
> It can cause consume queue hang, message loss, or message disorder.
> {code:title=Disruptor.java|borderStyle=solid}
> class Disruptor {
> ...
> public void publish(Object obj, boolean block) throws
> InsufficientCapacityException {
> if(consumerStartedFlag) {
> final long id;
> if(block) {
> id = _buffer.next();
> } else {
> id = _buffer.tryNext(1);
> }
> final MutableObject m = _buffer.get(id);
> m.setObject(obj);
> _buffer.publish(id);
> } else {
> _cache.add(obj);
> if(consumerStartedFlag) flushCache();
> }
> }
>
> public void consumerStarted() {
> if(!consumerStartedFlag) {
> consumerStartedFlag = true;
> flushCache();
> }
> }
> }
> {code}
> The following steps will describe the scenario which make message disorder:
> 1. consumer not started.
> 2. producer in another thread publish message "1", as "consumerStartedFlag"
> == false, it will publish it into cache.
> 3. consumerStarted() is called. consumerStartedFlag is set to true, but
> flushCache() is not called yet.
> 4. producer in another thread publish message "2", as "consumerStartedFlag"
> == true now, it will publish directly in RingBuffer.
> 5. flushCache() is called in consumerStarted()
> 6. FLUSH_CACHE object is published RingBuffer, it will mark the position of
> message "1" in RingBuffer.
> 7. consume() is called, it will first fecth "2", then "1"
> 8. message order is wrong!
> The following steps describe the scenario which make message loss, and
> consumer thread hang forever.
> 1. consumer not started.
> 2. producer in another thread publish message "1", as "consumerStartedFlag"
> == false, it will publish it into cache.
> 3. consumerStarted() is called. consumerStartedFlag is set to true, but
> flushCache() is not called yet.
> 4. producer in another thread publish multiple messages "2", as
> "consumerStartedFlag" == true now, it will publish directly in RingBuffer.
> And then the RingBuffer is full.
> 5. flushCache() is called in consumerStarted()
> 6. FLUSH_CACHE object is published RingBuffer, As now RingBuffer is full, the
> consumer thread will be blocked.
> 7. consume() is never called, so the consumer thread is always blocked.
> I found this after troubleshooting a tricky random failure(1 in 100 times).
> It usually happen when producer and consumer colocated in same process, for
> example, the task send queue thread as producer, produce message to local
> task receive queue in same worker.
--
This message was sent by Atlassian JIRA
(v6.2#6252)