Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java?rev=1694594&r1=1694593&r2=1694594&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java (original) +++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java Fri Aug 7 00:28:17 2015 @@ -50,6 +50,7 @@ import org.apache.qpid.amqp_1_0.transpor import org.apache.qpid.amqp_1_0.type.Binary; import org.apache.qpid.amqp_1_0.type.FrameBody; import org.apache.qpid.amqp_1_0.type.Symbol; +import org.apache.qpid.bytebuffer.QpidByteBuffer; import org.apache.qpid.common.QpidProperties; import org.apache.qpid.common.ServerPropertyNames; import org.apache.qpid.protocol.AMQConstant; @@ -166,7 +167,7 @@ public class AMQPConnection_1_0 extends _frameWriter = new FrameWriter(_endpoint.getDescribedTypeRegistry()); - getSender().send(headerResponse.duplicate()); + getSender().send(QpidByteBuffer.wrap(headerResponse.duplicate())); getSender().flush(); if(useSASL) @@ -254,7 +255,7 @@ public class AMQPConnection_1_0 extends { if (_endpoint.isAuthenticated()) { - getSender().send(AMQP_LAYER_HEADER.duplicate()); + getSender().send(QpidByteBuffer.wrap(AMQP_LAYER_HEADER.duplicate())); getSender().flush(); } else @@ -343,14 +344,14 @@ public class AMQPConnection_1_0 extends private final Logger RAW_LOGGER = LoggerFactory.getLogger("RAW"); - public synchronized void received(final ByteBuffer msg) + public synchronized void received(final QpidByteBuffer msg) { try { updateLastReadTime(); if(RAW_LOGGER.isDebugEnabled()) { - ByteBuffer dup = msg.duplicate(); + QpidByteBuffer dup = msg.duplicate(); byte[] data = new byte[dup.remaining()]; dup.get(data); Binary bin = new Binary(data); @@ -531,7 +532,7 @@ public class AMQPConnection_1_0 extends _frameWriter.setValue(amqFrame); - ByteBuffer dup = ByteBuffer.allocateDirect(_endpoint.getMaxFrameSize()); + QpidByteBuffer dup = QpidByteBuffer.allocateDirect(_endpoint.getMaxFrameSize()); int size = _frameWriter.writeToBuffer(dup); if (size > _endpoint.getMaxFrameSize()) @@ -543,7 +544,7 @@ public class AMQPConnection_1_0 extends if (RAW_LOGGER.isDebugEnabled()) { - ByteBuffer dup2 = dup.duplicate(); + QpidByteBuffer dup2 = dup.duplicate(); byte[] data = new byte[dup2.remaining()]; dup2.get(data); Binary bin = new Binary(data);
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java?rev=1694594&r1=1694593&r2=1694594&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java (original) +++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java Fri Aug 7 00:28:17 2015 @@ -21,6 +21,7 @@ package org.apache.qpid.server.protocol.v1_0; import java.nio.ByteBuffer; +import java.util.Collection; import java.util.List; import org.slf4j.Logger; @@ -43,6 +44,7 @@ import org.apache.qpid.amqp_1_0.type.mes import org.apache.qpid.amqp_1_0.type.transaction.TransactionalState; import org.apache.qpid.amqp_1_0.type.transport.SenderSettleMode; import org.apache.qpid.amqp_1_0.type.transport.Transfer; +import org.apache.qpid.bytebuffer.QpidByteBuffer; import org.apache.qpid.server.transport.ProtocolEngine; import org.apache.qpid.server.consumer.AbstractConsumerTarget; import org.apache.qpid.server.consumer.ConsumerImpl; @@ -138,24 +140,23 @@ class ConsumerTarget_1_0 extends Abstrac Transfer transfer = new Transfer(); //TODO - - List<ByteBuffer> fragments = message.getFragments(); - ByteBuffer payload; + Collection<QpidByteBuffer> fragments = message.getFragments(); + QpidByteBuffer payload; if(fragments.size() == 1) { - payload = fragments.get(0); + payload = fragments.iterator().next(); } else { int size = 0; - for(ByteBuffer fragment : fragments) + for(QpidByteBuffer fragment : fragments) { size += fragment.remaining(); } - payload = ByteBuffer.allocateDirect(size); + payload = QpidByteBuffer.allocateDirect(size); - for(ByteBuffer fragment : fragments) + for(QpidByteBuffer fragment : fragments) { payload.put(fragment.duplicate()); } @@ -171,7 +172,6 @@ class ConsumerTarget_1_0 extends Abstrac Header oldHeader = null; try { - ByteBuffer encodedBuf = payload.duplicate(); Object value = valueHandler.parse(payload); if(value instanceof Header) { @@ -200,8 +200,8 @@ class ConsumerTarget_1_0 extends Abstrac _sectionEncoder.encodeObject(header); Binary encodedHeader = _sectionEncoder.getEncoding(); - ByteBuffer oldPayload = payload; - payload = ByteBuffer.allocateDirect(oldPayload.remaining() + encodedHeader.getLength()); + QpidByteBuffer oldPayload = payload; + payload = QpidByteBuffer.allocateDirect(oldPayload.remaining() + encodedHeader.getLength()); payload.put(encodedHeader.getArray(),encodedHeader.getArrayOffset(),encodedHeader.getLength()); payload.put(oldPayload); payload.flip(); Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_from_1_0.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_from_1_0.java?rev=1694594&r1=1694593&r2=1694594&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_from_1_0.java (original) +++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_from_1_0.java Fri Aug 7 00:28:17 2015 @@ -49,6 +49,7 @@ import org.apache.qpid.amqp_1_0.type.Uns import org.apache.qpid.amqp_1_0.type.messaging.AmqpSequence; import org.apache.qpid.amqp_1_0.type.messaging.AmqpValue; import org.apache.qpid.amqp_1_0.type.messaging.Data; +import org.apache.qpid.bytebuffer.QpidByteBuffer; import org.apache.qpid.server.util.ConnectionScopedRuntimeException; import org.apache.qpid.transport.codec.BBEncoder; import org.apache.qpid.typedmessage.TypedBytesContentWriter; @@ -68,7 +69,7 @@ public class MessageConverter_from_1_0 Object bodyObject; try { - List<Section> sections = sectionDecoder.parseAll(ByteBuffer.wrap(data)); + List<Section> sections = sectionDecoder.parseAll(QpidByteBuffer.wrap(data)); ListIterator<Section> iterator = sections.listIterator(); Section previousSection = null; while(iterator.hasNext()) Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java?rev=1694594&r1=1694593&r2=1694594&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java (original) +++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java Fri Aug 7 00:28:17 2015 @@ -38,6 +38,7 @@ import org.apache.qpid.amqp_1_0.type.Sym import org.apache.qpid.amqp_1_0.type.codec.AMQPDescribedTypeRegistry; import org.apache.qpid.amqp_1_0.type.messaging.AmqpValue; import org.apache.qpid.amqp_1_0.type.messaging.Data; +import org.apache.qpid.bytebuffer.QpidByteBuffer; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.plugin.MessageConverter; import org.apache.qpid.server.store.StoredMessage; @@ -257,7 +258,7 @@ public abstract class MessageConverter_t } @Override - public Collection<ByteBuffer> getContent(int offsetInMessage, int size) + public Collection<QpidByteBuffer> getContent(int offsetInMessage, int size) { ByteBuffer buf = allData.duplicate(); buf.position(offsetInMessage); @@ -266,7 +267,7 @@ public abstract class MessageConverter_t { buf.limit(size); } - return Collections.singleton(buf); + return Collections.singleton(QpidByteBuffer.wrap(buf)); } @Override Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java?rev=1694594&r1=1694593&r2=1694594&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java (original) +++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java Fri Aug 7 00:28:17 2015 @@ -49,6 +49,7 @@ import org.apache.qpid.amqp_1_0.type.mes import org.apache.qpid.amqp_1_0.type.messaging.Header; import org.apache.qpid.amqp_1_0.type.messaging.MessageAnnotations; import org.apache.qpid.amqp_1_0.type.messaging.Properties; +import org.apache.qpid.bytebuffer.QpidByteBuffer; import org.apache.qpid.server.message.AMQMessageHeader; import org.apache.qpid.server.plugin.MessageMetaDataType; import org.apache.qpid.server.store.StorableMessageMetaData; @@ -70,9 +71,9 @@ public class MessageMetaData_1_0 impleme private Map _appProperties; private Map _footer; - private List<ByteBuffer> _encodedSections = new ArrayList<ByteBuffer>(3); + private List<QpidByteBuffer> _encodedSections = new ArrayList<>(3); - private volatile ByteBuffer _encoded; + private volatile QpidByteBuffer _encoded; private MessageHeader_1_0 _messageHeader; @@ -92,29 +93,29 @@ public class MessageMetaData_1_0 impleme return _header; } - private static ArrayList<ByteBuffer> encodeSections(final List<Section> sections, final SectionEncoder encoder) + private static ArrayList<QpidByteBuffer> encodeSections(final List<Section> sections, final SectionEncoder encoder) { - ArrayList<ByteBuffer> encodedSections = new ArrayList<ByteBuffer>(sections.size()); + ArrayList<QpidByteBuffer> encodedSections = new ArrayList<QpidByteBuffer>(sections.size()); for(Section section : sections) { encoder.encodeObject(section); - encodedSections.add(encoder.getEncoding().asByteBuffer()); + encodedSections.add(QpidByteBuffer.wrap(encoder.getEncoding().asByteBuffer())); encoder.reset(); } return encodedSections; } - public MessageMetaData_1_0(ByteBuffer[] fragments, SectionDecoder decoder) + public MessageMetaData_1_0(QpidByteBuffer[] fragments, SectionDecoder decoder) { - this(fragments, decoder, new ArrayList<ByteBuffer>(3)); + this(fragments, decoder, new ArrayList<QpidByteBuffer>(3)); } - public MessageMetaData_1_0(ByteBuffer[] fragments, SectionDecoder decoder, List<ByteBuffer> immutableSections) + public MessageMetaData_1_0(QpidByteBuffer[] fragments, SectionDecoder decoder, List<QpidByteBuffer> immutableSections) { this(constructSections(fragments, decoder,immutableSections), immutableSections); } - private MessageMetaData_1_0(List<Section> sections, List<ByteBuffer> encodedSections) + private MessageMetaData_1_0(List<Section> sections, List<QpidByteBuffer> encodedSections) { _encodedSections = encodedSections; @@ -161,11 +162,11 @@ public class MessageMetaData_1_0 impleme } - private static List<Section> constructSections(final ByteBuffer[] fragments, final SectionDecoder decoder, List<ByteBuffer> encodedSections) + private static List<Section> constructSections(final QpidByteBuffer[] fragments, final SectionDecoder decoder, List<QpidByteBuffer> encodedSections) { List<Section> sections = new ArrayList<Section>(3); - ByteBuffer src; + QpidByteBuffer src; if(fragments.length == 1) { src = fragments[0].duplicate(); @@ -173,12 +174,12 @@ public class MessageMetaData_1_0 impleme else { int size = 0; - for(ByteBuffer buf : fragments) + for(QpidByteBuffer buf : fragments) { size += buf.remaining(); } - src = ByteBuffer.allocateDirect(size); - for(ByteBuffer buf : fragments) + src = QpidByteBuffer.allocateDirect(size); + for(QpidByteBuffer buf : fragments) { src.put(buf.duplicate()); } @@ -274,7 +275,7 @@ public class MessageMetaData_1_0 impleme int pos = 0; - for(ByteBuffer buf : fragments) + for(QpidByteBuffer buf : fragments) { /* if(pos < startBarePos) @@ -315,7 +316,7 @@ public class MessageMetaData_1_0 impleme { int size = 0; - for(ByteBuffer bin : _encodedSections) + for(QpidByteBuffer bin : _encodedSections) { size += bin.limit(); } @@ -323,11 +324,11 @@ public class MessageMetaData_1_0 impleme return size; } - private ByteBuffer encodeAsBuffer() + private QpidByteBuffer encodeAsBuffer() { - ByteBuffer buf = ByteBuffer.allocateDirect(getStorableSize()); + QpidByteBuffer buf = QpidByteBuffer.allocateDirect(getStorableSize()); - for(ByteBuffer bin : _encodedSections) + for(QpidByteBuffer bin : _encodedSections) { buf.put(bin.duplicate()); } @@ -337,7 +338,7 @@ public class MessageMetaData_1_0 impleme public int writeToBuffer(ByteBuffer dest) { - ByteBuffer buf = _encoded; + QpidByteBuffer buf = _encoded; if(buf == null) { @@ -353,13 +354,13 @@ public class MessageMetaData_1_0 impleme { buf.limit(dest.remaining()); } - dest.put(buf); + buf.get(dest); return buf.limit(); } public int getContentSize() { - ByteBuffer buf = _encoded; + QpidByteBuffer buf = _encoded; if(buf == null) { @@ -399,17 +400,18 @@ public class MessageMetaData_1_0 impleme ValueHandler valueHandler = new ValueHandler(_typeRegistry); ArrayList<Section> sections = new ArrayList<Section>(3); - ArrayList<ByteBuffer> encodedSections = new ArrayList<ByteBuffer>(3); + ArrayList<QpidByteBuffer> encodedSections = new ArrayList<>(3); while(buf.hasRemaining()) { try { ByteBuffer encodedBuf = buf.duplicate(); - Object parse = valueHandler.parse(buf); + final QpidByteBuffer qpidByteBuffer = QpidByteBuffer.wrap(buf); + Object parse = valueHandler.parse(qpidByteBuffer); sections.add((Section) parse); encodedBuf.limit(buf.position()); - encodedSections.add(encodedBuf); + encodedSections.add(QpidByteBuffer.wrap(encodedBuf)); } catch (AmqpErrorException e) Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java?rev=1694594&r1=1694593&r2=1694594&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java (original) +++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java Fri Aug 7 00:28:17 2015 @@ -24,8 +24,10 @@ package org.apache.qpid.server.protocol. import java.lang.ref.SoftReference; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Collection; import java.util.List; +import org.apache.qpid.bytebuffer.QpidByteBuffer; import org.apache.qpid.server.message.AbstractServerMessageImpl; import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.util.ByteBufferUtils; @@ -33,7 +35,7 @@ import org.apache.qpid.util.ByteBufferUt public class Message_1_0 extends AbstractServerMessageImpl<Message_1_0, MessageMetaData_1_0> { - private volatile SoftReference<List<ByteBuffer>> _fragmentsRef; + private volatile SoftReference<Collection<QpidByteBuffer>> _fragmentsRef; private long _arrivalTime; private final long _size; @@ -41,18 +43,18 @@ public class Message_1_0 extends Abstrac public Message_1_0(final StoredMessage<MessageMetaData_1_0> storedMessage) { super(storedMessage, null); - final List<ByteBuffer> fragments = restoreFragments(getStoredMessage()); + final Collection<QpidByteBuffer> fragments = restoreFragments(getStoredMessage()); _fragmentsRef = new SoftReference<>(fragments); _size = calculateSize(fragments); } - private long calculateSize(final List<ByteBuffer> fragments) + private long calculateSize(final Collection<QpidByteBuffer> fragments) { long size = 0l; if(fragments != null) { - for(ByteBuffer buf : fragments) + for(QpidByteBuffer buf : fragments) { size += buf.remaining(); } @@ -60,28 +62,14 @@ public class Message_1_0 extends Abstrac return size; } - private static List<ByteBuffer> restoreFragments(StoredMessage<MessageMetaData_1_0> storedMessage) + private static Collection<QpidByteBuffer> restoreFragments(StoredMessage<MessageMetaData_1_0> storedMessage) { - ArrayList<ByteBuffer> fragments = new ArrayList<ByteBuffer>(); - final int FRAGMENT_SIZE = 2048; - int offset = 0; - ByteBuffer b; - do - { + return storedMessage.getContent(0, Integer.MAX_VALUE); - b = ByteBufferUtils.combine(storedMessage.getContent(offset,FRAGMENT_SIZE)); - if(b.hasRemaining()) - { - fragments.add(b); - offset+= b.remaining(); - } - } - while(b.hasRemaining()); - return fragments; } public Message_1_0(final StoredMessage<MessageMetaData_1_0> storedMessage, - final List<ByteBuffer> fragments, + final Collection<QpidByteBuffer> fragments, final Object connectionReference) { super(storedMessage, connectionReference); @@ -128,10 +116,10 @@ public class Message_1_0 extends Abstrac return _arrivalTime; } - public List<ByteBuffer> getFragments() + public Collection<QpidByteBuffer> getFragments() { - List<ByteBuffer> fragments = _fragmentsRef.get(); + Collection<QpidByteBuffer> fragments = _fragmentsRef.get(); if(fragments == null) { fragments = restoreFragments(getStoredMessage()); Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java?rev=1694594&r1=1694593&r2=1694594&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java (original) +++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java Fri Aug 7 00:28:17 2015 @@ -41,6 +41,7 @@ import org.apache.qpid.amqp_1_0.type.tra import org.apache.qpid.amqp_1_0.type.transport.Detach; import org.apache.qpid.amqp_1_0.type.transport.ReceiverSettleMode; import org.apache.qpid.amqp_1_0.type.transport.Transfer; +import org.apache.qpid.bytebuffer.QpidByteBuffer; import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.store.MessageHandle; import org.apache.qpid.server.store.StoredMessage; @@ -87,7 +88,7 @@ public class ReceivingLink_1_0 implement { // TODO - cope with fragmented messages - List<ByteBuffer> fragments = null; + List<QpidByteBuffer> fragments = null; @@ -108,7 +109,7 @@ public class ReceivingLink_1_0 implement return; } - fragments = new ArrayList<ByteBuffer>(_incompleteMessage.size()); + fragments = new ArrayList<QpidByteBuffer>(_incompleteMessage.size()); for(Transfer t : _incompleteMessage) { fragments.add(t.getPayload()); @@ -146,20 +147,16 @@ public class ReceivingLink_1_0 implement else { MessageMetaData_1_0 mmd = null; - List<ByteBuffer> immutableSections = new ArrayList<ByteBuffer>(3); - mmd = new MessageMetaData_1_0(fragments.toArray(new ByteBuffer[fragments.size()]), + List<QpidByteBuffer> immutableSections = new ArrayList<>(3); + mmd = new MessageMetaData_1_0(fragments.toArray(new QpidByteBuffer[fragments.size()]), _sectionDecoder, immutableSections); MessageHandle<MessageMetaData_1_0> handle = _vhost.getMessageStore().addMessage(mmd); - boolean skipping = true; - int offset = 0; - - for(ByteBuffer bareMessageBuf : immutableSections) + for(QpidByteBuffer bareMessageBuf : immutableSections) { handle.addContent(bareMessageBuf.duplicate()); - offset += bareMessageBuf.remaining(); } final StoredMessage<MessageMetaData_1_0> storedMessage = handle.allContentAdded(); Message_1_0 message = new Message_1_0(storedMessage, fragments, getSession().getConnection().getReference()); Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorLink_1_0.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorLink_1_0.java?rev=1694594&r1=1694593&r2=1694594&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorLink_1_0.java (original) +++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorLink_1_0.java Fri Aug 7 00:28:17 2015 @@ -47,6 +47,7 @@ import org.apache.qpid.amqp_1_0.type.tra import org.apache.qpid.amqp_1_0.type.transport.Detach; import org.apache.qpid.amqp_1_0.type.transport.Error; import org.apache.qpid.amqp_1_0.type.transport.Transfer; +import org.apache.qpid.bytebuffer.QpidByteBuffer; import org.apache.qpid.server.txn.LocalTransaction; import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.virtualhost.VirtualHostImpl; @@ -78,7 +79,7 @@ public class TxnCoordinatorLink_1_0 impl { // TODO - cope with fragmented messages - ByteBuffer payload = null; + QpidByteBuffer payload = null; if(Boolean.TRUE.equals(xfr.getMore()) && _incompleteMessage == null) @@ -100,7 +101,7 @@ public class TxnCoordinatorLink_1_0 impl { size += t.getPayload().limit(); } - payload = ByteBuffer.allocateDirect(size); + payload = QpidByteBuffer.allocateDirect(size); for(Transfer t : _incompleteMessage) { payload.put(t.getPayload().duplicate()); Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0Test.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0Test.java?rev=1694594&r1=1694593&r2=1694594&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0Test.java (original) +++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0Test.java Fri Aug 7 00:28:17 2015 @@ -40,6 +40,7 @@ import java.util.UUID; import javax.security.auth.Subject; +import org.apache.qpid.bytebuffer.QpidByteBuffer; import org.apache.qpid.server.logging.EventLogger; import org.apache.qpid.server.model.*; import org.apache.qpid.server.transport.AMQPConnection; @@ -135,14 +136,14 @@ public class ProtocolEngine_1_0_0Test ex final ByteBufferSender sender = mock(ByteBufferSender.class); when(_networkConnection.getSender()).thenReturn(sender); - final ArgumentCaptor<ByteBuffer> byteBufferCaptor = ArgumentCaptor.forClass(ByteBuffer.class); + final ArgumentCaptor<QpidByteBuffer> byteBufferCaptor = ArgumentCaptor.forClass(QpidByteBuffer.class); doAnswer(new Answer() { @Override public Object answer(final InvocationOnMock invocation) throws Throwable { - _sentBuffers.add(byteBufferCaptor.getValue()); + _sentBuffers.add(byteBufferCaptor.getValue().getNativeBuffer()); return null; } }).when(sender).send(byteBufferCaptor.capture()); @@ -163,11 +164,12 @@ public class ProtocolEngine_1_0_0Test ex createEngine(useSASL, Transport.TCP); - _protocolEngine_1_0_0.received(ByteBuffer.wrap(ProtocolEngineCreator_1_0_0.getInstance().getHeaderIdentifier())); + _protocolEngine_1_0_0.received(QpidByteBuffer.wrap(ProtocolEngineCreator_1_0_0.getInstance() + .getHeaderIdentifier())); Open open = new Open(); _frameWriter.setValue(AMQFrame.createAMQFrame((short)0,open)); - ByteBuffer buf = ByteBuffer.allocate(64*1024); + QpidByteBuffer buf = QpidByteBuffer.allocate(64*1024); _frameWriter.writeToBuffer(buf); buf.flip(); _protocolEngine_1_0_0.received(buf); @@ -186,11 +188,11 @@ public class ProtocolEngine_1_0_0Test ex createEngine(useSASL, Transport.TCP); - _protocolEngine_1_0_0.received(ByteBuffer.wrap(ProtocolEngineCreator_1_0_0.getInstance().getHeaderIdentifier())); + _protocolEngine_1_0_0.received(QpidByteBuffer.wrap(ProtocolEngineCreator_1_0_0.getInstance().getHeaderIdentifier())); Open open = new Open(); _frameWriter.setValue(AMQFrame.createAMQFrame((short)0,open)); - ByteBuffer buf = ByteBuffer.allocate(64*1024); + QpidByteBuffer buf = QpidByteBuffer.allocate(64*1024); _frameWriter.writeToBuffer(buf); buf.flip(); _protocolEngine_1_0_0.received(buf); @@ -216,11 +218,11 @@ public class ProtocolEngine_1_0_0Test ex final boolean useSASL = false; createEngine(useSASL, Transport.SSL); - _protocolEngine_1_0_0.received(ByteBuffer.wrap(ProtocolEngineCreator_1_0_0.getInstance().getHeaderIdentifier())); + _protocolEngine_1_0_0.received(QpidByteBuffer.wrap(ProtocolEngineCreator_1_0_0.getInstance().getHeaderIdentifier())); Open open = new Open(); _frameWriter.setValue(AMQFrame.createAMQFrame((short)0,open)); - ByteBuffer buf = ByteBuffer.allocate(64*1024); + QpidByteBuffer buf = QpidByteBuffer.allocate(64*1024); _frameWriter.writeToBuffer(buf); buf.flip(); _protocolEngine_1_0_0.received(buf); @@ -247,22 +249,22 @@ public class ProtocolEngine_1_0_0Test ex createEngine(useSASL, Transport.TCP); - _protocolEngine_1_0_0.received(ByteBuffer.wrap(ProtocolEngineCreator_1_0_0_SASL.getInstance().getHeaderIdentifier())); + _protocolEngine_1_0_0.received(QpidByteBuffer.wrap(ProtocolEngineCreator_1_0_0_SASL.getInstance().getHeaderIdentifier())); SaslInit init = new SaslInit(); init.setMechanism(Symbol.valueOf("ANONYMOUS")); _frameWriter.setValue(new SASLFrame(init)); - ByteBuffer buf = ByteBuffer.allocate(64*1024); + QpidByteBuffer buf = QpidByteBuffer.allocate(64*1024); _frameWriter.writeToBuffer(buf); buf.flip(); _protocolEngine_1_0_0.received(buf); - _protocolEngine_1_0_0.received(ByteBuffer.wrap(ProtocolEngineCreator_1_0_0.getInstance().getHeaderIdentifier())); + _protocolEngine_1_0_0.received(QpidByteBuffer.wrap(ProtocolEngineCreator_1_0_0.getInstance().getHeaderIdentifier())); Open open = new Open(); _frameWriter.setValue(AMQFrame.createAMQFrame((short)0,open)); - buf = ByteBuffer.allocate(64*1024); + buf = QpidByteBuffer.allocate(64*1024); _frameWriter.writeToBuffer(buf); buf.flip(); _protocolEngine_1_0_0.received(buf); Modified: qpid/java/trunk/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java?rev=1694594&r1=1694593&r2=1694594&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java (original) +++ qpid/java/trunk/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java Fri Aug 7 00:28:17 2015 @@ -26,6 +26,7 @@ import java.util.Collections; import java.util.LinkedHashMap; import java.util.Map; +import org.apache.qpid.bytebuffer.QpidByteBuffer; import org.apache.qpid.server.plugin.MessageConverter; import org.apache.qpid.server.plugin.PluggableService; import org.apache.qpid.server.protocol.v0_10.MessageMetaData_0_10; @@ -102,9 +103,9 @@ public class MessageConverter_1_0_to_v0_ } @Override - public Collection<ByteBuffer> getContent(int offsetInMessage, int size) + public Collection<QpidByteBuffer> getContent(int offsetInMessage, int size) { - return Collections.singleton(ByteBuffer.wrap(messageContent, offsetInMessage, size)); + return Collections.singleton(QpidByteBuffer.wrap(messageContent, offsetInMessage, size)); } @Override Modified: qpid/java/trunk/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java?rev=1694594&r1=1694593&r2=1694594&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java (original) +++ qpid/java/trunk/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java Fri Aug 7 00:28:17 2015 @@ -26,6 +26,7 @@ import java.util.HashMap; import java.util.Map; import org.apache.qpid.AMQPInvalidClassException; +import org.apache.qpid.bytebuffer.QpidByteBuffer; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.BasicContentHeaderProperties; @@ -199,7 +200,7 @@ public class MessageConverter_0_10_to_0_ } @Override - public Collection<ByteBuffer> getContent(int offsetInMessage, int size) + public Collection<QpidByteBuffer> getContent(int offsetInMessage, int size) { return message.getContent(offsetInMessage, size); } Modified: qpid/java/trunk/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java?rev=1694594&r1=1694593&r2=1694594&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java (original) +++ qpid/java/trunk/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java Fri Aug 7 00:28:17 2015 @@ -26,6 +26,7 @@ import java.util.Collection; import java.util.Map; import java.util.UUID; +import org.apache.qpid.bytebuffer.QpidByteBuffer; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.framing.FieldTable; @@ -88,7 +89,7 @@ public class MessageConverter_0_8_to_0_1 } @Override - public Collection<ByteBuffer> getContent(int offsetInMessage, int size) + public Collection<QpidByteBuffer> getContent(int offsetInMessage, int size) { return message_0_8.getContent(offsetInMessage, size); } Modified: qpid/java/trunk/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java?rev=1694594&r1=1694593&r2=1694594&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java (original) +++ qpid/java/trunk/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java Fri Aug 7 00:28:17 2015 @@ -26,6 +26,7 @@ import java.util.Collections; import java.util.LinkedHashMap; import java.util.Map; +import org.apache.qpid.bytebuffer.QpidByteBuffer; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.framing.ContentHeaderBody; @@ -104,9 +105,9 @@ public class MessageConverter_1_0_to_v0_ } @Override - public Collection<ByteBuffer> getContent(int offsetInMessage, int size) + public Collection<QpidByteBuffer> getContent(int offsetInMessage, int size) { - return Collections.singleton(ByteBuffer.wrap(messageContent, offsetInMessage, size)); + return Collections.singleton(QpidByteBuffer.wrap(messageContent, offsetInMessage, size)); } @Override Modified: qpid/java/trunk/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java?rev=1694594&r1=1694593&r2=1694594&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java (original) +++ qpid/java/trunk/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java Fri Aug 7 00:28:17 2015 @@ -45,6 +45,7 @@ import org.eclipse.jetty.util.ssl.SslCon import org.eclipse.jetty.websocket.WebSocket; import org.eclipse.jetty.websocket.WebSocketHandler; +import org.apache.qpid.bytebuffer.QpidByteBuffer; import org.apache.qpid.server.transport.MultiVersionProtocolEngine; import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.Protocol; @@ -223,7 +224,7 @@ class WebSocketProvider implements Accep @Override public void onMessage(final byte[] data, final int offset, final int length) { - _engine.received(ByteBuffer.wrap(data, offset, length).slice()); + _engine.received(QpidByteBuffer.wrap(data, offset, length).slice()); } @Override @@ -277,8 +278,29 @@ class WebSocketProvider implements Accep } + private void send(final ByteBuffer msg) + { + try + { + if (msg.hasArray()) + { + _connection.sendMessage(msg.array(), msg.arrayOffset() + msg.position(), msg.remaining()); + } + else + { + byte[] copy = new byte[msg.remaining()]; + msg.duplicate().get(copy); + _connection.sendMessage(copy, 0, copy.length); + } + } + catch (IOException e) + { + close(); + } + } + @Override - public void send(final ByteBuffer msg) + public void send(final QpidByteBuffer msg) { try { @@ -299,6 +321,7 @@ class WebSocketProvider implements Accep } } + @Override public void flush() { Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQProtocolHandler.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQProtocolHandler.java?rev=1694594&r1=1694593&r2=1694594&view=diff ============================================================================== --- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQProtocolHandler.java (original) +++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQProtocolHandler.java Fri Aug 7 00:28:17 2015 @@ -30,6 +30,7 @@ import java.util.concurrent.CopyOnWriteA import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import org.apache.qpid.bytebuffer.QpidByteBuffer; import org.apache.qpid.client.protocol.AMQProtocolSession; import org.apache.qpid.client.protocol.BlockingMethodFrameListener; import org.slf4j.Logger; @@ -121,7 +122,7 @@ public class AMQProtocolHandler implemen /** Object to lock on when changing the latch */ private Object _failoverLatchChange = new Object(); - private AMQDecoder _decoder; + private ClientDecoder _decoder; private ProtocolVersion _suggestedProtocolVersion; @@ -559,7 +560,7 @@ public class AMQProtocolHandler implemen final ByteBuffer buf = asByteBuffer(frame); _lastWriteTime = System.currentTimeMillis(); _writtenBytes += buf.remaining(); - _sender.send(buf); + _sender.send(QpidByteBuffer.wrap(buf)); if(flush) { _sender.flush(); Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java?rev=1694594&r1=1694593&r2=1694594&view=diff ============================================================================== --- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java (original) +++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java Fri Aug 7 00:28:17 2015 @@ -24,6 +24,7 @@ import java.io.EOFException; import java.io.IOException; import java.io.InputStream; import java.nio.ByteBuffer; +import java.util.Collection; import java.util.Iterator; import java.util.List; @@ -33,6 +34,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.qpid.QpidException; +import org.apache.qpid.bytebuffer.QpidByteBuffer; import org.apache.qpid.client.AMQQueue; import org.apache.qpid.client.AMQSession_0_8; import org.apache.qpid.client.AMQTopic; @@ -76,7 +78,7 @@ public abstract class AbstractJMSMessage _logger.debug("Non-fragmented message body (bodySize=" + contentHeader.getBodySize() + ")"); } - data = ((ContentBody) bodies.get(0)).getPayload().duplicate(); + data = ((ContentBody) bodies.get(0)).getPayload().getNativeBuffer().duplicate(); } else if (bodies != null) { @@ -91,7 +93,7 @@ public abstract class AbstractJMSMessage while (it.hasNext()) { ContentBody cb = (ContentBody) it.next(); - final ByteBuffer payload = cb.getPayload().duplicate(); + final ByteBuffer payload = cb.getPayload().getNativeBuffer().duplicate(); if (payload.isDirect() || payload.isReadOnly()) { data.put(payload); @@ -131,15 +133,25 @@ public abstract class AbstractJMSMessage protected AbstractJMSMessage create010MessageWithBody(long messageNbr, MessageProperties msgProps, DeliveryProperties deliveryProps, - java.nio.ByteBuffer body) throws QpidException + Collection<QpidByteBuffer> body) throws QpidException { ByteBuffer data; final boolean debug = _logger.isDebugEnabled(); - if (body != null) + if (body != null && body.size() != 0) { - data = body; + int size = 0; + for(QpidByteBuffer b : body) + { + size += b.remaining(); + } + data = ByteBuffer.allocate(size); + for(QpidByteBuffer b : body) + { + b.get(data); + } + data.flip(); } else // body == null { @@ -180,7 +192,7 @@ public abstract class AbstractJMSMessage } public AbstractJMSMessage createMessage(long messageNbr, boolean redelivered, MessageProperties msgProps, - DeliveryProperties deliveryProps, java.nio.ByteBuffer body) + DeliveryProperties deliveryProps, Collection<QpidByteBuffer> body) throws JMSException, QpidException { final AbstractJMSMessage msg = @@ -193,7 +205,7 @@ public abstract class AbstractJMSMessage private class BodyInputStream extends InputStream { private final Iterator<ContentBody> _bodiesIter; - private ByteBuffer _currentBuffer; + private QpidByteBuffer _currentBuffer; public BodyInputStream(final List<ContentBody> bodies) { _bodiesIter = bodies.iterator(); Modified: qpid/java/trunk/client/src/test/java/org/apache/qpid/client/transport/MockSender.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/test/java/org/apache/qpid/client/transport/MockSender.java?rev=1694594&r1=1694593&r2=1694594&view=diff ============================================================================== --- qpid/java/trunk/client/src/test/java/org/apache/qpid/client/transport/MockSender.java (original) +++ qpid/java/trunk/client/src/test/java/org/apache/qpid/client/transport/MockSender.java Fri Aug 7 00:28:17 2015 @@ -22,12 +22,19 @@ package org.apache.qpid.client.transport import java.nio.ByteBuffer; +import org.apache.qpid.bytebuffer.QpidByteBuffer; import org.apache.qpid.transport.ByteBufferSender; public class MockSender implements ByteBufferSender { - public void send(ByteBuffer msg) + private void send(ByteBuffer msg) + { + + } + + @Override + public void send(final QpidByteBuffer msg) { } Added: qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/ByteBufferRef.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/ByteBufferRef.java?rev=1694594&view=auto ============================================================================== --- qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/ByteBufferRef.java (added) +++ qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/ByteBufferRef.java Fri Aug 7 00:28:17 2015 @@ -0,0 +1,32 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.bytebuffer; + +import java.nio.ByteBuffer; + +public interface ByteBufferRef +{ + void incrementRef(); + + void decrementRef(); + + ByteBuffer getBuffer(); +} Propchange: qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/ByteBufferRef.java ------------------------------------------------------------------------------ svn:eol-style = native Added: qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/NonPooledByteBufferRef.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/NonPooledByteBufferRef.java?rev=1694594&view=auto ============================================================================== --- qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/NonPooledByteBufferRef.java (added) +++ qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/NonPooledByteBufferRef.java Fri Aug 7 00:28:17 2015 @@ -0,0 +1,51 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.bytebuffer; + +import java.nio.ByteBuffer; + +class NonPooledByteBufferRef implements ByteBufferRef +{ + private final ByteBuffer _buffer; + + NonPooledByteBufferRef(final ByteBuffer buffer) + { + _buffer = buffer; + } + + @Override + public void incrementRef() + { + + } + + @Override + public void decrementRef() + { + + } + + @Override + public ByteBuffer getBuffer() + { + return _buffer; + } +} Propchange: qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/NonPooledByteBufferRef.java ------------------------------------------------------------------------------ svn:eol-style = native Added: qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/PooledByteBufferRef.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/PooledByteBufferRef.java?rev=1694594&view=auto ============================================================================== --- qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/PooledByteBufferRef.java (added) +++ qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/PooledByteBufferRef.java Fri Aug 7 00:28:17 2015 @@ -0,0 +1,64 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.bytebuffer; + +import java.nio.ByteBuffer; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; + +class PooledByteBufferRef implements ByteBufferRef +{ + private static final AtomicIntegerFieldUpdater<PooledByteBufferRef> UPDATER = AtomicIntegerFieldUpdater.newUpdater(PooledByteBufferRef.class, "_refCount"); + + private final ByteBuffer _buffer; + private volatile int _refCount; + + PooledByteBufferRef(final ByteBuffer buffer) + { + _buffer = buffer; + } + + @Override + public void incrementRef() + { + UPDATER.incrementAndGet(this); + } + + @Override + public void decrementRef() + { + if(UPDATER.decrementAndGet(this) == 0) + { + returnToPool(this); + } + } + + @Override + public ByteBuffer getBuffer() + { + return _buffer.duplicate(); + } + + private void returnToPool(final PooledByteBufferRef byteBufferRef) + { + + } + +} Propchange: qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/PooledByteBufferRef.java ------------------------------------------------------------------------------ svn:eol-style = native Added: qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java?rev=1694594&view=auto ============================================================================== --- qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java (added) +++ qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java Fri Aug 7 00:28:17 2015 @@ -0,0 +1,769 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.bytebuffer; + +import java.io.DataOutput; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.nio.CharBuffer; +import java.nio.charset.Charset; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; + +import org.apache.qpid.codec.MarkableDataInput; +import org.apache.qpid.framing.AMQShortString; + +public final class QpidByteBuffer +{ + private static final AtomicIntegerFieldUpdater<QpidByteBuffer> DISPOSED_UPDATER = AtomicIntegerFieldUpdater.newUpdater(QpidByteBuffer.class, "_disposed"); + + private final ByteBuffer _buffer; + private final ByteBufferRef _ref; + private volatile int _disposed; + + QpidByteBuffer(ByteBufferRef ref) + { + this(ref.getBuffer(), ref); + } + + private QpidByteBuffer(ByteBuffer buf, ByteBufferRef ref) + { + _buffer = buf; + _ref = ref; + ref.incrementRef(); + } + + + public boolean hasRemaining() + { + return _buffer.hasRemaining(); + } + + public QpidByteBuffer putInt(final int index, final int value) + { + _buffer.putInt(index, value); + return this; + } + + public boolean isDirect() + { + return _buffer.isDirect(); + } + + public QpidByteBuffer putShort(final int index, final short value) + { + _buffer.putShort(index, value); + return this; + } + + public QpidByteBuffer putChar(final int index, final char value) + { + _buffer.putChar(index, value); + return this; + + } + + public QpidByteBuffer put(final byte b) + { + _buffer.put(b); + return this; + } + + public QpidByteBuffer put(final int index, final byte b) + { + _buffer.put(index, b); + return this; + } + + public short getShort(final int index) + { + return _buffer.getShort(index); + } + + + public QpidByteBuffer mark() + { + _buffer.mark(); + return this; + } + + public long getLong() + { + return _buffer.getLong(); + } + + public QpidByteBuffer putFloat(final int index, final float value) + { + _buffer.putFloat(index, value); + return this; + } + + public double getDouble(final int index) + { + return _buffer.getDouble(index); + } + + public boolean hasArray() + { + return _buffer.hasArray(); + } + + public QpidByteBuffer asReadOnlyBuffer() + { + return new QpidByteBuffer(_buffer.asReadOnlyBuffer(), _ref); + } + + public double getDouble() + { + return _buffer.getDouble(); + } + + public QpidByteBuffer putFloat(final float value) + { + _buffer.putFloat(value); + return this; + } + + public QpidByteBuffer putInt(final int value) + { + _buffer.putInt(value); + return this; + } + + public byte[] array() + { + return _buffer.array(); + } + + public QpidByteBuffer putShort(final short value) + { + _buffer.putShort(value); + return this; + } + + public int getInt(final int index) + { + return _buffer.getInt(index); + } + + public int remaining() + { + return _buffer.remaining(); + } + + public QpidByteBuffer put(final byte[] src) + { + _buffer.put(src); + return this; + } + + public QpidByteBuffer put(final ByteBuffer src) + { + _buffer.put(src); + return this; + } + + public QpidByteBuffer put(final QpidByteBuffer src) + { + _buffer.put(src._buffer); + return this; + } + + + + public QpidByteBuffer get(final byte[] dst, final int offset, final int length) + { + _buffer.get(dst, offset, length); + return this; + } + + public QpidByteBuffer get(final ByteBuffer src) + { + src.put(_buffer); + return this; + } + + + public QpidByteBuffer rewind() + { + _buffer.rewind(); + return this; + } + + public QpidByteBuffer clear() + { + _buffer.clear(); + return this; + } + + public QpidByteBuffer putLong(final int index, final long value) + { + _buffer.putLong(index, value); + return this; + } + public QpidByteBuffer compact() + { + _buffer.compact(); + return this; + } + + public QpidByteBuffer putDouble(final double value) + { + _buffer.putDouble(value); + return this; + } + + public int limit() + { + return _buffer.limit(); + } + + public QpidByteBuffer reset() + { + _buffer.reset(); + return this; + } + + public QpidByteBuffer flip() + { + _buffer.flip(); + return this; + } + + public short getShort() + { + return _buffer.getShort(); + } + + public float getFloat() + { + return _buffer.getFloat(); + } + + public QpidByteBuffer limit(final int newLimit) + { + _buffer.limit(newLimit); + return this; + } + + public QpidByteBuffer duplicate() + { + return new QpidByteBuffer(_buffer.duplicate(), _ref); + } + + public QpidByteBuffer put(final byte[] src, final int offset, final int length) + { + _buffer.put(src, offset, length); + return this; + } + + public long getLong(final int index) + { + return _buffer.getLong(index); + } + + public int capacity() + { + return _buffer.capacity(); + } + + public boolean isReadOnly() + { + return _buffer.isReadOnly(); + } + + public char getChar(final int index) + { + return _buffer.getChar(index); + } + + public byte get() + { + return _buffer.get(); + } + + public byte get(final int index) + { + return _buffer.get(index); + } + + public QpidByteBuffer get(final byte[] dst) + { + _buffer.get(dst); + return this; + } + + public QpidByteBuffer putChar(final char value) + { + _buffer.putChar(value); + return this; + } + + public QpidByteBuffer position(final int newPosition) + { + _buffer.position(newPosition); + return this; + } + + public int arrayOffset() + { + return _buffer.arrayOffset(); + } + + public char getChar() + { + return _buffer.getChar(); + } + + public int getInt() + { + return _buffer.getInt(); + } + + public QpidByteBuffer putLong(final long value) + { + _buffer.putLong(value); + return this; + } + + public float getFloat(final int index) + { + return _buffer.getFloat(index); + } + + public QpidByteBuffer slice() + { + return new QpidByteBuffer(_buffer.slice(), _ref); + } + + public QpidByteBuffer view(int offset, int length) + { + ByteBuffer buf = _buffer.slice(); + buf.position(offset); + buf = buf.slice(); + buf.limit(length); + return new QpidByteBuffer(buf, _ref); + } + + public int position() + { + return _buffer.position(); + } + + public QpidByteBuffer putDouble(final int index, final double value) + { + _buffer.putDouble(index, value); + return this; + } + + @Override + protected void finalize() throws Throwable + { + dispose(); + super.finalize(); + } + + public void dispose() + { + if(DISPOSED_UPDATER.compareAndSet(this,0,1)) + { + _ref.decrementRef(); + } + } + + public InputStream asInputStream() + { + return new BufferInputStream(); + } + + public MarkableDataInput asDataInput() + { + return new BufferDataInput(); + } + + + public DataOutput asDataOutput() + { + return new BufferDataOutput(); + } + + public static QpidByteBuffer allocate(int size) + { + return new QpidByteBuffer(new NonPooledByteBufferRef(ByteBuffer.allocate(size))); + } + + + public static QpidByteBuffer allocateDirect(int size) + { + return new QpidByteBuffer(new NonPooledByteBufferRef(ByteBuffer.allocate(size))); + } + + + public ByteBuffer getNativeBuffer() + { + return _buffer; + } + + public CharBuffer decode(Charset charset) + { + return charset.decode(_buffer); + } + + public static QpidByteBuffer wrap(final ByteBuffer wrap) + { + return new QpidByteBuffer(new NonPooledByteBufferRef(wrap)); + } + + public static QpidByteBuffer wrap(final byte[] data) + { + return wrap(ByteBuffer.wrap(data)); + } + + public static QpidByteBuffer wrap(final byte[] data, int offset, int length) + { + return wrap(ByteBuffer.wrap(data, offset, length)); + } + + private final class BufferInputStream extends InputStream + { + + @Override + public int read() throws IOException + { + if (_buffer.hasRemaining()) + { + return _buffer.get() & 0xFF; + } + return -1; + } + + + @Override + public int read(byte[] b, int off, int len) throws IOException + { + if (!_buffer.hasRemaining()) + { + return -1; + } + if(_buffer.remaining() < len) + { + len = _buffer.remaining(); + } + _buffer.get(b, off, len); + + return len; + } + + @Override + public void mark(int readlimit) + { + _buffer.mark(); + } + + @Override + public void reset() throws IOException + { + _buffer.reset(); + } + + @Override + public boolean markSupported() + { + return true; + } + + @Override + public long skip(long n) throws IOException + { + _buffer.position(_buffer.position()+(int)n); + return n; + } + + @Override + public int available() throws IOException + { + return _buffer.remaining(); + } + + @Override + public void close() + { + } + } + + private final class BufferDataInput implements MarkableDataInput + { + private int _mark; + private final int _offset; + + public BufferDataInput() + { + _offset = _buffer.position(); + } + + public void readFully(byte[] b) + { + _buffer.get(b); + } + + public void readFully(byte[] b, int off, int len) + { + _buffer.get(b, 0, len); + } + + public QpidByteBuffer readAsByteBuffer(int len) + { + final QpidByteBuffer view = view(0, len); + skipBytes(len); + return view; + } + + public int skipBytes(int n) + { + _buffer.position(_buffer.position()+n); + return _buffer.position()-_offset; + } + + public boolean readBoolean() + { + return _buffer.get() != 0; + } + + public byte readByte() + { + return _buffer.get(); + } + + public int readUnsignedByte() + { + return ((int) _buffer.get()) & 0xFF; + } + + public short readShort() + { + return _buffer.getShort(); + } + + public int readUnsignedShort() + { + return ((int) _buffer.getShort()) & 0xffff; + } + + public char readChar() + { + return (char) _buffer.getChar(); + } + + public int readInt() + { + return _buffer.getInt(); + } + + public long readLong() + { + return _buffer.getLong(); + } + + public float readFloat() + { + return _buffer.getFloat(); + } + + public double readDouble() + { + return _buffer.getDouble(); + } + + public AMQShortString readAMQShortString() + { + return AMQShortString.readAMQShortString(_buffer); + } + + public String readLine() + { + throw new UnsupportedOperationException(); + } + + public String readUTF() + { + throw new UnsupportedOperationException(); + } + + public int available() + { + return _buffer.remaining(); + } + + + public long skip(long i) + { + _buffer.position(_buffer.position()+(int)i); + return i; + } + + public int read(byte[] b) + { + readFully(b); + return b.length; + } + + public int position() + { + return _buffer.position()-_offset; + } + + public void position(int position) + { + _buffer.position(position + _offset); + } + + public int length() + { + return _buffer.limit(); + } + + + public void mark(int readAhead) + { + _mark = position(); + } + + public void reset() + { + _buffer.position(_mark); + } + } + + private final class BufferDataOutput implements DataOutput + { + public void write(int b) + { + _buffer.put((byte) b); + } + + public void write(byte[] b) + { + _buffer.put(b); + } + + + public void write(byte[] b, int off, int len) + { + _buffer.put(b, off, len); + + } + + public void writeBoolean(boolean v) + { + _buffer.put(v ? (byte) 1 : (byte) 0); + } + + public void writeByte(int v) + { + _buffer.put((byte) v); + } + + public void writeShort(int v) + { + _buffer.putShort((short) v); + } + + public void writeChar(int v) + { + _buffer.put((byte) (v >>> 8)); + _buffer.put((byte) v); + } + + public void writeInt(int v) + { + _buffer.putInt(v); + } + + public void writeLong(long v) + { + _buffer.putLong(v); + } + + public void writeFloat(float v) + { + writeInt(Float.floatToIntBits(v)); + } + + public void writeDouble(double v) + { + writeLong(Double.doubleToLongBits(v)); + } + + public void writeBytes(String s) + { + throw new UnsupportedOperationException("writeBytes(String s) not supported"); + } + + public void writeChars(String s) + { + int len = s.length(); + for (int i = 0 ; i < len ; i++) + { + int v = s.charAt(i); + _buffer.put((byte) (v >>> 8)); + _buffer.put((byte) v); + } + } + + public void writeUTF(String s) + { + int strlen = s.length(); + + int pos = _buffer.position(); + _buffer.position(pos + 2); + + + for (int i = 0; i < strlen; i++) + { + int c = s.charAt(i); + if ((c >= 0x0001) && (c <= 0x007F)) + { + c = s.charAt(i); + _buffer.put((byte) c); + + } + else if (c > 0x07FF) + { + _buffer.put((byte) (0xE0 | ((c >> 12) & 0x0F))); + _buffer.put((byte) (0x80 | ((c >> 6) & 0x3F))); + _buffer.put((byte) (0x80 | (c & 0x3F))); + } + else + { + _buffer.put((byte) (0xC0 | ((c >> 6) & 0x1F))); + _buffer.put((byte) (0x80 | (c & 0x3F))); + } + } + + int len = _buffer.position() - (pos + 2); + + _buffer.put(pos++, (byte) (len >>> 8)); + _buffer.put(pos, (byte) len); + } + + } + +} Propchange: qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java ------------------------------------------------------------------------------ svn:eol-style = native Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java?rev=1694594&r1=1694593&r2=1694594&view=diff ============================================================================== --- qpid/java/trunk/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java (original) +++ qpid/java/trunk/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java Fri Aug 7 00:28:17 2015 @@ -95,9 +95,6 @@ public abstract class AMQDecoder<T exten return _methodProcessor; } - - public abstract void decodeBuffer(ByteBuffer buf) throws AMQFrameDecodingException, AMQProtocolVersionException, IOException; - protected void decode(final MarkableDataInput msg) throws IOException, AMQFrameDecodingException { // If this is the first read then we may be getting a protocol initiation back if we tried to negotiate Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/codec/ClientDecoder.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/codec/ClientDecoder.java?rev=1694594&r1=1694593&r2=1694594&view=diff ============================================================================== --- qpid/java/trunk/common/src/main/java/org/apache/qpid/codec/ClientDecoder.java (original) +++ qpid/java/trunk/common/src/main/java/org/apache/qpid/codec/ClientDecoder.java Fri Aug 7 00:28:17 2015 @@ -42,7 +42,6 @@ public class ClientDecoder extends AMQDe super(false, methodProcessor); } - @Override public void decodeBuffer(ByteBuffer buf) throws AMQFrameDecodingException, AMQProtocolVersionException, IOException { Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/codec/MarkableDataInput.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/codec/MarkableDataInput.java?rev=1694594&r1=1694593&r2=1694594&view=diff ============================================================================== --- qpid/java/trunk/common/src/main/java/org/apache/qpid/codec/MarkableDataInput.java (original) +++ qpid/java/trunk/common/src/main/java/org/apache/qpid/codec/MarkableDataInput.java Fri Aug 7 00:28:17 2015 @@ -20,6 +20,7 @@ */ package org.apache.qpid.codec; +import org.apache.qpid.bytebuffer.QpidByteBuffer; import org.apache.qpid.framing.AMQShortString; import java.io.DataInput; @@ -28,8 +29,8 @@ import java.nio.ByteBuffer; public interface MarkableDataInput extends DataInput { - public void mark(int readAhead); - public void reset() throws IOException; + void mark(int readAhead); + void reset() throws IOException; int available() throws IOException; @@ -37,8 +38,8 @@ public interface MarkableDataInput exten int read(byte[] b) throws IOException; - ByteBuffer readAsByteBuffer(int len) throws IOException; + QpidByteBuffer readAsByteBuffer(int len) throws IOException; - public AMQShortString readAMQShortString() throws IOException; + AMQShortString readAMQShortString() throws IOException; } Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/codec/ServerDecoder.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/codec/ServerDecoder.java?rev=1694594&r1=1694593&r2=1694594&view=diff ============================================================================== --- qpid/java/trunk/common/src/main/java/org/apache/qpid/codec/ServerDecoder.java (original) +++ qpid/java/trunk/common/src/main/java/org/apache/qpid/codec/ServerDecoder.java Fri Aug 7 00:28:17 2015 @@ -23,6 +23,7 @@ package org.apache.qpid.codec; import java.io.IOException; import java.nio.ByteBuffer; +import org.apache.qpid.bytebuffer.QpidByteBuffer; import org.apache.qpid.framing.*; public class ServerDecoder extends AMQDecoder<ServerMethodProcessor<? extends ServerChannelMethodProcessor>> @@ -38,9 +39,9 @@ public class ServerDecoder extends AMQDe super(true, methodProcessor); } - public void decodeBuffer(ByteBuffer buf) throws AMQFrameDecodingException, AMQProtocolVersionException, IOException + public void decodeBuffer(QpidByteBuffer buf) throws AMQFrameDecodingException, AMQProtocolVersionException, IOException { - decode(new ByteBufferDataInput(buf)); + decode(buf.asDataInput()); } Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQFrame.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQFrame.java?rev=1694594&r1=1694593&r2=1694594&view=diff ============================================================================== --- qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQFrame.java (original) +++ qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQFrame.java Fri Aug 7 00:28:17 2015 @@ -24,6 +24,7 @@ import java.io.DataOutput; import java.io.IOException; import java.nio.ByteBuffer; +import org.apache.qpid.bytebuffer.QpidByteBuffer; import org.apache.qpid.transport.ByteBufferSender; import org.apache.qpid.util.BytesDataOutput; @@ -62,7 +63,7 @@ public class AMQFrame extends AMQDataBlo } private static final byte[] FRAME_END_BYTE_ARRAY = new byte[] { FRAME_END_BYTE }; - private static final ByteBuffer FRAME_END_BYTE_BUFFER = ByteBuffer.allocateDirect(1); + private static final QpidByteBuffer FRAME_END_BYTE_BUFFER = QpidByteBuffer.allocateDirect(1); static { FRAME_END_BYTE_BUFFER.put(FRAME_END_BYTE); @@ -72,7 +73,7 @@ public class AMQFrame extends AMQDataBlo @Override public long writePayload(final ByteBufferSender sender) throws IOException { - ByteBuffer frameHeader = ByteBuffer.allocate(7); + QpidByteBuffer frameHeader = QpidByteBuffer.allocate(7); frameHeader.put(_bodyFrame.getFrameType()); EncodingUtils.writeUnsignedShort(frameHeader, _channel); Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java?rev=1694594&r1=1694593&r2=1694594&view=diff ============================================================================== --- qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java (original) +++ qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java Fri Aug 7 00:28:17 2015 @@ -32,6 +32,8 @@ import org.slf4j.LoggerFactory; import org.apache.qpid.AMQChannelException; import org.apache.qpid.AMQConnectionException; import org.apache.qpid.QpidException; +import org.apache.qpid.bytebuffer.QpidByteBuffer; +import org.apache.qpid.codec.MarkableDataInput; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; import org.apache.qpid.transport.ByteBufferSender; @@ -118,8 +120,8 @@ public abstract class AMQMethodBodyImpl { final int size = getSize(); - ByteBuffer buf = ByteBuffer.allocate(size); - ByteBufferDataOutput dataOutput = new ByteBufferDataOutput(buf); + QpidByteBuffer buf = QpidByteBuffer.allocate(size); + DataOutput dataOutput = buf.asDataOutput(); writePayload(dataOutput); buf.flip(); sender.send(buf); Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java?rev=1694594&r1=1694593&r2=1694594&view=diff ============================================================================== --- qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java (original) +++ qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java Fri Aug 7 00:28:17 2015 @@ -28,6 +28,7 @@ import java.nio.ByteBuffer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.qpid.bytebuffer.QpidByteBuffer; import org.apache.qpid.codec.MarkableDataInput; import org.apache.qpid.transport.ByteBufferSender; import org.apache.qpid.util.ByteBufferDataOutput; @@ -86,7 +87,8 @@ public class BasicContentHeaderPropertie private static final int USER_ID_MASK = 1 << 4; private static final int APPLICATION_ID_MASK = 1 << 3; private static final int CLUSTER_ID_MASK = 1 << 2; - private ByteBuffer _encodedForm; + + private QpidByteBuffer _encodedForm; public BasicContentHeaderProperties(BasicContentHeaderProperties other) @@ -482,9 +484,8 @@ public class BasicContentHeaderPropertie else { int propertyListSize = getPropertyListSize(); - ByteBuffer buf = ByteBuffer.allocateDirect(propertyListSize); - ByteBufferDataOutput out = new ByteBufferDataOutput(buf); - writePropertyListPayload(out); + QpidByteBuffer buf = QpidByteBuffer.allocateDirect(propertyListSize); + writePropertyListPayload(buf.asDataOutput()); buf.flip(); sender.send(buf); return propertyListSize; @@ -503,9 +504,7 @@ public class BasicContentHeaderPropertie _encodedForm = buffer.readAsByteBuffer(size); - ByteBufferDataInput input = new ByteBufferDataInput(_encodedForm.slice()); - - decode(input); + decode(_encodedForm.slice().asDataInput()); } @@ -529,7 +528,7 @@ public class BasicContentHeaderPropertie { long length = EncodingUtils.readUnsignedInteger(buffer); - ByteBuffer buf = _encodedForm.slice(); + QpidByteBuffer buf = _encodedForm.slice(); buf.position(headersOffset+4); buf = buf.slice(); buf.limit((int)length); Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ByteArrayDataInput.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ByteArrayDataInput.java?rev=1694594&r1=1694593&r2=1694594&view=diff ============================================================================== --- qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ByteArrayDataInput.java (original) +++ qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ByteArrayDataInput.java Fri Aug 7 00:28:17 2015 @@ -22,9 +22,10 @@ package org.apache.qpid.framing; import java.nio.ByteBuffer; +import org.apache.qpid.bytebuffer.QpidByteBuffer; import org.apache.qpid.codec.MarkableDataInput; -public class ByteArrayDataInput implements ExtendedDataInput, MarkableDataInput +public class ByteArrayDataInput implements MarkableDataInput { private byte[] _data; private int _offset; @@ -167,11 +168,11 @@ public class ByteArrayDataInput implemen } @Override - public ByteBuffer readAsByteBuffer(final int len) + public QpidByteBuffer readAsByteBuffer(final int len) { byte[] data = new byte[len]; readFully(data); - return ByteBuffer.wrap(data); + return QpidByteBuffer.wrap(ByteBuffer.wrap(data)); } public int position() Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ByteBufferDataInput.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ByteBufferDataInput.java?rev=1694594&r1=1694593&r2=1694594&view=diff ============================================================================== --- qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ByteBufferDataInput.java (original) +++ qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ByteBufferDataInput.java Fri Aug 7 00:28:17 2015 @@ -22,9 +22,10 @@ package org.apache.qpid.framing; import java.nio.ByteBuffer; +import org.apache.qpid.bytebuffer.QpidByteBuffer; import org.apache.qpid.codec.MarkableDataInput; -public class ByteBufferDataInput implements ExtendedDataInput, MarkableDataInput +public class ByteBufferDataInput implements MarkableDataInput { private final ByteBuffer _underlying; private int _mark; @@ -46,12 +47,12 @@ public class ByteBufferDataInput impleme _underlying.get(b,0, len); } - public ByteBuffer readAsByteBuffer(int len) + public QpidByteBuffer readAsByteBuffer(int len) { ByteBuffer buf = _underlying.slice(); buf.limit(len); skipBytes(len); - return buf; + return QpidByteBuffer.wrap(buf); } public int skipBytes(int n) Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ByteBufferListDataInput.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ByteBufferListDataInput.java?rev=1694594&r1=1694593&r2=1694594&view=diff ============================================================================== --- qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ByteBufferListDataInput.java (original) +++ qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ByteBufferListDataInput.java Fri Aug 7 00:28:17 2015 @@ -23,9 +23,10 @@ package org.apache.qpid.framing; import java.nio.ByteBuffer; import java.util.List; +import org.apache.qpid.bytebuffer.QpidByteBuffer; import org.apache.qpid.codec.MarkableDataInput; -public class ByteBufferListDataInput implements ExtendedDataInput, MarkableDataInput +public class ByteBufferListDataInput implements MarkableDataInput { private final List<ByteBuffer> _underlying; private int _bufferIndex; @@ -45,7 +46,7 @@ public class ByteBufferListDataInput imp } else { - ByteBuffer buf = readAsByteBuffer(b.length); + ByteBuffer buf = readAsNativeByteBuffer(b.length); buf.get(b); } } @@ -59,13 +60,18 @@ public class ByteBufferListDataInput imp } else { - ByteBuffer buf = readAsByteBuffer(len); + ByteBuffer buf = readAsNativeByteBuffer(len); buf.get(b, off, len); } } @Override - public ByteBuffer readAsByteBuffer(int len) + public QpidByteBuffer readAsByteBuffer(int len) + { + return QpidByteBuffer.wrap(readAsNativeByteBuffer(len)); + } + + private ByteBuffer readAsNativeByteBuffer(int len) { ByteBuffer currentBuffer = getCurrentBuffer(); if(currentBuffer.remaining()>=len) @@ -167,7 +173,7 @@ public class ByteBufferListDataInput imp } else { - return readAsByteBuffer(size); + return readAsNativeByteBuffer(size); } } Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ChannelMethodProcessor.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ChannelMethodProcessor.java?rev=1694594&r1=1694593&r2=1694594&view=diff ============================================================================== --- qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ChannelMethodProcessor.java (original) +++ qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ChannelMethodProcessor.java Fri Aug 7 00:28:17 2015 @@ -20,7 +20,7 @@ */ package org.apache.qpid.framing; -import java.nio.ByteBuffer; +import org.apache.qpid.bytebuffer.QpidByteBuffer; public interface ChannelMethodProcessor { @@ -32,7 +32,7 @@ public interface ChannelMethodProcessor void receiveChannelCloseOk(); - void receiveMessageContent(ByteBuffer data); + void receiveMessageContent(QpidByteBuffer data); void receiveMessageHeader(BasicContentHeaderProperties properties, long bodySize); Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ContentBody.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ContentBody.java?rev=1694594&r1=1694593&r2=1694594&view=diff ============================================================================== --- qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ContentBody.java (original) +++ qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ContentBody.java Fri Aug 7 00:28:17 2015 @@ -25,6 +25,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import org.apache.qpid.QpidException; +import org.apache.qpid.bytebuffer.QpidByteBuffer; import org.apache.qpid.codec.MarkableDataInput; import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; import org.apache.qpid.transport.ByteBufferSender; @@ -33,7 +34,7 @@ public class ContentBody implements AMQB { public static final byte TYPE = 3; - private ByteBuffer _payload; + private QpidByteBuffer _payload; public ContentBody() { @@ -42,9 +43,15 @@ public class ContentBody implements AMQB public ContentBody(ByteBuffer payload) { + _payload = QpidByteBuffer.wrap(payload); + } + + public ContentBody(QpidByteBuffer payload) + { _payload = payload; } + public byte getFrameType() { return TYPE; @@ -82,7 +89,7 @@ public class ContentBody implements AMQB } } - public ByteBuffer getPayload() + public QpidByteBuffer getPayload() { return _payload; } @@ -92,7 +99,7 @@ public class ContentBody implements AMQB throws IOException { - ByteBuffer payload = in.readAsByteBuffer((int)bodySize); + QpidByteBuffer payload = in.readAsByteBuffer((int) bodySize); if(!methodProcessor.ignoreAllButCloseOk()) { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
