Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/FrameCreatingMethodProcessor.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/FrameCreatingMethodProcessor.java?rev=1631275&r1=1631274&r2=1631275&view=diff ============================================================================== --- qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/FrameCreatingMethodProcessor.java (original) +++ qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/FrameCreatingMethodProcessor.java Mon Oct 13 00:58:45 2014 @@ -23,7 +23,9 @@ package org.apache.qpid.framing; import java.util.ArrayList; import java.util.List; -public class FrameCreatingMethodProcessor implements MethodProcessor +public class FrameCreatingMethodProcessor implements MethodProcessor<FrameCreatingMethodProcessor.ClientAndServerChannelMethodProcessor>, + ClientMethodProcessor<FrameCreatingMethodProcessor.ClientAndServerChannelMethodProcessor>, + ServerMethodProcessor<FrameCreatingMethodProcessor.ClientAndServerChannelMethodProcessor> { private ProtocolVersion _protocolVersion; @@ -61,42 +63,6 @@ public class FrameCreatingMethodProcesso } @Override - public void receiveTxSelect(final int channelId) - { - _processedMethods.add(new AMQFrame(channelId, TxSelectBody.INSTANCE)); - } - - @Override - public void receiveTxSelectOk(final int channelId) - { - _processedMethods.add(new AMQFrame(channelId, TxSelectOkBody.INSTANCE)); - } - - @Override - public void receiveTxCommit(final int channelId) - { - _processedMethods.add(new AMQFrame(channelId, TxCommitBody.INSTANCE)); - } - - @Override - public void receiveTxCommitOk(final int channelId) - { - _processedMethods.add(new AMQFrame(channelId, TxCommitOkBody.INSTANCE)); - } - - @Override - public void receiveTxRollback(final int channelId) - { - _processedMethods.add(new AMQFrame(channelId, TxRollbackBody.INSTANCE)); - } - - @Override - public void receiveTxRollbackOk(final int channelId) - { - _processedMethods.add(new AMQFrame(channelId, TxRollbackOkBody.INSTANCE)); - } - - @Override public void receiveConnectionSecure(final byte[] challenge) { _processedMethods.add(new AMQFrame(0, new ConnectionSecureBody(challenge))); @@ -163,382 +129,483 @@ public class FrameCreatingMethodProcesso _processedMethods.add(new AMQFrame(channelId, new ChannelOpenBody())); } - @Override - public void receiveChannelOpenOk(final int channelId) + private void receiveExchangeBoundOk(final int channelId, final int replyCode, final AMQShortString replyText) { - _processedMethods.add(new AMQFrame(channelId, ProtocolVersion.v8_0.equals(getProtocolVersion()) - ? ChannelOpenOkBody.INSTANCE_0_8 - : ChannelOpenOkBody.INSTANCE_0_9)); + _processedMethods.add(new AMQFrame(channelId, new ExchangeBoundOkBody(replyCode, replyText))); } @Override - public void receiveChannelFlow(final int channelId, final boolean active) + public void receiveHeartbeat() { - _processedMethods.add(new AMQFrame(channelId, new ChannelFlowBody(active))); + _processedMethods.add(new AMQFrame(0, new HeartbeatBody())); } @Override - public void receiveChannelFlowOk(final int channelId, final boolean active) + public ProtocolVersion getProtocolVersion() { - _processedMethods.add(new AMQFrame(channelId, new ChannelFlowOkBody(active))); + return _protocolVersion; } @Override - public void receiveChannelAlert(final int channelId, - final int replyCode, - final AMQShortString replyText, - final FieldTable details) + public ClientAndServerChannelMethodProcessor getChannelMethodProcessor(final int channelId) { - _processedMethods.add(new AMQFrame(channelId, new ChannelAlertBody(replyCode, replyText, details))); + return new FrameCreatingChannelMethodProcessor(channelId); } - @Override - public void receiveChannelClose(final int channelId, - final int replyCode, - final AMQShortString replyText, - final int classId, - final int methodId) + public void setProtocolVersion(final ProtocolVersion protocolVersion) { - _processedMethods.add(new AMQFrame(channelId, new ChannelCloseBody(replyCode, replyText, classId, methodId))); + _protocolVersion = protocolVersion; } @Override - public void receiveChannelCloseOk(final int channelId) + public void receiveProtocolHeader(final ProtocolInitiation protocolInitiation) { - _processedMethods.add(new AMQFrame(channelId, ChannelCloseOkBody.INSTANCE)); + _processedMethods.add(protocolInitiation); } @Override - public void receiveAccessRequest(final int channelId, - final AMQShortString realm, - final boolean exclusive, - final boolean passive, - final boolean active, - final boolean write, - final boolean read) + public void setCurrentMethod(final int classId, final int methodId) { - _processedMethods.add(new AMQFrame(channelId, new AccessRequestBody(realm, exclusive, passive, active, write, read))); + _classId = classId; + _methodId = methodId; } @Override - public void receiveAccessRequestOk(final int channelId, final int ticket) + public boolean ignoreAllButCloseOk() { - _processedMethods.add(new AMQFrame(channelId, new AccessRequestOkBody(ticket))); + return false; } - @Override - public void receiveExchangeDeclare(final int channelId, - final AMQShortString exchange, - final AMQShortString type, - final boolean passive, - final boolean durable, - final boolean autoDelete, - final boolean internal, - final boolean nowait, final FieldTable arguments) + public int getClassId() { - _processedMethods.add(new AMQFrame(channelId, new ExchangeDeclareBody(0, exchange, type, passive, durable, autoDelete, internal, nowait, arguments))); + return _classId; } - @Override - public void receiveExchangeDeclareOk(final int channelId) + public int getMethodId() { - _processedMethods.add(new AMQFrame(channelId, new ExchangeDeclareOkBody())); + return _methodId; } - @Override - public void receiveExchangeDelete(final int channelId, - final AMQShortString exchange, - final boolean ifUnused, - final boolean nowait) + public static interface ClientAndServerChannelMethodProcessor extends ServerChannelMethodProcessor, ClientChannelMethodProcessor { - _processedMethods.add(new AMQFrame(channelId, new ExchangeDeleteBody(0, exchange, ifUnused, nowait))); - } - @Override - public void receiveExchangeDeleteOk(final int channelId) - { - _processedMethods.add(new AMQFrame(channelId, new ExchangeDeleteOkBody())); } - @Override - public void receiveExchangeBound(final int channelId, - final AMQShortString exchange, - final AMQShortString routingKey, - final AMQShortString queue) + private class FrameCreatingChannelMethodProcessor implements ClientAndServerChannelMethodProcessor { - _processedMethods.add(new AMQFrame(channelId, new ExchangeBoundBody(exchange, routingKey, queue))); - } + private final int _channelId; - @Override - public void receiveExchangeBoundOk(final int channelId, final int replyCode, final AMQShortString replyText) - { - _processedMethods.add(new AMQFrame(channelId, new ExchangeBoundOkBody(replyCode, replyText))); - } + private FrameCreatingChannelMethodProcessor(final int channelId) + { + _channelId = channelId; + } - @Override - public void receiveQueueBindOk(final int channelId) - { - _processedMethods.add(new AMQFrame(channelId, new QueueBindOkBody())); - } - @Override - public void receiveQueueUnbindOk(final int channelId) - { - _processedMethods.add(new AMQFrame(channelId, new QueueUnbindOkBody())); - } + @Override + public void receiveChannelOpenOk() + { + _processedMethods.add(new AMQFrame(_channelId, ProtocolVersion.v8_0.equals(getProtocolVersion()) + ? ChannelOpenOkBody.INSTANCE_0_8 + : ChannelOpenOkBody.INSTANCE_0_9)); + } - @Override - public void receiveQueueDeclare(final int channelId, - final AMQShortString queue, - final boolean passive, - final boolean durable, - final boolean exclusive, - final boolean autoDelete, - final boolean nowait, - final FieldTable arguments) - { - _processedMethods.add(new AMQFrame(channelId, new QueueDeclareBody(0, queue, passive, durable, exclusive, autoDelete, nowait, arguments))); - } + @Override + public void receiveChannelAlert(final int replyCode, final AMQShortString replyText, final FieldTable details) + { + _processedMethods.add(new AMQFrame(_channelId, new ChannelAlertBody(replyCode, replyText, details))); + } - @Override - public void receiveQueueDeclareOk(final int channelId, - final AMQShortString queue, - final long messageCount, - final long consumerCount) - { - _processedMethods.add(new AMQFrame(channelId, new QueueDeclareOkBody(queue, messageCount, consumerCount))); - } + @Override + public void receiveAccessRequestOk(final int ticket) + { + _processedMethods.add(new AMQFrame(_channelId, new AccessRequestOkBody(ticket))); + } - @Override - public void receiveQueueBind(final int channelId, - final AMQShortString queue, - final AMQShortString exchange, - final AMQShortString bindingKey, - final boolean nowait, - final FieldTable arguments) - { - _processedMethods.add(new AMQFrame(channelId, new QueueBindBody(0, queue, exchange, bindingKey, nowait, arguments))); - } + @Override + public void receiveExchangeDeclareOk() + { + _processedMethods.add(new AMQFrame(_channelId, new ExchangeDeclareOkBody())); + } - @Override - public void receiveQueuePurge(final int channelId, final AMQShortString queue, final boolean nowait) - { - _processedMethods.add(new AMQFrame(channelId, new QueuePurgeBody(0, queue, nowait))); - } + @Override + public void receiveExchangeDeleteOk() + { + _processedMethods.add(new AMQFrame(_channelId, new ExchangeDeleteOkBody())); + } - @Override - public void receiveQueuePurgeOk(final int channelId, final long messageCount) - { - _processedMethods.add(new AMQFrame(channelId, new QueuePurgeOkBody(messageCount))); - } + @Override + public void receiveExchangeBoundOk(final int replyCode, final AMQShortString replyText) + { + FrameCreatingMethodProcessor.this.receiveExchangeBoundOk(_channelId, replyCode, replyText); + } - @Override - public void receiveQueueDelete(final int channelId, - final AMQShortString queue, - final boolean ifUnused, - final boolean ifEmpty, - final boolean nowait) - { - _processedMethods.add(new AMQFrame(channelId, new QueueDeleteBody(0, queue, ifUnused, ifEmpty, nowait))); - } + @Override + public void receiveQueueBindOk() + { + _processedMethods.add(new AMQFrame(_channelId, new QueueBindOkBody())); + } - @Override - public void receiveQueueDeleteOk(final int channelId, final long messageCount) - { - _processedMethods.add(new AMQFrame(channelId, new QueueDeleteOkBody(messageCount))); - } + @Override + public void receiveQueueUnbindOk() + { + _processedMethods.add(new AMQFrame(_channelId, new QueueUnbindOkBody())); + } - @Override - public void receiveQueueUnbind(final int channelId, - final AMQShortString queue, - final AMQShortString exchange, - final AMQShortString bindingKey, - final FieldTable arguments) - { - _processedMethods.add(new AMQFrame(channelId, new QueueUnbindBody(0, queue, exchange, bindingKey, arguments))); - } + @Override + public void receiveQueueDeclareOk(final AMQShortString queue, final long messageCount, final long consumerCount) + { + _processedMethods.add(new AMQFrame(_channelId, new QueueDeclareOkBody(queue, messageCount, consumerCount))); + } - @Override - public void receiveBasicRecoverSyncOk(final int channelId) - { - _processedMethods.add(new AMQFrame(channelId, new BasicRecoverSyncOkBody(getProtocolVersion()))); - } + @Override + public void receiveQueuePurgeOk(final long messageCount) + { + _processedMethods.add(new AMQFrame(_channelId, new QueuePurgeOkBody(messageCount))); + } - @Override - public void receiveBasicRecover(final int channelId, final boolean requeue, final boolean sync) - { - if(ProtocolVersion.v8_0.equals(getProtocolVersion()) || !sync) + @Override + public void receiveQueueDeleteOk(final long messageCount) { - _processedMethods.add(new AMQFrame(channelId, new BasicRecoverBody(requeue))); + _processedMethods.add(new AMQFrame(_channelId, new QueueDeleteOkBody(messageCount))); } - else + + @Override + public void receiveBasicRecoverSyncOk() { - _processedMethods.add(new AMQFrame(channelId, new BasicRecoverSyncBody(getProtocolVersion(), requeue))); + _processedMethods.add(new AMQFrame(_channelId, new BasicRecoverSyncOkBody(getProtocolVersion()))); } - } - @Override - public void receiveBasicQos(final int channelId, - final long prefetchSize, - final int prefetchCount, - final boolean global) - { - _processedMethods.add(new AMQFrame(channelId, new BasicQosBody(prefetchSize, prefetchCount, global))); - } + @Override + public void receiveBasicQosOk() + { + _processedMethods.add(new AMQFrame(_channelId, new BasicQosOkBody())); + } - @Override - public void receiveBasicQosOk(final int channelId) - { - _processedMethods.add(new AMQFrame(channelId, new BasicQosOkBody())); - } + @Override + public void receiveBasicConsumeOk(final AMQShortString consumerTag) + { + _processedMethods.add(new AMQFrame(_channelId, new BasicConsumeOkBody(consumerTag))); + } - @Override - public void receiveBasicConsume(final int channelId, - final AMQShortString queue, - final AMQShortString consumerTag, - final boolean noLocal, - final boolean noAck, - final boolean exclusive, - final boolean nowait, - final FieldTable arguments) - { - _processedMethods.add(new AMQFrame(channelId, new BasicConsumeBody(0, queue, consumerTag, noLocal, noAck, exclusive, nowait, arguments))); - } + @Override + public void receiveBasicCancelOk(final AMQShortString consumerTag) + { + _processedMethods.add(new AMQFrame(_channelId, new BasicCancelOkBody(consumerTag))); + } - @Override - public void receiveBasicConsumeOk(final int channelId, final AMQShortString consumerTag) - { - _processedMethods.add(new AMQFrame(channelId, new BasicConsumeOkBody(consumerTag))); - } + @Override + public void receiveBasicReturn(final int replyCode, + final AMQShortString replyText, + final AMQShortString exchange, + final AMQShortString routingKey) + { + _processedMethods.add(new AMQFrame(_channelId, new BasicReturnBody(replyCode, + replyText, + exchange, + routingKey))); + } - @Override - public void receiveBasicCancel(final int channelId, final AMQShortString consumerTag, final boolean noWait) - { - _processedMethods.add(new AMQFrame(channelId, new BasicCancelBody(consumerTag, noWait))); - } + @Override + public void receiveBasicDeliver(final AMQShortString consumerTag, + final long deliveryTag, + final boolean redelivered, + final AMQShortString exchange, + final AMQShortString routingKey) + { + _processedMethods.add(new AMQFrame(_channelId, new BasicDeliverBody(consumerTag, + deliveryTag, + redelivered, + exchange, + routingKey))); + } - @Override - public void receiveBasicCancelOk(final int channelId, final AMQShortString consumerTag) - { - _processedMethods.add(new AMQFrame(channelId, new BasicCancelOkBody(consumerTag))); - } + @Override + public void receiveBasicGetOk(final long deliveryTag, + final boolean redelivered, + final AMQShortString exchange, + final AMQShortString routingKey, + final long messageCount) + { + _processedMethods.add(new AMQFrame(_channelId, new BasicGetOkBody(deliveryTag, + redelivered, + exchange, + routingKey, + messageCount))); + } - @Override - public void receiveBasicPublish(final int channelId, - final AMQShortString exchange, - final AMQShortString routingKey, - final boolean mandatory, - final boolean immediate) - { - _processedMethods.add(new AMQFrame(channelId, new BasicPublishBody(0, exchange, routingKey, mandatory, immediate))); - } + @Override + public void receiveBasicGetEmpty() + { + _processedMethods.add(new AMQFrame(_channelId, new BasicGetEmptyBody((AMQShortString)null))); + } - @Override - public void receiveBasicReturn(final int channelId, final int replyCode, - final AMQShortString replyText, - final AMQShortString exchange, - final AMQShortString routingKey) - { - _processedMethods.add(new AMQFrame(channelId, new BasicReturnBody(replyCode, replyText, exchange, routingKey))); - } + @Override + public void receiveTxSelectOk() + { + _processedMethods.add(new AMQFrame(_channelId, TxSelectOkBody.INSTANCE)); + } - @Override - public void receiveBasicDeliver(final int channelId, - final AMQShortString consumerTag, - final long deliveryTag, - final boolean redelivered, - final AMQShortString exchange, - final AMQShortString routingKey) - { - _processedMethods.add(new AMQFrame(channelId, new BasicDeliverBody(consumerTag, deliveryTag, redelivered, exchange, routingKey))); - } + @Override + public void receiveTxCommitOk() + { + _processedMethods.add(new AMQFrame(_channelId, TxCommitOkBody.INSTANCE)); + } - @Override - public void receiveBasicGet(final int channelId, final AMQShortString queue, final boolean noAck) - { - _processedMethods.add(new AMQFrame(channelId, new BasicGetBody(0, queue, noAck))); - } + @Override + public void receiveTxRollbackOk() + { + _processedMethods.add(new AMQFrame(_channelId, TxRollbackOkBody.INSTANCE)); + } - @Override - public void receiveBasicGetOk(final int channelId, - final long deliveryTag, - final boolean redelivered, - final AMQShortString exchange, - final AMQShortString routingKey, - final long messageCount) - { - _processedMethods.add(new AMQFrame(channelId, new BasicGetOkBody(deliveryTag, redelivered, exchange, routingKey, messageCount))); - } + @Override + public void receiveAccessRequest(final AMQShortString realm, + final boolean exclusive, + final boolean passive, + final boolean active, + final boolean write, + final boolean read) + { + _processedMethods.add(new AMQFrame(_channelId, new AccessRequestBody(realm, + exclusive, + passive, + active, + write, + read))); + } - @Override - public void receiveBasicGetEmpty(final int channelId) - { - _processedMethods.add(new AMQFrame(channelId, new BasicGetEmptyBody((AMQShortString)null))); - } + @Override + public void receiveExchangeDeclare(final AMQShortString exchange, + final AMQShortString type, + final boolean passive, + final boolean durable, + final boolean autoDelete, + final boolean internal, + final boolean nowait, + final FieldTable arguments) + { + _processedMethods.add(new AMQFrame(_channelId, new ExchangeDeclareBody(0, + exchange, + type, + passive, + durable, + autoDelete, + internal, + nowait, + arguments))); + } - @Override - public void receiveBasicAck(final int channelId, final long deliveryTag, final boolean multiple) - { - _processedMethods.add(new AMQFrame(channelId, new BasicAckBody(deliveryTag, multiple))); - } + @Override + public void receiveExchangeDelete(final AMQShortString exchange, final boolean ifUnused, final boolean nowait) + { + _processedMethods.add(new AMQFrame(_channelId, new ExchangeDeleteBody(0, exchange, ifUnused, nowait))); + } - @Override - public void receiveBasicReject(final int channelId, final long deliveryTag, final boolean requeue) - { - _processedMethods.add(new AMQFrame(channelId, new BasicRejectBody(deliveryTag, requeue))); - } + @Override + public void receiveExchangeBound(final AMQShortString exchange, + final AMQShortString routingKey, + final AMQShortString queue) + { + _processedMethods.add(new AMQFrame(_channelId, new ExchangeBoundBody(exchange, routingKey, queue))); + } - @Override - public void receiveHeartbeat() - { - _processedMethods.add(new AMQFrame(0, new HeartbeatBody())); - } + @Override + public void receiveQueueDeclare(final AMQShortString queue, + final boolean passive, + final boolean durable, + final boolean exclusive, + final boolean autoDelete, + final boolean nowait, + final FieldTable arguments) + { + _processedMethods.add(new AMQFrame(_channelId, new QueueDeclareBody(0, + queue, + passive, + durable, + exclusive, + autoDelete, + nowait, + arguments))); + } - @Override - public ProtocolVersion getProtocolVersion() - { - return _protocolVersion; - } + @Override + public void receiveQueueBind(final AMQShortString queue, + final AMQShortString exchange, + final AMQShortString bindingKey, + final boolean nowait, + final FieldTable arguments) + { + _processedMethods.add(new AMQFrame(_channelId, new QueueBindBody(0, + queue, + exchange, + bindingKey, + nowait, + arguments))); + } - public void setProtocolVersion(final ProtocolVersion protocolVersion) - { - _protocolVersion = protocolVersion; - } + @Override + public void receiveQueuePurge(final AMQShortString queue, final boolean nowait) + { + _processedMethods.add(new AMQFrame(_channelId, new QueuePurgeBody(0, queue, nowait))); + } - @Override - public void receiveMessageContent(final int channelId, final byte[] data) - { - _processedMethods.add(new AMQFrame(channelId, new ContentBody(data))); - } + @Override + public void receiveQueueDelete(final AMQShortString queue, + final boolean ifUnused, + final boolean ifEmpty, + final boolean nowait) + { + _processedMethods.add(new AMQFrame(_channelId, new QueueDeleteBody(0, queue, ifUnused, ifEmpty, nowait))); + } - @Override - public void receiveMessageHeader(final int channelId, - final BasicContentHeaderProperties properties, - final long bodySize) - { - _processedMethods.add(new AMQFrame(channelId, new ContentHeaderBody(properties, bodySize))); - } + @Override + public void receiveQueueUnbind(final AMQShortString queue, + final AMQShortString exchange, + final AMQShortString bindingKey, + final FieldTable arguments) + { + _processedMethods.add(new AMQFrame(_channelId, new QueueUnbindBody(0, + queue, + exchange, + bindingKey, + arguments))); + } - @Override - public void receiveProtocolHeader(final ProtocolInitiation protocolInitiation) - { - _processedMethods.add(protocolInitiation); - } + @Override + public void receiveBasicRecover(final boolean requeue, final boolean sync) + { + if(ProtocolVersion.v8_0.equals(getProtocolVersion()) || !sync) + { + _processedMethods.add(new AMQFrame(_channelId, new BasicRecoverBody(requeue))); + } + else + { + _processedMethods.add(new AMQFrame(_channelId, new BasicRecoverSyncBody(getProtocolVersion(), requeue))); + } + } - @Override - public void setCurrentMethod(final int classId, final int methodId) - { - _classId = classId; - _methodId = methodId; - } + @Override + public void receiveBasicQos(final long prefetchSize, final int prefetchCount, final boolean global) + { + _processedMethods.add(new AMQFrame(_channelId, new BasicQosBody(prefetchSize, prefetchCount, global))); + } - public int getClassId() - { - return _classId; - } + @Override + public void receiveBasicConsume(final AMQShortString queue, + final AMQShortString consumerTag, + final boolean noLocal, + final boolean noAck, + final boolean exclusive, + final boolean nowait, + final FieldTable arguments) + { + _processedMethods.add(new AMQFrame(_channelId, new BasicConsumeBody(0, + queue, + consumerTag, + noLocal, + noAck, + exclusive, + nowait, + arguments))); + } - public int getMethodId() - { - return _methodId; + @Override + public void receiveBasicCancel(final AMQShortString consumerTag, final boolean noWait) + { + _processedMethods.add(new AMQFrame(_channelId, new BasicCancelBody(consumerTag, noWait))); + } + + @Override + public void receiveBasicPublish(final AMQShortString exchange, + final AMQShortString routingKey, + final boolean mandatory, + final boolean immediate) + { + _processedMethods.add(new AMQFrame(_channelId, new BasicPublishBody(0, + exchange, + routingKey, + mandatory, + immediate))); + } + + @Override + public void receiveBasicGet(final AMQShortString queue, final boolean noAck) + { + _processedMethods.add(new AMQFrame(_channelId, new BasicGetBody(0, queue, noAck))); + } + + @Override + public void receiveBasicAck(final long deliveryTag, final boolean multiple) + { + _processedMethods.add(new AMQFrame(_channelId, new BasicAckBody(deliveryTag, multiple))); + } + + @Override + public void receiveBasicReject(final long deliveryTag, final boolean requeue) + { + _processedMethods.add(new AMQFrame(_channelId, new BasicRejectBody(deliveryTag, requeue))); + } + + @Override + public void receiveTxSelect() + { + _processedMethods.add(new AMQFrame(_channelId, TxSelectBody.INSTANCE)); + } + + @Override + public void receiveTxCommit() + { + _processedMethods.add(new AMQFrame(_channelId, TxCommitBody.INSTANCE)); + } + + @Override + public void receiveTxRollback() + { + _processedMethods.add(new AMQFrame(_channelId, TxRollbackBody.INSTANCE)); + } + + @Override + public void receiveChannelFlow(final boolean active) + { + _processedMethods.add(new AMQFrame(_channelId, new ChannelFlowBody(active))); + } + + @Override + public void receiveChannelFlowOk(final boolean active) + { + _processedMethods.add(new AMQFrame(_channelId, new ChannelFlowOkBody(active))); + } + + @Override + public void receiveChannelClose(final int replyCode, + final AMQShortString replyText, + final int classId, + final int methodId) + { + _processedMethods.add(new AMQFrame(_channelId, new ChannelCloseBody(replyCode, replyText, classId, methodId))); + } + + @Override + public void receiveChannelCloseOk() + { + _processedMethods.add(new AMQFrame(_channelId, ChannelCloseOkBody.INSTANCE)); + } + + @Override + public void receiveMessageContent(final byte[] data) + { + _processedMethods.add(new AMQFrame(_channelId, new ContentBody(data))); + } + + @Override + public void receiveMessageHeader(final BasicContentHeaderProperties properties, final long bodySize) + { + _processedMethods.add(new AMQFrame(_channelId, new ContentHeaderBody(properties, bodySize))); + } + + @Override + public boolean ignoreAllButCloseOk() + { + return false; + } } }
Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/MethodProcessor.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/MethodProcessor.java?rev=1631275&r1=1631274&r2=1631275&view=diff ============================================================================== --- qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/MethodProcessor.java (original) +++ qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/MethodProcessor.java Mon Oct 13 00:58:45 2014 @@ -20,184 +20,21 @@ */ package org.apache.qpid.framing; -public interface MethodProcessor +public interface MethodProcessor<T extends ChannelMethodProcessor> { ProtocolVersion getProtocolVersion(); - void receiveConnectionStart(short versionMajor, - short versionMinor, - FieldTable serverProperties, - byte[] mechanisms, - byte[] locales); - - void receiveConnectionStartOk(FieldTable clientProperties, - AMQShortString mechanism, - byte[] response, - AMQShortString locale); - - void receiveTxSelect(int channelId); - - void receiveTxSelectOk(int channelId); - - void receiveTxCommit(int channelId); - - void receiveTxCommitOk(int channelId); - - void receiveTxRollback(int channelId); - - void receiveTxRollbackOk(int channelId); - - void receiveConnectionSecure(byte[] challenge); - - void receiveConnectionSecureOk(byte[] response); - - void receiveConnectionTune(int channelMax, long frameMax, int heartbeat); - - void receiveConnectionTuneOk(int channelMax, long frameMax, int heartbeat); - - void receiveConnectionOpen(AMQShortString virtualHost, AMQShortString capabilities, boolean insist); - - void receiveConnectionOpenOk(AMQShortString knownHosts); - - void receiveConnectionRedirect(AMQShortString host, AMQShortString knownHosts); + T getChannelMethodProcessor(int channelId); void receiveConnectionClose(int replyCode, AMQShortString replyText, int classId, int methodId); void receiveConnectionCloseOk(); - void receiveChannelOpen(int channelId); - - void receiveChannelOpenOk(int channelId); - - void receiveChannelFlow(int channelId, boolean active); - - void receiveChannelFlowOk(int channelId, boolean active); - - void receiveChannelAlert(int channelId, int replyCode, final AMQShortString replyText, FieldTable details); - - void receiveChannelClose(int channelId, int replyCode, AMQShortString replyText, int classId, int methodId); - - void receiveChannelCloseOk(int channelId); - - void receiveAccessRequest(int channelId, - AMQShortString realm, - boolean exclusive, - boolean passive, - boolean active, - boolean write, boolean read); - - void receiveAccessRequestOk(int channelId, int ticket); - - void receiveExchangeDeclare(int channelId, - AMQShortString exchange, - AMQShortString type, - boolean passive, - boolean durable, - boolean autoDelete, boolean internal, boolean nowait, final FieldTable arguments); - - void receiveExchangeDeclareOk(int channelId); - - void receiveExchangeDelete(int channelId, AMQShortString exchange, boolean ifUnused, boolean nowait); - - void receiveExchangeDeleteOk(int channelId); - - void receiveExchangeBound(int channelId, AMQShortString exchange, AMQShortString routingKey, AMQShortString queue); - - void receiveExchangeBoundOk(int channelId, int replyCode, AMQShortString replyText); - - void receiveQueueBindOk(int channelId); - - void receiveQueueUnbindOk(final int channelId); - - void receiveQueueDeclare(int channelId, - AMQShortString queue, - boolean passive, - boolean durable, - boolean exclusive, - boolean autoDelete, boolean nowait, FieldTable arguments); - - void receiveQueueDeclareOk(int channelId, final AMQShortString queue, long messageCount, long consumerCount); - - void receiveQueueBind(int channelId, - AMQShortString queue, - AMQShortString exchange, - AMQShortString bindingKey, - boolean nowait, FieldTable arguments); - - void receiveQueuePurge(int channelId, AMQShortString queue, boolean nowait); - - void receiveQueuePurgeOk(int channelId, long messageCount); - - void receiveQueueDelete(int channelId, AMQShortString queue, boolean ifUnused, boolean ifEmpty, boolean nowait); - - void receiveQueueDeleteOk(int channelId, long messageCount); - - void receiveQueueUnbind(int channelId, - AMQShortString queue, - AMQShortString exchange, - AMQShortString bindingKey, - FieldTable arguments); - - void receiveBasicRecoverSyncOk(int channelId); - - void receiveBasicRecover(int channelId, final boolean requeue, boolean sync); - - void receiveBasicQos(int channelId, long prefetchSize, int prefetchCount, boolean global); - - void receiveBasicQosOk(int channelId); - - void receiveBasicConsume(int channelId, - AMQShortString queue, - AMQShortString consumerTag, - boolean noLocal, - boolean noAck, - boolean exclusive, boolean nowait, FieldTable arguments); - - void receiveBasicConsumeOk(int channelId, AMQShortString consumerTag); - - void receiveBasicCancel(int channelId, AMQShortString consumerTag, boolean noWait); - - void receiveBasicCancelOk(int channelId, AMQShortString consumerTag); - - void receiveBasicPublish(int channelId, - AMQShortString exchange, - AMQShortString routingKey, - boolean mandatory, - boolean immediate); - - void receiveBasicReturn(final int channelId, - int replyCode, - AMQShortString replyText, - AMQShortString exchange, - AMQShortString routingKey); - - void receiveBasicDeliver(int channelId, - AMQShortString consumerTag, - long deliveryTag, - boolean redelivered, - AMQShortString exchange, AMQShortString routingKey); - - void receiveBasicGet(int channelId, AMQShortString queue, boolean noAck); - - void receiveBasicGetOk(int channelId, - long deliveryTag, - boolean redelivered, - AMQShortString exchange, - AMQShortString routingKey, long messageCount); - - void receiveBasicGetEmpty(int channelId); - - void receiveBasicAck(int channelId, long deliveryTag, boolean multiple); - - void receiveBasicReject(int channelId, long deliveryTag, boolean requeue); - void receiveHeartbeat(); - void receiveMessageContent(int channelId, byte[] data); - - void receiveMessageHeader(int channelId, BasicContentHeaderProperties properties, long bodySize); - void receiveProtocolHeader(ProtocolInitiation protocolInitiation); void setCurrentMethod(int classId, int methodId); + + boolean ignoreAllButCloseOk(); } Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueueBindBody.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueueBindBody.java?rev=1631275&r1=1631274&r2=1631275&view=diff ============================================================================== --- qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueueBindBody.java (original) +++ qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueueBindBody.java Mon Oct 13 00:58:45 2014 @@ -165,9 +165,8 @@ public class QueueBindBody extends AMQMe return buf.toString(); } - public static void process(final int channelId, - final MarkableDataInput buffer, - final MethodProcessor dispatcher) throws IOException, AMQFrameDecodingException + public static void process(final MarkableDataInput buffer, + final ServerChannelMethodProcessor dispatcher) throws IOException, AMQFrameDecodingException { int ticket = buffer.readUnsignedShort(); @@ -176,6 +175,9 @@ public class QueueBindBody extends AMQMe AMQShortString bindingKey = buffer.readAMQShortString(); boolean nowait = (buffer.readByte() & 0x01) == 0x01; FieldTable arguments = EncodingUtils.readFieldTable(buffer); - dispatcher.receiveQueueBind(channelId, queue, exchange, bindingKey, nowait, arguments); + if(!dispatcher.ignoreAllButCloseOk()) + { + dispatcher.receiveQueueBind(queue, exchange, bindingKey, nowait, arguments); + } } } Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueueDeclareBody.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueueDeclareBody.java?rev=1631275&r1=1631274&r2=1631275&view=diff ============================================================================== --- qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueueDeclareBody.java (original) +++ qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueueDeclareBody.java Mon Oct 13 00:58:45 2014 @@ -191,9 +191,8 @@ public class QueueDeclareBody extends AM return buf.toString(); } - public static void process(final int channelId, - final MarkableDataInput buffer, - final MethodProcessor dispatcher) throws IOException, AMQFrameDecodingException + public static void process(final MarkableDataInput buffer, + final ServerChannelMethodProcessor dispatcher) throws IOException, AMQFrameDecodingException { int ticket = buffer.readUnsignedShort(); @@ -206,6 +205,9 @@ public class QueueDeclareBody extends AM boolean autoDelete = (bitfield & 0x08 ) == 0x08; boolean nowait = (bitfield & 0x010 ) == 0x010; FieldTable arguments = EncodingUtils.readFieldTable(buffer); - dispatcher.receiveQueueDeclare(channelId, queue, passive, durable, exclusive, autoDelete, nowait, arguments); + if(!dispatcher.ignoreAllButCloseOk()) + { + dispatcher.receiveQueueDeclare(queue, passive, durable, exclusive, autoDelete, nowait, arguments); + } } } Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueueDeclareOkBody.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueueDeclareOkBody.java?rev=1631275&r1=1631274&r2=1631275&view=diff ============================================================================== --- qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueueDeclareOkBody.java (original) +++ qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueueDeclareOkBody.java Mon Oct 13 00:58:45 2014 @@ -120,13 +120,15 @@ public class QueueDeclareOkBody extends return buf.toString(); } - public static void process(final int channelId, - final MarkableDataInput buffer, - final MethodProcessor dispatcher) throws IOException + public static void process(final MarkableDataInput buffer, + final ClientChannelMethodProcessor dispatcher) throws IOException { AMQShortString queue = buffer.readAMQShortString(); long messageCount = EncodingUtils.readUnsignedInteger(buffer); long consumerCount = EncodingUtils.readUnsignedInteger(buffer); - dispatcher.receiveQueueDeclareOk(channelId, queue, messageCount, consumerCount); + if(!dispatcher.ignoreAllButCloseOk()) + { + dispatcher.receiveQueueDeclareOk(queue, messageCount, consumerCount); + } } } Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueueDeleteBody.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueueDeleteBody.java?rev=1631275&r1=1631274&r2=1631275&view=diff ============================================================================== --- qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueueDeleteBody.java (original) +++ qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueueDeleteBody.java Mon Oct 13 00:58:45 2014 @@ -151,9 +151,8 @@ public class QueueDeleteBody extends AMQ return buf.toString(); } - public static void process(final int channelId, - final MarkableDataInput buffer, - final MethodProcessor dispatcher) throws IOException + public static void process(final MarkableDataInput buffer, + final ServerChannelMethodProcessor dispatcher) throws IOException { int ticket = buffer.readUnsignedShort(); @@ -163,6 +162,9 @@ public class QueueDeleteBody extends AMQ boolean ifUnused = (bitfield & 0x01) == 0x01; boolean ifEmpty = (bitfield & 0x02) == 0x02; boolean nowait = (bitfield & 0x04) == 0x04; - dispatcher.receiveQueueDelete(channelId, queue, ifUnused, ifEmpty, nowait); + if(!dispatcher.ignoreAllButCloseOk()) + { + dispatcher.receiveQueueDelete(queue, ifUnused, ifEmpty, nowait); + } } } Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueueDeleteOkBody.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueueDeleteOkBody.java?rev=1631275&r1=1631274&r2=1631275&view=diff ============================================================================== --- qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueueDeleteOkBody.java (original) +++ qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueueDeleteOkBody.java Mon Oct 13 00:58:45 2014 @@ -95,11 +95,13 @@ public class QueueDeleteOkBody extends A return buf.toString(); } - public static void process(final int channelId, - final MarkableDataInput buffer, - final MethodProcessor dispatcher) throws IOException + public static void process(final MarkableDataInput buffer, + final ClientChannelMethodProcessor dispatcher) throws IOException { long messageCount = EncodingUtils.readUnsignedInteger(buffer); - dispatcher.receiveQueueDeleteOk(channelId, messageCount); + if(!dispatcher.ignoreAllButCloseOk()) + { + dispatcher.receiveQueueDeleteOk(messageCount); + } } } Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueuePurgeBody.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueuePurgeBody.java?rev=1631275&r1=1631274&r2=1631275&view=diff ============================================================================== --- qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueuePurgeBody.java (original) +++ qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueuePurgeBody.java Mon Oct 13 00:58:45 2014 @@ -125,14 +125,16 @@ public class QueuePurgeBody extends AMQM return buf.toString(); } - public static void process(final int channelId, - final MarkableDataInput buffer, - final MethodProcessor dispatcher) throws IOException + public static void process(final MarkableDataInput buffer, + final ServerChannelMethodProcessor dispatcher) throws IOException { int ticket = buffer.readUnsignedShort(); AMQShortString queue = buffer.readAMQShortString(); boolean nowait = (buffer.readByte() & 0x01) == 0x01; - dispatcher.receiveQueuePurge(channelId, queue, nowait); + if(!dispatcher.ignoreAllButCloseOk()) + { + dispatcher.receiveQueuePurge(queue, nowait); + } } } Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueuePurgeOkBody.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueuePurgeOkBody.java?rev=1631275&r1=1631274&r2=1631275&view=diff ============================================================================== --- qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueuePurgeOkBody.java (original) +++ qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueuePurgeOkBody.java Mon Oct 13 00:58:45 2014 @@ -95,11 +95,13 @@ public class QueuePurgeOkBody extends AM return buf.toString(); } - public static void process(final int channelId, - final MarkableDataInput buffer, - final MethodProcessor dispatcher) throws IOException + public static void process(final MarkableDataInput buffer, + final ClientChannelMethodProcessor dispatcher) throws IOException { long messageCount = EncodingUtils.readUnsignedInteger(buffer); - dispatcher.receiveQueuePurgeOk(channelId, messageCount); + if(!dispatcher.ignoreAllButCloseOk()) + { + dispatcher.receiveQueuePurgeOk(messageCount); + } } } Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueueUnbindBody.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueueUnbindBody.java?rev=1631275&r1=1631274&r2=1631275&view=diff ============================================================================== --- qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueueUnbindBody.java (original) +++ qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueueUnbindBody.java Mon Oct 13 00:58:45 2014 @@ -147,9 +147,8 @@ public class QueueUnbindBody extends AMQ return buf.toString(); } - public static void process(final int channelId, - final MarkableDataInput buffer, - final MethodProcessor dispatcher) throws IOException, AMQFrameDecodingException + public static void process(final MarkableDataInput buffer, + final ServerChannelMethodProcessor dispatcher) throws IOException, AMQFrameDecodingException { int ticket = buffer.readUnsignedShort(); @@ -157,6 +156,9 @@ public class QueueUnbindBody extends AMQ AMQShortString exchange = buffer.readAMQShortString(); AMQShortString routingKey = buffer.readAMQShortString(); FieldTable arguments = EncodingUtils.readFieldTable(buffer); - dispatcher.receiveQueueUnbind(channelId, queue, exchange, routingKey, arguments); + if(!dispatcher.ignoreAllButCloseOk()) + { + dispatcher.receiveQueueUnbind(queue, exchange, routingKey, arguments); + } } } Added: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ServerChannelMethodProcessor.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ServerChannelMethodProcessor.java?rev=1631275&view=auto ============================================================================== --- qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ServerChannelMethodProcessor.java (added) +++ qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ServerChannelMethodProcessor.java Mon Oct 13 00:58:45 2014 @@ -0,0 +1,92 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.framing; + +public interface ServerChannelMethodProcessor extends ChannelMethodProcessor +{ + void receiveAccessRequest(AMQShortString realm, + boolean exclusive, + boolean passive, + boolean active, + boolean write, boolean read); + + void receiveExchangeDeclare(AMQShortString exchange, + AMQShortString type, + boolean passive, + boolean durable, + boolean autoDelete, boolean internal, boolean nowait, final FieldTable arguments); + + void receiveExchangeDelete(AMQShortString exchange, boolean ifUnused, boolean nowait); + + void receiveExchangeBound(AMQShortString exchange, AMQShortString routingKey, AMQShortString queue); + + void receiveQueueDeclare(AMQShortString queue, + boolean passive, + boolean durable, + boolean exclusive, + boolean autoDelete, boolean nowait, FieldTable arguments); + + void receiveQueueBind(AMQShortString queue, + AMQShortString exchange, + AMQShortString bindingKey, + boolean nowait, FieldTable arguments); + + void receiveQueuePurge(AMQShortString queue, boolean nowait); + + void receiveQueueDelete(AMQShortString queue, boolean ifUnused, boolean ifEmpty, boolean nowait); + + void receiveQueueUnbind(AMQShortString queue, + AMQShortString exchange, + AMQShortString bindingKey, + FieldTable arguments); + + void receiveBasicRecover(final boolean requeue, boolean sync); + + void receiveBasicQos(long prefetchSize, int prefetchCount, boolean global); + + void receiveBasicConsume(AMQShortString queue, + AMQShortString consumerTag, + boolean noLocal, + boolean noAck, + boolean exclusive, boolean nowait, FieldTable arguments); + + void receiveBasicCancel(AMQShortString consumerTag, boolean noWait); + + void receiveBasicPublish(AMQShortString exchange, + AMQShortString routingKey, + boolean mandatory, + boolean immediate); + + void receiveBasicGet(AMQShortString queue, boolean noAck); + + void receiveBasicAck(long deliveryTag, boolean multiple); + + void receiveBasicReject(long deliveryTag, boolean requeue); + + + + void receiveTxSelect(); + + void receiveTxCommit(); + + void receiveTxRollback(); + +} Propchange: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ServerChannelMethodProcessor.java ------------------------------------------------------------------------------ svn:eol-style = native Added: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ServerMethodProcessor.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ServerMethodProcessor.java?rev=1631275&view=auto ============================================================================== --- qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ServerMethodProcessor.java (added) +++ qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ServerMethodProcessor.java Mon Oct 13 00:58:45 2014 @@ -0,0 +1,39 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.framing; + +public interface ServerMethodProcessor<T extends ServerChannelMethodProcessor> extends MethodProcessor<T> +{ + void receiveConnectionStartOk(FieldTable clientProperties, + AMQShortString mechanism, + byte[] response, + AMQShortString locale); + + void receiveConnectionSecureOk(byte[] response); + + void receiveConnectionTuneOk(int channelMax, long frameMax, int heartbeat); + + void receiveConnectionOpen(AMQShortString virtualHost, AMQShortString capabilities, boolean insist); + + void receiveChannelOpen(int channelId); + + +} Propchange: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ServerMethodProcessor.java ------------------------------------------------------------------------------ svn:eol-style = native Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/transport/util/Functions.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/transport/util/Functions.java?rev=1631275&r1=1631274&r2=1631275&view=diff ============================================================================== --- qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/transport/util/Functions.java (original) +++ qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/transport/util/Functions.java Mon Oct 13 00:58:45 2014 @@ -20,10 +20,10 @@ */ package org.apache.qpid.transport.util; -import java.nio.ByteBuffer; - import static java.lang.Math.min; +import java.nio.ByteBuffer; + /** * Functions @@ -33,6 +33,9 @@ import static java.lang.Math.min; public final class Functions { + private static final char[] HEX_CHARACTERS = + {'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f'}; + private Functions() { } @@ -102,4 +105,21 @@ public final class Functions return str(ByteBuffer.wrap(bytes), limit); } + public static String hex(byte[] bytes, int limit) + { + limit = Math.min(limit, bytes == null ? 0 : bytes.length); + StringBuilder sb = new StringBuilder(3 + limit*2); + for(int i = 0; i < limit; i++) + { + sb.append(HEX_CHARACTERS[(((int)bytes[i]) & 0xf0)>>4]); + sb.append(HEX_CHARACTERS[(((int)bytes[i]) & 0x0f)]); + + } + if(bytes != null && bytes.length>limit) + { + sb.append("..."); + } + return sb.toString(); + } + } Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/test/java/org/apache/qpid/codec/AMQDecoderTest.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/test/java/org/apache/qpid/codec/AMQDecoderTest.java?rev=1631275&r1=1631274&r2=1631275&view=diff ============================================================================== --- qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/test/java/org/apache/qpid/codec/AMQDecoderTest.java (original) +++ qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/test/java/org/apache/qpid/codec/AMQDecoderTest.java Mon Oct 13 00:58:45 2014 @@ -47,7 +47,7 @@ public class AMQDecoderTest extends Test public void setUp() { _methodProcessor = new FrameCreatingMethodProcessor(ProtocolVersion.v0_91); - _decoder = new AMQDecoder(false, _methodProcessor); + _decoder = new ClientDecoder(_methodProcessor); } Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/systests/src/test/java/org/apache/qpid/transport/MaxFrameSizeTest.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/systests/src/test/java/org/apache/qpid/transport/MaxFrameSizeTest.java?rev=1631275&r1=1631274&r2=1631275&view=diff ============================================================================== --- qpid/branches/QPID-6125-ProtocolRefactoring/java/systests/src/test/java/org/apache/qpid/transport/MaxFrameSizeTest.java (original) +++ qpid/branches/QPID-6125-ProtocolRefactoring/java/systests/src/test/java/org/apache/qpid/transport/MaxFrameSizeTest.java Mon Oct 13 00:58:45 2014 @@ -26,6 +26,7 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.net.Socket; +import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.Collections; import java.util.List; @@ -40,13 +41,13 @@ import javax.security.sasl.Sasl; import javax.security.sasl.SaslClient; import javax.security.sasl.SaslException; +import org.apache.qpid.codec.AMQDecoder; +import org.apache.qpid.codec.ClientDecoder; import org.apache.qpid.framing.AMQDataBlock; -import org.apache.qpid.framing.AMQDataBlockDecoder; import org.apache.qpid.framing.AMQFrame; import org.apache.qpid.framing.AMQFrameDecodingException; import org.apache.qpid.framing.AMQProtocolVersionException; import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.ByteArrayDataInput; import org.apache.qpid.framing.ConnectionCloseBody; import org.apache.qpid.framing.ConnectionStartOkBody; import org.apache.qpid.framing.ConnectionTuneOkBody; @@ -234,14 +235,9 @@ public class MaxFrameSizeTest extends Qp } byte[] serverData = baos.toByteArray(); - ByteArrayDataInput badi = new ByteArrayDataInput(serverData); - AMQDataBlockDecoder datablockDecoder = new AMQDataBlockDecoder(); final FrameCreatingMethodProcessor methodProcessor = new FrameCreatingMethodProcessor(ProtocolVersion.v0_91); - - while (datablockDecoder.decodable(badi)) - { - datablockDecoder.processInput(methodProcessor, badi); - } + AMQDecoder decoder = new ClientDecoder(methodProcessor); + decoder.decodeBuffer(ByteBuffer.wrap(serverData)); evaluator.evaluate(socket, methodProcessor.getProcessedMethods()); } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
