Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueBindHandler.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueBindHandler.java?rev=1628074&r1=1628073&r2=1628074&view=diff ============================================================================== --- qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueBindHandler.java (original) +++ qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueBindHandler.java Sun Sep 28 15:22:03 2014 @@ -36,7 +36,6 @@ import org.apache.qpid.protocol.AMQConst import org.apache.qpid.server.exchange.ExchangeImpl; import org.apache.qpid.server.protocol.v0_8.AMQChannel; import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; -import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager; import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.virtualhost.VirtualHostImpl; @@ -56,15 +55,16 @@ public class QueueBindHandler implements { } - public void methodReceived(AMQStateManager stateManager, QueueBindBody body, int channelId) throws AMQException + public void methodReceived(final AMQProtocolSession<?> connection, + QueueBindBody body, + int channelId) throws AMQException { - AMQProtocolSession protocolConnection = stateManager.getProtocolSession(); - VirtualHostImpl virtualHost = protocolConnection.getVirtualHost(); - AMQChannel channel = protocolConnection.getChannel(channelId); + VirtualHostImpl virtualHost = connection.getVirtualHost(); + AMQChannel channel = connection.getChannel(channelId); if (channel == null) { - throw body.getChannelNotFoundException(channelId); + throw body.getChannelNotFoundException(channelId, connection.getMethodRegistry()); } final AMQQueue queue; @@ -79,7 +79,8 @@ public class QueueBindHandler implements if (queue == null) { - throw body.getChannelException(AMQConstant.NOT_FOUND, "No default queue defined on channel and queue was null"); + throw body.getChannelException(AMQConstant.NOT_FOUND, "No default queue defined on channel and queue was null", + connection.getMethodRegistry()); } if (body.getRoutingKey() == null) @@ -99,12 +100,14 @@ public class QueueBindHandler implements if (queue == null) { - throw body.getChannelException(AMQConstant.NOT_FOUND, "Queue " + queueName + " does not exist."); + throw body.getChannelException(AMQConstant.NOT_FOUND, "Queue " + queueName + " does not exist.", + connection.getMethodRegistry()); } if(isDefaultExchange(body.getExchange())) { - throw body.getConnectionException(AMQConstant.NOT_ALLOWED, "Cannot bind the queue " + queueName + " to the default exchange"); + throw body.getConnectionException(AMQConstant.NOT_ALLOWED, "Cannot bind the queue " + queueName + " to the default exchange", + connection.getMethodRegistry()); } final String exchangeName = body.getExchange().toString(); @@ -112,7 +115,8 @@ public class QueueBindHandler implements final ExchangeImpl exch = virtualHost.getExchange(exchangeName); if (exch == null) { - throw body.getChannelException(AMQConstant.NOT_FOUND, "Exchange " + exchangeName + " does not exist."); + throw body.getChannelException(AMQConstant.NOT_FOUND, "Exchange " + exchangeName + " does not exist.", + connection.getMethodRegistry()); } @@ -133,7 +137,7 @@ public class QueueBindHandler implements } catch (AccessControlException e) { - throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, e.getMessage()); + throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, e.getMessage(), connection.getMethodRegistry()); } if (_log.isInfoEnabled()) @@ -143,9 +147,9 @@ public class QueueBindHandler implements if (!body.getNowait()) { channel.sync(); - MethodRegistry methodRegistry = protocolConnection.getMethodRegistry(); + MethodRegistry methodRegistry = connection.getMethodRegistry(); AMQMethodBody responseBody = methodRegistry.createQueueBindOkBody(); - protocolConnection.writeFrame(responseBody.generateFrame(channelId)); + connection.writeFrame(responseBody.generateFrame(channelId)); } }
Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java?rev=1628074&r1=1628073&r2=1628074&view=diff ============================================================================== --- qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java (original) +++ qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java Sun Sep 28 15:22:03 2014 @@ -39,7 +39,6 @@ import org.apache.qpid.server.model.Queu import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.protocol.v0_8.AMQChannel; import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; -import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager; import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.QueueArgumentsConverter; @@ -57,11 +56,12 @@ public class QueueDeclareHandler impleme return _instance; } - public void methodReceived(AMQStateManager stateManager, QueueDeclareBody body, int channelId) throws AMQException + public void methodReceived(final AMQProtocolSession<?> connection, + QueueDeclareBody body, + int channelId) throws AMQException { - final AMQProtocolSession protocolConnection = stateManager.getProtocolSession(); - final AMQSessionModel session = protocolConnection.getChannel(channelId); - VirtualHostImpl virtualHost = protocolConnection.getVirtualHost(); + final AMQSessionModel session = connection.getChannel(channelId); + VirtualHostImpl virtualHost = connection.getVirtualHost(); final AMQShortString queueName; @@ -79,11 +79,11 @@ public class QueueDeclareHandler impleme //TODO: do we need to check that the queue already exists with exactly the same "configuration"? - AMQChannel channel = protocolConnection.getChannel(channelId); + AMQChannel channel = connection.getChannel(channelId); if (channel == null) { - throw body.getChannelNotFoundException(channelId); + throw body.getChannelNotFoundException(channelId, connection.getMethodRegistry()); } if(body.getPassive()) @@ -92,14 +92,15 @@ public class QueueDeclareHandler impleme if (queue == null) { String msg = "Queue: " + queueName + " not found on VirtualHost(" + virtualHost + ")."; - throw body.getChannelException(AMQConstant.NOT_FOUND, msg); + throw body.getChannelException(AMQConstant.NOT_FOUND, msg, connection.getMethodRegistry()); } else { if (!queue.verifySessionAccess(channel)) { throw body.getConnectionException(AMQConstant.NOT_ALLOWED, - "Queue " + queue.getName() + " is exclusive, but not created on this Connection."); + "Queue " + queue.getName() + " is exclusive, but not created on this Connection.", + connection.getMethodRegistry()); } //set this as the default queue on the channel: @@ -112,7 +113,7 @@ public class QueueDeclareHandler impleme try { - queue = createQueue(channel, queueName, body, virtualHost, protocolConnection); + queue = createQueue(channel, queueName, body, virtualHost, connection); } catch(QueueExistsException qe) @@ -123,33 +124,37 @@ public class QueueDeclareHandler impleme if (!queue.verifySessionAccess(channel)) { throw body.getConnectionException(AMQConstant.NOT_ALLOWED, - "Queue " + queue.getName() + " is exclusive, but not created on this Connection."); + "Queue " + queue.getName() + " is exclusive, but not created on this Connection.", + connection.getMethodRegistry()); } else if(queue.isExclusive() != body.getExclusive()) { throw body.getChannelException(AMQConstant.ALREADY_EXISTS, "Cannot re-declare queue '" + queue.getName() + "' with different exclusivity (was: " - + queue.isExclusive() + " requested " + body.getExclusive() + ")"); + + queue.isExclusive() + " requested " + body.getExclusive() + ")", + connection.getMethodRegistry()); } else if((body.getAutoDelete() && queue.getLifetimePolicy() != LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS) || (!body.getAutoDelete() && queue.getLifetimePolicy() != ((body.getExclusive() && !body.getDurable()) ? LifetimePolicy.DELETE_ON_CONNECTION_CLOSE : LifetimePolicy.PERMANENT))) { throw body.getChannelException(AMQConstant.ALREADY_EXISTS, "Cannot re-declare queue '" + queue.getName() + "' with different lifetime policy (was: " - + queue.getLifetimePolicy() + " requested autodelete: " + body.getAutoDelete() + ")"); + + queue.getLifetimePolicy() + " requested autodelete: " + body.getAutoDelete() + ")", + connection.getMethodRegistry()); } else if(queue.isDurable() != body.getDurable()) { throw body.getChannelException(AMQConstant.ALREADY_EXISTS, "Cannot re-declare queue '" + queue.getName() + "' with different durability (was: " - + queue.isDurable() + " requested " + body.getDurable() + ")"); + + queue.isDurable() + " requested " + body.getDurable() + ")", + connection.getMethodRegistry()); } } catch (AccessControlException e) { - throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, e.getMessage()); + throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, e.getMessage(), connection.getMethodRegistry()); } //set this as the default queue on the channel: @@ -159,12 +164,12 @@ public class QueueDeclareHandler impleme if (!body.getNowait()) { channel.sync(); - MethodRegistry methodRegistry = protocolConnection.getMethodRegistry(); + MethodRegistry methodRegistry = connection.getMethodRegistry(); QueueDeclareOkBody responseBody = methodRegistry.createQueueDeclareOkBody(queueName, queue.getQueueDepthMessages(), queue.getConsumerCount()); - protocolConnection.writeFrame(responseBody.generateFrame(channelId)); + connection.writeFrame(responseBody.generateFrame(channelId)); _logger.info("Queue " + queueName + " declared successfully"); } Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeleteHandler.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeleteHandler.java?rev=1628074&r1=1628073&r2=1628074&view=diff ============================================================================== --- qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeleteHandler.java (original) +++ qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeleteHandler.java Sun Sep 28 15:22:03 2014 @@ -20,6 +20,8 @@ */ package org.apache.qpid.server.protocol.v0_8.handler; +import java.security.AccessControlException; + import org.apache.qpid.AMQException; import org.apache.qpid.framing.MethodRegistry; import org.apache.qpid.framing.QueueDeleteBody; @@ -27,14 +29,10 @@ import org.apache.qpid.framing.QueueDele import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.protocol.v0_8.AMQChannel; import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; -import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager; import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener; -import org.apache.qpid.server.store.DurableConfigurationStore; +import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.virtualhost.VirtualHostImpl; -import java.security.AccessControlException; - public class QueueDeleteHandler implements StateAwareMethodListener<QueueDeleteBody> { private static final QueueDeleteHandler _instance = new QueueDeleteHandler(); @@ -57,18 +55,17 @@ public class QueueDeleteHandler implemen } - public void methodReceived(AMQStateManager stateManager, QueueDeleteBody body, int channelId) throws AMQException + public void methodReceived(final AMQProtocolSession<?> connection, + QueueDeleteBody body, + int channelId) throws AMQException { - AMQProtocolSession protocolConnection = stateManager.getProtocolSession(); - VirtualHostImpl virtualHost = protocolConnection.getVirtualHost(); - DurableConfigurationStore store = virtualHost.getDurableConfigurationStore(); - + VirtualHostImpl virtualHost = connection.getVirtualHost(); - AMQChannel channel = protocolConnection.getChannel(channelId); + AMQChannel channel = connection.getChannel(channelId); if (channel == null) { - throw body.getChannelNotFoundException(channelId); + throw body.getChannelNotFoundException(channelId, connection.getMethodRegistry()); } channel.sync(); AMQQueue queue; @@ -87,26 +84,30 @@ public class QueueDeleteHandler implemen { if (_failIfNotFound) { - throw body.getChannelException(AMQConstant.NOT_FOUND, "Queue " + body.getQueue() + " does not exist."); + throw body.getChannelException(AMQConstant.NOT_FOUND, "Queue " + body.getQueue() + " does not exist.", + connection.getMethodRegistry()); } } else { if (body.getIfEmpty() && !queue.isEmpty()) { - throw body.getChannelException(AMQConstant.IN_USE, "Queue: " + body.getQueue() + " is not empty."); + throw body.getChannelException(AMQConstant.IN_USE, "Queue: " + body.getQueue() + " is not empty.", + connection.getMethodRegistry()); } else if (body.getIfUnused() && !queue.isUnused()) { // TODO - Error code - throw body.getChannelException(AMQConstant.IN_USE, "Queue: " + body.getQueue() + " is still used."); + throw body.getChannelException(AMQConstant.IN_USE, "Queue: " + body.getQueue() + " is still used.", + connection.getMethodRegistry()); } else { if (!queue.verifySessionAccess(channel)) { throw body.getConnectionException(AMQConstant.NOT_ALLOWED, - "Queue " + queue.getName() + " is exclusive, but not created on this Connection."); + "Queue " + queue.getName() + " is exclusive, but not created on this Connection.", + connection.getMethodRegistry()); } int purged = 0; @@ -116,12 +117,12 @@ public class QueueDeleteHandler implemen } catch (AccessControlException e) { - throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, e.getMessage()); + throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, e.getMessage(), connection.getMethodRegistry()); } - MethodRegistry methodRegistry = protocolConnection.getMethodRegistry(); + MethodRegistry methodRegistry = connection.getMethodRegistry(); QueueDeleteOkBody responseBody = methodRegistry.createQueueDeleteOkBody(purged); - protocolConnection.writeFrame(responseBody.generateFrame(channelId)); + connection.writeFrame(responseBody.generateFrame(channelId)); } } } Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueuePurgeHandler.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueuePurgeHandler.java?rev=1628074&r1=1628073&r2=1628074&view=diff ============================================================================== --- qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueuePurgeHandler.java (original) +++ qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueuePurgeHandler.java Sun Sep 28 15:22:03 2014 @@ -21,6 +21,8 @@ package org.apache.qpid.server.protocol.v0_8.handler; +import java.security.AccessControlException; + import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQMethodBody; import org.apache.qpid.framing.MethodRegistry; @@ -28,13 +30,10 @@ import org.apache.qpid.framing.QueuePurg import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.protocol.v0_8.AMQChannel; import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; -import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager; import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener; +import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.virtualhost.VirtualHostImpl; -import java.security.AccessControlException; - public class QueuePurgeHandler implements StateAwareMethodListener<QueuePurgeBody> { private static final QueuePurgeHandler _instance = new QueuePurgeHandler(); @@ -56,15 +55,16 @@ public class QueuePurgeHandler implement _failIfNotFound = failIfNotFound; } - public void methodReceived(AMQStateManager stateManager, QueuePurgeBody body, int channelId) throws AMQException + public void methodReceived(final AMQProtocolSession<?> connection, + QueuePurgeBody body, + int channelId) throws AMQException { - AMQProtocolSession protocolConnection = stateManager.getProtocolSession(); - VirtualHostImpl virtualHost = protocolConnection.getVirtualHost(); + VirtualHostImpl virtualHost = connection.getVirtualHost(); - AMQChannel channel = protocolConnection.getChannel(channelId); + AMQChannel channel = connection.getChannel(channelId); if (channel == null) { - throw body.getChannelNotFoundException(channelId); + throw body.getChannelNotFoundException(channelId, connection.getMethodRegistry()); } AMQQueue queue; if(body.getQueue() == null) @@ -77,7 +77,7 @@ public class QueuePurgeHandler implement { if(_failIfNotFound) { - throw body.getConnectionException(AMQConstant.NOT_ALLOWED,"No queue specified."); + throw body.getConnectionException(AMQConstant.NOT_ALLOWED,"No queue specified.", connection.getMethodRegistry()); } } } @@ -90,7 +90,8 @@ public class QueuePurgeHandler implement { if(_failIfNotFound) { - throw body.getChannelException(AMQConstant.NOT_FOUND, "Queue " + body.getQueue() + " does not exist."); + throw body.getChannelException(AMQConstant.NOT_FOUND, "Queue " + body.getQueue() + " does not exist.", + connection.getMethodRegistry()); } } else @@ -98,7 +99,7 @@ public class QueuePurgeHandler implement if (!queue.verifySessionAccess(channel)) { throw body.getConnectionException(AMQConstant.NOT_ALLOWED, - "Queue is exclusive, but not created on this Connection."); + "Queue is exclusive, but not created on this Connection.", connection.getMethodRegistry()); } long purged = 0; @@ -108,16 +109,16 @@ public class QueuePurgeHandler implement } catch (AccessControlException e) { - throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, e.getMessage()); + throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, e.getMessage(), connection.getMethodRegistry()); } if(!body.getNowait()) { channel.sync(); - MethodRegistry methodRegistry = protocolConnection.getMethodRegistry(); + MethodRegistry methodRegistry = connection.getMethodRegistry(); AMQMethodBody responseBody = methodRegistry.createQueuePurgeOkBody(purged); - protocolConnection.writeFrame(responseBody.generateFrame(channelId)); + connection.writeFrame(responseBody.generateFrame(channelId)); } } Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueUnbindHandler.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueUnbindHandler.java?rev=1628074&r1=1628073&r2=1628074&view=diff ============================================================================== --- qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueUnbindHandler.java (original) +++ qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueUnbindHandler.java Sun Sep 28 15:22:03 2014 @@ -20,6 +20,8 @@ */ package org.apache.qpid.server.protocol.v0_8.handler; +import java.security.AccessControlException; + import org.apache.log4j.Logger; import org.apache.qpid.AMQException; @@ -33,13 +35,10 @@ import org.apache.qpid.protocol.AMQConst import org.apache.qpid.server.exchange.ExchangeImpl; import org.apache.qpid.server.protocol.v0_8.AMQChannel; import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; -import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager; import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener; +import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.virtualhost.VirtualHostImpl; -import java.security.AccessControlException; - public class QueueUnbindHandler implements StateAwareMethodListener<QueueUnbindBody> { private static final Logger _log = Logger.getLogger(QueueUnbindHandler.class); @@ -55,19 +54,20 @@ public class QueueUnbindHandler implemen { } - public void methodReceived(AMQStateManager stateManager, QueueUnbindBody body, int channelId) throws AMQException + public void methodReceived(final AMQProtocolSession<?> connection, + QueueUnbindBody body, + int channelId) throws AMQException { - AMQProtocolSession session = stateManager.getProtocolSession(); - VirtualHostImpl virtualHost = session.getVirtualHost(); + VirtualHostImpl virtualHost = connection.getVirtualHost(); final AMQQueue queue; final AMQShortString routingKey; - AMQChannel channel = session.getChannel(channelId); + AMQChannel channel = connection.getChannel(channelId); if (channel == null) { - throw body.getChannelNotFoundException(channelId); + throw body.getChannelNotFoundException(channelId, connection.getMethodRegistry()); } if (body.getQueue() == null) @@ -77,7 +77,8 @@ public class QueueUnbindHandler implemen if (queue == null) { - throw body.getChannelException(AMQConstant.NOT_FOUND, "No default queue defined on channel and queue was null"); + throw body.getChannelException(AMQConstant.NOT_FOUND, "No default queue defined on channel and queue was null", + connection.getMethodRegistry()); } routingKey = body.getRoutingKey() == null ? null : body.getRoutingKey().intern(false); @@ -91,23 +92,28 @@ public class QueueUnbindHandler implemen if (queue == null) { - throw body.getChannelException(AMQConstant.NOT_FOUND, "Queue " + body.getQueue() + " does not exist."); + throw body.getChannelException(AMQConstant.NOT_FOUND, "Queue " + body.getQueue() + " does not exist.", + connection.getMethodRegistry()); } if(isDefaultExchange(body.getExchange())) { - throw body.getConnectionException(AMQConstant.NOT_ALLOWED, "Cannot unbind the queue " + queue.getName() + " from the default exchange"); + throw body.getConnectionException(AMQConstant.NOT_ALLOWED, + "Cannot unbind the queue " + + queue.getName() + + " from the default exchange", connection.getMethodRegistry()); } final ExchangeImpl exch = virtualHost.getExchange(body.getExchange() == null ? null : body.getExchange().toString()); if (exch == null) { - throw body.getChannelException(AMQConstant.NOT_FOUND, "Exchange " + body.getExchange() + " does not exist."); + throw body.getChannelException(AMQConstant.NOT_FOUND, "Exchange " + body.getExchange() + " does not exist.", + connection.getMethodRegistry()); } if(!exch.hasBinding(String.valueOf(routingKey), queue)) { - throw body.getChannelException(AMQConstant.NOT_FOUND,"No such binding"); + throw body.getChannelException(AMQConstant.NOT_FOUND,"No such binding", connection.getMethodRegistry()); } else { @@ -117,7 +123,7 @@ public class QueueUnbindHandler implemen } catch (AccessControlException e) { - throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, e.getMessage()); + throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, e.getMessage(), connection.getMethodRegistry()); } } @@ -127,7 +133,7 @@ public class QueueUnbindHandler implemen _log.info("Binding queue " + queue + " to exchange " + exch + " with routing key " + routingKey); } - final MethodRegistry registry = session.getMethodRegistry(); + final MethodRegistry registry = connection.getMethodRegistry(); final AMQMethodBody responseBody; if (registry instanceof MethodRegistry_0_9) { @@ -140,10 +146,10 @@ public class QueueUnbindHandler implemen else { // 0-8 does not support QueueUnbind - throw new AMQException(AMQConstant.COMMAND_INVALID, "QueueUnbind not present in AMQP version: " + session.getProtocolVersion(), null); + throw new AMQException(AMQConstant.COMMAND_INVALID, "QueueUnbind not present in AMQP version: " + connection.getProtocolVersion(), null); } channel.sync(); - session.writeFrame(responseBody.generateFrame(channelId)); + connection.writeFrame(responseBody.generateFrame(channelId)); } protected boolean isDefaultExchange(final AMQShortString exchangeName) Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ServerMethodDispatcherImpl.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ServerMethodDispatcherImpl.java?rev=1628074&r1=1628073&r2=1628074&view=diff ============================================================================== --- qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ServerMethodDispatcherImpl.java (original) +++ qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ServerMethodDispatcherImpl.java Sun Sep 28 15:22:03 2014 @@ -20,20 +20,20 @@ */ package org.apache.qpid.server.protocol.v0_8.handler; -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.*; -import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager; - import java.util.HashMap; import java.util.Map; +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.*; +import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; + public class ServerMethodDispatcherImpl implements MethodDispatcher { - private final AMQStateManager _stateManager; + private final AMQProtocolSession<?> _connection; private static interface DispatcherFactory { - public MethodDispatcher createMethodDispatcher(AMQStateManager stateManager); + public MethodDispatcher createMethodDispatcher(AMQProtocolSession<?> connection); } private static final Map<ProtocolVersion, DispatcherFactory> _dispatcherFactories = @@ -45,26 +45,26 @@ public class ServerMethodDispatcherImpl _dispatcherFactories.put(ProtocolVersion.v8_0, new DispatcherFactory() { - public MethodDispatcher createMethodDispatcher(AMQStateManager stateManager) + public MethodDispatcher createMethodDispatcher(AMQProtocolSession<?> connection) { - return new ServerMethodDispatcherImpl_8_0(stateManager); + return new ServerMethodDispatcherImpl_8_0(connection); } }); _dispatcherFactories.put(ProtocolVersion.v0_9, new DispatcherFactory() { - public MethodDispatcher createMethodDispatcher(AMQStateManager stateManager) + public MethodDispatcher createMethodDispatcher(AMQProtocolSession<?> connection) { - return new ServerMethodDispatcherImpl_0_9(stateManager); + return new ServerMethodDispatcherImpl_0_9(connection); } }); _dispatcherFactories.put(ProtocolVersion.v0_91, new DispatcherFactory() { - public MethodDispatcher createMethodDispatcher(AMQStateManager stateManager) + public MethodDispatcher createMethodDispatcher(AMQProtocolSession<?> connection) { - return new ServerMethodDispatcherImpl_0_91(stateManager); + return new ServerMethodDispatcherImpl_0_91(connection); } }); @@ -103,82 +103,80 @@ public class ServerMethodDispatcherImpl - public static MethodDispatcher createMethodDispatcher(AMQStateManager stateManager, ProtocolVersion protocolVersion) + public static MethodDispatcher createMethodDispatcher(AMQProtocolSession<?> connection) { - return _dispatcherFactories.get(protocolVersion).createMethodDispatcher(stateManager); + return _dispatcherFactories.get(connection.getProtocolVersion()).createMethodDispatcher(connection); } - public ServerMethodDispatcherImpl(AMQStateManager stateManager) + public ServerMethodDispatcherImpl(AMQProtocolSession<?> connection) { - _stateManager = stateManager; + _connection = connection; } - protected AMQStateManager getStateManager() + protected final AMQProtocolSession<?> getConnection() { - return _stateManager; + return _connection; } - - public boolean dispatchAccessRequest(AccessRequestBody body, int channelId) throws AMQException { - _accessRequestHandler.methodReceived(_stateManager, body, channelId); + _accessRequestHandler.methodReceived(getConnection(), body, channelId); return true; } public boolean dispatchBasicAck(BasicAckBody body, int channelId) throws AMQException { - _basicAckMethodHandler.methodReceived(_stateManager, body, channelId); + _basicAckMethodHandler.methodReceived(getConnection(), body, channelId); return true; } public boolean dispatchBasicCancel(BasicCancelBody body, int channelId) throws AMQException { - _basicCancelMethodHandler.methodReceived(_stateManager, body, channelId); + _basicCancelMethodHandler.methodReceived(getConnection(), body, channelId); return true; } public boolean dispatchBasicConsume(BasicConsumeBody body, int channelId) throws AMQException { - _basicConsumeMethodHandler.methodReceived(_stateManager, body, channelId); + _basicConsumeMethodHandler.methodReceived(getConnection(), body, channelId); return true; } public boolean dispatchBasicGet(BasicGetBody body, int channelId) throws AMQException { - _basicGetMethodHandler.methodReceived(_stateManager, body, channelId); + _basicGetMethodHandler.methodReceived(getConnection(), body, channelId); return true; } public boolean dispatchBasicPublish(BasicPublishBody body, int channelId) throws AMQException { - _basicPublishMethodHandler.methodReceived(_stateManager, body, channelId); + _basicPublishMethodHandler.methodReceived(getConnection(), body, channelId); return true; } public boolean dispatchBasicQos(BasicQosBody body, int channelId) throws AMQException { - _basicQosHandler.methodReceived(_stateManager, body, channelId); + _basicQosHandler.methodReceived(getConnection(), body, channelId); return true; } public boolean dispatchBasicRecover(BasicRecoverBody body, int channelId) throws AMQException { - _basicRecoverMethodHandler.methodReceived(_stateManager, body, channelId); + _basicRecoverMethodHandler.methodReceived(getConnection(), body, channelId); return true; } public boolean dispatchBasicReject(BasicRejectBody body, int channelId) throws AMQException { - _basicRejectMethodHandler.methodReceived(_stateManager, body, channelId); + _basicRejectMethodHandler.methodReceived(getConnection(), body, channelId); return true; } public boolean dispatchChannelOpen(ChannelOpenBody body, int channelId) throws AMQException { - _channelOpenHandler.methodReceived(_stateManager, body, channelId); + _channelOpenHandler.methodReceived(getConnection(), body, channelId); return true; } @@ -225,21 +223,21 @@ public class ServerMethodDispatcherImpl public boolean dispatchChannelClose(ChannelCloseBody body, int channelId) throws AMQException { - _channelCloseHandler.methodReceived(_stateManager, body, channelId); + _channelCloseHandler.methodReceived(getConnection(), body, channelId); return true; } public boolean dispatchChannelCloseOk(ChannelCloseOkBody body, int channelId) throws AMQException { - _channelCloseOkHandler.methodReceived(_stateManager, body, channelId); + _channelCloseOkHandler.methodReceived(getConnection(), body, channelId); return true; } public boolean dispatchChannelFlow(ChannelFlowBody body, int channelId) throws AMQException { - _channelFlowHandler.methodReceived(_stateManager, body, channelId); + _channelFlowHandler.methodReceived(getConnection(), body, channelId); return true; } @@ -256,21 +254,23 @@ public class ServerMethodDispatcherImpl public boolean dispatchConnectionOpen(ConnectionOpenBody body, int channelId) throws AMQException { - _connectionOpenMethodHandler.methodReceived(_stateManager, body, channelId); + _connectionOpenMethodHandler.methodReceived(getConnection(), body, channelId); return true; } public boolean dispatchConnectionClose(ConnectionCloseBody body, int channelId) throws AMQException { - _connectionCloseMethodHandler.methodReceived(_stateManager, body, channelId); + _connectionCloseMethodHandler.methodReceived(getConnection(), body, channelId); return true; } public boolean dispatchConnectionCloseOk(ConnectionCloseOkBody body, int channelId) throws AMQException { - _connectionCloseOkMethodHandler.methodReceived(_stateManager, body, channelId); + _connectionCloseOkMethodHandler.methodReceived( + getConnection(), + body, channelId); return true; } @@ -299,15 +299,6 @@ public class ServerMethodDispatcherImpl throw new UnexpectedMethodException(body); } - public boolean dispatchDtxSelectOk(DtxSelectOkBody body, int channelId) throws AMQException - { - throw new UnexpectedMethodException(body); - } - - public boolean dispatchDtxStartOk(DtxStartOkBody body, int channelId) throws AMQException - { - throw new UnexpectedMethodException(body); - } public boolean dispatchExchangeBoundOk(ExchangeBoundOkBody body, int channelId) throws AMQException { @@ -324,46 +315,6 @@ public class ServerMethodDispatcherImpl throw new UnexpectedMethodException(body); } - public boolean dispatchFileCancelOk(FileCancelOkBody body, int channelId) throws AMQException - { - throw new UnexpectedMethodException(body); - } - - public boolean dispatchFileConsumeOk(FileConsumeOkBody body, int channelId) throws AMQException - { - throw new UnexpectedMethodException(body); - } - - public boolean dispatchFileDeliver(FileDeliverBody body, int channelId) throws AMQException - { - throw new UnexpectedMethodException(body); - } - - public boolean dispatchFileOpen(FileOpenBody body, int channelId) throws AMQException - { - throw new UnexpectedMethodException(body); - } - - public boolean dispatchFileOpenOk(FileOpenOkBody body, int channelId) throws AMQException - { - throw new UnexpectedMethodException(body); - } - - public boolean dispatchFileQosOk(FileQosOkBody body, int channelId) throws AMQException - { - throw new UnexpectedMethodException(body); - } - - public boolean dispatchFileReturn(FileReturnBody body, int channelId) throws AMQException - { - throw new UnexpectedMethodException(body); - } - - public boolean dispatchFileStage(FileStageBody body, int channelId) throws AMQException - { - throw new UnexpectedMethodException(body); - } - public boolean dispatchQueueBindOk(QueueBindOkBody body, int channelId) throws AMQException { throw new UnexpectedMethodException(body); @@ -384,31 +335,6 @@ public class ServerMethodDispatcherImpl throw new UnexpectedMethodException(body); } - public boolean dispatchStreamCancelOk(StreamCancelOkBody body, int channelId) throws AMQException - { - throw new UnexpectedMethodException(body); - } - - public boolean dispatchStreamConsumeOk(StreamConsumeOkBody body, int channelId) throws AMQException - { - throw new UnexpectedMethodException(body); - } - - public boolean dispatchStreamDeliver(StreamDeliverBody body, int channelId) throws AMQException - { - throw new UnexpectedMethodException(body); - } - - public boolean dispatchStreamQosOk(StreamQosOkBody body, int channelId) throws AMQException - { - throw new UnexpectedMethodException(body); - } - - public boolean dispatchStreamReturn(StreamReturnBody body, int channelId) throws AMQException - { - throw new UnexpectedMethodException(body); - } - public boolean dispatchTxCommitOk(TxCommitOkBody body, int channelId) throws AMQException { throw new UnexpectedMethodException(body); @@ -427,144 +353,84 @@ public class ServerMethodDispatcherImpl public boolean dispatchConnectionSecureOk(ConnectionSecureOkBody body, int channelId) throws AMQException { - _connectionSecureOkMethodHandler.methodReceived(_stateManager, body, channelId); + _connectionSecureOkMethodHandler.methodReceived( + getConnection(), + body, channelId); return true; } public boolean dispatchConnectionStartOk(ConnectionStartOkBody body, int channelId) throws AMQException { - _connectionStartOkMethodHandler.methodReceived(_stateManager, body, channelId); + _connectionStartOkMethodHandler.methodReceived( + getConnection(), + body, channelId); return true; } public boolean dispatchConnectionTuneOk(ConnectionTuneOkBody body, int channelId) throws AMQException { - _connectionTuneOkMethodHandler.methodReceived(_stateManager, body, channelId); + _connectionTuneOkMethodHandler.methodReceived(getConnection(), body, channelId); return true; } - public boolean dispatchDtxSelect(DtxSelectBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchDtxStart(DtxStartBody body, int channelId) throws AMQException - { - return false; - } - public boolean dispatchExchangeBound(ExchangeBoundBody body, int channelId) throws AMQException { - _exchangeBoundHandler.methodReceived(_stateManager, body, channelId); + _exchangeBoundHandler.methodReceived(getConnection(), body, channelId); return true; } public boolean dispatchExchangeDeclare(ExchangeDeclareBody body, int channelId) throws AMQException { - _exchangeDeclareHandler.methodReceived(_stateManager, body, channelId); + _exchangeDeclareHandler.methodReceived(getConnection(), body, channelId); return true; } public boolean dispatchExchangeDelete(ExchangeDeleteBody body, int channelId) throws AMQException { - _exchangeDeleteHandler.methodReceived(_stateManager, body, channelId); + _exchangeDeleteHandler.methodReceived(getConnection(), body, channelId); return true; } - public boolean dispatchFileAck(FileAckBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchFileCancel(FileCancelBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchFileConsume(FileConsumeBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchFilePublish(FilePublishBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchFileQos(FileQosBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchFileReject(FileRejectBody body, int channelId) throws AMQException - { - return false; - } - public boolean dispatchQueueBind(QueueBindBody body, int channelId) throws AMQException { - _queueBindHandler.methodReceived(_stateManager, body, channelId); + _queueBindHandler.methodReceived(getConnection(), body, channelId); return true; } public boolean dispatchQueueDeclare(QueueDeclareBody body, int channelId) throws AMQException { - _queueDeclareHandler.methodReceived(_stateManager, body, channelId); + _queueDeclareHandler.methodReceived(getConnection(), body, channelId); return true; } public boolean dispatchQueueDelete(QueueDeleteBody body, int channelId) throws AMQException { - _queueDeleteHandler.methodReceived(_stateManager, body, channelId); + _queueDeleteHandler.methodReceived(getConnection(), body, channelId); return true; } public boolean dispatchQueuePurge(QueuePurgeBody body, int channelId) throws AMQException { - _queuePurgeHandler.methodReceived(_stateManager, body, channelId); + _queuePurgeHandler.methodReceived(getConnection(), body, channelId); return true; } - public boolean dispatchStreamCancel(StreamCancelBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchStreamConsume(StreamConsumeBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchStreamPublish(StreamPublishBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchStreamQos(StreamQosBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchTunnelRequest(TunnelRequestBody body, int channelId) throws AMQException - { - return false; - } public boolean dispatchTxCommit(TxCommitBody body, int channelId) throws AMQException { - _txCommitHandler.methodReceived(_stateManager, body, channelId); + _txCommitHandler.methodReceived(getConnection(), body, channelId); return true; } public boolean dispatchTxRollback(TxRollbackBody body, int channelId) throws AMQException { - _txRollbackHandler.methodReceived(_stateManager, body, channelId); + _txRollbackHandler.methodReceived(getConnection(), body, channelId); return true; } public boolean dispatchTxSelect(TxSelectBody body, int channelId) throws AMQException { - _txSelectHandler.methodReceived(_stateManager, body, channelId); + _txSelectHandler.methodReceived(getConnection(), body, channelId); return true; } Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ServerMethodDispatcherImpl_0_9.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ServerMethodDispatcherImpl_0_9.java?rev=1628074&r1=1628073&r2=1628074&view=diff ============================================================================== --- qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ServerMethodDispatcherImpl_0_9.java (original) +++ qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ServerMethodDispatcherImpl_0_9.java Sun Sep 28 15:22:03 2014 @@ -24,7 +24,7 @@ package org.apache.qpid.server.protocol. import org.apache.qpid.AMQException; import org.apache.qpid.framing.*; import org.apache.qpid.framing.amqp_0_9.MethodDispatcher_0_9; -import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager; +import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; @@ -40,14 +40,14 @@ public class ServerMethodDispatcherImpl_ QueueUnbindHandler.getInstance(); - public ServerMethodDispatcherImpl_0_9(AMQStateManager stateManager) + public ServerMethodDispatcherImpl_0_9(AMQProtocolSession<?> connection) { - super(stateManager); + super(connection); } public boolean dispatchBasicRecoverSync(BasicRecoverSyncBody body, int channelId) throws AMQException { - _basicRecoverSyncMethodHandler.methodReceived(getStateManager(), body, channelId); + _basicRecoverSyncMethodHandler.methodReceived(getConnection(), body, channelId); return true; } @@ -56,101 +56,6 @@ public class ServerMethodDispatcherImpl_ throw new UnexpectedMethodException(body); } - public boolean dispatchChannelOk(ChannelOkBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchChannelPing(ChannelPingBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchChannelPong(ChannelPongBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchChannelResume(ChannelResumeBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchMessageAppend(MessageAppendBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchMessageCancel(MessageCancelBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchMessageCheckpoint(MessageCheckpointBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchMessageClose(MessageCloseBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchMessageConsume(MessageConsumeBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchMessageEmpty(MessageEmptyBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchMessageGet(MessageGetBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchMessageOffset(MessageOffsetBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchMessageOk(MessageOkBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchMessageOpen(MessageOpenBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchMessageQos(MessageQosBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchMessageRecover(MessageRecoverBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchMessageReject(MessageRejectBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchMessageResume(MessageResumeBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchMessageTransfer(MessageTransferBody body, int channelId) throws AMQException - { - return false; - } - public boolean dispatchQueueUnbindOk(QueueUnbindOkBody body, int channelId) throws AMQException { throw new UnexpectedMethodException(body); @@ -158,7 +63,7 @@ public class ServerMethodDispatcherImpl_ public boolean dispatchQueueUnbind(QueueUnbindBody body, int channelId) throws AMQException { - _queueUnbindHandler.methodReceived(getStateManager(),body,channelId); + _queueUnbindHandler.methodReceived(getConnection(), body,channelId); return true; } } Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ServerMethodDispatcherImpl_0_91.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ServerMethodDispatcherImpl_0_91.java?rev=1628074&r1=1628073&r2=1628074&view=diff ============================================================================== --- qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ServerMethodDispatcherImpl_0_91.java (original) +++ qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ServerMethodDispatcherImpl_0_91.java Sun Sep 28 15:22:03 2014 @@ -24,7 +24,7 @@ package org.apache.qpid.server.protocol. import org.apache.qpid.AMQException; import org.apache.qpid.framing.*; import org.apache.qpid.framing.amqp_0_91.MethodDispatcher_0_91; -import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager; +import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; public class ServerMethodDispatcherImpl_0_91 @@ -39,14 +39,14 @@ public class ServerMethodDispatcherImpl_ QueueUnbindHandler.getInstance(); - public ServerMethodDispatcherImpl_0_91(AMQStateManager stateManager) + public ServerMethodDispatcherImpl_0_91(AMQProtocolSession<?> connection) { - super(stateManager); + super(connection); } public boolean dispatchBasicRecoverSync(BasicRecoverSyncBody body, int channelId) throws AMQException { - _basicRecoverSyncMethodHandler.methodReceived(getStateManager(), body, channelId); + _basicRecoverSyncMethodHandler.methodReceived(getConnection(), body, channelId); return true; } @@ -55,106 +55,6 @@ public class ServerMethodDispatcherImpl_ throw new UnexpectedMethodException(body); } - public boolean dispatchChannelOk(ChannelOkBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchChannelPing(ChannelPingBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchChannelPong(ChannelPongBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchChannelResume(ChannelResumeBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchMessageAppend(MessageAppendBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchMessageCancel(MessageCancelBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchMessageCheckpoint(MessageCheckpointBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchMessageClose(MessageCloseBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchMessageConsume(MessageConsumeBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchMessageEmpty(MessageEmptyBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchMessageGet(MessageGetBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchMessageOffset(MessageOffsetBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchMessageOk(MessageOkBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchMessageOpen(MessageOpenBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchMessageQos(MessageQosBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchMessageRecover(MessageRecoverBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchMessageReject(MessageRejectBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchMessageResume(MessageResumeBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchMessageTransfer(MessageTransferBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchBasicRecoverOk(BasicRecoverOkBody body, int channelId) throws AMQException - { - return false; //To change body of implemented methods use File | Settings | File Templates. - } - public boolean dispatchQueueUnbindOk(QueueUnbindOkBody body, int channelId) throws AMQException { throw new UnexpectedMethodException(body); @@ -162,7 +62,7 @@ public class ServerMethodDispatcherImpl_ public boolean dispatchQueueUnbind(QueueUnbindBody body, int channelId) throws AMQException { - _queueUnbindHandler.methodReceived(getStateManager(),body,channelId); + _queueUnbindHandler.methodReceived(getConnection(), body,channelId); return true; } } Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ServerMethodDispatcherImpl_8_0.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ServerMethodDispatcherImpl_8_0.java?rev=1628074&r1=1628073&r2=1628074&view=diff ============================================================================== --- qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ServerMethodDispatcherImpl_8_0.java (original) +++ qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ServerMethodDispatcherImpl_8_0.java Sun Sep 28 15:22:03 2014 @@ -23,24 +23,16 @@ package org.apache.qpid.server.protocol. import org.apache.qpid.AMQException; import org.apache.qpid.framing.BasicRecoverOkBody; import org.apache.qpid.framing.ChannelAlertBody; -import org.apache.qpid.framing.TestContentBody; -import org.apache.qpid.framing.TestContentOkBody; -import org.apache.qpid.framing.TestIntegerBody; -import org.apache.qpid.framing.TestIntegerOkBody; -import org.apache.qpid.framing.TestStringBody; -import org.apache.qpid.framing.TestStringOkBody; -import org.apache.qpid.framing.TestTableBody; -import org.apache.qpid.framing.TestTableOkBody; import org.apache.qpid.framing.amqp_8_0.MethodDispatcher_8_0; -import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager; +import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; public class ServerMethodDispatcherImpl_8_0 extends ServerMethodDispatcherImpl implements MethodDispatcher_8_0 { - public ServerMethodDispatcherImpl_8_0(AMQStateManager stateManager) + public ServerMethodDispatcherImpl_8_0(AMQProtocolSession<?> connection) { - super(stateManager); + super(connection); } public boolean dispatchBasicRecoverOk(BasicRecoverOkBody body, int channelId) throws AMQException @@ -53,43 +45,4 @@ public class ServerMethodDispatcherImpl_ throw new UnexpectedMethodException(body); } - public boolean dispatchTestContent(TestContentBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchTestContentOk(TestContentOkBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchTestInteger(TestIntegerBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchTestIntegerOk(TestIntegerOkBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchTestString(TestStringBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchTestStringOk(TestStringOkBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchTestTable(TestTableBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchTestTableOk(TestTableOkBody body, int channelId) throws AMQException - { - return false; - } } Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/TxCommitHandler.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/TxCommitHandler.java?rev=1628074&r1=1628073&r2=1628074&view=diff ============================================================================== --- qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/TxCommitHandler.java (original) +++ qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/TxCommitHandler.java Sun Sep 28 15:22:03 2014 @@ -28,7 +28,6 @@ import org.apache.qpid.framing.MethodReg import org.apache.qpid.framing.TxCommitBody; import org.apache.qpid.server.protocol.v0_8.AMQChannel; import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; -import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager; import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener; public class TxCommitHandler implements StateAwareMethodListener<TxCommitBody> @@ -46,21 +45,21 @@ public class TxCommitHandler implements { } - public void methodReceived(AMQStateManager stateManager, TxCommitBody body, final int channelId) throws AMQException + public void methodReceived(final AMQProtocolSession<?> connection, + TxCommitBody body, + final int channelId) throws AMQException { - final AMQProtocolSession session = stateManager.getProtocolSession(); - try { if (_log.isDebugEnabled()) { _log.debug("Commit received on channel " + channelId); } - AMQChannel channel = session.getChannel(channelId); + AMQChannel channel = connection.getChannel(channelId); if (channel == null) { - throw body.getChannelNotFoundException(channelId); + throw body.getChannelNotFoundException(channelId, connection.getMethodRegistry()); } channel.commit(new Runnable() { @@ -68,9 +67,9 @@ public class TxCommitHandler implements @Override public void run() { - MethodRegistry methodRegistry = session.getMethodRegistry(); + MethodRegistry methodRegistry = connection.getMethodRegistry(); AMQMethodBody responseBody = methodRegistry.createTxCommitOkBody(); - session.writeFrame(responseBody.generateFrame(channelId)); + connection.writeFrame(responseBody.generateFrame(channelId)); } }, true); @@ -79,7 +78,8 @@ public class TxCommitHandler implements } catch (AMQException e) { - throw body.getChannelException(e.getErrorCode(), "Failed to commit: " + e.getMessage()); + throw body.getChannelException(e.getErrorCode(), "Failed to commit: " + e.getMessage(), + connection.getMethodRegistry()); } } } Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/TxRollbackHandler.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/TxRollbackHandler.java?rev=1628074&r1=1628073&r2=1628074&view=diff ============================================================================== --- qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/TxRollbackHandler.java (original) +++ qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/TxRollbackHandler.java Sun Sep 28 15:22:03 2014 @@ -26,7 +26,6 @@ import org.apache.qpid.framing.MethodReg import org.apache.qpid.framing.TxRollbackBody; import org.apache.qpid.server.protocol.v0_8.AMQChannel; import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; -import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager; import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener; public class TxRollbackHandler implements StateAwareMethodListener<TxRollbackBody> @@ -42,22 +41,22 @@ public class TxRollbackHandler implement { } - public void methodReceived(AMQStateManager stateManager, TxRollbackBody body, final int channelId) throws AMQException + public void methodReceived(final AMQProtocolSession<?> connection, + TxRollbackBody body, + final int channelId) throws AMQException { - final AMQProtocolSession session = stateManager.getProtocolSession(); - try { - AMQChannel channel = session.getChannel(channelId); + AMQChannel channel = connection.getChannel(channelId); if (channel == null) { - throw body.getChannelNotFoundException(channelId); + throw body.getChannelNotFoundException(channelId, connection.getMethodRegistry()); } - final MethodRegistry methodRegistry = session.getMethodRegistry(); + final MethodRegistry methodRegistry = connection.getMethodRegistry(); final AMQMethodBody responseBody = methodRegistry.createTxRollbackOkBody(); Runnable task = new Runnable() @@ -65,7 +64,7 @@ public class TxRollbackHandler implement public void run() { - session.writeFrame(responseBody.generateFrame(channelId)); + connection.writeFrame(responseBody.generateFrame(channelId)); } }; @@ -79,7 +78,8 @@ public class TxRollbackHandler implement } catch (AMQException e) { - throw body.getChannelException(e.getErrorCode(), "Failed to rollback: " + e.getMessage()); + throw body.getChannelException(e.getErrorCode(), "Failed to rollback: " + e.getMessage(), + connection.getMethodRegistry()); } } } Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/TxSelectHandler.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/TxSelectHandler.java?rev=1628074&r1=1628073&r2=1628074&view=diff ============================================================================== --- qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/TxSelectHandler.java (original) +++ qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/TxSelectHandler.java Sun Sep 28 15:22:03 2014 @@ -26,7 +26,6 @@ import org.apache.qpid.framing.TxSelectB import org.apache.qpid.framing.TxSelectOkBody; import org.apache.qpid.server.protocol.v0_8.AMQChannel; import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; -import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager; import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener; public class TxSelectHandler implements StateAwareMethodListener<TxSelectBody> @@ -42,21 +41,21 @@ public class TxSelectHandler implements { } - public void methodReceived(AMQStateManager stateManager, TxSelectBody body, int channelId) throws AMQException + public void methodReceived(final AMQProtocolSession<?> connection, + TxSelectBody body, + int channelId) throws AMQException { - AMQProtocolSession session = stateManager.getProtocolSession(); - - AMQChannel channel = session.getChannel(channelId); + AMQChannel channel = connection.getChannel(channelId); if (channel == null) { - throw body.getChannelNotFoundException(channelId); + throw body.getChannelNotFoundException(channelId, connection.getMethodRegistry()); } channel.setLocalTransactional(); - MethodRegistry methodRegistry = session.getMethodRegistry(); + MethodRegistry methodRegistry = connection.getMethodRegistry(); TxSelectOkBody responseBody = methodRegistry.createTxSelectOkBody(); - session.writeFrame(responseBody.generateFrame(channelId)); + connection.writeFrame(responseBody.generateFrame(channelId)); } } Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/state/StateAwareMethodListener.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/state/StateAwareMethodListener.java?rev=1628074&r1=1628073&r2=1628074&view=diff ============================================================================== --- qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/state/StateAwareMethodListener.java (original) +++ qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/state/StateAwareMethodListener.java Sun Sep 28 15:22:03 2014 @@ -22,6 +22,7 @@ package org.apache.qpid.server.protocol. import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; /** * A frame listener that is informed of the protocol state when invoked and has @@ -30,5 +31,5 @@ import org.apache.qpid.framing.AMQMethod */ public interface StateAwareMethodListener<B extends AMQMethodBody> { - void methodReceived(AMQStateManager stateManager, B evt, int channelId) throws AMQException; + void methodReceived(final AMQProtocolSession<?> connection, B evt, int channelId) throws AMQException; } Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java?rev=1628074&r1=1628073&r2=1628074&view=diff ============================================================================== --- qpid/branches/QPID-6125-ProtocolRefactoring/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java (original) +++ qpid/branches/QPID-6125-ProtocolRefactoring/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java Sun Sep 28 15:22:03 2014 @@ -244,16 +244,6 @@ public class ClientMethodDispatcherImpl return false; } - public boolean dispatchStreamCancelOk(StreamCancelOkBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchStreamConsumeOk(StreamConsumeOkBody body, int channelId) throws AMQException - { - return false; - } - public boolean dispatchAccessRequest(AccessRequestBody body, int channelId) throws AMQException { throw new AMQMethodNotImplementedException(body); @@ -324,16 +314,6 @@ public class ClientMethodDispatcherImpl throw new AMQMethodNotImplementedException(body); } - public boolean dispatchDtxSelect(DtxSelectBody body, int channelId) throws AMQException - { - throw new AMQMethodNotImplementedException(body); - } - - public boolean dispatchDtxStart(DtxStartBody body, int channelId) throws AMQException - { - throw new AMQMethodNotImplementedException(body); - } - public boolean dispatchExchangeBound(ExchangeBoundBody body, int channelId) throws AMQException { throw new AMQMethodNotImplementedException(body); @@ -349,36 +329,6 @@ public class ClientMethodDispatcherImpl throw new AMQMethodNotImplementedException(body); } - public boolean dispatchFileAck(FileAckBody body, int channelId) throws AMQException - { - throw new AMQMethodNotImplementedException(body); - } - - public boolean dispatchFileCancel(FileCancelBody body, int channelId) throws AMQException - { - throw new AMQMethodNotImplementedException(body); - } - - public boolean dispatchFileConsume(FileConsumeBody body, int channelId) throws AMQException - { - throw new AMQMethodNotImplementedException(body); - } - - public boolean dispatchFilePublish(FilePublishBody body, int channelId) throws AMQException - { - throw new AMQMethodNotImplementedException(body); - } - - public boolean dispatchFileQos(FileQosBody body, int channelId) throws AMQException - { - throw new AMQMethodNotImplementedException(body); - } - - public boolean dispatchFileReject(FileRejectBody body, int channelId) throws AMQException - { - throw new AMQMethodNotImplementedException(body); - } - public boolean dispatchQueueBind(QueueBindBody body, int channelId) throws AMQException { throw new AMQMethodNotImplementedException(body); @@ -399,30 +349,6 @@ public class ClientMethodDispatcherImpl throw new AMQMethodNotImplementedException(body); } - public boolean dispatchStreamCancel(StreamCancelBody body, int channelId) throws AMQException - { - throw new AMQMethodNotImplementedException(body); - } - - public boolean dispatchStreamConsume(StreamConsumeBody body, int channelId) throws AMQException - { - throw new AMQMethodNotImplementedException(body); - } - - public boolean dispatchStreamPublish(StreamPublishBody body, int channelId) throws AMQException - { - throw new AMQMethodNotImplementedException(body); - } - - public boolean dispatchStreamQos(StreamQosBody body, int channelId) throws AMQException - { - throw new AMQMethodNotImplementedException(body); - } - - public boolean dispatchTunnelRequest(TunnelRequestBody body, int channelId) throws AMQException - { - throw new AMQMethodNotImplementedException(body); - } public boolean dispatchTxCommit(TxCommitBody body, int channelId) throws AMQException { @@ -439,16 +365,6 @@ public class ClientMethodDispatcherImpl throw new AMQMethodNotImplementedException(body); } - public boolean dispatchDtxSelectOk(DtxSelectOkBody body, int channelId) throws AMQException - { - throw new AMQMethodNotImplementedException(body); - } - - public boolean dispatchDtxStartOk(DtxStartOkBody body, int channelId) throws AMQException - { - throw new AMQMethodNotImplementedException(body); - } - public boolean dispatchExchangeBoundOk(ExchangeBoundOkBody body, int channelId) throws AMQException { _exchangeBoundOkMethodHandler.methodReceived(_session, body, channelId); @@ -465,46 +381,6 @@ public class ClientMethodDispatcherImpl return false; } - public boolean dispatchFileCancelOk(FileCancelOkBody body, int channelId) throws AMQException - { - throw new AMQMethodNotImplementedException(body); - } - - public boolean dispatchFileConsumeOk(FileConsumeOkBody body, int channelId) throws AMQException - { - throw new AMQMethodNotImplementedException(body); - } - - public boolean dispatchFileDeliver(FileDeliverBody body, int channelId) throws AMQException - { - throw new AMQMethodNotImplementedException(body); - } - - public boolean dispatchFileOpen(FileOpenBody body, int channelId) throws AMQException - { - throw new AMQMethodNotImplementedException(body); - } - - public boolean dispatchFileOpenOk(FileOpenOkBody body, int channelId) throws AMQException - { - throw new AMQMethodNotImplementedException(body); - } - - public boolean dispatchFileQosOk(FileQosOkBody body, int channelId) throws AMQException - { - throw new AMQMethodNotImplementedException(body); - } - - public boolean dispatchFileReturn(FileReturnBody body, int channelId) throws AMQException - { - throw new AMQMethodNotImplementedException(body); - } - - public boolean dispatchFileStage(FileStageBody body, int channelId) throws AMQException - { - throw new AMQMethodNotImplementedException(body); - } - public boolean dispatchQueueBindOk(QueueBindOkBody body, int channelId) throws AMQException { return false; @@ -515,21 +391,6 @@ public class ClientMethodDispatcherImpl return false; } - public boolean dispatchStreamDeliver(StreamDeliverBody body, int channelId) throws AMQException - { - throw new AMQMethodNotImplementedException(body); - } - - public boolean dispatchStreamQosOk(StreamQosOkBody body, int channelId) throws AMQException - { - throw new AMQMethodNotImplementedException(body); - } - - public boolean dispatchStreamReturn(StreamReturnBody body, int channelId) throws AMQException - { - throw new AMQMethodNotImplementedException(body); - } - public boolean dispatchTxCommitOk(TxCommitOkBody body, int channelId) throws AMQException { return false; Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_0_9.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_0_9.java?rev=1628074&r1=1628073&r2=1628074&view=diff ============================================================================== --- qpid/branches/QPID-6125-ProtocolRefactoring/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_0_9.java (original) +++ qpid/branches/QPID-6125-ProtocolRefactoring/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_0_9.java Sun Sep 28 15:22:03 2014 @@ -43,101 +43,6 @@ public class ClientMethodDispatcherImpl_ throw new AMQMethodNotImplementedException(body); } - public boolean dispatchChannelOk(ChannelOkBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchChannelPing(ChannelPingBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchChannelPong(ChannelPongBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchChannelResume(ChannelResumeBody body, int channelId) throws AMQException - { - throw new AMQMethodNotImplementedException(body); - } - - public boolean dispatchMessageAppend(MessageAppendBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchMessageCancel(MessageCancelBody body, int channelId) throws AMQException - { - throw new AMQMethodNotImplementedException(body); - } - - public boolean dispatchMessageCheckpoint(MessageCheckpointBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchMessageClose(MessageCloseBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchMessageConsume(MessageConsumeBody body, int channelId) throws AMQException - { - throw new AMQMethodNotImplementedException(body); - } - - public boolean dispatchMessageEmpty(MessageEmptyBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchMessageGet(MessageGetBody body, int channelId) throws AMQException - { - throw new AMQMethodNotImplementedException(body); - } - - public boolean dispatchMessageOffset(MessageOffsetBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchMessageOk(MessageOkBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchMessageOpen(MessageOpenBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchMessageQos(MessageQosBody body, int channelId) throws AMQException - { - throw new AMQMethodNotImplementedException(body); - } - - public boolean dispatchMessageRecover(MessageRecoverBody body, int channelId) throws AMQException - { - throw new AMQMethodNotImplementedException(body); - } - - public boolean dispatchMessageReject(MessageRejectBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchMessageResume(MessageResumeBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchMessageTransfer(MessageTransferBody body, int channelId) throws AMQException - { - return false; - } - public boolean dispatchQueueUnbind(QueueUnbindBody body, int channelId) throws AMQException { throw new AMQMethodNotImplementedException(body); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
