[
https://issues.apache.org/jira/browse/STORM-342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14025212#comment-14025212
]
Sean Zhong edited comment on STORM-342 at 6/9/14 2:18 PM:
----------------------------------------------------------
why cache is introduced in Disruptor queue? The logic for
SingleThreadedClaimStrategy is different with MultiThreadedClaimStrategy.
For SingleThreadedClaimStrategy, it will mark consumerStartedFlag directly in
constructor, and no cache will be used.
In Disprutor,
{code:title=Disruptor.java|borderStyle=solid}
public DisruptorQueue(String queueName, ClaimStrategy claim, WaitStrategy
wait) {
this._queueName = PREFIX + queueName;
LOG.info("========Disruptor queueName" + _queueName + ", " +
claim.getClass().toString() + ", " + wait.getClass().toString());
_buffer = new RingBuffer<MutableObject>(new ObjectEventFactory(),
claim, wait);
_consumer = new Sequence();
_barrier = _buffer.newBarrier();
_buffer.setGatingSequences(_consumer);
if(claim instanceof SingleThreadedClaimStrategy) {
consumerStartedFlag = true;
}
}
{code}
was (Author: clockfly):
why cache is introduced in Disruptor queue? The logic for
SingleThreadedClaimStrategy is different with MultiThreadedClaimStrategy.
For SingleThreadedClaimStrategy, it will mark consumerStartedFlag directly.
In Disprutor,
{code:title=Disruptor.java|borderStyle=solid}
public DisruptorQueue(String queueName, ClaimStrategy claim, WaitStrategy
wait) {
this._queueName = PREFIX + queueName;
LOG.info("========Disruptor queueName" + _queueName + ", " +
claim.getClass().toString() + ", " + wait.getClass().toString());
_buffer = new RingBuffer<MutableObject>(new ObjectEventFactory(),
claim, wait);
_consumer = new Sequence();
_barrier = _buffer.newBarrier();
_buffer.setGatingSequences(_consumer);
if(claim instanceof SingleThreadedClaimStrategy) {
consumerStartedFlag = true;
}
}
{code}
> Contention in Disruptor Queue which may cause message out of order
> ------------------------------------------------------------------
>
> Key: STORM-342
> URL: https://issues.apache.org/jira/browse/STORM-342
> Project: Apache Storm (Incubating)
> Issue Type: Bug
> Reporter: Sean Zhong
> Priority: Critical
>
> Disruptor contains a potential contention bug between consumer and producer:
> {code:title=Disruptor.java|borderStyle=solid}
> class Disruptor {
> ...
> public void publish(Object obj, boolean block) throws
> InsufficientCapacityException {
> if(consumerStartedFlag) {
> final long id;
> if(block) {
> id = _buffer.next();
> } else {
> id = _buffer.tryNext(1);
> }
> final MutableObject m = _buffer.get(id);
> m.setObject(obj);
> _buffer.publish(id);
> } else {
> _cache.add(obj);
> if(consumerStartedFlag) flushCache();
> }
> }
>
> public void consumerStarted() {
> if(!consumerStartedFlag) {
> consumerStartedFlag = true;
> flushCache();
> }
> }
> }
> {code}
> Suppose the following sequence to reproduce this bug:
> 1. consumer not started.
> 2. producer in another thread publish message "1", as "consumerStartedFlag"
> == false, it will publish it into cache.
> 3. consumerStarted() is called. consumerStartedFlag is set to true, but
> flushCache() is not called yet.
> 4. producer in another thread publish message "2", as "consumerStartedFlag"
> == true now, it will publish directly in RingBuffer.
> 5. flushCache() is called in consumerStarted()
> 6. FLUSH_CACHE object is published RingBuffer, it will mark the position of
> message "1" in RingBuffer.
> 7. consume() is called, it will first fecth "2", then "1"
> 8. message order is wrong!
> I found this after troubleshooting a tricky random failure(1 in 100 times).
> It usually happen when producer and consumer colocated in same process, for
> example, the task send queue thread as producer, produce message to local
> task receive queue in same worker.
--
This message was sent by Atlassian JIRA
(v6.2#6252)