Author: kwall Date: Wed Aug 1 13:44:34 2012 New Revision: 1367990 URL: http://svn.apache.org/viewvc?rev=1367990&view=rev Log: QPID-4171: Fix enqueue ordering for persistent messsages
Applied patch from Philip Harvey <p...@philharveyonline.com> and Oleksandr Rudyy <oru...@gmail.com> Added: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/AsyncAutoCommitTransactionTest.java Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?rev=1367990&r1=1367989&r2=1367990&view=diff ============================================================================== --- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java (original) +++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java Wed Aug 1 13:44:34 2012 @@ -138,8 +138,6 @@ public class AMQChannel implements Sessi private final LinkedList<AsyncCommand> _unfinishedCommandsQueue = new LinkedList<AsyncCommand>(); - private static final int UNFINISHED_COMMAND_QUEUE_THRESHOLD = 500; - private UnacknowledgedMessageMap _unacknowledgedMessageMap = new UnacknowledgedMessageMapImpl(DEFAULT_PREFETCH); // Set of messages being acknowledged in the current transaction @@ -1637,23 +1635,6 @@ public class AMQChannel implements Sessi _unfinishedCommandsQueue.add(new AsyncCommand(future, action)); } - public void completeAsyncCommands() - { - AsyncCommand cmd; - while((cmd = _unfinishedCommandsQueue.peek()) != null && cmd.isReadyForCompletion()) - { - cmd.complete(); - _unfinishedCommandsQueue.poll(); - } - while(_unfinishedCommandsQueue.size() > UNFINISHED_COMMAND_QUEUE_THRESHOLD) - { - cmd = _unfinishedCommandsQueue.poll(); - cmd.awaitReadyForCompletion(); - cmd.complete(); - } - } - - public void sync() { AsyncCommand cmd; Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java?rev=1367990&r1=1367989&r2=1367990&view=diff ============================================================================== --- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java (original) +++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java Wed Aug 1 13:44:34 2012 @@ -44,11 +44,16 @@ import java.util.List; */ public class AsyncAutoCommitTransaction implements ServerTransaction { + static final String QPID_STRICT_ORDER_WITH_MIXED_DELIVERY_MODE = "qpid.strict_order_with_mixed_delivery_mode"; + protected static final Logger _logger = Logger.getLogger(AsyncAutoCommitTransaction.class); private final MessageStore _messageStore; private final FutureRecorder _futureRecorder; + //Set true to ensure strict ordering when enqueing messages with mixed delivery mode, i.e. disable async persistence + private boolean _strictOrderWithMixedDeliveryMode = Boolean.getBoolean(QPID_STRICT_ORDER_WITH_MIXED_DELIVERY_MODE); + public static interface FutureRecorder { public void recordFuture(StoreFuture future, Action action); @@ -129,6 +134,23 @@ public class AsyncAutoCommitTransaction } } + private void addEnqueueFuture(final StoreFuture future, final Action action, boolean persistent) + { + if(action != null) + { + // For persistent messages, do not synchronously invoke postCommit even if the future is completed. + // Otherwise, postCommit (which actually does the enqueuing) might be called on successive messages out of order. + if(future.isComplete() && !persistent && !_strictOrderWithMixedDeliveryMode) + { + action.postCommit(); + } + else + { + _futureRecorder.recordFuture(future, action); + } + } + } + public void dequeue(Collection<QueueEntry> queueEntries, Action postTransactionAction) { Transaction txn = null; @@ -203,7 +225,7 @@ public class AsyncAutoCommitTransaction { future = StoreFuture.IMMEDIATE_FUTURE; } - addFuture(future, postTransactionAction); + addEnqueueFuture(future, postTransactionAction, message.isPersistent()); postTransactionAction = null; } catch (AMQException e) @@ -257,7 +279,7 @@ public class AsyncAutoCommitTransaction { future = StoreFuture.IMMEDIATE_FUTURE; } - addFuture(future, postTransactionAction); + addEnqueueFuture(future, postTransactionAction, message.isPersistent()); postTransactionAction = null; Added: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/AsyncAutoCommitTransactionTest.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/AsyncAutoCommitTransactionTest.java?rev=1367990&view=auto ============================================================================== --- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/AsyncAutoCommitTransactionTest.java (added) +++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/AsyncAutoCommitTransactionTest.java Wed Aug 1 13:44:34 2012 @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.server.txn; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.*; + +import java.util.Collections; + +import org.apache.qpid.server.message.EnqueableMessage; +import org.apache.qpid.server.queue.BaseQueue; +import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.StoreFuture; +import org.apache.qpid.server.store.Transaction; +import org.apache.qpid.server.txn.AsyncAutoCommitTransaction.FutureRecorder; +import org.apache.qpid.server.txn.ServerTransaction.Action; +import org.apache.qpid.test.utils.QpidTestCase; + +public class AsyncAutoCommitTransactionTest extends QpidTestCase +{ + private static final String STRICT_ORDER_SYSTEM_PROPERTY = AsyncAutoCommitTransaction.QPID_STRICT_ORDER_WITH_MIXED_DELIVERY_MODE; + + private FutureRecorder _futureRecorder = mock(FutureRecorder.class); + private EnqueableMessage _message = mock(EnqueableMessage.class); + private BaseQueue _queue = mock(BaseQueue.class); + private MessageStore _messageStore = mock(MessageStore.class); + private Transaction _storeTransaction = mock(Transaction.class); + private Action _postTransactionAction = mock(Action.class); + private StoreFuture _future = mock(StoreFuture.class); + + + @Override + protected void setUp() throws Exception + { + super.setUp(); + + when(_messageStore.newTransaction()).thenReturn(_storeTransaction); + when(_storeTransaction.commitTranAsync()).thenReturn(_future); + when(_queue.isDurable()).thenReturn(true); + } + + public void testEnqueuePersistentMessagePostCommitNotCalledWhenFutureAlreadyComplete() throws Exception + { + setTestSystemProperty(STRICT_ORDER_SYSTEM_PROPERTY, "false"); + + when(_message.isPersistent()).thenReturn(true); + when(_future.isComplete()).thenReturn(true); + + AsyncAutoCommitTransaction asyncAutoCommitTransaction = + new AsyncAutoCommitTransaction(_messageStore, _futureRecorder); + + asyncAutoCommitTransaction.enqueue(_queue, _message, _postTransactionAction); + + verify(_storeTransaction).enqueueMessage(_queue, _message); + verify(_futureRecorder).recordFuture(_future, _postTransactionAction); + verifyZeroInteractions(_postTransactionAction); + } + + public void testEnqueuePersistentMessageOnMultiplQueuesPostCommitNotCalled() throws Exception + { + setTestSystemProperty(STRICT_ORDER_SYSTEM_PROPERTY, "false"); + + when(_message.isPersistent()).thenReturn(true); + when(_future.isComplete()).thenReturn(true); + + AsyncAutoCommitTransaction asyncAutoCommitTransaction = + new AsyncAutoCommitTransaction(_messageStore, _futureRecorder); + + asyncAutoCommitTransaction.enqueue(Collections.singletonList(_queue), _message, _postTransactionAction, System.currentTimeMillis()); + + verify(_storeTransaction).enqueueMessage(_queue, _message); + verify(_futureRecorder).recordFuture(_future, _postTransactionAction); + verifyZeroInteractions(_postTransactionAction); + } + + public void testEnqueuePersistentMessagePostCommitNotCalledWhenFutureNotYetComplete() throws Exception + { + setTestSystemProperty(STRICT_ORDER_SYSTEM_PROPERTY, "false"); + + when(_message.isPersistent()).thenReturn(true); + when(_future.isComplete()).thenReturn(false); + + AsyncAutoCommitTransaction asyncAutoCommitTransaction = + new AsyncAutoCommitTransaction(_messageStore, _futureRecorder); + + asyncAutoCommitTransaction.enqueue(_queue, _message, _postTransactionAction); + + verify(_storeTransaction).enqueueMessage(_queue, _message); + verify(_futureRecorder).recordFuture(_future, _postTransactionAction); + verifyZeroInteractions(_postTransactionAction); + } + + public void testEnqueueTransientMessagePostCommitIsCalledWhenNotBehavingStrictly() throws Exception + { + setTestSystemProperty(STRICT_ORDER_SYSTEM_PROPERTY, "false"); + + when(_message.isPersistent()).thenReturn(false); + + AsyncAutoCommitTransaction asyncAutoCommitTransaction = + new AsyncAutoCommitTransaction(_messageStore, _futureRecorder); + + asyncAutoCommitTransaction.enqueue(_queue, _message, _postTransactionAction); + + verifyZeroInteractions(_storeTransaction); + verify(_postTransactionAction).postCommit(); + verifyZeroInteractions(_futureRecorder); + } + + public void testEnqueueTransientMessagePostCommitIsCalledWhenBehavingStrictly() throws Exception + { + setTestSystemProperty(STRICT_ORDER_SYSTEM_PROPERTY, "true"); + + when(_message.isPersistent()).thenReturn(false); + + AsyncAutoCommitTransaction asyncAutoCommitTransaction = + new AsyncAutoCommitTransaction(_messageStore, _futureRecorder); + + asyncAutoCommitTransaction.enqueue(_queue, _message, _postTransactionAction); + + verifyZeroInteractions(_storeTransaction); + verify(_futureRecorder).recordFuture(StoreFuture.IMMEDIATE_FUTURE, _postTransactionAction); + verifyZeroInteractions(_postTransactionAction); + } +} Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java?rev=1367990&r1=1367989&r2=1367990&view=diff ============================================================================== --- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java (original) +++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java Wed Aug 1 13:44:34 2012 @@ -254,7 +254,7 @@ public class InternalBrokerBaseCase exte channel.publishContentHeader(_headerBody); } - + channel.sync(); } public void acknowledge(AMQChannel channel, long deliveryTag) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org