Author: rgodfrey
Date: Sat Jan 21 00:06:40 2012
New Revision: 1234215

URL: http://svn.apache.org/viewvc?rev=1234215&view=rev
Log:
QPID-3774 : allow out of order completion of persistent enqueues / dequeues 
(0-9 codepath)

Modified:
    
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
    
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/AccessRequestHandler.java
    
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicCancelMethodHandler.java
    
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
    
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java
    
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicQosHandler.java
    
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java
    
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverSyncMethodHandler.java
    
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java
    
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java
    
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java
    
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java
    
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java
    
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java
    
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
    
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java
    
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java
    
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueUnbindHandler.java
    
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java
    
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/ManagedConnectionMBeanTest.java
    
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/ExchangeLoggingTest.java

Modified: 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?rev=1234215&r1=1234214&r2=1234215&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
 (original)
+++ 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
 Sat Jan 21 00:06:40 2012
@@ -73,27 +73,20 @@ import org.apache.qpid.server.subscripti
 import org.apache.qpid.server.subscription.RecordDeliveryMethod;
 import org.apache.qpid.server.subscription.Subscription;
 import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
+import org.apache.qpid.server.txn.AsyncAutoCommitTransaction;
 import org.apache.qpid.server.txn.AutoCommitTransaction;
 import org.apache.qpid.server.txn.LocalTransaction;
 import org.apache.qpid.server.txn.ServerTransaction;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.transport.TransportException;
 
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.SortedSet;
-import java.util.TreeSet;
-import java.util.UUID;
+import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
-public class AMQChannel implements SessionConfig, AMQSessionModel
+public class AMQChannel implements SessionConfig, AMQSessionModel, 
AsyncAutoCommitTransaction.FutureRecorder
 {
     public static final int DEFAULT_PREFETCH = 4096;
 
@@ -132,6 +125,10 @@ public class AMQChannel implements Sessi
 
     private final MessageStore _messageStore;
 
+    private final LinkedList<AsyncCommand> _unfinishedCommandsQueue = new 
LinkedList<AsyncCommand>();
+
+    private static final int UNFINISHED_COMMAND_QUEUE_THRESHOLD = 500;
+
     private UnacknowledgedMessageMap _unacknowledgedMessageMap = new 
UnacknowledgedMessageMapImpl(DEFAULT_PREFETCH);
 
     // Set of messages being acknoweledged in the current transaction
@@ -184,7 +181,7 @@ public class AMQChannel implements Sessi
         _messageStore = messageStore;
 
         // by default the session is non-transactional
-        _transaction = new AutoCommitTransaction(_messageStore);
+        _transaction = new AsyncAutoCommitTransaction(_messageStore, this);
 
          _clientDeliveryMethod = session.createDeliveryMethod(_channelId);
     }
@@ -203,14 +200,12 @@ public class AMQChannel implements Sessi
 
     public boolean isTransactional()
     {
-        // this does not look great but there should only be one 
"non-transactional"
-        // transactional context, while there could be several transactional 
ones in
-        // theory
-        return !(_transaction instanceof AutoCommitTransaction);
+        return _transaction.isTransactional();
     }
 
     public void receivedComplete()
     {
+        sync();
     }
 
 
@@ -1562,4 +1557,69 @@ public class AMQChannel implements Sessi
 
         }
     }
+
+    public void recordFuture(final MessageStore.StoreFuture future, final 
ServerTransaction.Action action)
+    {
+        _unfinishedCommandsQueue.add(new AsyncCommand(future, action));
+    }
+
+    public void completeAsyncCommands()
+    {
+        AsyncCommand cmd;
+        while((cmd = _unfinishedCommandsQueue.peek()) != null && 
cmd.isReadyForCompletion())
+        {
+            cmd.complete();
+            _unfinishedCommandsQueue.poll();
+        }
+        while(_unfinishedCommandsQueue.size() > 
UNFINISHED_COMMAND_QUEUE_THRESHOLD)
+        {
+            cmd = _unfinishedCommandsQueue.poll();
+            cmd.awaitReadyForCompletion();
+            cmd.complete();
+        }
+    }
+
+
+    public void sync()
+    {
+        AsyncCommand cmd;
+        while((cmd = _unfinishedCommandsQueue.poll()) != null)
+        {
+            cmd.awaitReadyForCompletion();
+            cmd.complete();
+        }
+    }
+
+    private static class AsyncCommand
+    {
+        private final MessageStore.StoreFuture _future;
+        private ServerTransaction.Action _action;
+
+        public AsyncCommand(final MessageStore.StoreFuture future, final 
ServerTransaction.Action action)
+        {
+            _future = future;
+            _action = action;
+        }
+
+        void awaitReadyForCompletion()
+        {
+            _future.waitForCompletion();
+        }
+
+        void complete()
+        {
+            if(!_future.isComplete())
+            {
+                _future.waitForCompletion();
+            }
+            _action.postCommit();
+            _action = null;
+        }
+
+        boolean isReadyForCompletion()
+        {
+            return _future.isComplete();
+        }
+    }
+
 }

Modified: 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/AccessRequestHandler.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/AccessRequestHandler.java?rev=1234215&r1=1234214&r2=1234215&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/AccessRequestHandler.java
 (original)
+++ 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/AccessRequestHandler.java
 Sat Jan 21 00:06:40 2012
@@ -24,6 +24,7 @@ package org.apache.qpid.server.handler;
 import org.apache.qpid.framing.*;
 import org.apache.qpid.framing.amqp_8_0.MethodRegistry_8_0;
 import org.apache.qpid.framing.amqp_0_9.MethodRegistry_0_9;
+import org.apache.qpid.server.AMQChannel;
 import org.apache.qpid.server.state.StateAwareMethodListener;
 import org.apache.qpid.server.state.AMQStateManager;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
@@ -52,6 +53,11 @@ public class AccessRequestHandler implem
     public void methodReceived(AMQStateManager stateManager, AccessRequestBody 
body, int channelId) throws AMQException
     {
         AMQProtocolSession session = stateManager.getProtocolSession();
+        final AMQChannel channel = session.getChannel(channelId);
+        if (channel == null)
+        {
+            throw body.getChannelNotFoundException(channelId);
+        }
 
         MethodRegistry methodRegistry = session.getMethodRegistry();
 
@@ -71,7 +77,7 @@ public class AccessRequestHandler implem
             throw new AMQException(AMQConstant.COMMAND_INVALID, "AccessRequest 
not present in AMQP versions other than 0-8, 0-9");
         }
 
-
+        channel.sync();
         session.writeFrame(response.generateFrame(channelId));
     }
 }

Modified: 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicCancelMethodHandler.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicCancelMethodHandler.java?rev=1234215&r1=1234214&r2=1234215&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicCancelMethodHandler.java
 (original)
+++ 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicCancelMethodHandler.java
 Sat Jan 21 00:06:40 2012
@@ -68,6 +68,7 @@ public class BasicCancelMethodHandler im
         {
             MethodRegistry methodRegistry = session.getMethodRegistry();
             BasicCancelOkBody cancelOkBody = 
methodRegistry.createBasicCancelOkBody(body.getConsumerTag());
+            channel.sync();
             session.writeFrame(cancelOkBody.generateFrame(channelId));
         }
     }

Modified: 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java?rev=1234215&r1=1234214&r2=1234215&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
 (original)
+++ 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
 Sat Jan 21 00:06:40 2012
@@ -60,6 +60,7 @@ public class BasicConsumeMethodHandler i
         }
         else
         {
+            channel.sync();
             if (_logger.isDebugEnabled())
             {
                 _logger.debug("BasicConsume: from '" + body.getQueue() +

Modified: 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java?rev=1234215&r1=1234214&r2=1234215&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java
 (original)
+++ 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java
 Sat Jan 21 00:06:40 2012
@@ -75,6 +75,7 @@ public class BasicGetMethodHandler imple
         }
         else
         {
+            channel.sync();
             AMQQueue queue = body.getQueue() == null ? 
channel.getDefaultQueue() : vHost.getQueueRegistry().getQueue(body.getQueue());
             if (queue == null)
             {

Modified: 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicQosHandler.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicQosHandler.java?rev=1234215&r1=1234214&r2=1234215&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicQosHandler.java
 (original)
+++ 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicQosHandler.java
 Sat Jan 21 00:06:40 2012
@@ -46,7 +46,7 @@ public class BasicQosHandler implements 
         {
             throw body.getChannelNotFoundException(channelId);
         }
-
+        channel.sync();
         channel.setCredit(body.getPrefetchSize(), body.getPrefetchCount());
 
 

Modified: 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java?rev=1234215&r1=1234214&r2=1234215&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java
 (original)
+++ 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java
 Sat Jan 21 00:06:40 2012
@@ -65,6 +65,7 @@ public class BasicRecoverMethodHandler i
         {
             MethodRegistry_8_0 methodRegistry = (MethodRegistry_8_0) 
session.getMethodRegistry();
             AMQMethodBody recoverOk = 
methodRegistry.createBasicRecoverOkBody();
+            channel.sync();
             session.writeFrame(recoverOk.generateFrame(channelId));
 
         }

Modified: 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverSyncMethodHandler.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverSyncMethodHandler.java?rev=1234215&r1=1234214&r2=1234215&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverSyncMethodHandler.java
 (original)
+++ 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverSyncMethodHandler.java
 Sat Jan 21 00:06:40 2012
@@ -59,7 +59,7 @@ public class BasicRecoverSyncMethodHandl
         {
             throw body.getChannelNotFoundException(channelId);
         }
-
+        channel.sync();
         channel.resend(body.getRequeue());
 
         // Qpid 0-8 hacks a synchronous -ok onto recover.

Modified: 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java?rev=1234215&r1=1234214&r2=1234215&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java
 (original)
+++ 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java
 Sat Jan 21 00:06:40 2012
@@ -65,6 +65,7 @@ public class ChannelCloseHandler impleme
         {
             throw body.getConnectionException(AMQConstant.CHANNEL_ERROR, 
"Trying to close unknown channel");
         }
+        channel.sync();
         session.closeChannel(channelId);
         // Client requested closure so we don't wait for ok we send it
         stateManager.getProtocolSession().closeChannelOk(channelId);

Modified: 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java?rev=1234215&r1=1234214&r2=1234215&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java
 (original)
+++ 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java
 Sat Jan 21 00:06:40 2012
@@ -55,6 +55,7 @@ public class ChannelFlowHandler implemen
         {
             throw body.getChannelNotFoundException(channelId);
         }
+        channel.sync();
         channel.setSuspended(!body.getActive());
         _logger.debug("Channel.Flow for channel " + channelId + ", active=" + 
body.getActive());
 

Modified: 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java?rev=1234215&r1=1234214&r2=1234215&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java
 (original)
+++ 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java
 Sat Jan 21 00:06:40 2012
@@ -22,6 +22,7 @@ package org.apache.qpid.server.handler;
 
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.*;
+import org.apache.qpid.server.AMQChannel;
 import org.apache.qpid.server.exchange.Exchange;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
 import org.apache.qpid.server.queue.QueueRegistry;
@@ -69,7 +70,12 @@ public class ExchangeBoundHandler implem
         QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
         MethodRegistry methodRegistry = session.getMethodRegistry();
 
-        
+        final AMQChannel channel = session.getChannel(channelId);
+        if (channel == null)
+        {
+            throw body.getChannelNotFoundException(channelId);
+        }
+        channel.sync();
 
 
         AMQShortString exchangeName = body.getExchange();

Modified: 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java?rev=1234215&r1=1234214&r2=1234215&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java
 (original)
+++ 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java
 Sat Jan 21 00:06:40 2012
@@ -26,6 +26,7 @@ import org.apache.qpid.AMQException;
 import org.apache.qpid.AMQUnknownExchangeType;
 import org.apache.qpid.framing.*;
 import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.AMQChannel;
 import org.apache.qpid.server.exchange.Exchange;
 import org.apache.qpid.server.exchange.ExchangeFactory;
 import org.apache.qpid.server.exchange.ExchangeRegistry;
@@ -55,6 +56,11 @@ public class ExchangeDeclareHandler impl
         VirtualHost virtualHost = session.getVirtualHost();
         ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry();
         ExchangeFactory exchangeFactory = virtualHost.getExchangeFactory();
+        final AMQChannel channel = session.getChannel(channelId);
+        if (channel == null)
+        {
+            throw body.getChannelNotFoundException(channelId);
+        }
 
         if (_logger.isDebugEnabled())
         {
@@ -102,6 +108,7 @@ public class ExchangeDeclareHandler impl
         {
             MethodRegistry methodRegistry = session.getMethodRegistry();
             AMQMethodBody responseBody = 
methodRegistry.createExchangeDeclareOkBody();
+            channel.sync();
             session.writeFrame(responseBody.generateFrame(channelId));
         }
     }

Modified: 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java?rev=1234215&r1=1234214&r2=1234215&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java
 (original)
+++ 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java
 Sat Jan 21 00:06:40 2012
@@ -24,6 +24,7 @@ import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.ExchangeDeleteBody;
 import org.apache.qpid.framing.ExchangeDeleteOkBody;
 import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.AMQChannel;
 import org.apache.qpid.server.exchange.ExchangeInUseException;
 import org.apache.qpid.server.exchange.ExchangeRegistry;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
@@ -49,7 +50,12 @@ public class ExchangeDeleteHandler imple
         AMQProtocolSession session = stateManager.getProtocolSession();
         VirtualHost virtualHost = session.getVirtualHost();
         ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry();
-
+        final AMQChannel channel = session.getChannel(channelId);
+        if (channel == null)
+        {
+            throw body.getChannelNotFoundException(channelId);
+        }
+        channel.sync();
         try
         {
             if(exchangeRegistry.getExchange(body.getExchange()) == null)

Modified: 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java?rev=1234215&r1=1234214&r2=1234215&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java
 (original)
+++ 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java
 Sat Jan 21 00:06:40 2012
@@ -64,18 +64,18 @@ public class QueueBindHandler implements
         VirtualHost virtualHost = protocolConnection.getVirtualHost();
         ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry();
         QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
+        AMQChannel channel = protocolConnection.getChannel(channelId);
+
+        if (channel == null)
+        {
+            throw body.getChannelNotFoundException(channelId);
+        }
 
         final AMQQueue queue;
         final AMQShortString routingKey;
 
         if (body.getQueue() == null)
         {
-            AMQChannel channel = protocolConnection.getChannel(channelId);
-
-            if (channel == null)
-            {
-                throw body.getChannelNotFoundException(channelId);
-            }
 
             queue = channel.getDefaultQueue();
 
@@ -150,6 +150,7 @@ public class QueueBindHandler implements
         }
         if (!body.getNowait())
         {
+            channel.sync();
             MethodRegistry methodRegistry = 
protocolConnection.getMethodRegistry();
             AMQMethodBody responseBody = 
methodRegistry.createQueueBindOkBody();
             
protocolConnection.writeFrame(responseBody.generateFrame(channelId));

Modified: 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java?rev=1234215&r1=1234214&r2=1234215&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
 (original)
+++ 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
 Sat Jan 21 00:06:40 2012
@@ -197,6 +197,7 @@ public class QueueDeclareHandler impleme
 
         if (!body.getNowait())
         {
+            channel.sync();
             MethodRegistry methodRegistry = 
protocolConnection.getMethodRegistry();
             QueueDeclareOkBody responseBody =
                     methodRegistry.createQueueDeclareOkBody(queueName,

Modified: 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java?rev=1234215&r1=1234214&r2=1234215&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java
 (original)
+++ 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java
 Sat Jan 21 00:06:40 2012
@@ -71,7 +71,7 @@ public class QueueDeleteHandler implemen
         {
             throw body.getChannelNotFoundException(channelId);
         }
-
+        channel.sync();
         AMQQueue queue;
         if (body.getQueue() == null)
         {

Modified: 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java?rev=1234215&r1=1234214&r2=1234215&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java
 (original)
+++ 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java
 Sat Jan 21 00:06:40 2012
@@ -109,7 +109,7 @@ public class QueuePurgeHandler implement
 
                 if(!body.getNowait())
                 {
-
+                    channel.sync();
                     MethodRegistry methodRegistry = 
protocolConnection.getMethodRegistry();
                     AMQMethodBody responseBody = 
methodRegistry.createQueuePurgeOkBody(purged);
                     
protocolConnection.writeFrame(responseBody.generateFrame(channelId));

Modified: 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueUnbindHandler.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueUnbindHandler.java?rev=1234215&r1=1234214&r2=1234215&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueUnbindHandler.java
 (original)
+++ 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueUnbindHandler.java
 Sat Jan 21 00:06:40 2012
@@ -132,6 +132,7 @@ public class QueueUnbindHandler implemen
             // 0-8 does not support QueueUnbind
             throw new AMQException(AMQConstant.COMMAND_INVALID, "QueueUnbind 
not present in AMQP version: " + session.getProtocolVersion(), null);
         }
+        channel.sync();
         session.writeFrame(responseBody.generateFrame(channelId));
     }
 }

Modified: 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java?rev=1234215&r1=1234214&r2=1234215&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java
 (original)
+++ 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java
 Sat Jan 21 00:06:40 2012
@@ -114,7 +114,17 @@ public class AsyncAutoCommitTransaction 
 
     private void addFuture(final MessageStore.StoreFuture future, final Action 
action)
     {
-        _futureRecorder.recordFuture(future, action);
+        if(action != null)
+        {
+            if(future.isComplete())
+            {
+                action.postCommit();
+            }
+            else
+            {
+                _futureRecorder.recordFuture(future, action);
+            }
+        }
     }
 
     public void dequeue(Collection<QueueEntry> queueEntries, Action 
postTransactionAction)
@@ -265,17 +275,20 @@ public class AsyncAutoCommitTransaction 
 
     public void commit(final Runnable immediatePostTransactionAction)
     {
-        addFuture(MessageStore.IMMEDIATE_FUTURE, new Action()
+        if(immediatePostTransactionAction != null)
         {
-            public void postCommit()
+            addFuture(MessageStore.IMMEDIATE_FUTURE, new Action()
             {
-                immediatePostTransactionAction.run();
-            }
+                public void postCommit()
+                {
+                    immediatePostTransactionAction.run();
+                }
 
-            public void onRollback()
-            {
-            }
-        });
+                public void onRollback()
+                {
+                }
+            });
+        }
     }    
     
     public void commit()

Modified: 
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/ManagedConnectionMBeanTest.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/ManagedConnectionMBeanTest.java?rev=1234215&r1=1234214&r2=1234215&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/ManagedConnectionMBeanTest.java
 (original)
+++ 
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/ManagedConnectionMBeanTest.java
 Sat Jan 21 00:06:40 2012
@@ -224,13 +224,13 @@ public class ManagedConnectionMBeanTest 
         mBean.rollbackTransactions(channelId.intValue());
 
         Message m = consumer.receive(1000l);
-        assertNull("Unexpected message received", m);
+        assertNull("Unexpected message received: " + String.valueOf(m), m);
 
         producerSession.commit();
 
         _connection.start();
         m = consumer.receive(1000l);
-        assertNull("Unexpected message received", m);
+        assertNull("Unexpected message received after commit " + 
String.valueOf(m), m);
     }
 
     public void testAuthorisedId() throws Exception

Modified: 
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/ExchangeLoggingTest.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/ExchangeLoggingTest.java?rev=1234215&r1=1234214&r2=1234215&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/ExchangeLoggingTest.java
 (original)
+++ 
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/ExchangeLoggingTest.java
 Sat Jan 21 00:06:40 2012
@@ -31,6 +31,7 @@ import javax.jms.Session;
 import javax.jms.TextMessage;
 
 import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQSession;
 import org.apache.qpid.client.AMQSession_0_10;
 import org.apache.qpid.framing.AMQFrame;
 import org.apache.qpid.framing.AMQShortString;
@@ -195,7 +196,7 @@ public class ExchangeLoggingTest extends
 
             ExchangeDeleteBody body = registry.createExchangeDeleteBody(0, new 
AMQShortString(_name), false, true);
 
-            AMQFrame exchangeDeclare = body.generateFrame(0);
+            AMQFrame exchangeDeclare = 
body.generateFrame(((AMQSession)_session).getChannelId());
 
             ((AMQConnection) 
_connection).getProtocolHandler().syncWrite(exchangeDeclare, 
ExchangeDeleteOkBody.class);
         }



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:[email protected]

Reply via email to