Author: rgodfrey
Date: Sat Jan 21 00:06:40 2012
New Revision: 1234215
URL: http://svn.apache.org/viewvc?rev=1234215&view=rev
Log:
QPID-3774 : allow out of order completion of persistent enqueues / dequeues
(0-9 codepath)
Modified:
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/AccessRequestHandler.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicCancelMethodHandler.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicQosHandler.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverSyncMethodHandler.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueUnbindHandler.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/ManagedConnectionMBeanTest.java
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/ExchangeLoggingTest.java
Modified:
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?rev=1234215&r1=1234214&r2=1234215&view=diff
==============================================================================
---
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
(original)
+++
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
Sat Jan 21 00:06:40 2012
@@ -73,27 +73,20 @@ import org.apache.qpid.server.subscripti
import org.apache.qpid.server.subscription.RecordDeliveryMethod;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
+import org.apache.qpid.server.txn.AsyncAutoCommitTransaction;
import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.LocalTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.transport.TransportException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.SortedSet;
-import java.util.TreeSet;
-import java.util.UUID;
+import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
-public class AMQChannel implements SessionConfig, AMQSessionModel
+public class AMQChannel implements SessionConfig, AMQSessionModel,
AsyncAutoCommitTransaction.FutureRecorder
{
public static final int DEFAULT_PREFETCH = 4096;
@@ -132,6 +125,10 @@ public class AMQChannel implements Sessi
private final MessageStore _messageStore;
+ private final LinkedList<AsyncCommand> _unfinishedCommandsQueue = new
LinkedList<AsyncCommand>();
+
+ private static final int UNFINISHED_COMMAND_QUEUE_THRESHOLD = 500;
+
private UnacknowledgedMessageMap _unacknowledgedMessageMap = new
UnacknowledgedMessageMapImpl(DEFAULT_PREFETCH);
// Set of messages being acknoweledged in the current transaction
@@ -184,7 +181,7 @@ public class AMQChannel implements Sessi
_messageStore = messageStore;
// by default the session is non-transactional
- _transaction = new AutoCommitTransaction(_messageStore);
+ _transaction = new AsyncAutoCommitTransaction(_messageStore, this);
_clientDeliveryMethod = session.createDeliveryMethod(_channelId);
}
@@ -203,14 +200,12 @@ public class AMQChannel implements Sessi
public boolean isTransactional()
{
- // this does not look great but there should only be one
"non-transactional"
- // transactional context, while there could be several transactional
ones in
- // theory
- return !(_transaction instanceof AutoCommitTransaction);
+ return _transaction.isTransactional();
}
public void receivedComplete()
{
+ sync();
}
@@ -1562,4 +1557,69 @@ public class AMQChannel implements Sessi
}
}
+
+ public void recordFuture(final MessageStore.StoreFuture future, final
ServerTransaction.Action action)
+ {
+ _unfinishedCommandsQueue.add(new AsyncCommand(future, action));
+ }
+
+ public void completeAsyncCommands()
+ {
+ AsyncCommand cmd;
+ while((cmd = _unfinishedCommandsQueue.peek()) != null &&
cmd.isReadyForCompletion())
+ {
+ cmd.complete();
+ _unfinishedCommandsQueue.poll();
+ }
+ while(_unfinishedCommandsQueue.size() >
UNFINISHED_COMMAND_QUEUE_THRESHOLD)
+ {
+ cmd = _unfinishedCommandsQueue.poll();
+ cmd.awaitReadyForCompletion();
+ cmd.complete();
+ }
+ }
+
+
+ public void sync()
+ {
+ AsyncCommand cmd;
+ while((cmd = _unfinishedCommandsQueue.poll()) != null)
+ {
+ cmd.awaitReadyForCompletion();
+ cmd.complete();
+ }
+ }
+
+ private static class AsyncCommand
+ {
+ private final MessageStore.StoreFuture _future;
+ private ServerTransaction.Action _action;
+
+ public AsyncCommand(final MessageStore.StoreFuture future, final
ServerTransaction.Action action)
+ {
+ _future = future;
+ _action = action;
+ }
+
+ void awaitReadyForCompletion()
+ {
+ _future.waitForCompletion();
+ }
+
+ void complete()
+ {
+ if(!_future.isComplete())
+ {
+ _future.waitForCompletion();
+ }
+ _action.postCommit();
+ _action = null;
+ }
+
+ boolean isReadyForCompletion()
+ {
+ return _future.isComplete();
+ }
+ }
+
}
Modified:
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/AccessRequestHandler.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/AccessRequestHandler.java?rev=1234215&r1=1234214&r2=1234215&view=diff
==============================================================================
---
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/AccessRequestHandler.java
(original)
+++
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/AccessRequestHandler.java
Sat Jan 21 00:06:40 2012
@@ -24,6 +24,7 @@ package org.apache.qpid.server.handler;
import org.apache.qpid.framing.*;
import org.apache.qpid.framing.amqp_8_0.MethodRegistry_8_0;
import org.apache.qpid.framing.amqp_0_9.MethodRegistry_0_9;
+import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.state.StateAwareMethodListener;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.protocol.AMQProtocolSession;
@@ -52,6 +53,11 @@ public class AccessRequestHandler implem
public void methodReceived(AMQStateManager stateManager, AccessRequestBody
body, int channelId) throws AMQException
{
AMQProtocolSession session = stateManager.getProtocolSession();
+ final AMQChannel channel = session.getChannel(channelId);
+ if (channel == null)
+ {
+ throw body.getChannelNotFoundException(channelId);
+ }
MethodRegistry methodRegistry = session.getMethodRegistry();
@@ -71,7 +77,7 @@ public class AccessRequestHandler implem
throw new AMQException(AMQConstant.COMMAND_INVALID, "AccessRequest
not present in AMQP versions other than 0-8, 0-9");
}
-
+ channel.sync();
session.writeFrame(response.generateFrame(channelId));
}
}
Modified:
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicCancelMethodHandler.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicCancelMethodHandler.java?rev=1234215&r1=1234214&r2=1234215&view=diff
==============================================================================
---
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicCancelMethodHandler.java
(original)
+++
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicCancelMethodHandler.java
Sat Jan 21 00:06:40 2012
@@ -68,6 +68,7 @@ public class BasicCancelMethodHandler im
{
MethodRegistry methodRegistry = session.getMethodRegistry();
BasicCancelOkBody cancelOkBody =
methodRegistry.createBasicCancelOkBody(body.getConsumerTag());
+ channel.sync();
session.writeFrame(cancelOkBody.generateFrame(channelId));
}
}
Modified:
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java?rev=1234215&r1=1234214&r2=1234215&view=diff
==============================================================================
---
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
(original)
+++
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
Sat Jan 21 00:06:40 2012
@@ -60,6 +60,7 @@ public class BasicConsumeMethodHandler i
}
else
{
+ channel.sync();
if (_logger.isDebugEnabled())
{
_logger.debug("BasicConsume: from '" + body.getQueue() +
Modified:
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java?rev=1234215&r1=1234214&r2=1234215&view=diff
==============================================================================
---
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java
(original)
+++
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java
Sat Jan 21 00:06:40 2012
@@ -75,6 +75,7 @@ public class BasicGetMethodHandler imple
}
else
{
+ channel.sync();
AMQQueue queue = body.getQueue() == null ?
channel.getDefaultQueue() : vHost.getQueueRegistry().getQueue(body.getQueue());
if (queue == null)
{
Modified:
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicQosHandler.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicQosHandler.java?rev=1234215&r1=1234214&r2=1234215&view=diff
==============================================================================
---
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicQosHandler.java
(original)
+++
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicQosHandler.java
Sat Jan 21 00:06:40 2012
@@ -46,7 +46,7 @@ public class BasicQosHandler implements
{
throw body.getChannelNotFoundException(channelId);
}
-
+ channel.sync();
channel.setCredit(body.getPrefetchSize(), body.getPrefetchCount());
Modified:
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java?rev=1234215&r1=1234214&r2=1234215&view=diff
==============================================================================
---
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java
(original)
+++
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java
Sat Jan 21 00:06:40 2012
@@ -65,6 +65,7 @@ public class BasicRecoverMethodHandler i
{
MethodRegistry_8_0 methodRegistry = (MethodRegistry_8_0)
session.getMethodRegistry();
AMQMethodBody recoverOk =
methodRegistry.createBasicRecoverOkBody();
+ channel.sync();
session.writeFrame(recoverOk.generateFrame(channelId));
}
Modified:
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverSyncMethodHandler.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverSyncMethodHandler.java?rev=1234215&r1=1234214&r2=1234215&view=diff
==============================================================================
---
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverSyncMethodHandler.java
(original)
+++
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverSyncMethodHandler.java
Sat Jan 21 00:06:40 2012
@@ -59,7 +59,7 @@ public class BasicRecoverSyncMethodHandl
{
throw body.getChannelNotFoundException(channelId);
}
-
+ channel.sync();
channel.resend(body.getRequeue());
// Qpid 0-8 hacks a synchronous -ok onto recover.
Modified:
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java?rev=1234215&r1=1234214&r2=1234215&view=diff
==============================================================================
---
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java
(original)
+++
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java
Sat Jan 21 00:06:40 2012
@@ -65,6 +65,7 @@ public class ChannelCloseHandler impleme
{
throw body.getConnectionException(AMQConstant.CHANNEL_ERROR,
"Trying to close unknown channel");
}
+ channel.sync();
session.closeChannel(channelId);
// Client requested closure so we don't wait for ok we send it
stateManager.getProtocolSession().closeChannelOk(channelId);
Modified:
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java?rev=1234215&r1=1234214&r2=1234215&view=diff
==============================================================================
---
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java
(original)
+++
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java
Sat Jan 21 00:06:40 2012
@@ -55,6 +55,7 @@ public class ChannelFlowHandler implemen
{
throw body.getChannelNotFoundException(channelId);
}
+ channel.sync();
channel.setSuspended(!body.getActive());
_logger.debug("Channel.Flow for channel " + channelId + ", active=" +
body.getActive());
Modified:
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java?rev=1234215&r1=1234214&r2=1234215&view=diff
==============================================================================
---
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java
(original)
+++
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java
Sat Jan 21 00:06:40 2012
@@ -22,6 +22,7 @@ package org.apache.qpid.server.handler;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.*;
+import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.QueueRegistry;
@@ -69,7 +70,12 @@ public class ExchangeBoundHandler implem
QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
MethodRegistry methodRegistry = session.getMethodRegistry();
-
+ final AMQChannel channel = session.getChannel(channelId);
+ if (channel == null)
+ {
+ throw body.getChannelNotFoundException(channelId);
+ }
+ channel.sync();
AMQShortString exchangeName = body.getExchange();
Modified:
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java?rev=1234215&r1=1234214&r2=1234215&view=diff
==============================================================================
---
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java
(original)
+++
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java
Sat Jan 21 00:06:40 2012
@@ -26,6 +26,7 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.AMQUnknownExchangeType;
import org.apache.qpid.framing.*;
import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.exchange.ExchangeFactory;
import org.apache.qpid.server.exchange.ExchangeRegistry;
@@ -55,6 +56,11 @@ public class ExchangeDeclareHandler impl
VirtualHost virtualHost = session.getVirtualHost();
ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry();
ExchangeFactory exchangeFactory = virtualHost.getExchangeFactory();
+ final AMQChannel channel = session.getChannel(channelId);
+ if (channel == null)
+ {
+ throw body.getChannelNotFoundException(channelId);
+ }
if (_logger.isDebugEnabled())
{
@@ -102,6 +108,7 @@ public class ExchangeDeclareHandler impl
{
MethodRegistry methodRegistry = session.getMethodRegistry();
AMQMethodBody responseBody =
methodRegistry.createExchangeDeclareOkBody();
+ channel.sync();
session.writeFrame(responseBody.generateFrame(channelId));
}
}
Modified:
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java?rev=1234215&r1=1234214&r2=1234215&view=diff
==============================================================================
---
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java
(original)
+++
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java
Sat Jan 21 00:06:40 2012
@@ -24,6 +24,7 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.framing.ExchangeDeleteBody;
import org.apache.qpid.framing.ExchangeDeleteOkBody;
import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.exchange.ExchangeInUseException;
import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.protocol.AMQProtocolSession;
@@ -49,7 +50,12 @@ public class ExchangeDeleteHandler imple
AMQProtocolSession session = stateManager.getProtocolSession();
VirtualHost virtualHost = session.getVirtualHost();
ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry();
-
+ final AMQChannel channel = session.getChannel(channelId);
+ if (channel == null)
+ {
+ throw body.getChannelNotFoundException(channelId);
+ }
+ channel.sync();
try
{
if(exchangeRegistry.getExchange(body.getExchange()) == null)
Modified:
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java?rev=1234215&r1=1234214&r2=1234215&view=diff
==============================================================================
---
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java
(original)
+++
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java
Sat Jan 21 00:06:40 2012
@@ -64,18 +64,18 @@ public class QueueBindHandler implements
VirtualHost virtualHost = protocolConnection.getVirtualHost();
ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry();
QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
+ AMQChannel channel = protocolConnection.getChannel(channelId);
+
+ if (channel == null)
+ {
+ throw body.getChannelNotFoundException(channelId);
+ }
final AMQQueue queue;
final AMQShortString routingKey;
if (body.getQueue() == null)
{
- AMQChannel channel = protocolConnection.getChannel(channelId);
-
- if (channel == null)
- {
- throw body.getChannelNotFoundException(channelId);
- }
queue = channel.getDefaultQueue();
@@ -150,6 +150,7 @@ public class QueueBindHandler implements
}
if (!body.getNowait())
{
+ channel.sync();
MethodRegistry methodRegistry =
protocolConnection.getMethodRegistry();
AMQMethodBody responseBody =
methodRegistry.createQueueBindOkBody();
protocolConnection.writeFrame(responseBody.generateFrame(channelId));
Modified:
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java?rev=1234215&r1=1234214&r2=1234215&view=diff
==============================================================================
---
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
(original)
+++
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
Sat Jan 21 00:06:40 2012
@@ -197,6 +197,7 @@ public class QueueDeclareHandler impleme
if (!body.getNowait())
{
+ channel.sync();
MethodRegistry methodRegistry =
protocolConnection.getMethodRegistry();
QueueDeclareOkBody responseBody =
methodRegistry.createQueueDeclareOkBody(queueName,
Modified:
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java?rev=1234215&r1=1234214&r2=1234215&view=diff
==============================================================================
---
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java
(original)
+++
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java
Sat Jan 21 00:06:40 2012
@@ -71,7 +71,7 @@ public class QueueDeleteHandler implemen
{
throw body.getChannelNotFoundException(channelId);
}
-
+ channel.sync();
AMQQueue queue;
if (body.getQueue() == null)
{
Modified:
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java?rev=1234215&r1=1234214&r2=1234215&view=diff
==============================================================================
---
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java
(original)
+++
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java
Sat Jan 21 00:06:40 2012
@@ -109,7 +109,7 @@ public class QueuePurgeHandler implement
if(!body.getNowait())
{
-
+ channel.sync();
MethodRegistry methodRegistry =
protocolConnection.getMethodRegistry();
AMQMethodBody responseBody =
methodRegistry.createQueuePurgeOkBody(purged);
protocolConnection.writeFrame(responseBody.generateFrame(channelId));
Modified:
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueUnbindHandler.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueUnbindHandler.java?rev=1234215&r1=1234214&r2=1234215&view=diff
==============================================================================
---
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueUnbindHandler.java
(original)
+++
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueUnbindHandler.java
Sat Jan 21 00:06:40 2012
@@ -132,6 +132,7 @@ public class QueueUnbindHandler implemen
// 0-8 does not support QueueUnbind
throw new AMQException(AMQConstant.COMMAND_INVALID, "QueueUnbind
not present in AMQP version: " + session.getProtocolVersion(), null);
}
+ channel.sync();
session.writeFrame(responseBody.generateFrame(channelId));
}
}
Modified:
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java?rev=1234215&r1=1234214&r2=1234215&view=diff
==============================================================================
---
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java
(original)
+++
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java
Sat Jan 21 00:06:40 2012
@@ -114,7 +114,17 @@ public class AsyncAutoCommitTransaction
private void addFuture(final MessageStore.StoreFuture future, final Action
action)
{
- _futureRecorder.recordFuture(future, action);
+ if(action != null)
+ {
+ if(future.isComplete())
+ {
+ action.postCommit();
+ }
+ else
+ {
+ _futureRecorder.recordFuture(future, action);
+ }
+ }
}
public void dequeue(Collection<QueueEntry> queueEntries, Action
postTransactionAction)
@@ -265,17 +275,20 @@ public class AsyncAutoCommitTransaction
public void commit(final Runnable immediatePostTransactionAction)
{
- addFuture(MessageStore.IMMEDIATE_FUTURE, new Action()
+ if(immediatePostTransactionAction != null)
{
- public void postCommit()
+ addFuture(MessageStore.IMMEDIATE_FUTURE, new Action()
{
- immediatePostTransactionAction.run();
- }
+ public void postCommit()
+ {
+ immediatePostTransactionAction.run();
+ }
- public void onRollback()
- {
- }
- });
+ public void onRollback()
+ {
+ }
+ });
+ }
}
public void commit()
Modified:
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/ManagedConnectionMBeanTest.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/ManagedConnectionMBeanTest.java?rev=1234215&r1=1234214&r2=1234215&view=diff
==============================================================================
---
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/ManagedConnectionMBeanTest.java
(original)
+++
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/ManagedConnectionMBeanTest.java
Sat Jan 21 00:06:40 2012
@@ -224,13 +224,13 @@ public class ManagedConnectionMBeanTest
mBean.rollbackTransactions(channelId.intValue());
Message m = consumer.receive(1000l);
- assertNull("Unexpected message received", m);
+ assertNull("Unexpected message received: " + String.valueOf(m), m);
producerSession.commit();
_connection.start();
m = consumer.receive(1000l);
- assertNull("Unexpected message received", m);
+ assertNull("Unexpected message received after commit " +
String.valueOf(m), m);
}
public void testAuthorisedId() throws Exception
Modified:
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/ExchangeLoggingTest.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/ExchangeLoggingTest.java?rev=1234215&r1=1234214&r2=1234215&view=diff
==============================================================================
---
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/ExchangeLoggingTest.java
(original)
+++
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/ExchangeLoggingTest.java
Sat Jan 21 00:06:40 2012
@@ -31,6 +31,7 @@ import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.AMQSession_0_10;
import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.AMQShortString;
@@ -195,7 +196,7 @@ public class ExchangeLoggingTest extends
ExchangeDeleteBody body = registry.createExchangeDeleteBody(0, new
AMQShortString(_name), false, true);
- AMQFrame exchangeDeclare = body.generateFrame(0);
+ AMQFrame exchangeDeclare =
body.generateFrame(((AMQSession)_session).getChannelId());
((AMQConnection)
_connection).getProtocolHandler().syncWrite(exchangeDeclare,
ExchangeDeleteOkBody.class);
}
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:[email protected]