http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/660c206d/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/PropertyConverter_v0_8_to_InternalTest.java ---------------------------------------------------------------------- diff --git a/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/PropertyConverter_v0_8_to_InternalTest.java b/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/PropertyConverter_v0_8_to_InternalTest.java index cc9a2ed..4c9cf37 100644 --- a/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/PropertyConverter_v0_8_to_InternalTest.java +++ b/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/PropertyConverter_v0_8_to_InternalTest.java @@ -24,7 +24,6 @@ package org.apache.qpid.server.protocol.v0_8; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -313,8 +312,12 @@ public class PropertyConverter_v0_8_to_InternalTest extends QpidTestCase if (content != null) { when(storedMessage.getContentSize()).thenReturn(content.length); - when(storedMessage.getContent(0, content.length)).thenReturn(Collections.singleton(QpidByteBuffer.wrap( - content))); + when(storedMessage.getContent(0, content.length)).thenReturn(QpidByteBuffer.wrap(content)); + } + else + { + when(storedMessage.getContentSize()).thenReturn(0); + when(storedMessage.getContent(0, 0)).thenReturn(QpidByteBuffer.emptyQpidByteBuffer()); } return new AMQMessage(storedMessage);
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/660c206d/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java ---------------------------------------------------------------------- diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java index a1576cd..660d995 100644 --- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java +++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java @@ -21,7 +21,6 @@ package org.apache.qpid.server.protocol.v1_0; import java.util.Iterator; -import java.util.List; import org.apache.qpid.server.bytebuffer.QpidByteBuffer; import org.apache.qpid.server.logging.EventLoggerProvider; @@ -71,7 +70,7 @@ public interface AMQPConnection_1_0<C extends AMQPConnection_1_0<C>> extends AMQ AMQPDescribedTypeRegistry getDescribedTypeRegistry(); - int sendFrame(int channel, FrameBody body, List<QpidByteBuffer> payload); + int sendFrame(int channel, FrameBody body, QpidByteBuffer payload); void sendFrame(int channel, FrameBody body); http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/660c206d/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java ---------------------------------------------------------------------- diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java index 8419b0b..bbdc148 100644 --- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java +++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java @@ -58,7 +58,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.qpid.server.bytebuffer.QpidByteBuffer; -import org.apache.qpid.server.bytebuffer.QpidByteBufferUtils; import org.apache.qpid.server.logging.messages.ConnectionMessages; import org.apache.qpid.server.model.AuthenticationProvider; import org.apache.qpid.server.model.Broker; @@ -444,37 +443,60 @@ public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnectio @Override public void receive(final List<ChannelFrameBody> channelFrameBodies) { - PeekingIterator<ChannelFrameBody> itr = Iterators.peekingIterator(channelFrameBodies.iterator()); - - while(itr.hasNext()) + if (!channelFrameBodies.isEmpty()) { - final ChannelFrameBody channelFrameBody = itr.next(); - final int frameChannel = channelFrameBody.getChannel(); - - Session_1_0 session = _receivingSessions == null || frameChannel >= _receivingSessions.length ? null : _receivingSessions[frameChannel]; - if (session != null) + PeekingIterator<ChannelFrameBody> itr = Iterators.peekingIterator(channelFrameBodies.iterator()); + boolean cleanExit = false; + try { - final AccessControlContext context = session.getAccessControllerContext(); - AccessController.doPrivileged((PrivilegedAction<Void>) () -> + while (itr.hasNext()) { - ChannelFrameBody channelFrame = channelFrameBody; - boolean nextIsSameChannel; - do + final ChannelFrameBody channelFrameBody = itr.next(); + final int frameChannel = channelFrameBody.getChannel(); + + Session_1_0 session = _receivingSessions == null || frameChannel >= _receivingSessions.length + ? null + : _receivingSessions[frameChannel]; + if (session != null) { - received(frameChannel, channelFrame.getFrameBody()); - nextIsSameChannel = itr.hasNext() && frameChannel == itr.peek().getChannel(); - if (nextIsSameChannel) + final AccessControlContext context = session.getAccessControllerContext(); + AccessController.doPrivileged((PrivilegedAction<Void>) () -> { - channelFrame = itr.next(); - } + ChannelFrameBody channelFrame = channelFrameBody; + boolean nextIsSameChannel; + do + { + received(frameChannel, channelFrame.getFrameBody()); + nextIsSameChannel = itr.hasNext() && frameChannel == itr.peek().getChannel(); + if (nextIsSameChannel) + { + channelFrame = itr.next(); + } + } + while (nextIsSameChannel); + return null; + }, context); } - while (nextIsSameChannel); - return null; - }, context); + else + { + received(frameChannel, channelFrameBody.getFrameBody()); + } + } + cleanExit = true; } - else + finally { - received(frameChannel, channelFrameBody.getFrameBody()); + if (!cleanExit) + { + while (itr.hasNext()) + { + final Object frameBody = itr.next().getFrameBody(); + if (frameBody instanceof Transfer) + { + ((Transfer) frameBody).dispose(); + } + } + } } } } @@ -1178,7 +1200,7 @@ public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnectio } @Override - public int sendFrame(final int channel, final FrameBody body, final List<QpidByteBuffer> payload) + public int sendFrame(final int channel, final FrameBody body, final QpidByteBuffer payload) { if (!_closedForOutput) { @@ -1192,8 +1214,8 @@ public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnectio { int size = writer.getEncodedSize(); int maxPayloadSize = _maxFrameSize - (size + 9); - long payloadLength = QpidByteBufferUtils.remaining(payload); - if(payloadLength <= maxPayloadSize) + long payloadLength = (long) payload.remaining(); + if (payloadLength <= maxPayloadSize) { send(AMQFrame.createAMQFrame(channel, body, payload)); return (int)payloadLength; @@ -1206,32 +1228,10 @@ public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnectio size = writer.getEncodedSize(); maxPayloadSize = _maxFrameSize - (size + 9); - List<QpidByteBuffer> payloadDup = new ArrayList<>(payload.size()); - int payloadSize = 0; - for(QpidByteBuffer buf : payload) + try (QpidByteBuffer payloadDup = payload.view(0, maxPayloadSize)) { - if (buf.hasRemaining()) - { - if (payloadSize + buf.remaining() < maxPayloadSize) - { - payloadSize += buf.remaining(); - payloadDup.add(buf.duplicate()); - } - else - { - QpidByteBuffer dup = buf.slice(); - dup.limit(maxPayloadSize - payloadSize); - payloadDup.add(dup); - break; - } - } - } - - QpidByteBufferUtils.skip(payload, maxPayloadSize); - send(AMQFrame.createAMQFrame(channel, body, payloadDup)); - for(QpidByteBuffer buf : payloadDup) - { - buf.dispose(); + payload.position(payload.position() + maxPayloadSize); + send(AMQFrame.createAMQFrame(channel, body, payloadDup)); } return maxPayloadSize; @@ -1361,8 +1361,10 @@ public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnectio throw new ConnectionScopedRuntimeException("SASL Layer header received after SASL already established"); } - getSender().send(QpidByteBuffer.wrap(SASL_HEADER)); - + try (QpidByteBuffer protocolHeader = QpidByteBuffer.wrap(SASL_HEADER)) + { + getSender().send(protocolHeader); + } SaslMechanisms mechanisms = new SaslMechanisms(); ArrayList<Symbol> mechanismsList = new ArrayList<>(); for (String name : authenticationProvider.getAvailableMechanisms(getTransport().isSecure())) @@ -1397,7 +1399,10 @@ public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnectio } } - getSender().send(QpidByteBuffer.wrap(AMQP_HEADER)); + try (QpidByteBuffer protocolHeader = QpidByteBuffer.wrap(AMQP_HEADER)) + { + getSender().send(protocolHeader); + } _connectionState = ConnectionState.AWAIT_OPEN; _frameHandler = getFrameHandler(false); http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/660c206d/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractReceivingLinkEndpoint.java ---------------------------------------------------------------------- diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractReceivingLinkEndpoint.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractReceivingLinkEndpoint.java index 5cc329d..1579573 100644 --- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractReceivingLinkEndpoint.java +++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractReceivingLinkEndpoint.java @@ -83,6 +83,12 @@ public abstract class AbstractReceivingLinkEndpoint<T extends BaseTarget> extend Error error = validateTransfer(transfer); if (error != null) { + transfer.dispose(); + if (_currentDelivery != null) + { + _currentDelivery.discard(); + _currentDelivery = null; + } close(error); return; } @@ -92,6 +98,7 @@ public abstract class AbstractReceivingLinkEndpoint<T extends BaseTarget> extend error = validateNewTransfer(transfer); if (error != null) { + transfer.dispose(); close(error); return; } @@ -109,6 +116,9 @@ public abstract class AbstractReceivingLinkEndpoint<T extends BaseTarget> extend error = validateSubsequentTransfer(transfer); if (error != null) { + transfer.dispose(); + _currentDelivery.discard(); + _currentDelivery = null; close(error); return; } @@ -121,6 +131,8 @@ public abstract class AbstractReceivingLinkEndpoint<T extends BaseTarget> extend String.format("delivery '%s' exceeds max-message-size %d", _currentDelivery.getDeliveryTag(), getSession().getConnection().getMaxMessageSize())); + _currentDelivery.discard(); + _currentDelivery = null; close(error); return; } http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/660c206d/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java ---------------------------------------------------------------------- diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java index 92237a0..5cfb7c4 100644 --- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java +++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java @@ -22,7 +22,6 @@ package org.apache.qpid.server.protocol.v1_0; import java.nio.ByteBuffer; import java.util.ArrayList; -import java.util.Collection; import java.util.List; import org.slf4j.Logger; @@ -152,7 +151,7 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget<ConsumerTarget_1_0> Transfer transfer = new Transfer(); try { - Collection<QpidByteBuffer> bodyContent = message.getContent(0, (int) message.getSize()); + QpidByteBuffer bodyContent = message.getContent(); HeaderSection headerSection = message.getHeaderSection(); UnsignedInteger ttl = headerSection == null ? null : headerSection.getValue().getTtl(); @@ -184,50 +183,49 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget<ConsumerTarget_1_0> List<QpidByteBuffer> payload = new ArrayList<>(); if(headerSection != null) { - payload.addAll(headerSection.getEncodedForm()); + payload.add(headerSection.getEncodedForm()); headerSection.dispose(); } EncodingRetainingSection<?> section; if((section = message.getDeliveryAnnotationsSection()) != null) { - payload.addAll(section.getEncodedForm()); + payload.add(section.getEncodedForm()); section.dispose(); } if((section = message.getMessageAnnotationsSection()) != null) { - payload.addAll(section.getEncodedForm()); + payload.add(section.getEncodedForm()); section.dispose(); } if((section = message.getPropertiesSection()) != null) { - payload.addAll(section.getEncodedForm()); + payload.add(section.getEncodedForm()); section.dispose(); } if((section = message.getApplicationPropertiesSection()) != null) { - payload.addAll(section.getEncodedForm()); + payload.add(section.getEncodedForm()); section.dispose(); } - payload.addAll(bodyContent); + payload.add(bodyContent); if((section = message.getFooterSection()) != null) { - payload.addAll(section.getEncodedForm()); + payload.add(section.getEncodedForm()); section.dispose(); } - - transfer.setPayload(payload); - - for(QpidByteBuffer buf : payload) + try (QpidByteBuffer combined = QpidByteBuffer.concatenate(payload)) { - buf.dispose(); + transfer.setPayload(combined); } + payload.forEach(QpidByteBuffer::dispose); + byte[] data = new byte[8]; ByteBuffer.wrap(data).putLong(_deliveryTag++); final Binary tag = new Binary(data); http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/660c206d/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Delivery.java ---------------------------------------------------------------------- diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Delivery.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Delivery.java index 877e934..0f8e893 100644 --- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Delivery.java +++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Delivery.java @@ -24,7 +24,6 @@ import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; import org.apache.qpid.server.bytebuffer.QpidByteBuffer; -import org.apache.qpid.server.bytebuffer.QpidByteBufferUtils; import org.apache.qpid.server.protocol.v1_0.type.BaseSource; import org.apache.qpid.server.protocol.v1_0.type.BaseTarget; import org.apache.qpid.server.protocol.v1_0.type.Binary; @@ -171,11 +170,12 @@ public class Delivery } } - final List<QpidByteBuffer> payload = transfer.getPayload(); - if (payload != null) + try (QpidByteBuffer payload = transfer.getPayload()) { - _totalPayloadSize += QpidByteBufferUtils.remaining(payload); - QpidByteBufferUtils.dispose(payload); + if (payload != null) + { + _totalPayloadSize += (long) payload.remaining(); + } } } @@ -184,17 +184,18 @@ public class Delivery return _linkEndpoint; } - - public List<QpidByteBuffer> getPayload() + public QpidByteBuffer getPayload() { - List<QpidByteBuffer> fragments = new ArrayList<>(_transfers.size()); + List<QpidByteBuffer> transferBuffers = new ArrayList<>(_transfers.size()); for (Transfer t : _transfers) { - fragments.addAll(t.getPayload()); + transferBuffers.add(t.getPayload()); t.dispose(); } _transfers.clear(); - return fragments; + final QpidByteBuffer combined = QpidByteBuffer.concatenate(transferBuffers); + transferBuffers.forEach(QpidByteBuffer::dispose); + return combined; } public void discard() http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/660c206d/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_from_1_0.java ---------------------------------------------------------------------- diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_from_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_from_1_0.java index a0158ad..b744d25 100644 --- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_from_1_0.java +++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_from_1_0.java @@ -26,7 +26,6 @@ import java.io.Serializable; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collection; import java.util.Date; import java.util.HashSet; import java.util.LinkedHashMap; @@ -57,6 +56,7 @@ import org.apache.qpid.server.protocol.v1_0.type.messaging.HeaderSection; import org.apache.qpid.server.protocol.v1_0.type.messaging.MessageAnnotationsSection; import org.apache.qpid.server.protocol.v1_0.type.messaging.Properties; import org.apache.qpid.server.protocol.v1_0.type.messaging.PropertiesSection; +import org.apache.qpid.server.protocol.v1_0.type.messaging.codec.EncodingRetaining; import org.apache.qpid.server.util.ConnectionScopedRuntimeException; import org.apache.qpid.server.util.ServerScopedRuntimeException; @@ -78,30 +78,26 @@ public class MessageConverter_from_1_0 static Object convertBodyToObject(final Message_1_0 serverMessage) { - final Collection<QpidByteBuffer> allData = serverMessage.getContent(0, (int) serverMessage.getSize()); SectionDecoderImpl sectionDecoder = new SectionDecoderImpl(MessageConverter_v1_0_to_Internal.TYPE_REGISTRY.getSectionDecoderRegistry()); Object bodyObject = null; + List<EncodingRetainingSection<?>> sections = null; try { - List<EncodingRetainingSection<?>> sections = sectionDecoder.parseAll(new ArrayList<>(allData)); - for(QpidByteBuffer buf : allData) + try (QpidByteBuffer allData = serverMessage.getContent()) { - buf.dispose(); + sections = sectionDecoder.parseAll(allData); } + List<EncodingRetainingSection<?>> bodySections = new ArrayList<>(sections.size()); ListIterator<EncodingRetainingSection<?>> iterator = sections.listIterator(); EncodingRetainingSection<?> previousSection = null; while(iterator.hasNext()) { EncodingRetainingSection<?> section = iterator.next(); - if(!(section instanceof AmqpValueSection || section instanceof DataSection || section instanceof AmqpSequenceSection)) + if (section instanceof AmqpValueSection || section instanceof DataSection || section instanceof AmqpSequenceSection) { - iterator.remove(); - } - else - { - if(previousSection != null && (previousSection.getClass() != section.getClass() || section instanceof AmqpValueSection)) + if (previousSection != null && (previousSection.getClass() != section.getClass() || section instanceof AmqpValueSection)) { throw new MessageConversionException("Message is badly formed and has multiple body section which are not all Data or not all AmqpSequence"); } @@ -109,13 +105,14 @@ public class MessageConverter_from_1_0 { previousSection = section; } + bodySections.add(section); } } // In 1.0 of the spec, it is illegal to have message with no body but AMQP-127 asks to have that restriction lifted - if(!sections.isEmpty()) + if (!bodySections.isEmpty()) { - EncodingRetainingSection<?> firstBodySection = sections.get(0); + EncodingRetainingSection<?> firstBodySection = bodySections.get(0); if(firstBodySection instanceof AmqpValueSection) { bodyObject = convertValue(firstBodySection.getValue()); @@ -123,13 +120,13 @@ public class MessageConverter_from_1_0 else if(firstBodySection instanceof DataSection) { int totalSize = 0; - for(EncodingRetainingSection<?> section : sections) + for(EncodingRetainingSection<?> section : bodySections) { totalSize += ((DataSection)section).getValue().getArray().length; } byte[] bodyData = new byte[totalSize]; ByteBuffer buf = ByteBuffer.wrap(bodyData); - for(EncodingRetainingSection<?> section : sections) + for(EncodingRetainingSection<?> section : bodySections) { buf.put(((DataSection) section).getValue().asByteBuffer()); } @@ -138,7 +135,7 @@ public class MessageConverter_from_1_0 else { ArrayList<Object> totalSequence = new ArrayList<>(); - for(EncodingRetainingSection<?> section : sections) + for(EncodingRetainingSection<?> section : bodySections) { totalSequence.addAll(((AmqpSequenceSection)section).getValue()); } @@ -151,6 +148,13 @@ public class MessageConverter_from_1_0 { throw new ConnectionScopedRuntimeException(e); } + finally + { + if (sections != null) + { + sections.forEach(EncodingRetaining::dispose); + } + } return bodyObject; } @@ -315,6 +319,7 @@ public class MessageConverter_from_1_0 if (section != null) { Map<Symbol, Object> annotations = section.getValue(); + section.dispose(); if (annotations != null && annotations.containsKey(JmsMessageTypeAnnotation.ANNOTATION_KEY)) { Object object = annotations.get(JmsMessageTypeAnnotation.ANNOTATION_KEY); @@ -337,9 +342,11 @@ public class MessageConverter_from_1_0 public static Symbol getContentType(final Message_1_0 serverMsg) { final PropertiesSection propertiesSection = serverMsg.getPropertiesSection(); + if (propertiesSection != null) { final Properties properties = propertiesSection.getValue(); + propertiesSection.dispose(); if (properties != null) { return properties.getContentType(); @@ -354,6 +361,7 @@ public class MessageConverter_from_1_0 if (propertiesSection != null) { final Properties properties = propertiesSection.getValue(); + propertiesSection.dispose(); if (properties != null) { return properties.getGroupSequence(); @@ -368,6 +376,7 @@ public class MessageConverter_from_1_0 if (propertiesSection != null) { final Properties properties = propertiesSection.getValue(); + propertiesSection.dispose(); if (properties != null) { return properties.getGroupId(); @@ -382,6 +391,7 @@ public class MessageConverter_from_1_0 if (propertiesSection != null) { final Properties properties = propertiesSection.getValue(); + propertiesSection.dispose(); if (properties != null) { return properties.getCreationTime(); @@ -396,6 +406,7 @@ public class MessageConverter_from_1_0 if (propertiesSection != null) { final Properties properties = propertiesSection.getValue(); + propertiesSection.dispose(); if (properties != null) { return properties.getAbsoluteExpiryTime(); @@ -410,6 +421,7 @@ public class MessageConverter_from_1_0 if (headerSection != null) { Header header = headerSection.getValue(); + headerSection.dispose(); if (header != null) { UnsignedInteger ttl = header.getTtl(); @@ -429,6 +441,7 @@ public class MessageConverter_from_1_0 if (propertiesSection != null) { final Properties properties = propertiesSection.getValue(); + propertiesSection.dispose(); if (properties != null) { userId = properties.getUserId(); @@ -444,6 +457,7 @@ public class MessageConverter_from_1_0 if (propertiesSection != null) { final Properties properties = propertiesSection.getValue(); + propertiesSection.dispose(); if (properties != null) { replyTo = properties.getReplyTo(); @@ -459,6 +473,7 @@ public class MessageConverter_from_1_0 if (propertiesSection != null) { final Properties properties = propertiesSection.getValue(); + propertiesSection.dispose(); if (properties != null) { contentEncoding = properties.getContentEncoding(); @@ -474,6 +489,7 @@ public class MessageConverter_from_1_0 if (propertiesSection != null) { final Properties properties = propertiesSection.getValue(); + propertiesSection.dispose(); if (properties != null) { correlationIdObject = properties.getCorrelationId(); @@ -490,6 +506,7 @@ public class MessageConverter_from_1_0 if (propertiesSection != null) { final Properties properties = propertiesSection.getValue(); + propertiesSection.dispose(); if (properties != null) { messageId = properties.getMessageId(); http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/660c206d/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java ---------------------------------------------------------------------- diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java index fa85d84..71c3b06 100644 --- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java +++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java @@ -38,7 +38,6 @@ import java.io.IOException; import java.io.ObjectOutputStream; import java.nio.ByteBuffer; import java.util.ArrayList; -import java.util.Collection; import java.util.Collections; import java.util.LinkedHashMap; import java.util.List; @@ -357,16 +356,13 @@ public abstract class MessageConverter_to_1_0<M extends ServerMessage> implement { final String mimeType = serverMessage.getMessageHeader().getMimeType(); byte[] data = new byte[(int) serverMessage.getSize()]; - int total = 0; - for(QpidByteBuffer b : serverMessage.getContent(0, (int) serverMessage.getSize())) + + try (QpidByteBuffer content = serverMessage.getContent()) { - int len = b.remaining(); - b.get(data, total, len); - b.dispose(); - total += len; + content.get(data); } - byte[] uncompressed; + byte[] uncompressed; if(GZIPUtils.GZIP_CONTENT_ENCODING.equals(serverMessage.getMessageHeader().getEncoding()) && (uncompressed = GZIPUtils.uncompressBufferToArray(ByteBuffer.wrap(data))) != null) { @@ -420,35 +416,12 @@ public abstract class MessageConverter_to_1_0<M extends ServerMessage> implement } @Override - public Collection<QpidByteBuffer> getContent(int offset, int length) + public QpidByteBuffer getContent(int offset, int length) { - int position = 0; - List<QpidByteBuffer> content = new ArrayList<>(); - for(QpidByteBuffer buf : _section.getEncodedForm()) + try (QpidByteBuffer content = _section.getEncodedForm()) { - if(position < offset) - { - if(offset - position < buf.remaining()) - { - QpidByteBuffer view = buf.view(offset - position, Math.min(length, buf.remaining() - (offset-position))); - content.add(view); - position += view.remaining(); - } - else - { - position += buf.remaining(); - } - } - else if(position <= offset+length) - { - QpidByteBuffer view = buf.view(0, Math.min(length - (position-offset), buf.remaining())); - content.add(view); - position += view.remaining(); - } - - buf.dispose(); + return content.view(offset, length); } - return content; } @Override @@ -490,6 +463,7 @@ public abstract class MessageConverter_to_1_0<M extends ServerMessage> implement private void dispose() { _section.dispose(); + _metaData.dispose(); } } } http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/660c206d/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageFormat_1_0.java ---------------------------------------------------------------------- diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageFormat_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageFormat_1_0.java index 6da3327..bf9731d 100644 --- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageFormat_1_0.java +++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageFormat_1_0.java @@ -83,13 +83,13 @@ public class MessageFormat_1_0 implements MessageFormat<Message_1_0> } @Override - public List<QpidByteBuffer> convertToMessageFormat(final Message_1_0 message) + public QpidByteBuffer convertToMessageFormat(final Message_1_0 message) { throw new UnsupportedOperationException("not implemented"); } @Override - public Message_1_0 createMessage(final List<QpidByteBuffer> buf, + public Message_1_0 createMessage(final QpidByteBuffer payload, final MessageStore store, final Object connectionReference) { @@ -98,7 +98,7 @@ public class MessageFormat_1_0 implements MessageFormat<Message_1_0> List<EncodingRetainingSection<?>> allSections; try { - allSections = getSectionDecoder().parseAll(buf); + allSections = getSectionDecoder().parseAll(payload); } catch (AmqpErrorException e) { @@ -110,10 +110,9 @@ public class MessageFormat_1_0 implements MessageFormat<Message_1_0> for (EncodingRetainingSection<?> dataSection : dataSections) { - for (QpidByteBuffer buffer : dataSection.getEncodedForm()) + try (QpidByteBuffer encodedForm = dataSection.getEncodedForm()) { - handle.addContent(buffer); - buffer.dispose(); + handle.addContent(encodedForm); } dataSection.dispose(); } http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/660c206d/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaDataType_1_0.java ---------------------------------------------------------------------- diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaDataType_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaDataType_1_0.java index 7db8bcd..97fff01 100644 --- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaDataType_1_0.java +++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaDataType_1_0.java @@ -20,8 +20,6 @@ */ package org.apache.qpid.server.protocol.v1_0; -import java.util.List; - import org.apache.qpid.server.bytebuffer.QpidByteBuffer; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.plugin.MessageMetaDataType; @@ -42,9 +40,9 @@ public class MessageMetaDataType_1_0 implements MessageMetaDataType<MessageMetaD } @Override - public MessageMetaData_1_0 createMetaData(List<QpidByteBuffer> bufs) + public MessageMetaData_1_0 createMetaData(QpidByteBuffer buf) { - return MessageMetaData_1_0.FACTORY.createMetaData(bufs); + return MessageMetaData_1_0.FACTORY.createMetaData(buf); } @Override http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/660c206d/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java ---------------------------------------------------------------------- diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java index 6bd9249..e3f9a0e 100755 --- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java +++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java @@ -34,7 +34,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.qpid.server.bytebuffer.QpidByteBuffer; -import org.apache.qpid.server.bytebuffer.QpidByteBufferUtils; import org.apache.qpid.server.message.AMQMessageHeader; import org.apache.qpid.server.plugin.MessageMetaDataType; import org.apache.qpid.server.protocol.v1_0.messaging.SectionDecoder; @@ -415,37 +414,29 @@ public class MessageMetaData_1_0 implements StorableMessageMetaData } @Override - public MessageMetaData_1_0 createMetaData(List<QpidByteBuffer> bufs) + public MessageMetaData_1_0 createMetaData(QpidByteBuffer buf) { try { - if (!QpidByteBufferUtils.hasRemaining(bufs)) + if (!buf.hasRemaining()) { throw new ConnectionScopedRuntimeException("No metadata found"); } - byte versionByte = 0; - for (final QpidByteBuffer buf : bufs) - { - if (buf.hasRemaining()) - { - versionByte = buf.get(buf.position()); - break; - } - } + byte versionByte = buf.get(buf.position()); long arrivalTime; long contentSize = 0; if (versionByte == 1) { - if (!QpidByteBufferUtils.hasRemaining(bufs, 17)) + if (!buf.hasRemaining(17)) { throw new ConnectionScopedRuntimeException("Cannot decode stored message meta data."); } // we can discard the first byte - QpidByteBufferUtils.get(bufs); - arrivalTime = QpidByteBufferUtils.getLong(bufs); - contentSize = QpidByteBufferUtils.getLong(bufs); + buf.get(); + arrivalTime = buf.getLong(); + contentSize = buf.getLong(); } else if (versionByte == 0) { @@ -460,7 +451,7 @@ public class MessageMetaData_1_0 implements StorableMessageMetaData SectionDecoder sectionDecoder = new SectionDecoderImpl(_typeRegistry.getSectionDecoderRegistry()); - List<EncodingRetainingSection<?>> sections = sectionDecoder.parseAll(bufs); + List<EncodingRetainingSection<?>> sections = sectionDecoder.parseAll(buf); if (versionByte == 0) { http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/660c206d/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java ---------------------------------------------------------------------- diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java index 50fd0f3..e02f24a 100644 --- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java +++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java @@ -22,14 +22,11 @@ package org.apache.qpid.server.protocol.v1_0; import java.util.ArrayList; -import java.util.Collection; -import java.util.Iterator; import java.util.List; import org.apache.qpid.server.bytebuffer.QpidByteBuffer; import org.apache.qpid.server.message.AbstractServerMessageImpl; import org.apache.qpid.server.model.Queue; -import org.apache.qpid.server.bytebuffer.QpidByteBufferUtils; import org.apache.qpid.server.protocol.v1_0.messaging.SectionDecoder; import org.apache.qpid.server.protocol.v1_0.messaging.SectionDecoderImpl; import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException; @@ -144,12 +141,6 @@ public class Message_1_0 extends AbstractServerMessageImpl<Message_1_0, MessageM return resource instanceof Queue && ((Queue<?>)resource).isHoldOnPublishEnabled(); } - - public Collection<QpidByteBuffer> getFragments() - { - return getContent(0, (int) getSize()); - } - public HeaderSection getHeaderSection() { return getMessageMetaData().getHeaderSection(); @@ -181,7 +172,7 @@ public class Message_1_0 extends AbstractServerMessageImpl<Message_1_0, MessageM } @Override - public Collection<QpidByteBuffer> getContent(final int offset, final int length) + public QpidByteBuffer getContent(final int offset, final int length) { if(getMessageMetaData().getVersion() == 0) { @@ -189,15 +180,12 @@ public class Message_1_0 extends AbstractServerMessageImpl<Message_1_0, MessageM try { - final Collection<QpidByteBuffer> allSectionsContent = super.getContent(0, Integer.MAX_VALUE); - - List<EncodingRetainingSection<?>> sections = sectionDecoder.parseAll(new ArrayList<>(allSectionsContent)); - - List<QpidByteBuffer> bodySectionContent = new ArrayList<>(); - for(QpidByteBuffer buf : allSectionsContent) + List<EncodingRetainingSection<?>> sections; + try (QpidByteBuffer allSectionsContent = super.getContent()) { - buf.dispose(); + sections = sectionDecoder.parseAll(allSectionsContent); } + List<QpidByteBuffer> bodySectionContent = new ArrayList<>(); for (final EncodingRetainingSection<?> section : sections) { @@ -205,44 +193,14 @@ public class Message_1_0 extends AbstractServerMessageImpl<Message_1_0, MessageM || section instanceof AmqpValueSection || section instanceof AmqpSequenceSection) { - bodySectionContent.addAll(section.getEncodedForm()); + bodySectionContent.add(section.getEncodedForm()); } section.dispose(); } - if(offset == 0 && length >= QpidByteBufferUtils.remaining(bodySectionContent)) - { - return bodySectionContent; - } - else + try (QpidByteBuffer bodyContent = QpidByteBuffer.concatenate(bodySectionContent)) { - final Collection<QpidByteBuffer> contentView = new ArrayList<>(); - int position = 0; - for(QpidByteBuffer buf :bodySectionContent) - { - if (position < offset) - { - if (offset - position < buf.remaining()) - { - QpidByteBuffer view = buf.view(offset - position, - Math.min(length, buf.remaining() - (offset - position))); - contentView.add(view); - position += view.remaining(); - } - else - { - position += buf.remaining(); - } - } - else if (position <= offset + length) - { - QpidByteBuffer view = buf.view(0, Math.min(length - (position - offset), buf.remaining())); - contentView.add(view); - position += view.remaining(); - } - - buf.dispose(); - } - return contentView; + bodySectionContent.forEach(QpidByteBuffer::dispose); + return bodyContent.view(offset, length); } } catch (AmqpErrorException e) http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/660c206d/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java ---------------------------------------------------------------------- diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java index 6156a57..753fcec 100644 --- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java +++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java @@ -50,7 +50,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.qpid.server.bytebuffer.QpidByteBuffer; -import org.apache.qpid.server.bytebuffer.QpidByteBufferUtils; import org.apache.qpid.server.exchange.ExchangeDefaults; import org.apache.qpid.server.logging.LogMessage; import org.apache.qpid.server.logging.LogSubject; @@ -286,10 +285,9 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget } _remoteIncomingWindow--; - List<QpidByteBuffer> payload = xfr.getPayload(); - try + try (QpidByteBuffer payload = xfr.getPayload()) { - long remaining = payload == null ? 0 : QpidByteBufferUtils.remaining(payload); + long remaining = payload == null ? 0 : (long) payload.remaining(); int payloadSent = _connection.sendFrame(_sendingChannel, xfr, payload); if(payload != null) { @@ -305,7 +303,7 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget _nextOutgoingId.incr(); _remoteIncomingWindow--; - remaining = QpidByteBufferUtils.remaining(payload); + remaining = (long) payload.remaining(); payloadSent = _connection.sendFrame(_sendingChannel, continuationTransfer, payload); continuationTransfer.dispose(); @@ -316,16 +314,6 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget { throw new ConnectionScopedRuntimeException(e); } - finally - { - if(payload != null) - { - for (QpidByteBuffer buf : payload) - { - buf.dispose(); - } - } - } } public boolean isActive() http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/660c206d/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java ---------------------------------------------------------------------- diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java index 99ce73a..f8f12aa 100644 --- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java +++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java @@ -156,10 +156,9 @@ public class StandardReceivingLinkEndpoint extends AbstractReceivingLinkEndpoint MessageFormat format = MessageFormatRegistry.getFormat(messageFormat.intValue()); if(format != null) { - List<QpidByteBuffer> fragments = delivery.getPayload(); - try + try (QpidByteBuffer payload = delivery.getPayload()) { - serverMessage = format.createMessage(fragments, + serverMessage = format.createMessage(payload, getAddressSpace().getMessageStore(), getSession().getConnection().getReference()); } @@ -167,14 +166,6 @@ public class StandardReceivingLinkEndpoint extends AbstractReceivingLinkEndpoint { return e.getCause().getError(); } - finally - { - for(QpidByteBuffer fragment: fragments) - { - fragment.dispose(); - } - fragments = null; - } } else { http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/660c206d/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLinkEndpoint.java ---------------------------------------------------------------------- diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLinkEndpoint.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLinkEndpoint.java index afab510..0f98e0b 100644 --- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLinkEndpoint.java +++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLinkEndpoint.java @@ -67,74 +67,77 @@ public class TxnCoordinatorReceivingLinkEndpoint extends AbstractReceivingLinkEn @Override protected Error receiveDelivery(Delivery delivery) { - List<QpidByteBuffer> payload = delivery.getPayload(); - - - // Only interested in the amqp-value section that holds the message to the coordinator - try + try (QpidByteBuffer payload = delivery.getPayload()) { List<EncodingRetainingSection<?>> sections = getSectionDecoder().parseAll(payload); boolean amqpValueSectionFound = false; for(EncodingRetainingSection section : sections) { - if(section instanceof AmqpValueSection) + try { - if (amqpValueSectionFound) - { - throw new ConnectionScopedRuntimeException("Received more than one AmqpValue sections"); - } - amqpValueSectionFound = true; - Object command = section.getValue(); - - Session_1_0 session = getSession(); - if(command instanceof Declare) + if (section instanceof AmqpValueSection) { - final IdentifiedTransaction txn = session.getConnection().createIdentifiedTransaction(); - _createdTransactions.put(txn.getId(), txn.getServerTransaction()); + if (amqpValueSectionFound) + { + throw new ConnectionScopedRuntimeException("Received more than one AmqpValue sections"); + } + amqpValueSectionFound = true; + Object command = section.getValue(); - Declared state = new Declared(); + Session_1_0 session = getSession(); + if (command instanceof Declare) + { + final IdentifiedTransaction txn = session.getConnection().createIdentifiedTransaction(); + _createdTransactions.put(txn.getId(), txn.getServerTransaction()); - session.incrementStartedTransactions(); + Declared state = new Declared(); - state.setTxnId(Session_1_0.integerToTransactionId(txn.getId())); - updateDisposition(delivery.getDeliveryTag(), state, true); + session.incrementStartedTransactions(); - } - else if(command instanceof Discharge) - { - Discharge discharge = (Discharge) command; - - Error error = discharge(discharge.getTxnId(), Boolean.TRUE.equals(discharge.getFail())); - final DeliveryState outcome; - if (error == null) - { - outcome = new Accepted(); + state.setTxnId(Session_1_0.integerToTransactionId(txn.getId())); + updateDisposition(delivery.getDeliveryTag(), state, true); } - else if (Arrays.asList(getSource().getOutcomes()).contains(Rejected.REJECTED_SYMBOL)) + else if (command instanceof Discharge) { - final Rejected rejected = new Rejected(); - rejected.setError(error); - outcome = rejected; - error = null; + Discharge discharge = (Discharge) command; + + Error error = discharge(discharge.getTxnId(), Boolean.TRUE.equals(discharge.getFail())); + final DeliveryState outcome; + if (error == null) + { + outcome = new Accepted(); + } + else if (Arrays.asList(getSource().getOutcomes()).contains(Rejected.REJECTED_SYMBOL)) + { + final Rejected rejected = new Rejected(); + rejected.setError(error); + outcome = rejected; + error = null; + } + else + { + outcome = null; + } + + if (error == null) + { + updateDisposition(delivery.getDeliveryTag(), outcome, true); + } + return error; } else { - outcome = null; + throw new ConnectionScopedRuntimeException(String.format("Received unknown command '%s'", + command.getClass() + .getSimpleName())); } - - if (error == null) - { - updateDisposition(delivery.getDeliveryTag(), outcome, true); - } - return error; - } - else - { - throw new ConnectionScopedRuntimeException(String.format("Received unknown command '%s'", - command.getClass().getSimpleName())); } } + finally + { + section.dispose(); + } } if (!amqpValueSectionFound) { @@ -145,13 +148,6 @@ public class TxnCoordinatorReceivingLinkEndpoint extends AbstractReceivingLinkEn { return e.getError(); } - finally - { - for(QpidByteBuffer buf : payload) - { - buf.dispose(); - } - } return null; } http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/660c206d/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/AbstractCompositeTypeConstructor.java ---------------------------------------------------------------------- diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/AbstractCompositeTypeConstructor.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/AbstractCompositeTypeConstructor.java index 00a1252..1b159f6 100644 --- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/AbstractCompositeTypeConstructor.java +++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/AbstractCompositeTypeConstructor.java @@ -22,11 +22,9 @@ package org.apache.qpid.server.protocol.v1_0.codec; import java.lang.reflect.Array; -import java.util.List; import java.util.Map; import org.apache.qpid.server.bytebuffer.QpidByteBuffer; -import org.apache.qpid.server.bytebuffer.QpidByteBufferUtils; import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException; import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError; import org.apache.qpid.server.protocol.v1_0.type.transport.Error; @@ -35,8 +33,8 @@ public abstract class AbstractCompositeTypeConstructor<T> implements DescribedTy { @Override public TypeConstructor<T> construct(final Object descriptor, - final List<QpidByteBuffer> in, - final int[] originalPositions, + final QpidByteBuffer in, + final int originalPosition, final ValueHandler valueHandler) throws AmqpErrorException { return new FieldValueReader(); @@ -48,12 +46,12 @@ public abstract class AbstractCompositeTypeConstructor<T> implements DescribedTy protected class FieldValueReader implements TypeConstructor<T> { - private List<QpidByteBuffer> _in; + private QpidByteBuffer _in; private ValueHandler _valueHandler; private int _count; @Override - public T construct(final List<QpidByteBuffer> in, final ValueHandler handler) throws AmqpErrorException + public T construct(final QpidByteBuffer in, final ValueHandler handler) throws AmqpErrorException { _in = in; _valueHandler = handler; @@ -64,7 +62,7 @@ public abstract class AbstractCompositeTypeConstructor<T> implements DescribedTy { int size; final TypeConstructor typeConstructor = _valueHandler.readConstructor(_in); - long remaining = QpidByteBufferUtils.remaining(_in); + long remaining = _in.remaining(); if (typeConstructor instanceof ListConstructor) { ListConstructor listConstructor = (ListConstructor) typeConstructor; @@ -80,13 +78,13 @@ public abstract class AbstractCompositeTypeConstructor<T> implements DescribedTy if (listConstructor.getSize() == 1) { - size = QpidByteBufferUtils.get(_in) & 0xFF; - _count = QpidByteBufferUtils.get(_in) & 0xFF; + size = _in.getUnsignedByte(); + _count = _in.getUnsignedByte(); } else { - size = QpidByteBufferUtils.getInt(_in); - _count = QpidByteBufferUtils.getInt(_in); + size = _in.getInt(); + _count = _in.getInt(); } remaining -= listConstructor.getSize(); @@ -115,7 +113,7 @@ public abstract class AbstractCompositeTypeConstructor<T> implements DescribedTy final T constructedObject = AbstractCompositeTypeConstructor.this.construct(this); long expectedRemaining = remaining - size; - long unconsumedBytes = QpidByteBufferUtils.remaining(_in) - expectedRemaining; + long unconsumedBytes = _in.remaining() - expectedRemaining; if(unconsumedBytes > 0) { final String msg = http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/660c206d/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/AbstractDescribedTypeConstructor.java ---------------------------------------------------------------------- diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/AbstractDescribedTypeConstructor.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/AbstractDescribedTypeConstructor.java index 7b1cda6..4dc50f1 100644 --- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/AbstractDescribedTypeConstructor.java +++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/AbstractDescribedTypeConstructor.java @@ -20,8 +20,6 @@ */ package org.apache.qpid.server.protocol.v1_0.codec; -import java.util.List; - import org.apache.qpid.server.bytebuffer.QpidByteBuffer; import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException; @@ -29,8 +27,8 @@ public abstract class AbstractDescribedTypeConstructor<T> implements DescribedTy { @Override public TypeConstructor<T> construct(final Object descriptor, - final List<QpidByteBuffer> in, - final int[] originalPositions, final ValueHandler valueHandler) + final QpidByteBuffer in, + final int originalPosition, final ValueHandler valueHandler) throws AmqpErrorException { return new TypeConstructorFromUnderlying<>(this, valueHandler.readConstructor(in)); @@ -52,7 +50,7 @@ public abstract class AbstractDescribedTypeConstructor<T> implements DescribedTy } @Override - public S construct(final List<QpidByteBuffer> in, final ValueHandler handler) throws AmqpErrorException + public S construct(final QpidByteBuffer in, final ValueHandler handler) throws AmqpErrorException { return _describedTypeConstructor.construct(_describedConstructor.construct(in, handler)); } http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/660c206d/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/ArrayTypeConstructor.java ---------------------------------------------------------------------- diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/ArrayTypeConstructor.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/ArrayTypeConstructor.java index 3c75d96..f4ae161 100644 --- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/ArrayTypeConstructor.java +++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/ArrayTypeConstructor.java @@ -23,7 +23,6 @@ import java.util.ArrayList; import java.util.List; import org.apache.qpid.server.bytebuffer.QpidByteBuffer; -import org.apache.qpid.server.bytebuffer.QpidByteBufferUtils; import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException; import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError; @@ -32,10 +31,10 @@ public abstract class ArrayTypeConstructor implements TypeConstructor<Object[]> @Override - public Object[] construct(final List<QpidByteBuffer> in, final ValueHandler handler) throws AmqpErrorException + public Object[] construct(final QpidByteBuffer in, final ValueHandler handler) throws AmqpErrorException { int size = read(in); - long remaining = QpidByteBufferUtils.remaining(in); + long remaining = in.remaining(); if(remaining < (long) size) { throw new AmqpErrorException(AmqpError.DECODE_ERROR, @@ -53,7 +52,7 @@ public abstract class ArrayTypeConstructor implements TypeConstructor<Object[]> } long expectedRemaining = remaining - size; - long unconsumedBytes = QpidByteBufferUtils.remaining(in) - expectedRemaining; + long unconsumedBytes = in.remaining() - expectedRemaining; if(unconsumedBytes > 0) { final String msg = String.format("Array incorrectly encoded, %d bytes remaining after decoding %d elements", @@ -81,7 +80,6 @@ public abstract class ArrayTypeConstructor implements TypeConstructor<Object[]> abstract int read(QpidByteBuffer in) throws AmqpErrorException; - abstract int read(List<QpidByteBuffer> in) throws AmqpErrorException; private static final ArrayTypeConstructor ONE_BYTE_SIZE_ARRAY = new ArrayTypeConstructor() @@ -95,16 +93,6 @@ public abstract class ArrayTypeConstructor implements TypeConstructor<Object[]> } return ((int)in.get()) & 0xff; } - - @Override - int read(final List<QpidByteBuffer> in) throws AmqpErrorException - { - if(!QpidByteBufferUtils.hasRemaining(in)) - { - throw new AmqpErrorException(AmqpError.DECODE_ERROR, "Insufficient data to decode array"); - } - return ((int)QpidByteBufferUtils.get(in)) & 0xff; - } }; private static final ArrayTypeConstructor FOUR_BYTE_SIZE_ARRAY = new ArrayTypeConstructor() @@ -118,15 +106,6 @@ public abstract class ArrayTypeConstructor implements TypeConstructor<Object[]> } return in.getInt(); } - @Override - int read(final List<QpidByteBuffer> in) throws AmqpErrorException - { - if(!QpidByteBufferUtils.hasRemaining(in,4)) - { - throw new AmqpErrorException(AmqpError.DECODE_ERROR, "Insufficient data to decode array"); - } - return QpidByteBufferUtils.getInt(in); - } }; public static ArrayTypeConstructor getOneByteSizeTypeConstructor() http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/660c206d/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/BinaryTypeConstructor.java ---------------------------------------------------------------------- diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/BinaryTypeConstructor.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/BinaryTypeConstructor.java index 3ab576e..cda7f69 100644 --- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/BinaryTypeConstructor.java +++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/BinaryTypeConstructor.java @@ -20,10 +20,7 @@ */ package org.apache.qpid.server.protocol.v1_0.codec; -import java.util.List; - import org.apache.qpid.server.bytebuffer.QpidByteBuffer; -import org.apache.qpid.server.bytebuffer.QpidByteBufferUtils; import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException; import org.apache.qpid.server.protocol.v1_0.type.Binary; import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError; @@ -45,32 +42,32 @@ public class BinaryTypeConstructor extends VariableWidthTypeConstructor<Binary> } @Override - public Binary construct(final List<QpidByteBuffer> in, final ValueHandler handler) throws AmqpErrorException + public Binary construct(final QpidByteBuffer in, final ValueHandler handler) throws AmqpErrorException { int size; - if (!QpidByteBufferUtils.hasRemaining(in, getSize())) + if (!in.hasRemaining(getSize())) { throw new AmqpErrorException(AmqpError.DECODE_ERROR, "Cannot construct binary: insufficient input data"); } if (getSize() == 1) { - size = QpidByteBufferUtils.get(in) & 0xFF; + size = in.getUnsignedByte(); } else { - size = QpidByteBufferUtils.getInt(in); + size = in.getInt(); } - if (!QpidByteBufferUtils.hasRemaining(in, size)) + if (!in.hasRemaining(size)) { throw new AmqpErrorException(AmqpError.DECODE_ERROR, "Cannot construct binary: insufficient input data"); } byte[] data = new byte[size]; - QpidByteBufferUtils.get(in, data); + in.get(data); return new Binary(data); } } http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/660c206d/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/BooleanConstructor.java ---------------------------------------------------------------------- diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/BooleanConstructor.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/BooleanConstructor.java index 38fda60..ca49996 100644 --- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/BooleanConstructor.java +++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/BooleanConstructor.java @@ -19,12 +19,9 @@ package org.apache.qpid.server.protocol.v1_0.codec; -import java.util.List; - -import org.apache.qpid.server.bytebuffer.QpidByteBufferUtils; +import org.apache.qpid.server.bytebuffer.QpidByteBuffer; import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException; import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError; -import org.apache.qpid.server.bytebuffer.QpidByteBuffer; public class BooleanConstructor { @@ -32,7 +29,7 @@ public class BooleanConstructor { @Override - public Boolean construct(final List<QpidByteBuffer> in, final ValueHandler handler) throws AmqpErrorException + public Boolean construct(final QpidByteBuffer in, final ValueHandler handler) throws AmqpErrorException { return Boolean.TRUE; } @@ -42,7 +39,7 @@ public class BooleanConstructor { @Override - public Boolean construct(final List<QpidByteBuffer> in, final ValueHandler handler) + public Boolean construct(final QpidByteBuffer in, final ValueHandler handler) throws AmqpErrorException { return Boolean.FALSE; @@ -52,11 +49,11 @@ public class BooleanConstructor { @Override - public Boolean construct(final List<QpidByteBuffer> in, final ValueHandler handler) throws AmqpErrorException + public Boolean construct(final QpidByteBuffer in, final ValueHandler handler) throws AmqpErrorException { - if(QpidByteBufferUtils.hasRemaining(in)) + if (in.hasRemaining()) { - byte b = QpidByteBufferUtils.get(in); + byte b = in.get(); return b != (byte) 0; } else http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/660c206d/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/ByteTypeConstructor.java ---------------------------------------------------------------------- diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/ByteTypeConstructor.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/ByteTypeConstructor.java index 47213cd..78feca7 100644 --- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/ByteTypeConstructor.java +++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/ByteTypeConstructor.java @@ -20,10 +20,7 @@ */ package org.apache.qpid.server.protocol.v1_0.codec; -import java.util.List; - import org.apache.qpid.server.bytebuffer.QpidByteBuffer; -import org.apache.qpid.server.bytebuffer.QpidByteBufferUtils; import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException; import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError; @@ -41,11 +38,11 @@ public class ByteTypeConstructor implements TypeConstructor<Byte> } @Override - public Byte construct(final List<QpidByteBuffer> in, final ValueHandler handler) throws AmqpErrorException + public Byte construct(final QpidByteBuffer in, final ValueHandler handler) throws AmqpErrorException { - if(QpidByteBufferUtils.hasRemaining(in)) + if (in.hasRemaining()) { - return QpidByteBufferUtils.get(in); + return in.get(); } else { http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/660c206d/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/CharTypeConstructor.java ---------------------------------------------------------------------- diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/CharTypeConstructor.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/CharTypeConstructor.java index 5f7a1ab..a6727a1 100644 --- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/CharTypeConstructor.java +++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/CharTypeConstructor.java @@ -20,12 +20,9 @@ */ package org.apache.qpid.server.protocol.v1_0.codec; -import java.util.List; - -import org.apache.qpid.server.bytebuffer.QpidByteBufferUtils; -import org.apache.qpid.server.protocol.v1_0.type.*; -import org.apache.qpid.server.protocol.v1_0.type.transport.*; import org.apache.qpid.server.bytebuffer.QpidByteBuffer; +import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException; +import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError; public class CharTypeConstructor implements TypeConstructor<String> { @@ -42,11 +39,11 @@ public class CharTypeConstructor implements TypeConstructor<String> } @Override - public String construct(final List<QpidByteBuffer> in, final ValueHandler handler) throws AmqpErrorException + public String construct(final QpidByteBuffer in, final ValueHandler handler) throws AmqpErrorException { - if(QpidByteBufferUtils.hasRemaining(in, 4)) + if (in.hasRemaining(4)) { - int codePoint = QpidByteBufferUtils.getInt(in); + int codePoint = in.getInt(); // TODO look wrong AMQP 1.0 type is actually UTF-32BE not a code point char[] chars = Character.toChars(codePoint); return new String(chars); } http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/660c206d/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/DecimalConstructor.java ---------------------------------------------------------------------- diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/DecimalConstructor.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/DecimalConstructor.java index f996de1..cf91f99 100644 --- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/DecimalConstructor.java +++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/DecimalConstructor.java @@ -20,10 +20,8 @@ package org.apache.qpid.server.protocol.v1_0.codec; import java.math.BigDecimal; -import java.util.List; import org.apache.qpid.server.bytebuffer.QpidByteBuffer; -import org.apache.qpid.server.bytebuffer.QpidByteBufferUtils; import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException; import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError; @@ -34,13 +32,13 @@ public abstract class DecimalConstructor implements TypeConstructor<BigDecimal> { @Override - public BigDecimal construct(final List<QpidByteBuffer> in, final ValueHandler handler) throws AmqpErrorException + public BigDecimal construct(final QpidByteBuffer in, final ValueHandler handler) throws AmqpErrorException { int val; - if(QpidByteBufferUtils.hasRemaining(in, 4)) + if (in.hasRemaining(4)) { - val = QpidByteBufferUtils.getInt(in); + val = in.getInt(); } else { @@ -57,13 +55,13 @@ public abstract class DecimalConstructor implements TypeConstructor<BigDecimal> { @Override - public BigDecimal construct(final List<QpidByteBuffer> in, final ValueHandler handler) throws AmqpErrorException + public BigDecimal construct(final QpidByteBuffer in, final ValueHandler handler) throws AmqpErrorException { long val; - if(QpidByteBufferUtils.hasRemaining(in, 8)) + if (in.hasRemaining(8)) { - val = QpidByteBufferUtils.getLong(in); + val = in.getLong(); } else { @@ -80,16 +78,16 @@ public abstract class DecimalConstructor implements TypeConstructor<BigDecimal> { @Override - public BigDecimal construct(final List<QpidByteBuffer> in, final ValueHandler handler) throws AmqpErrorException + public BigDecimal construct(final QpidByteBuffer in, final ValueHandler handler) throws AmqpErrorException { long high; long low; - if(QpidByteBufferUtils.hasRemaining(in, 16)) + if (in.hasRemaining(16)) { - high = QpidByteBufferUtils.getLong(in); - low = QpidByteBufferUtils.getLong(in); + high = in.getLong(); + low = in.getLong(); } else { http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/660c206d/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/DescribedTypeConstructor.java ---------------------------------------------------------------------- diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/DescribedTypeConstructor.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/DescribedTypeConstructor.java index 99bba13..788dcb0 100644 --- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/DescribedTypeConstructor.java +++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/DescribedTypeConstructor.java @@ -20,15 +20,13 @@ */ package org.apache.qpid.server.protocol.v1_0.codec; -import java.util.List; - import org.apache.qpid.server.bytebuffer.QpidByteBuffer; import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException; public interface DescribedTypeConstructor<T> { TypeConstructor<T> construct(Object descriptor, - List<QpidByteBuffer> in, - final int[] originalPositions, + QpidByteBuffer in, + final int originalPosition, ValueHandler valueHandler) throws AmqpErrorException; } http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/660c206d/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/DoubleTypeConstructor.java ---------------------------------------------------------------------- diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/DoubleTypeConstructor.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/DoubleTypeConstructor.java index aaaef75..d874cff 100644 --- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/DoubleTypeConstructor.java +++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/DoubleTypeConstructor.java @@ -20,10 +20,7 @@ */ package org.apache.qpid.server.protocol.v1_0.codec; -import java.util.List; - import org.apache.qpid.server.bytebuffer.QpidByteBuffer; -import org.apache.qpid.server.bytebuffer.QpidByteBufferUtils; import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException; import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError; @@ -42,11 +39,11 @@ public class DoubleTypeConstructor implements TypeConstructor<Double> } @Override - public Double construct(final List<QpidByteBuffer> in, final ValueHandler handler) throws AmqpErrorException + public Double construct(final QpidByteBuffer in, final ValueHandler handler) throws AmqpErrorException { - if(QpidByteBufferUtils.hasRemaining(in, 8)) + if (in.hasRemaining(8)) { - return QpidByteBufferUtils.getDouble(in); + return in.getDouble(); } else { http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/660c206d/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/FloatTypeConstructor.java ---------------------------------------------------------------------- diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/FloatTypeConstructor.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/FloatTypeConstructor.java index 9c8c88e..dc529c3 100644 --- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/FloatTypeConstructor.java +++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/FloatTypeConstructor.java @@ -20,10 +20,7 @@ */ package org.apache.qpid.server.protocol.v1_0.codec; -import java.util.List; - import org.apache.qpid.server.bytebuffer.QpidByteBuffer; -import org.apache.qpid.server.bytebuffer.QpidByteBufferUtils; import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException; import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError; @@ -42,11 +39,11 @@ public class FloatTypeConstructor implements TypeConstructor<Float> } @Override - public Float construct(final List<QpidByteBuffer> in, final ValueHandler handler) throws AmqpErrorException + public Float construct(final QpidByteBuffer in, final ValueHandler handler) throws AmqpErrorException { - if(QpidByteBufferUtils.hasRemaining(in, 4)) + if (in.hasRemaining(4)) { - return QpidByteBufferUtils.getFloat(in); + return in.getFloat(); } else { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org