Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java?rev=1632583&r1=1632582&r2=1632583&view=diff ============================================================================== --- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java (original) +++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java Fri Oct 17 14:23:19 2014 @@ -20,54 +20,45 @@ */ package org.apache.qpid.framing; -import org.apache.qpid.AMQException; -import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; - import java.io.DataInput; import java.io.DataInputStream; import java.io.DataOutput; import java.io.IOException; +import org.apache.qpid.AMQException; +import org.apache.qpid.codec.MarkableDataInput; +import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; + public class ContentHeaderBody implements AMQBody { public static final byte TYPE = 2; + public static final int CLASS_ID = 60; - private int classId; - - private int weight; - - private long bodySize; + private long _bodySize; /** must never be null */ - private BasicContentHeaderProperties properties; - - public ContentHeaderBody() - { - } + private BasicContentHeaderProperties _properties; public ContentHeaderBody(DataInput buffer, long size) throws AMQFrameDecodingException, IOException { - classId = buffer.readUnsignedShort(); - weight = buffer.readUnsignedShort(); - bodySize = buffer.readLong(); + buffer.readUnsignedShort(); + buffer.readUnsignedShort(); + _bodySize = buffer.readLong(); int propertyFlags = buffer.readUnsignedShort(); ContentHeaderPropertiesFactory factory = ContentHeaderPropertiesFactory.getInstance(); - properties = factory.createContentHeaderProperties(classId, propertyFlags, buffer, (int)size - 14); + _properties = factory.createContentHeaderProperties(CLASS_ID, propertyFlags, buffer, (int)size - 14); } - - public ContentHeaderBody(BasicContentHeaderProperties props, int classId) + public ContentHeaderBody(BasicContentHeaderProperties props) { - properties = props; - this.classId = classId; + _properties = props; } - public ContentHeaderBody(int classId, int weight, BasicContentHeaderProperties props, long bodySize) + public ContentHeaderBody(BasicContentHeaderProperties props, long bodySize) { - this(props, classId); - this.weight = weight; - this.bodySize = bodySize; + _properties = props; + _bodySize = bodySize; } public byte getFrameType() @@ -95,16 +86,16 @@ public class ContentHeaderBody implement public int getSize() { - return 2 + 2 + 8 + 2 + properties.getPropertyListSize(); + return 2 + 2 + 8 + 2 + _properties.getPropertyListSize(); } public void writePayload(DataOutput buffer) throws IOException { - EncodingUtils.writeUnsignedShort(buffer, classId); - EncodingUtils.writeUnsignedShort(buffer, weight); - buffer.writeLong(bodySize); - EncodingUtils.writeUnsignedShort(buffer, properties.getPropertyFlags()); - properties.writePropertyListPayload(buffer); + EncodingUtils.writeUnsignedShort(buffer, CLASS_ID); + EncodingUtils.writeUnsignedShort(buffer, 0); + buffer.writeLong(_bodySize); + EncodingUtils.writeUnsignedShort(buffer, _properties.getPropertyFlags()); + _properties.writePropertyListPayload(buffer); } public void handle(final int channelId, final AMQVersionAwareProtocolSession session) @@ -113,46 +104,42 @@ public class ContentHeaderBody implement session.contentHeaderReceived(channelId, this); } - public static AMQFrame createAMQFrame(int channelId, int classId, int weight, BasicContentHeaderProperties properties, + public static AMQFrame createAMQFrame(int channelId, + BasicContentHeaderProperties properties, long bodySize) { - return new AMQFrame(channelId, new ContentHeaderBody(classId, weight, properties, bodySize)); - } - - public static AMQFrame createAMQFrame(int channelId, ContentHeaderBody body) - { - return new AMQFrame(channelId, body); + return new AMQFrame(channelId, new ContentHeaderBody(properties, bodySize)); } public BasicContentHeaderProperties getProperties() { - return properties; + return _properties; } public void setProperties(BasicContentHeaderProperties props) { - properties = props; + _properties = props; } @Override public String toString() { return "ContentHeaderBody{" + - "classId=" + classId + - ", weight=" + weight + - ", bodySize=" + bodySize + - ", properties=" + properties + + "classId=" + CLASS_ID + + ", weight=" + 0 + + ", bodySize=" + _bodySize + + ", properties=" + _properties + '}'; } public int getClassId() { - return classId; + return CLASS_ID; } public int getWeight() { - return weight; + return 0; } /** unsigned long but java can't handle that anyway when allocating byte array @@ -160,11 +147,33 @@ public class ContentHeaderBody implement * @return the body size */ public long getBodySize() { - return bodySize; + return _bodySize; } public void setBodySize(long bodySize) { - this.bodySize = bodySize; + _bodySize = bodySize; + } + + public static void process(final MarkableDataInput buffer, + final ChannelMethodProcessor methodProcessor, final long size) + throws IOException, AMQFrameDecodingException + { + + int classId = buffer.readUnsignedShort(); + buffer.readUnsignedShort(); + long bodySize = buffer.readLong(); + int propertyFlags = buffer.readUnsignedShort(); + + BasicContentHeaderProperties properties; + + if (classId != CLASS_ID) + { + throw new AMQFrameDecodingException(null, "Unsupported content header class id: " + classId, null); + } + properties = new BasicContentHeaderProperties(); + properties.populatePropertiesFromBuffer(buffer, propertyFlags, (int)(size-14)); + + methodProcessor.receiveMessageHeader(properties, bodySize); } }
Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java?rev=1632583&r1=1632582&r2=1632583&view=diff ============================================================================== --- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java (original) +++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java Fri Oct 17 14:23:19 2014 @@ -20,8 +20,6 @@ */ package org.apache.qpid.framing; -import org.apache.qpid.framing.amqp_8_0.BasicConsumeBodyImpl; - import java.io.DataInput; import java.io.IOException; @@ -46,7 +44,7 @@ public class ContentHeaderPropertiesFact // AMQP version change: "Hardwired" version to major=8, minor=0 // TODO: Change so that the actual version is obtained from // the ProtocolInitiation object for this session. - if (classId == BasicConsumeBodyImpl.CLASS_ID) + if (classId == BasicConsumeBody.CLASS_ID) { properties = new BasicContentHeaderProperties(); } Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java?rev=1632583&r1=1632582&r2=1632583&view=diff ============================================================================== --- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java (original) +++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java Fri Oct 17 14:23:19 2014 @@ -20,15 +20,15 @@ */ package org.apache.qpid.framing; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.charset.Charset; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + public class EncodingUtils { private static final Logger _logger = LoggerFactory.getLogger(EncodingUtils.class); @@ -218,12 +218,6 @@ public class EncodingUtils } } - public static int encodedContentLength(Content table) - { - // TODO: New Content class required for AMQP 0-9. - return 0; - } - public static void writeShortStringBytes(DataOutput buffer, String s) throws IOException { if (s != null) @@ -374,11 +368,6 @@ public class EncodingUtils } } - public static void writeContentBytes(DataOutput buffer, Content content) - { - // TODO: New Content class required for AMQP 0-9. - } - public static void writeBooleans(DataOutput buffer, boolean[] values) throws IOException { byte packedValue = 0; @@ -656,12 +645,6 @@ public class EncodingUtils } } - public static Content readContent(DataInput buffer) throws AMQFrameDecodingException - { - // TODO: New Content class required for AMQP 0-9. - return null; - } - public static AMQShortString readAMQShortString(DataInput buffer) throws IOException { return AMQShortString.readFromBuffer(buffer); @@ -955,7 +938,6 @@ public class EncodingUtils } else { - // really writing out unsigned byte writeUnsignedInteger(buffer, 0L); } } Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java?rev=1632583&r1=1632582&r2=1632583&view=diff ============================================================================== --- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java (original) +++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java Fri Oct 17 14:23:19 2014 @@ -20,13 +20,14 @@ */ package org.apache.qpid.framing; -import org.apache.qpid.AMQException; -import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; - import java.io.DataInputStream; import java.io.DataOutput; import java.io.IOException; +import org.apache.qpid.AMQException; +import org.apache.qpid.codec.MarkableDataInput; +import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; + public class HeartbeatBody implements AMQBody { public static final byte TYPE = 8; @@ -79,4 +80,17 @@ public class HeartbeatBody implements AM { return new AMQFrame(0, this); } + + public static void process(final int channel, + final MarkableDataInput in, + final MethodProcessor processor, + final long bodySize) throws IOException + { + + if(bodySize > 0) + { + in.skip(bodySize); + } + processor.receiveHeartbeat(); + } } Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/MethodRegistry.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/MethodRegistry.java?rev=1632583&r1=1632582&r2=1632583&view=diff ============================================================================== --- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/MethodRegistry.java (original) +++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/MethodRegistry.java Fri Oct 17 14:23:19 2014 @@ -29,330 +29,529 @@ package org.apache.qpid.framing; -import java.io.IOException; +public final class MethodRegistry +{ + private ProtocolVersion _protocolVersion; -import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter; -import org.apache.qpid.codec.MarkableDataInput; -import java.util.Map; -import java.util.HashMap; + public MethodRegistry(ProtocolVersion pv) + { + _protocolVersion = pv; + } + public void setProtocolVersion(final ProtocolVersion protocolVersion) + { + _protocolVersion = protocolVersion; + } -public abstract class MethodRegistry -{ - private static final Map<ProtocolVersion, MethodRegistry> _registries = - new HashMap<ProtocolVersion, MethodRegistry>(); + public final AccessRequestBody createAccessRequestBody(final AMQShortString realm, + final boolean exclusive, + final boolean passive, + final boolean active, + final boolean write, + final boolean read) + { + return new AccessRequestBody(realm, + exclusive, + passive, + active, + write, + read); + } + public final AccessRequestOkBody createAccessRequestOkBody(final int ticket) + { + return new AccessRequestOkBody(ticket); + } - public static final MethodRegistry registry_0_9 = - new org.apache.qpid.framing.amqp_0_9.MethodRegistry_0_9(); - public static final MethodRegistry registry_0_91 = - new org.apache.qpid.framing.amqp_0_91.MethodRegistry_0_91(); - - public static final MethodRegistry registry_8_0 = - new org.apache.qpid.framing.amqp_8_0.MethodRegistry_8_0(); - - public abstract AMQMethodBody convertToBody(MarkableDataInput in, long size) - throws AMQFrameDecodingException, IOException; - - public abstract int getMaxClassId(); - - public abstract int getMaxMethodId(int classId); - - protected MethodRegistry(ProtocolVersion pv) - { - _registries.put(pv, this); - } - - public static MethodRegistry getMethodRegistry(ProtocolVersion pv) - { - return _registries.get(pv); - } - - - - - public abstract BasicAckBody createBasicAckBody( - final long deliveryTag, - final boolean multiple - ); - - public abstract BasicCancelBody createBasicCancelBody( - final AMQShortString consumerTag, - final boolean nowait - ); - - public abstract BasicCancelOkBody createBasicCancelOkBody( - final AMQShortString consumerTag - ); - - public abstract BasicConsumeBody createBasicConsumeBody( - final int ticket, - final AMQShortString queue, - final AMQShortString consumerTag, - final boolean noLocal, - final boolean noAck, - final boolean exclusive, - final boolean nowait, - final FieldTable arguments - ); - - public abstract BasicConsumeOkBody createBasicConsumeOkBody( - final AMQShortString consumerTag - ); - - public abstract BasicDeliverBody createBasicDeliverBody( - final AMQShortString consumerTag, - final long deliveryTag, - final boolean redelivered, - final AMQShortString exchange, - final AMQShortString routingKey - ); - - public abstract BasicGetBody createBasicGetBody( - final int ticket, - final AMQShortString queue, - final boolean noAck - ); - - public abstract BasicGetEmptyBody createBasicGetEmptyBody( - final AMQShortString clusterId - ); - - public abstract BasicGetOkBody createBasicGetOkBody( - final long deliveryTag, - final boolean redelivered, - final AMQShortString exchange, - final AMQShortString routingKey, - final long messageCount - ); - - public abstract BasicPublishBody createBasicPublishBody( - final int ticket, - final AMQShortString exchange, - final AMQShortString routingKey, - final boolean mandatory, - final boolean immediate - ); - - public abstract BasicQosBody createBasicQosBody( - final long prefetchSize, - final int prefetchCount, - final boolean global - ); - - public abstract BasicQosOkBody createBasicQosOkBody( - ); - - public abstract BasicRecoverBody createBasicRecoverBody( - final boolean requeue - ); - - public abstract BasicRejectBody createBasicRejectBody( - final long deliveryTag, - final boolean requeue - ); - - public abstract BasicReturnBody createBasicReturnBody( - final int replyCode, - final AMQShortString replyText, - final AMQShortString exchange, - final AMQShortString routingKey - ); - - - public abstract ChannelCloseBody createChannelCloseBody( - final int replyCode, - final AMQShortString replyText, - final int classId, - final int methodId - ); - - public abstract ChannelCloseOkBody createChannelCloseOkBody( - ); - - public abstract ChannelFlowBody createChannelFlowBody( - final boolean active - ); - - public abstract ChannelFlowOkBody createChannelFlowOkBody( - final boolean active - ); - - public abstract ChannelOpenBody createChannelOpenBody( - final AMQShortString outOfBand - ); - - - public abstract ConnectionCloseBody createConnectionCloseBody( - final int replyCode, - final AMQShortString replyText, - final int classId, - final int methodId - ); - - public abstract ConnectionCloseOkBody createConnectionCloseOkBody( - ); - - public abstract ConnectionOpenBody createConnectionOpenBody( - final AMQShortString virtualHost, - final AMQShortString capabilities, - final boolean insist - ); - - public abstract ConnectionOpenOkBody createConnectionOpenOkBody( - final AMQShortString knownHosts - ); - - public abstract ConnectionSecureBody createConnectionSecureBody( - final byte[] challenge - ); - - public abstract ConnectionSecureOkBody createConnectionSecureOkBody( - final byte[] response - ); - - public abstract ConnectionStartBody createConnectionStartBody( - final short versionMajor, - final short versionMinor, - final FieldTable serverProperties, - final byte[] mechanisms, - final byte[] locales - ); - - public abstract ConnectionStartOkBody createConnectionStartOkBody( - final FieldTable clientProperties, - final AMQShortString mechanism, - final byte[] response, - final AMQShortString locale - ); - - public abstract ConnectionTuneBody createConnectionTuneBody( - final int channelMax, - final long frameMax, - final int heartbeat - ); - - public abstract ConnectionTuneOkBody createConnectionTuneOkBody( - final int channelMax, - final long frameMax, - final int heartbeat - ); - - - - public abstract ExchangeBoundBody createExchangeBoundBody( - final AMQShortString exchange, - final AMQShortString routingKey, - final AMQShortString queue - ); - - public abstract ExchangeBoundOkBody createExchangeBoundOkBody( - final int replyCode, - final AMQShortString replyText - ); - - public abstract ExchangeDeclareBody createExchangeDeclareBody( - final int ticket, - final AMQShortString exchange, - final AMQShortString type, - final boolean passive, - final boolean durable, - final boolean autoDelete, - final boolean internal, - final boolean nowait, - final FieldTable arguments - ); - - public abstract ExchangeDeclareOkBody createExchangeDeclareOkBody( - ); - - public abstract ExchangeDeleteBody createExchangeDeleteBody( - final int ticket, - final AMQShortString exchange, - final boolean ifUnused, - final boolean nowait - ); - - public abstract ExchangeDeleteOkBody createExchangeDeleteOkBody( - ); - - - - - public abstract QueueBindBody createQueueBindBody( - final int ticket, - final AMQShortString queue, - final AMQShortString exchange, - final AMQShortString routingKey, - final boolean nowait, - final FieldTable arguments - ); - - public abstract QueueBindOkBody createQueueBindOkBody( - ); - - public abstract QueueDeclareBody createQueueDeclareBody( - final int ticket, - final AMQShortString queue, - final boolean passive, - final boolean durable, - final boolean exclusive, - final boolean autoDelete, - final boolean nowait, - final FieldTable arguments - ); - - public abstract QueueDeclareOkBody createQueueDeclareOkBody( - final AMQShortString queue, - final long messageCount, - final long consumerCount - ); - - public abstract QueueDeleteBody createQueueDeleteBody( - final int ticket, - final AMQShortString queue, - final boolean ifUnused, - final boolean ifEmpty, - final boolean nowait - ); - - public abstract QueueDeleteOkBody createQueueDeleteOkBody( - final long messageCount - ); - - public abstract QueuePurgeBody createQueuePurgeBody( - final int ticket, - final AMQShortString queue, - final boolean nowait - ); - - public abstract QueuePurgeOkBody createQueuePurgeOkBody( - final long messageCount - ); - - - - - - public abstract TxCommitBody createTxCommitBody( - ); - - public abstract TxCommitOkBody createTxCommitOkBody( - ); - - public abstract TxRollbackBody createTxRollbackBody( - ); - - public abstract TxRollbackOkBody createTxRollbackOkBody( - ); + public final BasicQosBody createBasicQosBody(final long prefetchSize, + final int prefetchCount, + final boolean global) + { + return new BasicQosBody(prefetchSize, + prefetchCount, + global); + } - public abstract TxSelectBody createTxSelectBody( - ); + public final BasicQosOkBody createBasicQosOkBody() + { + return new BasicQosOkBody(); + } + + public final BasicConsumeBody createBasicConsumeBody(final int ticket, + final AMQShortString queue, + final AMQShortString consumerTag, + final boolean noLocal, + final boolean noAck, + final boolean exclusive, + final boolean nowait, + final FieldTable arguments) + { + return new BasicConsumeBody(ticket, + queue, + consumerTag, + noLocal, + noAck, + exclusive, + nowait, + arguments); + } + + public final BasicConsumeOkBody createBasicConsumeOkBody(final AMQShortString consumerTag) + { + return new BasicConsumeOkBody(consumerTag); + } + + public final BasicCancelBody createBasicCancelBody(final AMQShortString consumerTag, + final boolean nowait) + { + return new BasicCancelBody(consumerTag, + nowait); + } + + public final BasicCancelOkBody createBasicCancelOkBody(final AMQShortString consumerTag) + { + return new BasicCancelOkBody(consumerTag); + } + + public final BasicPublishBody createBasicPublishBody(final int ticket, + final AMQShortString exchange, + final AMQShortString routingKey, + final boolean mandatory, + final boolean immediate) + { + return new BasicPublishBody(ticket, + exchange, + routingKey, + mandatory, + immediate); + } + + public final BasicReturnBody createBasicReturnBody(final int replyCode, + final AMQShortString replyText, + final AMQShortString exchange, + final AMQShortString routingKey) + { + return new BasicReturnBody(replyCode, + replyText, + exchange, + routingKey); + } + + public final BasicDeliverBody createBasicDeliverBody(final AMQShortString consumerTag, + final long deliveryTag, + final boolean redelivered, + final AMQShortString exchange, + final AMQShortString routingKey) + { + return new BasicDeliverBody(consumerTag, + deliveryTag, + redelivered, + exchange, + routingKey); + } + + public final BasicGetBody createBasicGetBody(final int ticket, + final AMQShortString queue, + final boolean noAck) + { + return new BasicGetBody(ticket, + queue, + noAck); + } + + public final BasicGetOkBody createBasicGetOkBody(final long deliveryTag, + final boolean redelivered, + final AMQShortString exchange, + final AMQShortString routingKey, + final long messageCount) + { + return new BasicGetOkBody(deliveryTag, + redelivered, + exchange, + routingKey, + messageCount); + } + + public final BasicGetEmptyBody createBasicGetEmptyBody(final AMQShortString clusterId) + { + return new BasicGetEmptyBody(clusterId); + } + + public final BasicAckBody createBasicAckBody(final long deliveryTag, + final boolean multiple) + { + return new BasicAckBody(deliveryTag, + multiple); + } + + public final BasicRejectBody createBasicRejectBody(final long deliveryTag, + final boolean requeue) + { + return new BasicRejectBody(deliveryTag, + requeue); + } + + public final BasicRecoverBody createBasicRecoverBody(final boolean requeue) + { + return new BasicRecoverBody(requeue); + } + + + public final BasicRecoverSyncOkBody createBasicRecoverSyncOkBody() + { + return new BasicRecoverSyncOkBody(_protocolVersion); + } + + + public final BasicRecoverSyncBody createBasicRecoverSyncBody(final boolean requeue) + { + return new BasicRecoverSyncBody(_protocolVersion, requeue); + } + + public final ChannelAlertBody createChannelAlertBody(final int replyCode, + final AMQShortString replyText, + final FieldTable details) + { + return new ChannelAlertBody(replyCode, + replyText, + details); + } + + public final ChannelOpenBody createChannelOpenBody(final AMQShortString outOfBand) + { + return new ChannelOpenBody(); + } + + public final ChannelOpenOkBody createChannelOpenOkBody(byte[] channelId) + { + return createChannelOpenOkBody(); + } + + public final ChannelOpenOkBody createChannelOpenOkBody() + { + return _protocolVersion.equals(ProtocolVersion.v8_0) + ? ChannelOpenOkBody.INSTANCE_0_8 + : ChannelOpenOkBody.INSTANCE_0_9; + } + + public final ChannelFlowBody createChannelFlowBody(final boolean active) + { + return new ChannelFlowBody(active); + } + + public final ChannelFlowOkBody createChannelFlowOkBody(final boolean active) + { + return new ChannelFlowOkBody(active); + } + + public final ChannelCloseBody createChannelCloseBody(final int replyCode, final AMQShortString replyText, + final int classId, + final int methodId + ) + { + return new ChannelCloseBody(replyCode, + replyText, + classId, + methodId); + } + + public final ChannelCloseOkBody createChannelCloseOkBody() + { + return ChannelCloseOkBody.INSTANCE; + } + + + + + public final ConnectionStartBody createConnectionStartBody(final short versionMajor, + final short versionMinor, + final FieldTable serverProperties, + final byte[] mechanisms, + final byte[] locales) + { + return new ConnectionStartBody(versionMajor, + versionMinor, + serverProperties, + mechanisms, + locales); + } + + public final ConnectionStartOkBody createConnectionStartOkBody(final FieldTable clientProperties, + final AMQShortString mechanism, + final byte[] response, + final AMQShortString locale) + { + return new ConnectionStartOkBody(clientProperties, + mechanism, + response, + locale); + } + + public final ConnectionSecureBody createConnectionSecureBody(final byte[] challenge) + { + return new ConnectionSecureBody(challenge); + } + + public final ConnectionSecureOkBody createConnectionSecureOkBody(final byte[] response) + { + return new ConnectionSecureOkBody(response); + } + + public final ConnectionTuneBody createConnectionTuneBody(final int channelMax, + final long frameMax, + final int heartbeat) + { + return new ConnectionTuneBody(channelMax, + frameMax, + heartbeat); + } + + public final ConnectionTuneOkBody createConnectionTuneOkBody(final int channelMax, + final long frameMax, + final int heartbeat) + { + return new ConnectionTuneOkBody(channelMax, + frameMax, + heartbeat); + } + + public final ConnectionOpenBody createConnectionOpenBody(final AMQShortString virtualHost, + final AMQShortString capabilities, + final boolean insist) + { + return new ConnectionOpenBody(virtualHost, + capabilities, + insist); + } + + public final ConnectionOpenOkBody createConnectionOpenOkBody(final AMQShortString knownHosts) + { + return new ConnectionOpenOkBody(knownHosts); + } + + public final ConnectionRedirectBody createConnectionRedirectBody(final AMQShortString host, + final AMQShortString knownHosts) + { + return new ConnectionRedirectBody(_protocolVersion, + host, + knownHosts); + } + + public final ConnectionCloseBody createConnectionCloseBody(final int replyCode, + final AMQShortString replyText, + final int classId, + final int methodId) + { + return new ConnectionCloseBody(_protocolVersion, + replyCode, + replyText, + classId, + methodId); + } + + public final ConnectionCloseOkBody createConnectionCloseOkBody() + { + return ProtocolVersion.v8_0 == _protocolVersion + ? ConnectionCloseOkBody.CONNECTION_CLOSE_OK_0_8 + : ConnectionCloseOkBody.CONNECTION_CLOSE_OK_0_9; + } + + + public final ExchangeDeclareBody createExchangeDeclareBody(final int ticket, + final AMQShortString exchange, + final AMQShortString type, + final boolean passive, + final boolean durable, + final boolean autoDelete, + final boolean internal, + final boolean nowait, + final FieldTable arguments) + { + return new ExchangeDeclareBody(ticket, + exchange, + type, + passive, + durable, + autoDelete, + internal, + nowait, + arguments); + } + + public final ExchangeDeclareOkBody createExchangeDeclareOkBody() + { + return new ExchangeDeclareOkBody(); + } + + public final ExchangeDeleteBody createExchangeDeleteBody(final int ticket, + final AMQShortString exchange, + final boolean ifUnused, + final boolean nowait) + { + return new ExchangeDeleteBody(ticket, + exchange, + ifUnused, + nowait + ); + } + + public final ExchangeDeleteOkBody createExchangeDeleteOkBody() + { + return new ExchangeDeleteOkBody(); + } + + public final ExchangeBoundBody createExchangeBoundBody(final AMQShortString exchange, + final AMQShortString routingKey, + final AMQShortString queue) + { + return new ExchangeBoundBody(exchange, + routingKey, + queue); + } + + public final ExchangeBoundOkBody createExchangeBoundOkBody(final int replyCode, + final AMQShortString replyText) + { + return new ExchangeBoundOkBody(replyCode, + replyText); + } + + + public final QueueDeclareBody createQueueDeclareBody(final int ticket, + final AMQShortString queue, + final boolean passive, + final boolean durable, + final boolean exclusive, + final boolean autoDelete, + final boolean nowait, + final FieldTable arguments) + { + return new QueueDeclareBody(ticket, + queue, + passive, + durable, + exclusive, + autoDelete, + nowait, + arguments); + } + + public final QueueDeclareOkBody createQueueDeclareOkBody(final AMQShortString queue, + final long messageCount, + final long consumerCount) + { + return new QueueDeclareOkBody(queue, + messageCount, + consumerCount); + } + + public final QueueBindBody createQueueBindBody(final int ticket, + final AMQShortString queue, + final AMQShortString exchange, + final AMQShortString routingKey, + final boolean nowait, + final FieldTable arguments) + { + return new QueueBindBody(ticket, + queue, + exchange, + routingKey, + nowait, + arguments); + } + + public final QueueBindOkBody createQueueBindOkBody() + { + return new QueueBindOkBody(); + } + + public final QueuePurgeBody createQueuePurgeBody(final int ticket, + final AMQShortString queue, + final boolean nowait) + { + return new QueuePurgeBody(ticket, + queue, + nowait); + } + + public final QueuePurgeOkBody createQueuePurgeOkBody(final long messageCount) + { + return new QueuePurgeOkBody(messageCount); + } + + public final QueueDeleteBody createQueueDeleteBody(final int ticket, + final AMQShortString queue, + final boolean ifUnused, + final boolean ifEmpty, + final boolean nowait) + { + return new QueueDeleteBody(ticket, + queue, + ifUnused, + ifEmpty, + nowait); + } + + public final QueueDeleteOkBody createQueueDeleteOkBody(final long messageCount) + { + return new QueueDeleteOkBody(messageCount); + } + + public final QueueUnbindBody createQueueUnbindBody(final int ticket, + final AMQShortString queue, + final AMQShortString exchange, + final AMQShortString routingKey, + final FieldTable arguments) + { + return new QueueUnbindBody(ticket, + queue, + exchange, + routingKey, + arguments); + } + + public final QueueUnbindOkBody createQueueUnbindOkBody() + { + return new QueueUnbindOkBody(); + } + + + public final TxSelectBody createTxSelectBody() + { + return TxSelectBody.INSTANCE; + } + + public final TxSelectOkBody createTxSelectOkBody() + { + return TxSelectOkBody.INSTANCE; + } + + public final TxCommitBody createTxCommitBody() + { + return TxCommitBody.INSTANCE; + } + + public final TxCommitOkBody createTxCommitOkBody() + { + return TxCommitOkBody.INSTANCE; + } + + public final TxRollbackBody createTxRollbackBody() + { + return TxRollbackBody.INSTANCE; + } + + public final TxRollbackOkBody createTxRollbackOkBody() + { + return TxRollbackOkBody.INSTANCE; + } + + public ProtocolVersion getProtocolVersion() + { + return _protocolVersion; + } - public abstract TxSelectOkBody createTxSelectOkBody( - ); - public abstract ProtocolVersionMethodConverter getProtocolVersionMethodConverter(); } Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java?rev=1632583&r1=1632582&r2=1632583&view=diff ============================================================================== --- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java (original) +++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java Fri Oct 17 14:23:19 2014 @@ -20,14 +20,15 @@ */ package org.apache.qpid.framing; -import org.apache.qpid.AMQException; -import org.apache.qpid.codec.MarkableDataInput; - import java.io.DataOutput; import java.io.IOException; import java.io.UnsupportedEncodingException; +import java.nio.charset.StandardCharsets; import java.util.Arrays; +import org.apache.qpid.AMQException; +import org.apache.qpid.codec.MarkableDataInput; + public class ProtocolInitiation extends AMQDataBlock implements EncodableAMQDataBlock { @@ -227,7 +228,7 @@ public class ProtocolInitiation extends public String toString() { - StringBuffer buffer = new StringBuffer(new String(_protocolHeader)); + StringBuffer buffer = new StringBuffer(new String(_protocolHeader, StandardCharsets.US_ASCII)); buffer.append(Integer.toHexString(_protocolClass)); buffer.append(Integer.toHexString(_protocolInstance)); buffer.append(Integer.toHexString(_protocolMajor)); Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ServerMethodDispatcher.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ServerMethodDispatcher.java?rev=1632583&r1=1632582&r2=1632583&view=diff ============================================================================== --- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ServerMethodDispatcher.java (original) +++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ServerMethodDispatcher.java Fri Oct 17 14:23:19 2014 @@ -33,6 +33,7 @@ import org.apache.qpid.AMQException; public interface ServerMethodDispatcher { + boolean dispatchAccessRequest(AccessRequestBody accessRequestBody, int channelId) throws AMQException; public boolean dispatchBasicAck(BasicAckBody body, int channelId) throws AMQException; public boolean dispatchBasicCancel(BasicCancelBody body, int channelId) throws AMQException; @@ -64,4 +65,7 @@ public interface ServerMethodDispatcher public boolean dispatchTxRollback(TxRollbackBody body, int channelId) throws AMQException; public boolean dispatchTxSelect(TxSelectBody body, int channelId) throws AMQException; -} \ No newline at end of file + boolean dispatchQueueUnbind(QueueUnbindBody queueUnbindBody, int channelId) throws AMQException; + + boolean dispatchBasicRecoverSync(BasicRecoverSyncBody basicRecoverSyncBody, int channelId) throws AMQException; +} Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/util/Functions.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/util/Functions.java?rev=1632583&r1=1632582&r2=1632583&view=diff ============================================================================== --- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/util/Functions.java (original) +++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/util/Functions.java Fri Oct 17 14:23:19 2014 @@ -20,10 +20,10 @@ */ package org.apache.qpid.transport.util; -import java.nio.ByteBuffer; - import static java.lang.Math.min; +import java.nio.ByteBuffer; + /** * Functions @@ -33,6 +33,9 @@ import static java.lang.Math.min; public final class Functions { + private static final char[] HEX_CHARACTERS = + {'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f'}; + private Functions() { } @@ -102,4 +105,21 @@ public final class Functions return str(ByteBuffer.wrap(bytes), limit); } + public static String hex(byte[] bytes, int limit) + { + limit = Math.min(limit, bytes == null ? 0 : bytes.length); + StringBuilder sb = new StringBuilder(3 + limit*2); + for(int i = 0; i < limit; i++) + { + sb.append(HEX_CHARACTERS[(((int)bytes[i]) & 0xf0)>>4]); + sb.append(HEX_CHARACTERS[(((int)bytes[i]) & 0x0f)]); + + } + if(bytes != null && bytes.length>limit) + { + sb.append("..."); + } + return sb.toString(); + } + } Modified: qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/codec/AMQDecoderTest.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/codec/AMQDecoderTest.java?rev=1632583&r1=1632582&r2=1632583&view=diff ============================================================================== --- qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/codec/AMQDecoderTest.java (original) +++ qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/codec/AMQDecoderTest.java Fri Oct 17 14:23:19 2014 @@ -25,7 +25,7 @@ import java.io.ByteArrayOutputStream; import java.io.DataOutputStream; import java.io.IOException; import java.nio.ByteBuffer; -import java.util.ArrayList; +import java.util.List; import junit.framework.TestCase; @@ -33,17 +33,21 @@ import org.apache.qpid.framing.AMQDataBl import org.apache.qpid.framing.AMQFrame; import org.apache.qpid.framing.AMQFrameDecodingException; import org.apache.qpid.framing.AMQProtocolVersionException; +import org.apache.qpid.framing.FrameCreatingMethodProcessor; import org.apache.qpid.framing.HeartbeatBody; +import org.apache.qpid.framing.ProtocolVersion; public class AMQDecoderTest extends TestCase { private AMQDecoder _decoder; + private FrameCreatingMethodProcessor _methodProcessor; public void setUp() { - _decoder = new AMQDecoder(false, null); + _methodProcessor = new FrameCreatingMethodProcessor(ProtocolVersion.v0_91); + _decoder = new ClientDecoder(_methodProcessor); } @@ -57,7 +61,8 @@ public class AMQDecoderTest extends Test public void testSingleFrameDecode() throws AMQProtocolVersionException, AMQFrameDecodingException, IOException { ByteBuffer msg = getHeartbeatBodyBuffer(); - ArrayList<AMQDataBlock> frames = _decoder.decodeBuffer(msg); + _decoder.decodeBuffer(msg); + List<AMQDataBlock> frames = _methodProcessor.getProcessedMethods(); if (frames.get(0) instanceof AMQFrame) { assertEquals(HeartbeatBody.FRAME.getBodyFrame().getFrameType(), ((AMQFrame) frames.get(0)).getBodyFrame().getFrameType()); @@ -77,9 +82,12 @@ public class AMQDecoderTest extends Test msgA.limit(msgaLimit); msg.position(msgbPos); ByteBuffer msgB = msg.slice(); - ArrayList<AMQDataBlock> frames = _decoder.decodeBuffer(msgA); + + _decoder.decodeBuffer(msgA); + List<AMQDataBlock> frames = _methodProcessor.getProcessedMethods(); assertEquals(0, frames.size()); - frames = _decoder.decodeBuffer(msgB); + + _decoder.decodeBuffer(msgB); assertEquals(1, frames.size()); if (frames.get(0) instanceof AMQFrame) { @@ -99,7 +107,8 @@ public class AMQDecoderTest extends Test msg.put(msgA); msg.put(msgB); msg.flip(); - ArrayList<AMQDataBlock> frames = _decoder.decodeBuffer(msg); + _decoder.decodeBuffer(msg); + List<AMQDataBlock> frames = _methodProcessor.getProcessedMethods(); assertEquals(2, frames.size()); for (AMQDataBlock frame : frames) { @@ -136,12 +145,15 @@ public class AMQDecoderTest extends Test sliceB.put(msgC); sliceB.flip(); msgC.limit(limit); - - ArrayList<AMQDataBlock> frames = _decoder.decodeBuffer(sliceA); + + _decoder.decodeBuffer(sliceA); + List<AMQDataBlock> frames = _methodProcessor.getProcessedMethods(); assertEquals(1, frames.size()); - frames = _decoder.decodeBuffer(sliceB); + frames.clear(); + _decoder.decodeBuffer(sliceB); assertEquals(1, frames.size()); - frames = _decoder.decodeBuffer(msgC); + frames.clear(); + _decoder.decodeBuffer(msgC); assertEquals(1, frames.size()); for (AMQDataBlock frame : frames) { Modified: qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/framing/abstraction/MessagePublishInfoImplTest.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/framing/abstraction/MessagePublishInfoImplTest.java?rev=1632583&r1=1632582&r2=1632583&view=diff ============================================================================== --- qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/framing/abstraction/MessagePublishInfoImplTest.java (original) +++ qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/framing/abstraction/MessagePublishInfoImplTest.java Fri Oct 17 14:23:19 2014 @@ -23,16 +23,17 @@ package org.apache.qpid.framing.abstract import junit.framework.TestCase; import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.MessagePublishInfo; public class MessagePublishInfoImplTest extends TestCase { - private MessagePublishInfoImpl _mpi; + private MessagePublishInfo _mpi; private final AMQShortString _exchange = new AMQShortString("exchange"); private final AMQShortString _routingKey = new AMQShortString("routingKey"); public void setUp() { - _mpi = new MessagePublishInfoImpl(_exchange, true, true, _routingKey); + _mpi = new MessagePublishInfo(_exchange, true, true, _routingKey); } /** Test that we can update the exchange value. */ @@ -55,7 +56,7 @@ public class MessagePublishInfoImplTest //Check that the set value is correct assertTrue("Set value for immediate not as expected", _mpi.isImmediate()); - MessagePublishInfoImpl mpi = new MessagePublishInfoImpl(); + MessagePublishInfo mpi = new MessagePublishInfo(); assertFalse("Default value for immediate should be false", mpi.isImmediate()); @@ -72,7 +73,7 @@ public class MessagePublishInfoImplTest { assertTrue("Set value for mandatory not as expected", _mpi.isMandatory()); - MessagePublishInfoImpl mpi = new MessagePublishInfoImpl(); + MessagePublishInfo mpi = new MessagePublishInfo(); assertFalse("Default value for mandatory should be false", mpi.isMandatory()); Propchange: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java ------------------------------------------------------------------------------ Merged /qpid/branches/QPID-6125-ProtocolRefactoring/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java:r1628068-1632579 Modified: qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/server/logging/ExchangeLoggingTest.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/server/logging/ExchangeLoggingTest.java?rev=1632583&r1=1632582&r2=1632583&view=diff ============================================================================== --- qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/server/logging/ExchangeLoggingTest.java (original) +++ qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/server/logging/ExchangeLoggingTest.java Fri Oct 17 14:23:19 2014 @@ -20,14 +20,8 @@ */ package org.apache.qpid.server.logging; -import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.client.AMQSession; -import org.apache.qpid.client.AMQSession_0_10; -import org.apache.qpid.framing.AMQFrame; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.ExchangeDeleteBody; -import org.apache.qpid.framing.ExchangeDeleteOkBody; -import org.apache.qpid.framing.amqp_8_0.MethodRegistry_8_0; +import java.io.IOException; +import java.util.List; import javax.jms.Connection; import javax.jms.JMSException; @@ -35,8 +29,16 @@ import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; -import java.io.IOException; -import java.util.List; + +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.client.AMQSession_0_10; +import org.apache.qpid.framing.AMQFrame; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.ExchangeDeleteBody; +import org.apache.qpid.framing.ExchangeDeleteOkBody; +import org.apache.qpid.framing.MethodRegistry; +import org.apache.qpid.framing.ProtocolVersion; /** * Exchange @@ -191,7 +193,7 @@ public class ExchangeLoggingTest extends } else { - MethodRegistry_8_0 registry = new MethodRegistry_8_0(); + MethodRegistry registry = new MethodRegistry(ProtocolVersion.v8_0); ExchangeDeleteBody body = registry.createExchangeDeleteBody(0, new AMQShortString(_name), false, true); Modified: qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java?rev=1632583&r1=1632582&r2=1632583&view=diff ============================================================================== --- qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java (original) +++ qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java Fri Oct 17 14:23:19 2014 @@ -41,8 +41,7 @@ import org.apache.qpid.framing.AMQShortS import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.framing.abstraction.MessagePublishInfo; -import org.apache.qpid.framing.amqp_8_0.BasicConsumeBodyImpl; +import org.apache.qpid.framing.MessagePublishInfo; import org.apache.qpid.server.configuration.updater.TaskExecutor; import org.apache.qpid.server.configuration.updater.TaskExecutorImpl; import org.apache.qpid.server.connection.SessionPrincipal; @@ -597,9 +596,9 @@ public class VirtualHostMessageStoreTest headers.setString("Test", "MST"); properties.setHeaders(headers); - MessagePublishInfo messageInfo = new TestMessagePublishInfo(exchange, false, false, routingKey); + MessagePublishInfo messageInfo = new MessagePublishInfo(new AMQShortString(exchange.getName()), false, false, new AMQShortString(routingKey)); - ContentHeaderBody headerBody = new ContentHeaderBody(BasicConsumeBodyImpl.CLASS_ID,0,properties,0l); + ContentHeaderBody headerBody = new ContentHeaderBody(properties,0l); MessageMetaData mmd = new MessageMetaData(messageInfo, headerBody, System.currentTimeMillis()); @@ -824,52 +823,4 @@ public class VirtualHostMessageStoreTest assertEquals("Incorrect Message count on queue:" + queueName, messageCount, queue.getQueueDepthMessages()); } - - private class TestMessagePublishInfo implements MessagePublishInfo - { - - ExchangeImpl<?> _exchange; - boolean _immediate; - boolean _mandatory; - String _routingKey; - - TestMessagePublishInfo(ExchangeImpl<?> exchange, boolean immediate, boolean mandatory, String routingKey) - { - _exchange = exchange; - _immediate = immediate; - _mandatory = mandatory; - _routingKey = routingKey; - } - - @Override - public AMQShortString getExchange() - { - return new AMQShortString(_exchange.getName()); - } - - @Override - public void setExchange(AMQShortString exchange) - { - //no-op - } - - @Override - public boolean isImmediate() - { - return _immediate; - } - - @Override - public boolean isMandatory() - { - return _mandatory; - } - - @Override - public AMQShortString getRoutingKey() - { - return new AMQShortString(_routingKey); - } - } - } Modified: qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/test/unit/close/JavaServerCloseRaceConditionTest.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/test/unit/close/JavaServerCloseRaceConditionTest.java?rev=1632583&r1=1632582&r2=1632583&view=diff ============================================================================== --- qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/test/unit/close/JavaServerCloseRaceConditionTest.java (original) +++ qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/test/unit/close/JavaServerCloseRaceConditionTest.java Fri Oct 17 14:23:19 2014 @@ -86,8 +86,16 @@ public class JavaServerCloseRaceConditio // Set no wait true so that we block the connection // Also set a different exchange class string so the attempt to declare // the exchange causes an exchange. - ExchangeDeclareBody body = session.getMethodRegistry().createExchangeDeclareBody(session.getTicket(), new AMQShortString(EXCHANGE_NAME), null, - true, false, false, false, true, null); + ExchangeDeclareBody body = session.getMethodRegistry().createExchangeDeclareBody(session.getTicket(), + new AMQShortString( + EXCHANGE_NAME), + null, + true, + false, + false, + false, + true, + null); AMQFrame exchangeDeclare = body.generateFrame(session.getChannelId()); Modified: qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/transport/MaxFrameSizeTest.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/transport/MaxFrameSizeTest.java?rev=1632583&r1=1632582&r2=1632583&view=diff ============================================================================== --- qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/transport/MaxFrameSizeTest.java (original) +++ qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/transport/MaxFrameSizeTest.java Fri Oct 17 14:23:19 2014 @@ -26,8 +26,8 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.net.Socket; +import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; -import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -41,22 +41,19 @@ import javax.security.sasl.Sasl; import javax.security.sasl.SaslClient; import javax.security.sasl.SaslException; -import org.apache.qpid.codec.MarkableDataInput; -import org.apache.qpid.framing.AMQBody; -import org.apache.qpid.framing.AMQDataBlockDecoder; +import org.apache.qpid.codec.AMQDecoder; +import org.apache.qpid.codec.ClientDecoder; +import org.apache.qpid.framing.AMQDataBlock; import org.apache.qpid.framing.AMQFrame; import org.apache.qpid.framing.AMQFrameDecodingException; import org.apache.qpid.framing.AMQProtocolVersionException; import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.BodyFactory; -import org.apache.qpid.framing.ByteArrayDataInput; import org.apache.qpid.framing.ConnectionCloseBody; import org.apache.qpid.framing.ConnectionStartOkBody; import org.apache.qpid.framing.ConnectionTuneOkBody; import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.framing.amqp_0_91.ConnectionStartOkBodyImpl; -import org.apache.qpid.framing.amqp_0_91.ConnectionTuneOkBodyImpl; -import org.apache.qpid.framing.amqp_0_91.MethodRegistry_0_91; +import org.apache.qpid.framing.FrameCreatingMethodProcessor; +import org.apache.qpid.framing.ProtocolVersion; import org.apache.qpid.jms.BrokerDetails; import org.apache.qpid.server.model.AuthenticationProvider; import org.apache.qpid.server.model.Broker; @@ -114,11 +111,11 @@ public class MaxFrameSizeTest extends Qp { @Override - public void evaluate(final Socket socket, final List<AMQFrame> frames) + public void evaluate(final Socket socket, final List<AMQDataBlock> frames) { if(!socket.isClosed()) { - AMQFrame lastFrame = frames.get(frames.size() - 1); + AMQFrame lastFrame = (AMQFrame) frames.get(frames.size() - 1); assertTrue("Connection should not be possible with a frame size < " + Constant.MIN_MAX_FRAME_SIZE, lastFrame.getBodyFrame() instanceof ConnectionCloseBody); } } @@ -163,11 +160,11 @@ public class MaxFrameSizeTest extends Qp { @Override - public void evaluate(final Socket socket, final List<AMQFrame> frames) + public void evaluate(final Socket socket, final List<AMQDataBlock> frames) { if(!socket.isClosed()) { - AMQFrame lastFrame = frames.get(frames.size() - 1); + AMQFrame lastFrame = (AMQFrame) frames.get(frames.size() - 1); assertTrue("Connection should not be possible with a frame size larger than the broker requested", lastFrame.getBodyFrame() instanceof ConnectionCloseBody); } } @@ -177,7 +174,7 @@ public class MaxFrameSizeTest extends Qp private static interface ResultEvaluator { - void evaluate(Socket socket, List<AMQFrame> frames); + void evaluate(Socket socket, List<AMQDataBlock> frames); } private void doAMQP08test(int frameSize, ResultEvaluator evaluator) @@ -220,12 +217,12 @@ public class MaxFrameSizeTest extends Qp response[i++] = b; } - ConnectionStartOkBody startOK = new ConnectionStartOkBodyImpl(new FieldTable(), AMQShortString.valueOf("PLAIN"), response, AMQShortString.valueOf("en_US")); + ConnectionStartOkBody startOK = new ConnectionStartOkBody(new FieldTable(), AMQShortString.valueOf("PLAIN"), response, AMQShortString.valueOf("en_US")); DataOutputStream dos = new DataOutputStream(os); new AMQFrame(0, startOK).writePayload(dos); dos.flush(); - ConnectionTuneOkBody tuneOk = new ConnectionTuneOkBodyImpl(256, frameSize, 0); + ConnectionTuneOkBody tuneOk = new ConnectionTuneOkBody(256, frameSize, 0); new AMQFrame(0, tuneOk).writePayload(dos); dos.flush(); socket.setSoTimeout(5000); @@ -238,26 +235,11 @@ public class MaxFrameSizeTest extends Qp } byte[] serverData = baos.toByteArray(); - ByteArrayDataInput badi = new ByteArrayDataInput(serverData); - AMQDataBlockDecoder datablockDecoder = new AMQDataBlockDecoder(); - final MethodRegistry_0_91 methodRegistry_0_91 = new MethodRegistry_0_91(); - BodyFactory methodBodyFactory = new BodyFactory() - { - @Override - public AMQBody createBody(final MarkableDataInput in, final long bodySize) - throws AMQFrameDecodingException, IOException - { - return methodRegistry_0_91.convertToBody(in, bodySize); - } - }; - - List<AMQFrame> frames = new ArrayList<>(); - while (datablockDecoder.decodable(badi)) - { - frames.add(datablockDecoder.createAndPopulateFrame(methodBodyFactory, badi)); - } + final FrameCreatingMethodProcessor methodProcessor = new FrameCreatingMethodProcessor(ProtocolVersion.v0_91); + AMQDecoder decoder = new ClientDecoder(methodProcessor); + decoder.decodeBuffer(ByteBuffer.wrap(serverData)); - evaluator.evaluate(socket, frames); + evaluator.evaluate(socket, methodProcessor.getProcessedMethods()); } private static class TestClientDelegate extends ClientDelegate --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
