Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java?rev=1628074&r1=1628073&r2=1628074&view=diff ============================================================================== --- qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java (original) +++ qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java Sun Sep 28 15:22:03 2014 @@ -36,7 +36,6 @@ import org.apache.qpid.server.filter.AMQ import org.apache.qpid.server.message.MessageSource; import org.apache.qpid.server.protocol.v0_8.AMQChannel; import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; -import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager; import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.virtualhost.VirtualHostImpl; @@ -56,16 +55,16 @@ public class BasicConsumeMethodHandler i { } - public void methodReceived(AMQStateManager stateManager, BasicConsumeBody body, int channelId) throws AMQException + public void methodReceived(final AMQProtocolSession<?> connection, + BasicConsumeBody body, + int channelId) throws AMQException { - AMQProtocolSession protocolConnection = stateManager.getProtocolSession(); - - AMQChannel channel = protocolConnection.getChannel(channelId); - VirtualHostImpl<?,?,?> vHost = protocolConnection.getVirtualHost(); + AMQChannel channel = connection.getChannel(channelId); + VirtualHostImpl<?,?,?> vHost = connection.getVirtualHost(); if (channel == null) { - throw body.getChannelNotFoundException(channelId); + throw body.getChannelNotFoundException(channelId, connection.getMethodRegistry()); } else { @@ -119,12 +118,12 @@ public class BasicConsumeMethodHandler i if (queueName != null) { String msg = "No such queue, '" + queueName + "'"; - throw body.getChannelException(AMQConstant.NOT_FOUND, msg); + throw body.getChannelException(AMQConstant.NOT_FOUND, msg, connection.getMethodRegistry()); } else { String msg = "No queue name provided, no default queue defined."; - throw body.getConnectionException(AMQConstant.NOT_ALLOWED, msg); + throw body.getConnectionException(AMQConstant.NOT_ALLOWED, msg, connection.getMethodRegistry()); } } else @@ -153,9 +152,9 @@ public class BasicConsumeMethodHandler i body.getNoLocal()); if (!body.getNowait()) { - MethodRegistry methodRegistry = protocolConnection.getMethodRegistry(); + MethodRegistry methodRegistry = connection.getMethodRegistry(); AMQMethodBody responseBody = methodRegistry.createBasicConsumeOkBody(consumerTag); - protocolConnection.writeFrame(responseBody.generateFrame(channelId)); + connection.writeFrame(responseBody.generateFrame(channelId)); } } @@ -163,12 +162,12 @@ public class BasicConsumeMethodHandler i { AMQShortString msg = AMQShortString.validValueOf("Non-unique consumer tag, '" + body.getConsumerTag() + "'"); - MethodRegistry methodRegistry = protocolConnection.getMethodRegistry(); + MethodRegistry methodRegistry = connection.getMethodRegistry(); AMQMethodBody responseBody = methodRegistry.createConnectionCloseBody(AMQConstant.NOT_ALLOWED.getCode(), // replyCode msg, // replytext body.getClazz(), body.getMethod()); - protocolConnection.writeFrame(responseBody.generateFrame(0)); + connection.writeFrame(responseBody.generateFrame(0)); } } @@ -176,12 +175,12 @@ public class BasicConsumeMethodHandler i { _logger.debug("Closing connection due to invalid selector"); - MethodRegistry methodRegistry = protocolConnection.getMethodRegistry(); + MethodRegistry methodRegistry = connection.getMethodRegistry(); AMQMethodBody responseBody = methodRegistry.createChannelCloseBody(AMQConstant.ARGUMENT_INVALID.getCode(), AMQShortString.validValueOf(ise.getMessage()), body.getClazz(), body.getMethod()); - protocolConnection.writeFrame(responseBody.generateFrame(channelId)); + connection.writeFrame(responseBody.generateFrame(channelId)); } @@ -190,28 +189,28 @@ public class BasicConsumeMethodHandler i throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, "Cannot subscribe to queue " + queue.getName() - + " as it already has an existing exclusive consumer"); + + " as it already has an existing exclusive consumer", connection.getMethodRegistry()); } catch (AMQQueue.ExistingConsumerPreventsExclusive e) { throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, "Cannot subscribe to queue " + queue.getName() - + " exclusively as it already has a consumer"); + + " exclusively as it already has a consumer", connection.getMethodRegistry()); } catch (AccessControlException e) { throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, "Cannot subscribe to queue " + queue.getName() - + " permission denied"); + + " permission denied", connection.getMethodRegistry()); } catch (MessageSource.ConsumerAccessRefused consumerAccessRefused) { throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, "Cannot subscribe to queue " + queue.getName() - + " as it already has an incompatible exclusivity policy"); + + " as it already has an incompatible exclusivity policy", connection.getMethodRegistry()); } }
Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java?rev=1628074&r1=1628073&r2=1628074&view=diff ============================================================================== --- qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java (original) +++ qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java Sun Sep 28 15:22:03 2014 @@ -45,7 +45,6 @@ import org.apache.qpid.server.protocol.v import org.apache.qpid.server.protocol.v0_8.ClientDeliveryMethod; import org.apache.qpid.server.protocol.v0_8.ConsumerTarget_0_8; import org.apache.qpid.server.protocol.v0_8.RecordDeliveryMethod; -import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager; import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.virtualhost.VirtualHostImpl; @@ -65,17 +64,17 @@ public class BasicGetMethodHandler imple { } - public void methodReceived(AMQStateManager stateManager, BasicGetBody body, int channelId) throws AMQException + public void methodReceived(final AMQProtocolSession<?> connection, + BasicGetBody body, + int channelId) throws AMQException { - AMQProtocolSession protocolConnection = stateManager.getProtocolSession(); + VirtualHostImpl vHost = connection.getVirtualHost(); - VirtualHostImpl vHost = protocolConnection.getVirtualHost(); - - AMQChannel channel = protocolConnection.getChannel(channelId); + AMQChannel channel = connection.getChannel(channelId); if (channel == null) { - throw body.getChannelNotFoundException(channelId); + throw body.getChannelNotFoundException(channelId, connection.getMethodRegistry()); } else { @@ -87,12 +86,12 @@ public class BasicGetMethodHandler imple if(body.getQueue()!=null) { throw body.getConnectionException(AMQConstant.NOT_FOUND, - "No such queue, '" + body.getQueue()+ "'"); + "No such queue, '" + body.getQueue()+ "'", connection.getMethodRegistry()); } else { throw body.getConnectionException(AMQConstant.NOT_ALLOWED, - "No queue name provided, no default queue defined."); + "No queue name provided, no default queue defined.", connection.getMethodRegistry()); } } else @@ -100,36 +99,37 @@ public class BasicGetMethodHandler imple try { - if (!performGet(queue,protocolConnection, channel, !body.getNoAck())) + if (!performGet(queue,connection, channel, !body.getNoAck())) { - MethodRegistry methodRegistry = protocolConnection.getMethodRegistry(); + MethodRegistry methodRegistry = connection.getMethodRegistry(); // TODO - set clusterId BasicGetEmptyBody responseBody = methodRegistry.createBasicGetEmptyBody(null); - protocolConnection.writeFrame(responseBody.generateFrame(channelId)); + connection.writeFrame(responseBody.generateFrame(channelId)); } } catch (AccessControlException e) { throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, - e.getMessage()); + e.getMessage(), connection.getMethodRegistry()); } catch (MessageSource.ExistingExclusiveConsumer e) { throw body.getConnectionException(AMQConstant.NOT_ALLOWED, - "Queue has an exclusive consumer"); + "Queue has an exclusive consumer", connection.getMethodRegistry()); } catch (MessageSource.ExistingConsumerPreventsExclusive e) { throw body.getConnectionException(AMQConstant.INTERNAL_ERROR, "The GET request has been evaluated as an exclusive consumer, " + - "this is likely due to a programming error in the Qpid broker"); + "this is likely due to a programming error in the Qpid broker", + connection.getMethodRegistry()); } catch (MessageSource.ConsumerAccessRefused consumerAccessRefused) { throw body.getConnectionException(AMQConstant.NOT_ALLOWED, - "Queue has an incompatible exclusivit policy"); + "Queue has an incompatible exclusivit policy", connection.getMethodRegistry()); } } } Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicPublishMethodHandler.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicPublishMethodHandler.java?rev=1628074&r1=1628073&r2=1628074&view=diff ============================================================================== --- qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicPublishMethodHandler.java (original) +++ qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicPublishMethodHandler.java Sun Sep 28 15:22:03 2014 @@ -20,6 +20,8 @@ */ package org.apache.qpid.server.protocol.v0_8.handler; +import java.security.AccessControlException; + import org.apache.log4j.Logger; import org.apache.qpid.AMQException; @@ -30,12 +32,9 @@ import org.apache.qpid.protocol.AMQConst import org.apache.qpid.server.message.MessageDestination; import org.apache.qpid.server.protocol.v0_8.AMQChannel; import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; -import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager; import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener; import org.apache.qpid.server.virtualhost.VirtualHostImpl; -import java.security.AccessControlException; - public class BasicPublishMethodHandler implements StateAwareMethodListener<BasicPublishBody> { private static final Logger _logger = Logger.getLogger(BasicPublishMethodHandler.class); @@ -52,16 +51,17 @@ public class BasicPublishMethodHandler i { } - public void methodReceived(AMQStateManager stateManager, BasicPublishBody body, int channelId) throws AMQException + public void methodReceived(final AMQProtocolSession<?> connection, + BasicPublishBody body, + int channelId) throws AMQException { - AMQProtocolSession session = stateManager.getProtocolSession(); if (_logger.isDebugEnabled()) { _logger.debug("Publish received on channel " + channelId); } AMQShortString exchangeName = body.getExchange(); - VirtualHostImpl vHost = session.getVirtualHost(); + VirtualHostImpl vHost = connection.getVirtualHost(); // TODO: check the delivery tag field details - is it unique across the broker or per subscriber? @@ -79,21 +79,22 @@ public class BasicPublishMethodHandler i // if the exchange does not exist we raise a channel exception if (destination == null) { - throw body.getChannelException(AMQConstant.NOT_FOUND, "Unknown exchange name"); + throw body.getChannelException(AMQConstant.NOT_FOUND, "Unknown exchange name", + connection.getMethodRegistry()); } else { // The partially populated BasicDeliver frame plus the received route body // is stored in the channel. Once the final body frame has been received // it is routed to the exchange. - AMQChannel channel = session.getChannel(channelId); + AMQChannel channel = connection.getChannel(channelId); if (channel == null) { - throw body.getChannelNotFoundException(channelId); + throw body.getChannelNotFoundException(channelId, connection.getMethodRegistry()); } - MessagePublishInfo info = session.getMethodRegistry().getProtocolVersionMethodConverter().convertToInfo(body); + MessagePublishInfo info = connection.getMethodRegistry().getProtocolVersionMethodConverter().convertToInfo(body); info.setExchange(exchangeName); try { @@ -101,7 +102,7 @@ public class BasicPublishMethodHandler i } catch (AccessControlException e) { - throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, e.getMessage()); + throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, e.getMessage(), connection.getMethodRegistry()); } } } Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicQosHandler.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicQosHandler.java?rev=1628074&r1=1628073&r2=1628074&view=diff ============================================================================== --- qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicQosHandler.java (original) +++ qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicQosHandler.java Sun Sep 28 15:22:03 2014 @@ -26,7 +26,6 @@ import org.apache.qpid.framing.BasicQosB import org.apache.qpid.framing.MethodRegistry; import org.apache.qpid.server.protocol.v0_8.AMQChannel; import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; -import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager; import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener; public class BasicQosHandler implements StateAwareMethodListener<BasicQosBody> @@ -38,21 +37,22 @@ public class BasicQosHandler implements return _instance; } - public void methodReceived(AMQStateManager stateManager, BasicQosBody body, int channelId) throws AMQException + public void methodReceived(final AMQProtocolSession<?> connection, + BasicQosBody body, + int channelId) throws AMQException { - AMQProtocolSession session = stateManager.getProtocolSession(); - AMQChannel channel = session.getChannel(channelId); + AMQChannel channel = connection.getChannel(channelId); if (channel == null) { - throw body.getChannelNotFoundException(channelId); + throw body.getChannelNotFoundException(channelId, connection.getMethodRegistry()); } channel.sync(); channel.setCredit(body.getPrefetchSize(), body.getPrefetchCount()); - MethodRegistry methodRegistry = session.getMethodRegistry(); + MethodRegistry methodRegistry = connection.getMethodRegistry(); AMQMethodBody responseBody = methodRegistry.createBasicQosOkBody(); - session.writeFrame(responseBody.generateFrame(channelId)); + connection.writeFrame(responseBody.generateFrame(channelId)); } } Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRecoverMethodHandler.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRecoverMethodHandler.java?rev=1628074&r1=1628073&r2=1628074&view=diff ============================================================================== --- qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRecoverMethodHandler.java (original) +++ qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRecoverMethodHandler.java Sun Sep 28 15:22:03 2014 @@ -29,7 +29,6 @@ import org.apache.qpid.framing.ProtocolV import org.apache.qpid.framing.amqp_8_0.MethodRegistry_8_0; import org.apache.qpid.server.protocol.v0_8.AMQChannel; import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; -import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager; import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener; public class BasicRecoverMethodHandler implements StateAwareMethodListener<BasicRecoverBody> @@ -43,29 +42,29 @@ public class BasicRecoverMethodHandler i return _instance; } - public void methodReceived(AMQStateManager stateManager, BasicRecoverBody body, int channelId) throws AMQException + public void methodReceived(final AMQProtocolSession<?> connection, + BasicRecoverBody body, + int channelId) throws AMQException { - AMQProtocolSession session = stateManager.getProtocolSession(); - - _logger.debug("Recover received on protocol session " + session + " and channel " + channelId); - AMQChannel channel = session.getChannel(channelId); + _logger.debug("Recover received on protocol session " + connection + " and channel " + channelId); + AMQChannel channel = connection.getChannel(channelId); if (channel == null) { - throw body.getChannelNotFoundException(channelId); + throw body.getChannelNotFoundException(channelId, connection.getMethodRegistry()); } channel.resend(); // Qpid 0-8 hacks a synchronous -ok onto recover. // In Qpid 0-9 we create a separate sync-recover, sync-recover-ok pair to be "more" compliant - if(session.getProtocolVersion().equals(ProtocolVersion.v8_0)) + if(connection.getProtocolVersion().equals(ProtocolVersion.v8_0)) { - MethodRegistry_8_0 methodRegistry = (MethodRegistry_8_0) session.getMethodRegistry(); + MethodRegistry_8_0 methodRegistry = (MethodRegistry_8_0) connection.getMethodRegistry(); AMQMethodBody recoverOk = methodRegistry.createBasicRecoverOkBody(); channel.sync(); - session.writeFrame(recoverOk.generateFrame(channelId)); + connection.writeFrame(recoverOk.generateFrame(channelId)); } Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRecoverSyncMethodHandler.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRecoverSyncMethodHandler.java?rev=1628074&r1=1628073&r2=1628074&view=diff ============================================================================== --- qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRecoverSyncMethodHandler.java (original) +++ qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRecoverSyncMethodHandler.java Sun Sep 28 15:22:03 2014 @@ -31,7 +31,6 @@ import org.apache.qpid.framing.amqp_0_9. import org.apache.qpid.framing.amqp_0_91.MethodRegistry_0_91; import org.apache.qpid.server.protocol.v0_8.AMQChannel; import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; -import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager; import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener; public class BasicRecoverSyncMethodHandler implements StateAwareMethodListener<BasicRecoverSyncBody> @@ -45,35 +44,36 @@ public class BasicRecoverSyncMethodHandl return _instance; } - public void methodReceived(AMQStateManager stateManager, BasicRecoverSyncBody body, int channelId) throws AMQException + public void methodReceived(final AMQProtocolSession<?> connection, + BasicRecoverSyncBody body, + int channelId) throws AMQException { - AMQProtocolSession session = stateManager.getProtocolSession(); - _logger.debug("Recover received on protocol session " + session + " and channel " + channelId); - AMQChannel channel = session.getChannel(channelId); + _logger.debug("Recover received on protocol session " + connection + " and channel " + channelId); + AMQChannel channel = connection.getChannel(channelId); if (channel == null) { - throw body.getChannelNotFoundException(channelId); + throw body.getChannelNotFoundException(channelId, connection.getMethodRegistry()); } channel.sync(); channel.resend(); // Qpid 0-8 hacks a synchronous -ok onto recover. // In Qpid 0-9 we create a separate sync-recover, sync-recover-ok pair to be "more" compliant - if(session.getProtocolVersion().equals(ProtocolVersion.v0_9)) + if(connection.getProtocolVersion().equals(ProtocolVersion.v0_9)) { - MethodRegistry_0_9 methodRegistry = (MethodRegistry_0_9) session.getMethodRegistry(); + MethodRegistry_0_9 methodRegistry = (MethodRegistry_0_9) connection.getMethodRegistry(); AMQMethodBody recoverOk = methodRegistry.createBasicRecoverSyncOkBody(); - session.writeFrame(recoverOk.generateFrame(channelId)); + connection.writeFrame(recoverOk.generateFrame(channelId)); } - else if(session.getProtocolVersion().equals(ProtocolVersion.v0_91)) + else if(connection.getProtocolVersion().equals(ProtocolVersion.v0_91)) { - MethodRegistry_0_91 methodRegistry = (MethodRegistry_0_91) session.getMethodRegistry(); + MethodRegistry_0_91 methodRegistry = (MethodRegistry_0_91) connection.getMethodRegistry(); AMQMethodBody recoverOk = methodRegistry.createBasicRecoverSyncOkBody(); - session.writeFrame(recoverOk.generateFrame(channelId)); + connection.writeFrame(recoverOk.generateFrame(channelId)); } Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRejectMethodHandler.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRejectMethodHandler.java?rev=1628074&r1=1628073&r2=1628074&view=diff ============================================================================== --- qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRejectMethodHandler.java (original) +++ qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRejectMethodHandler.java Sun Sep 28 15:22:03 2014 @@ -27,8 +27,6 @@ import org.apache.qpid.framing.BasicReje import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.protocol.v0_8.AMQChannel; import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; -import org.apache.qpid.server.queue.QueueEntry; -import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager; import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener; public class BasicRejectMethodHandler implements StateAwareMethodListener<BasicRejectBody> @@ -46,15 +44,16 @@ public class BasicRejectMethodHandler im { } - public void methodReceived(AMQStateManager stateManager, BasicRejectBody body, int channelId) throws AMQException + public void methodReceived(final AMQProtocolSession<?> connection, + BasicRejectBody body, + int channelId) throws AMQException { - AMQProtocolSession session = stateManager.getProtocolSession(); - AMQChannel channel = session.getChannel(channelId); + AMQChannel channel = connection.getChannel(channelId); if (channel == null) { - throw body.getChannelNotFoundException(channelId); + throw body.getChannelNotFoundException(channelId, connection.getMethodRegistry()); } if (_logger.isDebugEnabled()) Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ChannelCloseHandler.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ChannelCloseHandler.java?rev=1628074&r1=1628073&r2=1628074&view=diff ============================================================================== --- qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ChannelCloseHandler.java (original) +++ qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ChannelCloseHandler.java Sun Sep 28 15:22:03 2014 @@ -29,7 +29,6 @@ import org.apache.qpid.framing.MethodReg import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.protocol.v0_8.AMQChannel; import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; -import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager; import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener; public class ChannelCloseHandler implements StateAwareMethodListener<ChannelCloseBody> @@ -47,9 +46,10 @@ public class ChannelCloseHandler impleme { } - public void methodReceived(AMQStateManager stateManager, ChannelCloseBody body, int channelId) throws AMQException + public void methodReceived(final AMQProtocolSession<?> connection, + ChannelCloseBody body, + int channelId) throws AMQException { - AMQProtocolSession session = stateManager.getProtocolSession(); if (_logger.isInfoEnabled()) { @@ -58,19 +58,19 @@ public class ChannelCloseHandler impleme } - AMQChannel channel = session.getChannel(channelId); + AMQChannel channel = connection.getChannel(channelId); if (channel == null) { - throw body.getConnectionException(AMQConstant.CHANNEL_ERROR, "Trying to close unknown channel"); + throw body.getConnectionException(AMQConstant.CHANNEL_ERROR, "Trying to close unknown channel", connection.getMethodRegistry()); } channel.sync(); - session.closeChannel(channelId); + connection.closeChannel(channelId); // Client requested closure so we don't wait for ok we send it - stateManager.getProtocolSession().closeChannelOk(channelId); + connection.closeChannelOk(channelId); - MethodRegistry methodRegistry = session.getMethodRegistry(); + MethodRegistry methodRegistry = connection.getMethodRegistry(); ChannelCloseOkBody responseBody = methodRegistry.createChannelCloseOkBody(); - session.writeFrame(responseBody.generateFrame(channelId)); + connection.writeFrame(responseBody.generateFrame(channelId)); } } Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ChannelCloseOkHandler.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ChannelCloseOkHandler.java?rev=1628074&r1=1628073&r2=1628074&view=diff ============================================================================== --- qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ChannelCloseOkHandler.java (original) +++ qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ChannelCloseOkHandler.java Sun Sep 28 15:22:03 2014 @@ -24,7 +24,7 @@ import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.framing.ChannelCloseOkBody; -import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager; +import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener; public class ChannelCloseOkHandler implements StateAwareMethodListener<ChannelCloseOkBody> @@ -42,12 +42,14 @@ public class ChannelCloseOkHandler imple { } - public void methodReceived(AMQStateManager stateManager, ChannelCloseOkBody body, int channelId) throws AMQException + public void methodReceived(final AMQProtocolSession<?> connection, + ChannelCloseOkBody body, + int channelId) throws AMQException { _logger.info("Received channel-close-ok for channel-id " + channelId); // Let the Protocol Session know the channel is now closed. - stateManager.getProtocolSession().closeChannelOk(channelId); + connection.closeChannelOk(channelId); } } Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ChannelFlowHandler.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ChannelFlowHandler.java?rev=1628074&r1=1628073&r2=1628074&view=diff ============================================================================== --- qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ChannelFlowHandler.java (original) +++ qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ChannelFlowHandler.java Sun Sep 28 15:22:03 2014 @@ -28,7 +28,6 @@ import org.apache.qpid.framing.ChannelFl import org.apache.qpid.framing.MethodRegistry; import org.apache.qpid.server.protocol.v0_8.AMQChannel; import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; -import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager; import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener; public class ChannelFlowHandler implements StateAwareMethodListener<ChannelFlowBody> @@ -46,23 +45,24 @@ public class ChannelFlowHandler implemen { } - public void methodReceived(AMQStateManager stateManager, ChannelFlowBody body, int channelId) throws AMQException + public void methodReceived(final AMQProtocolSession<?> connection, + ChannelFlowBody body, + int channelId) throws AMQException { - AMQProtocolSession session = stateManager.getProtocolSession(); - AMQChannel channel = session.getChannel(channelId); + AMQChannel channel = connection.getChannel(channelId); if (channel == null) { - throw body.getChannelNotFoundException(channelId); + throw body.getChannelNotFoundException(channelId, connection.getMethodRegistry()); } channel.sync(); channel.setSuspended(!body.getActive()); _logger.debug("Channel.Flow for channel " + channelId + ", active=" + body.getActive()); - MethodRegistry methodRegistry = session.getMethodRegistry(); + MethodRegistry methodRegistry = connection.getMethodRegistry(); AMQMethodBody responseBody = methodRegistry.createChannelFlowOkBody(body.getActive()); - session.writeFrame(responseBody.generateFrame(channelId)); + connection.writeFrame(responseBody.generateFrame(channelId)); } } Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ChannelOpenHandler.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ChannelOpenHandler.java?rev=1628074&r1=1628073&r2=1628074&view=diff ============================================================================== --- qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ChannelOpenHandler.java (original) +++ qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ChannelOpenHandler.java Sun Sep 28 15:22:03 2014 @@ -20,6 +20,11 @@ */ package org.apache.qpid.server.protocol.v0_8.handler; +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.UUID; + import org.apache.log4j.Logger; import org.apache.qpid.AMQException; @@ -33,16 +38,10 @@ import org.apache.qpid.framing.amqp_8_0. import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.protocol.v0_8.AMQChannel; import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; -import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager; import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener; import org.apache.qpid.server.util.ConnectionScopedRuntimeException; import org.apache.qpid.server.virtualhost.VirtualHostImpl; -import java.io.ByteArrayOutputStream; -import java.io.DataOutputStream; -import java.io.IOException; -import java.util.UUID; - public class ChannelOpenHandler implements StateAwareMethodListener<ChannelOpenBody> { private static final Logger _logger = Logger.getLogger(ChannelOpenHandler.class); @@ -58,10 +57,11 @@ public class ChannelOpenHandler implemen { } - public void methodReceived(AMQStateManager stateManager, ChannelOpenBody body, int channelId) throws AMQException + public void methodReceived(final AMQProtocolSession<?> connection, + ChannelOpenBody body, + int channelId) throws AMQException { - AMQProtocolSession session = stateManager.getProtocolSession(); - VirtualHostImpl virtualHost = session.getVirtualHost(); + VirtualHostImpl virtualHost = connection.getVirtualHost(); // Protect the broker against out of order frame request. if (virtualHost == null) @@ -70,13 +70,13 @@ public class ChannelOpenHandler implemen } _logger.info("Connecting to: " + virtualHost.getName()); - final AMQChannel channel = new AMQChannel(session,channelId, virtualHost.getMessageStore()); + final AMQChannel channel = new AMQChannel(connection,channelId, virtualHost.getMessageStore()); - session.addChannel(channel); + connection.addChannel(channel); ChannelOpenOkBody response; - ProtocolVersion pv = session.getProtocolVersion(); + ProtocolVersion pv = connection.getProtocolVersion(); if(pv.equals(ProtocolVersion.v8_0)) { @@ -138,6 +138,6 @@ public class ChannelOpenHandler implemen } - session.writeFrame(response.generateFrame(channelId)); + connection.writeFrame(response.generateFrame(channelId)); } } Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionCloseMethodHandler.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionCloseMethodHandler.java?rev=1628074&r1=1628073&r2=1628074&view=diff ============================================================================== --- qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionCloseMethodHandler.java (original) +++ qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionCloseMethodHandler.java Sun Sep 28 15:22:03 2014 @@ -27,7 +27,6 @@ import org.apache.qpid.framing.Connectio import org.apache.qpid.framing.ConnectionCloseOkBody; import org.apache.qpid.framing.MethodRegistry; import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; -import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager; import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener; public class ConnectionCloseMethodHandler implements StateAwareMethodListener<ConnectionCloseBody> @@ -45,28 +44,29 @@ public class ConnectionCloseMethodHandle { } - public void methodReceived(AMQStateManager stateManager, ConnectionCloseBody body, int channelId) throws AMQException + public void methodReceived(final AMQProtocolSession<?> connection, + ConnectionCloseBody body, + int channelId) throws AMQException { - AMQProtocolSession session = stateManager.getProtocolSession(); if (_logger.isInfoEnabled()) { _logger.info("ConnectionClose received with reply code/reply text " + body.getReplyCode() + "/" + - body.getReplyText() + " for " + session); + body.getReplyText() + " for " + connection); } try { - session.closeSession(); + connection.closeSession(); } catch (Exception e) { _logger.error("Error closing protocol session: " + e, e); } - MethodRegistry methodRegistry = session.getMethodRegistry(); + MethodRegistry methodRegistry = connection.getMethodRegistry(); ConnectionCloseOkBody responseBody = methodRegistry.createConnectionCloseOkBody(); - session.writeFrame(responseBody.generateFrame(channelId)); + connection.writeFrame(responseBody.generateFrame(channelId)); - session.closeProtocolSession(); + connection.closeProtocolSession(); } } Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionCloseOkMethodHandler.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionCloseOkMethodHandler.java?rev=1628074&r1=1628073&r2=1628074&view=diff ============================================================================== --- qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionCloseOkMethodHandler.java (original) +++ qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionCloseOkMethodHandler.java Sun Sep 28 15:22:03 2014 @@ -26,7 +26,6 @@ import org.apache.qpid.AMQException; import org.apache.qpid.framing.ConnectionCloseOkBody; import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; import org.apache.qpid.server.protocol.v0_8.state.AMQState; -import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager; import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener; public class ConnectionCloseOkMethodHandler implements StateAwareMethodListener<ConnectionCloseOkBody> @@ -44,16 +43,17 @@ public class ConnectionCloseOkMethodHand { } - public void methodReceived(AMQStateManager stateManager, ConnectionCloseOkBody body, int channelId) throws AMQException + public void methodReceived(final AMQProtocolSession<?> connection, + ConnectionCloseOkBody body, + int channelId) throws AMQException { - AMQProtocolSession session = stateManager.getProtocolSession(); //todo should this not do more than just log the method? _logger.info("Received Connection-close-ok"); try { - stateManager.changeState(AMQState.CONNECTION_CLOSED); - session.closeSession(); + connection.changeState(AMQState.CONNECTION_CLOSED); + connection.closeSession(); } catch (Exception e) { Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionOpenMethodHandler.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionOpenMethodHandler.java?rev=1628074&r1=1628073&r2=1628074&view=diff ============================================================================== --- qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionOpenMethodHandler.java (original) +++ qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionOpenMethodHandler.java Sun Sep 28 15:22:03 2014 @@ -34,7 +34,6 @@ import org.apache.qpid.server.model.Stat import org.apache.qpid.server.model.port.AmqpPort; import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; import org.apache.qpid.server.protocol.v0_8.state.AMQState; -import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager; import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener; import org.apache.qpid.server.virtualhost.VirtualHostImpl; @@ -58,9 +57,10 @@ public class ConnectionOpenMethodHandler return new AMQShortString(Long.toString(System.currentTimeMillis())); } - public void methodReceived(AMQStateManager stateManager, ConnectionOpenBody body, int channelId) throws AMQException + public void methodReceived(final AMQProtocolSession<?> connection, + ConnectionOpenBody body, + int channelId) throws AMQException { - AMQProtocolSession session = stateManager.getProtocolSession(); //ignore leading '/' String virtualHostName; @@ -73,42 +73,44 @@ public class ConnectionOpenMethodHandler virtualHostName = body.getVirtualHost() == null ? null : String.valueOf(body.getVirtualHost()); } - VirtualHostImpl virtualHost = ((AmqpPort)stateManager.getProtocolSession().getPort()).getVirtualHost(virtualHostName); + VirtualHostImpl virtualHost = ((AmqpPort)connection.getPort()).getVirtualHost(virtualHostName); if (virtualHost == null) { - throw body.getConnectionException(AMQConstant.NOT_FOUND, "Unknown virtual host: '" + virtualHostName + "'"); + throw body.getConnectionException(AMQConstant.NOT_FOUND, "Unknown virtual host: '" + virtualHostName + "'", + connection.getMethodRegistry()); } else { // Check virtualhost access if (virtualHost.getState() != State.ACTIVE) { - throw body.getConnectionException(AMQConstant.CONNECTION_FORCED, "Virtual host '" + virtualHost.getName() + "' is not active"); + throw body.getConnectionException(AMQConstant.CONNECTION_FORCED, "Virtual host '" + virtualHost.getName() + "' is not active", + connection.getMethodRegistry()); } - session.setVirtualHost(virtualHost); + connection.setVirtualHost(virtualHost); try { - virtualHost.getSecurityManager().authoriseCreateConnection(session); + virtualHost.getSecurityManager().authoriseCreateConnection(connection); } catch (AccessControlException e) { - throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, e.getMessage()); + throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, e.getMessage(), connection.getMethodRegistry()); } // See Spec (0.8.2). Section 3.1.2 Virtual Hosts - if (session.getContextKey() == null) + if (connection.getContextKey() == null) { - session.setContextKey(generateClientID()); + connection.setContextKey(generateClientID()); } - MethodRegistry methodRegistry = session.getMethodRegistry(); + MethodRegistry methodRegistry = connection.getMethodRegistry(); AMQMethodBody responseBody = methodRegistry.createConnectionOpenOkBody(body.getVirtualHost()); - stateManager.changeState(AMQState.CONNECTION_OPEN); + connection.changeState(AMQState.CONNECTION_OPEN); - session.writeFrame(responseBody.generateFrame(channelId)); + connection.writeFrame(responseBody.generateFrame(channelId)); } } } Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionSecureOkMethodHandler.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionSecureOkMethodHandler.java?rev=1628074&r1=1628073&r2=1628074&view=diff ============================================================================== --- qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionSecureOkMethodHandler.java (original) +++ qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionSecureOkMethodHandler.java Sun Sep 28 15:22:03 2014 @@ -36,7 +36,6 @@ import org.apache.qpid.protocol.AMQConst import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; import org.apache.qpid.server.protocol.v0_8.state.AMQState; -import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager; import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener; import org.apache.qpid.server.security.SubjectCreator; import org.apache.qpid.server.security.auth.SubjectAuthenticationResult; @@ -56,19 +55,20 @@ public class ConnectionSecureOkMethodHan { } - public void methodReceived(AMQStateManager stateManager, ConnectionSecureOkBody body, int channelId) throws AMQException + public void methodReceived(final AMQProtocolSession<?> connection, + ConnectionSecureOkBody body, + int channelId) throws AMQException { - Broker<?> broker = stateManager.getBroker(); - AMQProtocolSession session = stateManager.getProtocolSession(); + Broker<?> broker = connection.getBroker(); - SubjectCreator subjectCreator = stateManager.getSubjectCreator(); + SubjectCreator subjectCreator = connection.getSubjectCreator(); - SaslServer ss = session.getSaslServer(); + SaslServer ss = connection.getSaslServer(); if (ss == null) { throw new AMQException("No SASL context set up in session"); } - MethodRegistry methodRegistry = session.getMethodRegistry(); + MethodRegistry methodRegistry = connection.getMethodRegistry(); SubjectAuthenticationResult authResult = subjectCreator.authenticate(ss, body.getResponse()); switch (authResult.getStatus()) { @@ -78,7 +78,7 @@ public class ConnectionSecureOkMethodHan _logger.info("Authentication failed:" + (cause == null ? "" : cause.getMessage())); // This should be abstracted - stateManager.changeState(AMQState.CONNECTION_CLOSING); + connection.changeState(AMQState.CONNECTION_CLOSING); ConnectionCloseBody connectionCloseBody = methodRegistry.createConnectionCloseBody(AMQConstant.NOT_ALLOWED.getCode(), @@ -86,15 +86,15 @@ public class ConnectionSecureOkMethodHan body.getClazz(), body.getMethod()); - session.writeFrame(connectionCloseBody.generateFrame(0)); - disposeSaslServer(session); + connection.writeFrame(connectionCloseBody.generateFrame(0)); + disposeSaslServer(connection); break; case SUCCESS: if (_logger.isInfoEnabled()) { _logger.info("Connected as: " + authResult.getSubject()); } - stateManager.changeState(AMQState.CONNECTION_NOT_TUNED); + connection.changeState(AMQState.CONNECTION_NOT_TUNED); int frameMax = broker.getContextValue(Integer.class, Broker.BROKER_FRAME_SIZE); @@ -107,15 +107,15 @@ public class ConnectionSecureOkMethodHan methodRegistry.createConnectionTuneBody(broker.getConnection_sessionCountLimit(), frameMax, broker.getConnection_heartBeatDelay()); - session.writeFrame(tuneBody.generateFrame(0)); - session.setAuthorizedSubject(authResult.getSubject()); - disposeSaslServer(session); + connection.writeFrame(tuneBody.generateFrame(0)); + connection.setAuthorizedSubject(authResult.getSubject()); + disposeSaslServer(connection); break; case CONTINUE: - stateManager.changeState(AMQState.CONNECTION_NOT_AUTH); + connection.changeState(AMQState.CONNECTION_NOT_AUTH); ConnectionSecureBody secureBody = methodRegistry.createConnectionSecureBody(authResult.getChallenge()); - session.writeFrame(secureBody.generateFrame(0)); + connection.writeFrame(secureBody.generateFrame(0)); } } Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionStartOkMethodHandler.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionStartOkMethodHandler.java?rev=1628074&r1=1628073&r2=1628074&view=diff ============================================================================== --- qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionStartOkMethodHandler.java (original) +++ qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionStartOkMethodHandler.java Sun Sep 28 15:22:03 2014 @@ -35,7 +35,6 @@ import org.apache.qpid.protocol.AMQConst import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; import org.apache.qpid.server.protocol.v0_8.state.AMQState; -import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager; import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener; import org.apache.qpid.server.security.SubjectCreator; import org.apache.qpid.server.security.auth.SubjectAuthenticationResult; @@ -56,32 +55,36 @@ public class ConnectionStartOkMethodHand { } - public void methodReceived(AMQStateManager stateManager, ConnectionStartOkBody body, int channelId) throws AMQException + public void methodReceived(final AMQProtocolSession<?> connection, + ConnectionStartOkBody body, + int channelId) throws AMQException { - Broker<?> broker = stateManager.getBroker(); - AMQProtocolSession session = stateManager.getProtocolSession(); + Broker<?> broker = connection.getBroker(); _logger.info("SASL Mechanism selected: " + body.getMechanism()); _logger.info("Locale selected: " + body.getLocale()); - SubjectCreator subjectCreator = stateManager.getSubjectCreator(); + SubjectCreator subjectCreator = connection.getSubjectCreator(); SaslServer ss = null; try { - ss = subjectCreator.createSaslServer(String.valueOf(body.getMechanism()), session.getLocalFQDN(), session.getPeerPrincipal()); + ss = subjectCreator.createSaslServer(String.valueOf(body.getMechanism()), + connection.getLocalFQDN(), + connection.getPeerPrincipal()); if (ss == null) { - throw body.getConnectionException(AMQConstant.RESOURCE_ERROR, "Unable to create SASL Server:" + body.getMechanism()); + throw body.getConnectionException(AMQConstant.RESOURCE_ERROR, "Unable to create SASL Server:" + body.getMechanism(), + connection.getMethodRegistry()); } - session.setSaslServer(ss); + connection.setSaslServer(ss); final SubjectAuthenticationResult authResult = subjectCreator.authenticate(ss, body.getResponse()); //save clientProperties - session.setClientProperties(body.getClientProperties()); + connection.setClientProperties(body.getClientProperties()); - MethodRegistry methodRegistry = session.getMethodRegistry(); + MethodRegistry methodRegistry = connection.getMethodRegistry(); switch (authResult.getStatus()) { @@ -90,7 +93,7 @@ public class ConnectionStartOkMethodHand _logger.info("Authentication failed:" + (cause == null ? "" : cause.getMessage())); - stateManager.changeState(AMQState.CONNECTION_CLOSING); + connection.changeState(AMQState.CONNECTION_CLOSING); ConnectionCloseBody closeBody = methodRegistry.createConnectionCloseBody(AMQConstant.NOT_ALLOWED.getCode(), // replyCode @@ -98,8 +101,8 @@ public class ConnectionStartOkMethodHand body.getClazz(), body.getMethod()); - session.writeFrame(closeBody.generateFrame(0)); - disposeSaslServer(session); + connection.writeFrame(closeBody.generateFrame(0)); + disposeSaslServer(connection); break; case SUCCESS: @@ -107,9 +110,9 @@ public class ConnectionStartOkMethodHand { _logger.info("Connected as: " + authResult.getSubject()); } - session.setAuthorizedSubject(authResult.getSubject()); + connection.setAuthorizedSubject(authResult.getSubject()); - stateManager.changeState(AMQState.CONNECTION_NOT_TUNED); + connection.changeState(AMQState.CONNECTION_NOT_TUNED); int frameMax = broker.getContextValue(Integer.class, Broker.BROKER_FRAME_SIZE); if(frameMax <= 0) @@ -120,18 +123,18 @@ public class ConnectionStartOkMethodHand ConnectionTuneBody tuneBody = methodRegistry.createConnectionTuneBody(broker.getConnection_sessionCountLimit(), frameMax, broker.getConnection_heartBeatDelay()); - session.writeFrame(tuneBody.generateFrame(0)); + connection.writeFrame(tuneBody.generateFrame(0)); break; case CONTINUE: - stateManager.changeState(AMQState.CONNECTION_NOT_AUTH); + connection.changeState(AMQState.CONNECTION_NOT_AUTH); ConnectionSecureBody secureBody = methodRegistry.createConnectionSecureBody(authResult.getChallenge()); - session.writeFrame(secureBody.generateFrame(0)); + connection.writeFrame(secureBody.generateFrame(0)); } } catch (SaslException e) { - disposeSaslServer(session); + disposeSaslServer(connection); throw new AMQException("SASL error: " + e, e); } } Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionTuneOkMethodHandler.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionTuneOkMethodHandler.java?rev=1628074&r1=1628073&r2=1628074&view=diff ============================================================================== --- qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionTuneOkMethodHandler.java (original) +++ qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionTuneOkMethodHandler.java Sun Sep 28 15:22:03 2014 @@ -29,7 +29,6 @@ import org.apache.qpid.protocol.AMQConst import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; import org.apache.qpid.server.protocol.v0_8.state.AMQState; -import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager; import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener; public class ConnectionTuneOkMethodHandler implements StateAwareMethodListener<ConnectionTuneOkBody> @@ -43,19 +42,20 @@ public class ConnectionTuneOkMethodHandl return _instance; } - public void methodReceived(AMQStateManager stateManager, ConnectionTuneOkBody body, int channelId) throws AMQException + public void methodReceived(final AMQProtocolSession<?> connection, + ConnectionTuneOkBody body, + int channelId) throws AMQException { - AMQProtocolSession session = stateManager.getProtocolSession(); if (_logger.isDebugEnabled()) { _logger.debug(body); } - stateManager.changeState(AMQState.CONNECTION_NOT_OPENED); + connection.changeState(AMQState.CONNECTION_NOT_OPENED); - session.initHeartbeats(body.getHeartbeat()); + connection.initHeartbeats(body.getHeartbeat()); - int brokerFrameMax = stateManager.getBroker().getContextValue(Integer.class,Broker.BROKER_FRAME_SIZE); + int brokerFrameMax = connection.getBroker().getContextValue(Integer.class,Broker.BROKER_FRAME_SIZE); if(brokerFrameMax <= 0) { brokerFrameMax = Integer.MAX_VALUE; @@ -68,7 +68,7 @@ public class ConnectionTuneOkMethodHandl + "greater than the broker will allow: " + brokerFrameMax, body.getClazz(), body.getMethod(), - body.getMajor(), body.getMinor(),null); + connection.getMethodRegistry(),null); } else if(body.getFrameMax() > 0 && body.getFrameMax() < AMQConstant.FRAME_MIN_SIZE.getCode()) { @@ -77,13 +77,13 @@ public class ConnectionTuneOkMethodHandl + "which is smaller than the specification definined minimum: " + AMQConstant.FRAME_MIN_SIZE.getCode(), body.getClazz(), body.getMethod(), - body.getMajor(), body.getMinor(),null); + connection.getMethodRegistry(),null); } int frameMax = body.getFrameMax() == 0 ? brokerFrameMax : (int) body.getFrameMax(); - session.setMaxFrameSize(frameMax); + connection.setMaxFrameSize(frameMax); long maxChannelNumber = body.getChannelMax(); //0 means no implied limit, except that forced by protocol limitations (0xFFFF) - session.setMaximumNumberOfChannels( maxChannelNumber == 0 ? 0xFFFFL : maxChannelNumber); + connection.setMaximumNumberOfChannels(maxChannelNumber == 0 ? 0xFFFFL : maxChannelNumber); } } Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeBoundHandler.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeBoundHandler.java?rev=1628074&r1=1628073&r2=1628074&view=diff ============================================================================== --- qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeBoundHandler.java (original) +++ qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeBoundHandler.java Sun Sep 28 15:22:03 2014 @@ -29,7 +29,6 @@ import org.apache.qpid.server.exchange.E import org.apache.qpid.server.protocol.v0_8.AMQChannel; import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager; import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener; import org.apache.qpid.server.virtualhost.VirtualHostImpl; @@ -65,16 +64,17 @@ public class ExchangeBoundHandler implem { } - public void methodReceived(AMQStateManager stateManager, ExchangeBoundBody body, int channelId) throws AMQException + public void methodReceived(final AMQProtocolSession<?> connection, + ExchangeBoundBody body, + int channelId) throws AMQException { - AMQProtocolSession session = stateManager.getProtocolSession(); - VirtualHostImpl virtualHost = session.getVirtualHost(); - MethodRegistry methodRegistry = session.getMethodRegistry(); + VirtualHostImpl virtualHost = connection.getVirtualHost(); + MethodRegistry methodRegistry = connection.getMethodRegistry(); - final AMQChannel channel = session.getChannel(channelId); + final AMQChannel channel = connection.getChannel(channelId); if (channel == null) { - throw body.getChannelNotFoundException(channelId); + throw body.getChannelNotFoundException(channelId, connection.getMethodRegistry()); } channel.sync(); @@ -227,7 +227,7 @@ public class ExchangeBoundHandler implem } } } - session.writeFrame(response.generateFrame(channelId)); + connection.writeFrame(response.generateFrame(channelId)); } protected boolean isDefaultExchange(final AMQShortString exchangeName) Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeclareHandler.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeclareHandler.java?rev=1628074&r1=1628073&r2=1628074&view=diff ============================================================================== --- qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeclareHandler.java (original) +++ qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeclareHandler.java Sun Sep 28 15:22:03 2014 @@ -41,7 +41,6 @@ import org.apache.qpid.server.model.NoFa import org.apache.qpid.server.model.UnknownConfiguredObjectException; import org.apache.qpid.server.protocol.v0_8.AMQChannel; import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; -import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager; import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener; import org.apache.qpid.server.virtualhost.ExchangeExistsException; import org.apache.qpid.server.virtualhost.ReservedExchangeNameException; @@ -62,14 +61,15 @@ public class ExchangeDeclareHandler impl { } - public void methodReceived(AMQStateManager stateManager, ExchangeDeclareBody body, int channelId) throws AMQException + public void methodReceived(final AMQProtocolSession<?> connection, + ExchangeDeclareBody body, + int channelId) throws AMQException { - AMQProtocolSession session = stateManager.getProtocolSession(); - VirtualHostImpl virtualHost = session.getVirtualHost(); - final AMQChannel channel = session.getChannel(channelId); + VirtualHostImpl virtualHost = connection.getVirtualHost(); + final AMQChannel channel = connection.getChannel(channelId); if (channel == null) { - throw body.getChannelNotFoundException(channelId); + throw body.getChannelNotFoundException(channelId, connection.getMethodRegistry()); } final AMQShortString exchangeName = body.getExchange(); @@ -89,7 +89,7 @@ public class ExchangeDeclareHandler impl + ExchangeDefaults.DIRECT_EXCHANGE_CLASS + " to " + body.getType() +".", body.getClazz(), body.getMethod(), - body.getMajor(), body.getMinor(),null); + connection.getMethodRegistry(),null); } } else @@ -99,14 +99,15 @@ public class ExchangeDeclareHandler impl exchange = virtualHost.getExchange(exchangeName.toString()); if(exchange == null) { - throw body.getChannelException(AMQConstant.NOT_FOUND, "Unknown exchange: " + exchangeName); + throw body.getChannelException(AMQConstant.NOT_FOUND, "Unknown exchange: " + exchangeName, + connection.getMethodRegistry()); } else if (!(body.getType() == null || body.getType().length() ==0) && !exchange.getType().equals(body.getType().asString())) { throw new AMQConnectionException(AMQConstant.NOT_ALLOWED, "Attempt to redeclare exchange: " + exchangeName + " of type " + exchange.getType() - + " to " + body.getType() +".",body.getClazz(), body.getMethod(),body.getMajor(),body.getMinor(),null); + + " to " + body.getType() +".",body.getClazz(), body.getMethod(),connection.getMethodRegistry(),null); } } @@ -139,7 +140,7 @@ public class ExchangeDeclareHandler impl { throw body.getConnectionException(AMQConstant.NOT_ALLOWED, "Attempt to declare exchange: " + exchangeName + - " which begins with reserved prefix."); + " which begins with reserved prefix.", connection.getMethodRegistry()); } catch(ExchangeExistsException e) @@ -147,40 +148,44 @@ public class ExchangeDeclareHandler impl exchange = e.getExistingExchange(); if(!new AMQShortString(exchange.getType()).equals(body.getType())) { - throw new AMQConnectionException(AMQConstant.NOT_ALLOWED, "Attempt to redeclare exchange: " - + exchangeName + " of type " - + exchange.getType() - + " to " + body.getType() +".", - body.getClazz(), body.getMethod(), - body.getMajor(), body.getMinor(),null); + throw body.getConnectionException(AMQConstant.NOT_ALLOWED, "Attempt to redeclare exchange: " + + exchangeName + " of type " + + exchange.getType() + + " to " + body.getType() + ".", + connection.getMethodRegistry()); } } catch(NoFactoryForTypeException e) { - throw body.getConnectionException(AMQConstant.COMMAND_INVALID, "Unknown exchange: " + exchangeName,e); + throw body.getConnectionException(AMQConstant.COMMAND_INVALID, "Unknown exchange type '"+e.getType()+"' for exchange '" + exchangeName + "'", connection.getMethodRegistry()); } catch (AccessControlException e) { - throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, e.getMessage()); + throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, e.getMessage(), connection.getMethodRegistry()); } catch (UnknownConfiguredObjectException e) { // note - since 0-8/9/9-1 can't set the alt. exchange this exception should never occur - throw body.getConnectionException(AMQConstant.NOT_FOUND, "Unknown alternate exchange",e); + throw body.getConnectionException(AMQConstant.NOT_FOUND, + "Unknown alternate exchange " + + (e.getName() != null + ? "name: \"" + e.getName() + "\"" + : "id: " + e.getId()), + connection.getMethodRegistry()); } catch (IllegalArgumentException e) { - throw body.getConnectionException(AMQConstant.COMMAND_INVALID, "Error creating exchange",e); + throw body.getConnectionException(AMQConstant.COMMAND_INVALID, "Error creating exchange '"+exchangeName+"': " + e.getMessage(),connection.getMethodRegistry()); } } } if(!body.getNowait()) { - MethodRegistry methodRegistry = session.getMethodRegistry(); + MethodRegistry methodRegistry = connection.getMethodRegistry(); AMQMethodBody responseBody = methodRegistry.createExchangeDeclareOkBody(); channel.sync(); - session.writeFrame(responseBody.generateFrame(channelId)); + connection.writeFrame(responseBody.generateFrame(channelId)); } } Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeleteHandler.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeleteHandler.java?rev=1628074&r1=1628073&r2=1628074&view=diff ============================================================================== --- qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeleteHandler.java (original) +++ qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeleteHandler.java Sun Sep 28 15:22:03 2014 @@ -20,6 +20,8 @@ */ package org.apache.qpid.server.protocol.v0_8.handler; +import java.security.AccessControlException; + import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.ExchangeDeleteBody; @@ -28,14 +30,11 @@ import org.apache.qpid.protocol.AMQConst import org.apache.qpid.server.exchange.ExchangeImpl; import org.apache.qpid.server.protocol.v0_8.AMQChannel; import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; -import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager; import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener; import org.apache.qpid.server.virtualhost.ExchangeIsAlternateException; import org.apache.qpid.server.virtualhost.RequiredExchangeException; import org.apache.qpid.server.virtualhost.VirtualHostImpl; -import java.security.AccessControlException; - public class ExchangeDeleteHandler implements StateAwareMethodListener<ExchangeDeleteBody> { private static final ExchangeDeleteHandler _instance = new ExchangeDeleteHandler(); @@ -49,14 +48,15 @@ public class ExchangeDeleteHandler imple { } - public void methodReceived(AMQStateManager stateManager, ExchangeDeleteBody body, int channelId) throws AMQException + public void methodReceived(final AMQProtocolSession<?> connection, + ExchangeDeleteBody body, + int channelId) throws AMQException { - AMQProtocolSession session = stateManager.getProtocolSession(); - VirtualHostImpl virtualHost = session.getVirtualHost(); - final AMQChannel channel = session.getChannel(channelId); + VirtualHostImpl virtualHost = connection.getVirtualHost(); + final AMQChannel channel = connection.getChannel(channelId); if (channel == null) { - throw body.getChannelNotFoundException(channelId); + throw body.getChannelNotFoundException(channelId, connection.getMethodRegistry()); } channel.sync(); try @@ -64,7 +64,7 @@ public class ExchangeDeleteHandler imple if(isDefaultExchange(body.getExchange())) { - throw body.getConnectionException(AMQConstant.NOT_ALLOWED, "Default Exchange cannot be deleted"); + throw body.getConnectionException(AMQConstant.NOT_ALLOWED, "Default Exchange cannot be deleted", connection.getMethodRegistry()); } final String exchangeName = body.getExchange().toString(); @@ -72,28 +72,31 @@ public class ExchangeDeleteHandler imple final ExchangeImpl exchange = virtualHost.getExchange(exchangeName); if(exchange == null) { - throw body.getChannelException(AMQConstant.NOT_FOUND, "No such exchange: " + body.getExchange()); + throw body.getChannelException(AMQConstant.NOT_FOUND, "No such exchange: " + body.getExchange(), + connection.getMethodRegistry()); } virtualHost.removeExchange(exchange, !body.getIfUnused()); - ExchangeDeleteOkBody responseBody = session.getMethodRegistry().createExchangeDeleteOkBody(); + ExchangeDeleteOkBody responseBody = connection.getMethodRegistry().createExchangeDeleteOkBody(); - session.writeFrame(responseBody.generateFrame(channelId)); + connection.writeFrame(responseBody.generateFrame(channelId)); } catch (ExchangeIsAlternateException e) { - throw body.getChannelException(AMQConstant.NOT_ALLOWED, "Exchange in use as an alternate exchange"); + throw body.getChannelException(AMQConstant.NOT_ALLOWED, "Exchange in use as an alternate exchange", + connection.getMethodRegistry()); } catch (RequiredExchangeException e) { - throw body.getChannelException(AMQConstant.NOT_ALLOWED, "Exchange '"+body.getExchange()+"' cannot be deleted"); + throw body.getChannelException(AMQConstant.NOT_ALLOWED, "Exchange '"+body.getExchange()+"' cannot be deleted", + connection.getMethodRegistry()); } catch (AccessControlException e) { - throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, e.getMessage()); + throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, e.getMessage(), connection.getMethodRegistry()); } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
