Author: rgodfrey
Date: Fri Aug  7 00:28:17 2015
New Revision: 1694594

URL: http://svn.apache.org/r1694594
Log:
QPID-6662 : Wrap use of ByteBuffers in QpidByteBuffer class to allow for use of 
pools of buffers rather than always allocating fresh.  Refactor 
NonBlockingConnection to allow connection object to only perform partial reads

Added:
    
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractNonBlockingConnectionDelegate.java
   (with props)
    
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerDecoder.java
      - copied, changed from r1693306, 
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/codec/BBDecoder.java
    
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerFrame.java
      - copied, changed from r1693306, 
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/Frame.java
    
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerInputHandler.java
      - copied, changed from r1693325, 
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/InputHandler.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/
    
qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/ByteBufferRef.java
   (with props)
    
qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/NonPooledByteBufferRef.java
   (with props)
    
qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/PooledByteBufferRef.java
   (with props)
    
qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java
   (with props)
Removed:
    
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ExtendedDataInput.java
Modified:
    
qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
    
qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/FieldTableEncoding.java
    
qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/ByteBufferBinding.java
    
qpid/java/trunk/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
    
qpid/java/trunk/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderTest.java
    
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java
    
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/EnqueueableMessage.java
    
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageContentSource.java
    
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java
    
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java
    
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/plugin/MessageMetaDataType.java
    
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
    
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/MessageHandle.java
    
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java
    
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/StoredMessage.java
    
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java
    
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
    
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionDelegate.java
    
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionPlainDelegate.java
    
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionTLSDelegate.java
    
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionUndecidedDelegate.java
    
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ProtocolEngine.java
    
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java
    
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java
    
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/transport/NetworkConnectionSchedulerTest.java
    
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java
    
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java
    
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
    
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java
    
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java
    
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java
    
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerAssembler.java
    
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
    
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerDisassembler.java
    
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
    
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
    
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java
    
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java
    
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java
    
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java
    
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MockStoredMessage.java
    
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/AbstractDescribedTypeWriter.java
    
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/ArrayTypeConstructor.java
    
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/ArrayWriter.java
    
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/BinaryTypeConstructor.java
    
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/BooleanConstructor.java
    
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/BooleanWriter.java
    
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/ByteTypeConstructor.java
    
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/ByteWriter.java
    
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/CharTypeConstructor.java
    
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/CompoundTypeConstructor.java
    
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/CompoundWriter.java
    
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/DecimalConstructor.java
    
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/DelegatingValueWriter.java
    
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/DescribedTypeConstructor.java
    
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/DoubleTypeConstructor.java
    
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/FixedEightWriter.java
    
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/FixedFourWriter.java
    
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/FixedOneWriter.java
    
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/FixedSixteenWriter.java
    
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/FixedTwoWriter.java
    
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/FloatTypeConstructor.java
    
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/FrameWriter.java
    
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/IntTypeConstructor.java
    
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/IntegerWriter.java
    
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/ListWriter.java
    
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/LongTypeConstructor.java
    
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/LongWriter.java
    
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/NullTypeConstructor.java
    
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/NullWriter.java
    
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/ProtocolHandler.java
    
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/ProtocolHeaderHandler.java
    
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/ShortTypeConstructor.java
    
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/SimpleVariableWidthWriter.java
    
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/SmallIntConstructor.java
    
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/SmallLongConstructor.java
    
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/SmallUIntConstructor.java
    
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/SmallULongConstructor.java
    
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/StringTypeConstructor.java
    
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/SymbolArrayWriter.java
    
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/SymbolTypeConstructor.java
    
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/SymbolWriter.java
    
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/TimestampTypeConstructor.java
    
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/TypeConstructor.java
    
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/UByteTypeConstructor.java
    
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/UIntTypeConstructor.java
    
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/ULongTypeConstructor.java
    
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/UShortTypeConstructor.java
    
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/UUIDTypeConstructor.java
    
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/UnsignedByteWriter.java
    
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/UnsignedIntegerWriter.java
    
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/UnsignedLongWriter.java
    
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/ValueHandler.java
    
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/ValueWriter.java
    
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/VariableWidthTypeConstructor.java
    
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/VariableWidthWriter.java
    
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/ZeroListConstructor.java
    
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/ZeroUIntConstructor.java
    
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/ZeroULongConstructor.java
    
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/framing/AMQFrame.java
    
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/framing/AMQPProtocolHeaderHandler.java
    
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/framing/FrameHandler.java
    
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/framing/SASLFrameHandler.java
    
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/framing/SASLProtocolHeaderHandler.java
    
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/framing/TransportFrame.java
    
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/messaging/SectionDecoder.java
    
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/messaging/SectionDecoderImpl.java
    
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/messaging/SectionEncoderImpl.java
    
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java
    
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java
    
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/type/transport/Transfer.java
    
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
    
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
    
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_from_1_0.java
    
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java
    
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java
    
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java
    
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java
    
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorLink_1_0.java
    
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0Test.java
    
qpid/java/trunk/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java
    
qpid/java/trunk/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java
    
qpid/java/trunk/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java
    
qpid/java/trunk/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java
    
qpid/java/trunk/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java
    
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQProtocolHandler.java
    
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
    
qpid/java/trunk/client/src/test/java/org/apache/qpid/client/transport/MockSender.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java
    
qpid/java/trunk/common/src/main/java/org/apache/qpid/codec/ClientDecoder.java
    
qpid/java/trunk/common/src/main/java/org/apache/qpid/codec/MarkableDataInput.java
    
qpid/java/trunk/common/src/main/java/org/apache/qpid/codec/ServerDecoder.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQFrame.java
    
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java
    
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java
    
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ByteArrayDataInput.java
    
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ByteBufferDataInput.java
    
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ByteBufferListDataInput.java
    
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ChannelMethodProcessor.java
    
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ContentBody.java
    
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java
    
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/FieldTable.java
    
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/FrameCreatingMethodProcessor.java
    
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java
    
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/ByteBufferSender.java
    
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/MessageTransfer.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/Method.java
    
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/ProtocolHeader.java
    
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/codec/AbstractDecoder.java
    
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/Assembler.java
    
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java
    
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java
    
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLSender.java
    
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLSender.java
    
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/util/Functions.java
    
qpid/java/trunk/common/src/main/java/org/apache/qpid/util/ByteBufferUtils.java
    
qpid/java/trunk/common/src/test/java/org/apache/qpid/codec/AMQDecoderTest.java
    
qpid/java/trunk/common/src/test/java/org/apache/qpid/framing/FieldTableTest.java
    
qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/security/auth/manager/MultipleAuthenticationManagersTest.java
    
qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/unit/basic/FieldTableMessageTest.java
    
qpid/java/trunk/systests/src/test/java/org/apache/qpid/transport/MaxFrameSizeTest.java
    
qpid/java/trunk/systests/src/test/java/org/apache/qpid/transport/ProtocolNegotiationTest.java

Modified: 
qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
--- 
qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
 (original)
+++ 
qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
 Fri Aug  7 00:28:17 2015
@@ -48,6 +48,7 @@ import com.sleepycat.je.SequenceConfig;
 import com.sleepycat.je.Transaction;
 import org.slf4j.Logger;
 
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.server.message.EnqueueableMessage;
 import org.apache.qpid.server.model.ConfiguredObject;
 import org.apache.qpid.server.store.Event;
@@ -370,8 +371,8 @@ public abstract class AbstractBDBMessage
             OperationStatus status = getMessageContentDb().get(null, 
contentKeyEntry, value, LockMode.READ_UNCOMMITTED);
             if (status == OperationStatus.SUCCESS)
             {
-                ByteBuffer dataAsBytes = 
contentTupleBinding.entryToObject(value);
-                int size = dataAsBytes.remaining();
+                QpidByteBuffer buffer = 
contentTupleBinding.entryToObject(value);
+                int size = buffer.remaining();
                 if (offset > size)
                 {
                     throw new RuntimeException("Offset " + offset + " is 
greater than message size " + size
@@ -384,11 +385,8 @@ public abstract class AbstractBDBMessage
                 {
                     written = dst.remaining();
                 }
-                dataAsBytes.position(dataAsBytes.position()+offset);
-
-                dataAsBytes = dataAsBytes.slice();
-                dataAsBytes.limit(written);
-                dst.put(dataAsBytes);
+                buffer = buffer.view(offset, written);
+                buffer.get(dst);
             }
             return written;
         }
@@ -401,7 +399,7 @@ public abstract class AbstractBDBMessage
         }
     }
 
-    ByteBuffer getAllContent(long messageId) throws StoreException
+    QpidByteBuffer getAllContent(long messageId) throws StoreException
     {
         DatabaseEntry contentKeyEntry = new DatabaseEntry();
         LongBinding.longToEntry(messageId, contentKeyEntry);
@@ -513,7 +511,7 @@ public abstract class AbstractBDBMessage
      * @throws org.apache.qpid.server.store.StoreException If the operation 
fails for any reason, or if the specified message does not exist.
      */
     private void addContent(final Transaction tx, long messageId, int offset,
-                            Collection<ByteBuffer> contentBody) throws 
StoreException
+                            Collection<QpidByteBuffer> contentBody) throws 
StoreException
     {
         DatabaseEntry key = new DatabaseEntry();
         LongBinding.longToEntry(messageId, key);
@@ -521,15 +519,15 @@ public abstract class AbstractBDBMessage
 
         int size = 0;
 
-        for(ByteBuffer buf : contentBody)
+        for(QpidByteBuffer buf : contentBody)
         {
             size += buf.remaining();
         }
         byte[] data = new byte[size];
         ByteBuffer dst = ByteBuffer.wrap(data);
-        for(ByteBuffer buf : contentBody)
+        for(QpidByteBuffer buf : contentBody)
         {
-            dst.put(buf.duplicate());
+            buf.duplicate().get(dst);
         }
         value.setData(data);
         try
@@ -903,15 +901,15 @@ public abstract class AbstractBDBMessage
     static interface MessageDataRef<T extends StorableMessageMetaData>
     {
         T getMetaData();
-        Collection<ByteBuffer> getData();
-        void setData(Collection<ByteBuffer> data);
+        Collection<QpidByteBuffer> getData();
+        void setData(Collection<QpidByteBuffer> data);
         boolean isHardRef();
     }
 
     private static final class MessageDataHardRef<T extends 
StorableMessageMetaData> implements MessageDataRef<T>
     {
         private final T _metaData;
-        private volatile Collection<ByteBuffer> _data;
+        private volatile Collection<QpidByteBuffer> _data;
 
         private MessageDataHardRef(final T metaData)
         {
@@ -925,13 +923,13 @@ public abstract class AbstractBDBMessage
         }
 
         @Override
-        public Collection<ByteBuffer> getData()
+        public Collection<QpidByteBuffer> getData()
         {
             return _data;
         }
 
         @Override
-        public void setData(final Collection<ByteBuffer> data)
+        public void setData(final Collection<QpidByteBuffer> data)
         {
             _data = data;
         }
@@ -946,9 +944,9 @@ public abstract class AbstractBDBMessage
     private static final class MessageData<T extends StorableMessageMetaData>
     {
         private T _metaData;
-        private SoftReference<Collection<ByteBuffer>> _data;
+        private SoftReference<Collection<QpidByteBuffer>> _data;
 
-        private MessageData(final T metaData, final Collection<ByteBuffer> 
data)
+        private MessageData(final T metaData, final Collection<QpidByteBuffer> 
data)
         {
             _metaData = metaData;
 
@@ -963,12 +961,12 @@ public abstract class AbstractBDBMessage
             return _metaData;
         }
 
-        public Collection<ByteBuffer> getData()
+        public Collection<QpidByteBuffer> getData()
         {
             return _data == null ? null : _data.get();
         }
 
-        public void setData(final Collection<ByteBuffer> data)
+        public void setData(final Collection<QpidByteBuffer> data)
         {
             _data = new SoftReference<>(data);
         }
@@ -978,7 +976,7 @@ public abstract class AbstractBDBMessage
     private static final class MessageDataSoftRef<T extends 
StorableMessageMetaData> extends SoftReference<MessageData<T>> implements 
MessageDataRef<T>
     {
 
-        public MessageDataSoftRef(final T metadata, Collection<ByteBuffer> 
data)
+        public MessageDataSoftRef(final T metadata, Collection<QpidByteBuffer> 
data)
         {
             super(new MessageData<T>(metadata, data));
         }
@@ -991,7 +989,7 @@ public abstract class AbstractBDBMessage
         }
 
         @Override
-        public Collection<ByteBuffer> getData()
+        public Collection<QpidByteBuffer> getData()
         {
             MessageData<T> ref = get();
 
@@ -999,7 +997,7 @@ public abstract class AbstractBDBMessage
         }
 
         @Override
-        public void setData(final Collection<ByteBuffer> data)
+        public void setData(final Collection<QpidByteBuffer> data)
         {
             MessageData<T> ref = get();
             if(ref != null)
@@ -1062,17 +1060,17 @@ public abstract class AbstractBDBMessage
         }
 
         @Override
-        public void addContent(ByteBuffer src)
+        public void addContent(QpidByteBuffer src)
         {
             src = src.slice();
-            Collection<ByteBuffer> data = _messageDataRef.getData();
+            Collection<QpidByteBuffer> data = _messageDataRef.getData();
             if(data == null)
             {
                 _messageDataRef.setData(Collections.singleton(src));
             }
             else
             {
-                List<ByteBuffer> newCollection = new 
ArrayList<>(data.size()+1);
+                List<QpidByteBuffer> newCollection = new 
ArrayList<>(data.size()+1);
                 newCollection.addAll(data);
                 newCollection.add(src);
                 
_messageDataRef.setData(Collections.unmodifiableCollection(newCollection));
@@ -1089,17 +1087,19 @@ public abstract class AbstractBDBMessage
         @Override
         public int getContent(int offsetInMessage, ByteBuffer dst)
         {
-            ByteBuffer data = 
ByteBufferUtils.combine(getContentAsByteBuffer());
-            data = data.slice();
-            int length = Math.min(dst.remaining(), data.remaining());
-            data.limit(length);
-            dst.put(data);
+            Collection<QpidByteBuffer> allContent = getContentAsByteBuffer();
+            int length = 0;
+            for(QpidByteBuffer contentChunk : allContent)
+            {
+                length += contentChunk.remaining();
+                contentChunk.duplicate().get(dst);
+            }
             return length;
         }
 
-        private Collection<ByteBuffer> getContentAsByteBuffer()
+        private Collection<QpidByteBuffer> getContentAsByteBuffer()
         {
-            Collection<ByteBuffer> data = _messageDataRef.getData();
+            Collection<QpidByteBuffer> data = _messageDataRef.getData();
             if(data == null)
             {
                 if(stored())
@@ -1125,14 +1125,14 @@ public abstract class AbstractBDBMessage
         }
 
         @Override
-        public Collection<ByteBuffer> getContent(int offsetInMessage, int size)
+        public Collection<QpidByteBuffer> getContent(int offsetInMessage, int 
size)
         {
             int pos = 0;
             int added = 0;
 
-            Collection<ByteBuffer> bufs = getContentAsByteBuffer();
-            List<ByteBuffer> content = new ArrayList<>(bufs.size());
-            for(ByteBuffer buf : bufs)
+            Collection<QpidByteBuffer> bufs = getContentAsByteBuffer();
+            List<QpidByteBuffer> content = new ArrayList<>(bufs.size());
+            for(QpidByteBuffer buf : bufs)
             {
                 if(pos < offsetInMessage)
                 {
@@ -1160,7 +1160,7 @@ public abstract class AbstractBDBMessage
                     {
                         buf.limit(size-added);
                     }
-                    content.add(buf.slice());
+                    content.add(buf);
                     added += buf.remaining();
                 }
                 if(added >= size)
@@ -1180,7 +1180,7 @@ public abstract class AbstractBDBMessage
                 AbstractBDBMessageStore.this.storeMetaData(txn, _messageId, 
_messageDataRef.getMetaData());
                 AbstractBDBMessageStore.this.addContent(txn, _messageId, 0,
                                                         
_messageDataRef.getData() == null
-                                                                ? 
Collections.<ByteBuffer>emptySet()
+                                                                ? 
Collections.<QpidByteBuffer>emptySet()
                                                                 : 
_messageDataRef.getData());
 
 

Modified: 
qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/FieldTableEncoding.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/FieldTableEncoding.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
--- 
qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/FieldTableEncoding.java
 (original)
+++ 
qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/FieldTableEncoding.java
 Fri Aug  7 00:28:17 2015
@@ -24,6 +24,7 @@ import com.sleepycat.bind.tuple.TupleInp
 import com.sleepycat.bind.tuple.TupleOutput;
 import com.sleepycat.je.DatabaseException;
 
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.server.store.berkeleydb.tuple.ByteBufferBinding;
 
@@ -48,7 +49,7 @@ public class FieldTableEncoding
 
             ByteBuffer buf = 
ByteBufferBinding.getInstance().readByteBuffer(tupleInput, (int) length);
 
-            return new FieldTable(buf);
+            return new FieldTable(QpidByteBuffer.wrap(buf));
 
         }
 

Modified: 
qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/ByteBufferBinding.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/ByteBufferBinding.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
--- 
qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/ByteBufferBinding.java
 (original)
+++ 
qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/ByteBufferBinding.java
 Fri Aug  7 00:28:17 2015
@@ -26,7 +26,9 @@ import com.sleepycat.bind.tuple.TupleBin
 import com.sleepycat.bind.tuple.TupleInput;
 import com.sleepycat.bind.tuple.TupleOutput;
 
-public class ByteBufferBinding extends TupleBinding<ByteBuffer>
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
+
+public class ByteBufferBinding extends TupleBinding<QpidByteBuffer>
 {
     private static final int COPY_BUFFER_SIZE = 8192;
 
@@ -51,10 +53,10 @@ public class ByteBufferBinding extends T
     private ByteBufferBinding() { }
 
     @Override
-    public ByteBuffer entryToObject(final TupleInput input)
+    public QpidByteBuffer entryToObject(final TupleInput input)
     {
         int available = input.available();
-        ByteBuffer buf = ByteBuffer.allocateDirect(available);
+        QpidByteBuffer buf = QpidByteBuffer.allocateDirect(available);
         byte[] copyBuf = COPY_BUFFER.get();
         while(available > 0)
         {
@@ -67,7 +69,7 @@ public class ByteBufferBinding extends T
     }
 
     @Override
-    public void objectToEntry(ByteBuffer data, final TupleOutput output)
+    public void objectToEntry(QpidByteBuffer data, final TupleOutput output)
     {
         data = data.duplicate();
         byte[] copyBuf = COPY_BUFFER.get();

Modified: 
qpid/java/trunk/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
--- 
qpid/java/trunk/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
 (original)
+++ 
qpid/java/trunk/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
 Fri Aug  7 00:28:17 2015
@@ -27,6 +27,7 @@ import java.io.File;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
 
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.BasicContentHeaderProperties;
 import org.apache.qpid.framing.ContentHeaderBody;
@@ -90,10 +91,10 @@ public class BDBMessageStoreTest extends
         // Use a single chunk for the 0-10 message as per broker behaviour.
         String bodyText = 
"jfhdjsflsdhfjdshfjdslhfjdslhfsjlhfsjkhfdsjkhfdsjkfhdslkjf";
 
-        ByteBuffer firstContentBytes_0_8 = 
ByteBuffer.wrap(bodyText.substring(0, 10).getBytes());
-        ByteBuffer secondContentBytes_0_8 = 
ByteBuffer.wrap(bodyText.substring(10).getBytes());
+        QpidByteBuffer firstContentBytes_0_8 = 
QpidByteBuffer.wrap(bodyText.substring(0, 10).getBytes());
+        QpidByteBuffer secondContentBytes_0_8 = 
QpidByteBuffer.wrap(bodyText.substring(10).getBytes());
 
-        ByteBuffer completeContentBody_0_10 = 
ByteBuffer.wrap(bodyText.getBytes());
+        QpidByteBuffer completeContentBody_0_10 = 
QpidByteBuffer.wrap(bodyText.getBytes());
         int bodySize = completeContentBody_0_10.limit();
 
         /*
@@ -123,7 +124,7 @@ public class BDBMessageStoreTest extends
         Header header_0_10 = new Header(delProps_0_10, msgProps_0_10);
 
         MessageTransfer xfr_0_10 = new MessageTransfer("destination", 
MessageAcceptMode.EXPLICIT,
-                MessageAcquireMode.PRE_ACQUIRED, header_0_10, 
completeContentBody_0_10);
+                MessageAcquireMode.PRE_ACQUIRED, header_0_10, 
completeContentBody_0_10.getNativeBuffer());
 
         MessageMetaData_0_10 messageMetaData_0_10 = new 
MessageMetaData_0_10(xfr_0_10);
         MessageHandle<MessageMetaData_0_10> messageHandle_0_10 = 
bdbStore.addMessage(messageMetaData_0_10);
@@ -344,7 +345,7 @@ public class BDBMessageStoreTest extends
 
     private StoredMessage<MessageMetaData> 
createAndStoreSingleChunkMessage_0_8(MessageStore store)
     {
-        ByteBuffer chunk1 = ByteBuffer.wrap(CONTENT_BYTES);
+        QpidByteBuffer chunk1 = QpidByteBuffer.wrap(CONTENT_BYTES);
 
         int bodySize = CONTENT_BYTES.length;
 

Modified: 
qpid/java/trunk/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderTest.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderTest.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
--- 
qpid/java/trunk/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderTest.java
 (original)
+++ 
qpid/java/trunk/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderTest.java
 Fri Aug  7 00:28:17 2015
@@ -25,6 +25,7 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.server.store.berkeleydb.BDBConfigurationStore;
 import org.apache.qpid.server.store.berkeleydb.tuple.ByteBufferBinding;
 
@@ -138,7 +139,7 @@ public class UpgraderTest extends Abstra
             {
                 long id = LongBinding.entryToLong(key);
                 assertTrue("Unexpected id", id > 0);
-                ByteBuffer content = contentBinding.entryToObject(value);
+                QpidByteBuffer content = contentBinding.entryToObject(value);
                 assertNotNull("Unexpected content", content);
                 assertTrue("Expected content", content.hasRemaining());
             }

Modified: 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java
 (original)
+++ 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java
 Fri Aug  7 00:28:17 2015
@@ -29,6 +29,7 @@ import java.util.UUID;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.server.store.StorableMessageMetaData;
 import org.apache.qpid.server.store.StoredMessage;
 import org.apache.qpid.server.store.TransactionLogResource;
@@ -166,7 +167,7 @@ public abstract class AbstractServerMess
     }
 
     @Override
-    final public Collection<ByteBuffer> getContent(int offset, int size)
+    final public Collection<QpidByteBuffer> getContent(int offset, int size)
     {
         return getStoredMessage().getContent(offset, size);
     }

Modified: 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/EnqueueableMessage.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/EnqueueableMessage.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/EnqueueableMessage.java
 (original)
+++ 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/EnqueueableMessage.java
 Fri Aug  7 00:28:17 2015
@@ -20,11 +20,12 @@
 */
 package org.apache.qpid.server.message;
 
+import org.apache.qpid.server.store.StorableMessageMetaData;
 import org.apache.qpid.server.store.StoredMessage;
 
-public interface EnqueueableMessage
+public interface EnqueueableMessage<T extends StorableMessageMetaData>
 {
     long getMessageNumber();
     boolean isPersistent();
-    StoredMessage getStoredMessage();
+    StoredMessage<T> getStoredMessage();
 }

Modified: 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageContentSource.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageContentSource.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageContentSource.java
 (original)
+++ 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageContentSource.java
 Fri Aug  7 00:28:17 2015
@@ -24,10 +24,12 @@ package org.apache.qpid.server.message;
 import java.nio.ByteBuffer;
 import java.util.Collection;
 
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
+
 public interface MessageContentSource
 {
     int getContent(ByteBuffer buf, int offset);
-    Collection<ByteBuffer> getContent(int offset, int size);
+    Collection<QpidByteBuffer> getContent(int offset, int size);
 
     long getSize();
 }

Modified: 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java
 (original)
+++ 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java
 Fri Aug  7 00:28:17 2015
@@ -33,7 +33,7 @@ public interface ServerMessage<T extends
 
     AMQMessageHeader getMessageHeader();
 
-    public StoredMessage<T> getStoredMessage();
+    StoredMessage<T> getStoredMessage();
 
     boolean isPersistent();
 
@@ -49,13 +49,7 @@ public interface ServerMessage<T extends
 
     boolean isReferenced();
 
-    long getMessageNumber();
-
     long getArrivalTime();
 
-    public int getContent(ByteBuffer buf, int offset);
-
-    Collection<ByteBuffer> getContent(int offset, int size);
-
     Object getConnectionReference();
 }

Modified: 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java
 (original)
+++ 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java
 Fri Aug  7 00:28:17 2015
@@ -33,6 +33,7 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.server.message.AMQMessageHeader;
 import org.apache.qpid.server.message.AbstractServerMessageImpl;
 import org.apache.qpid.server.store.MessageHandle;
@@ -64,7 +65,7 @@ public class InternalMessage extends Abs
     {
         super(msg, null);
         _contentSize = msg.getMetaData().getContentSize();
-        Collection<ByteBuffer> bufs = msg.getContent(0, _contentSize);
+        Collection<QpidByteBuffer> bufs = msg.getContent(0, _contentSize);
 
         try(ObjectInputStream is = new ObjectInputStream(new 
ByteBufferInputStream(ByteBufferUtils.combine(bufs))))
         {
@@ -140,7 +141,7 @@ public class InternalMessage extends Abs
 
             final InternalMessageMetaData metaData = 
InternalMessageMetaData.create(persistent, internalHeader, bytes.length);
             MessageHandle<InternalMessageMetaData> handle = 
store.addMessage(metaData);
-            handle.addContent(ByteBuffer.wrap(bytes));
+            handle.addContent(QpidByteBuffer.wrap(bytes));
             StoredMessage<InternalMessageMetaData> storedMessage = 
handle.allContentAdded();
             return new InternalMessage(storedMessage, internalHeader, 
bodyObject);
         }
@@ -238,9 +239,9 @@ public class InternalMessage extends Abs
                     }
 
                     @Override
-                    public Collection<ByteBuffer> getContent(final int 
offsetInMessage, final int size)
+                    public Collection<QpidByteBuffer> getContent(final int 
offsetInMessage, final int size)
                     {
-                        return Collections.singleton(ByteBuffer.wrap(bytes, 
offsetInMessage, size));
+                        return 
Collections.singleton(QpidByteBuffer.wrap(bytes, offsetInMessage, size));
                     }
 
                     @Override

Modified: 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/plugin/MessageMetaDataType.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/plugin/MessageMetaDataType.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/plugin/MessageMetaDataType.java
 (original)
+++ 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/plugin/MessageMetaDataType.java
 Fri Aug  7 00:28:17 2015
@@ -28,15 +28,15 @@ import org.apache.qpid.server.store.Stor
 public interface MessageMetaDataType<M extends StorableMessageMetaData> 
extends Pluggable
 {
 
-    public static interface Factory<M extends StorableMessageMetaData>
+    interface Factory<M extends StorableMessageMetaData>
     {
         M createMetaData(ByteBuffer buf);
     }
 
-    public int ordinal();
+    int ordinal();
 
-    public M createMetaData(ByteBuffer buf);
+    M createMetaData(ByteBuffer buf);
 
-    public ServerMessage<M> createMessage(StoredMessage<M> msg);
+    ServerMessage<M> createMessage(StoredMessage<M> msg);
 
 }

Modified: 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
 (original)
+++ 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
 Fri Aug  7 00:28:17 2015
@@ -49,6 +49,7 @@ import java.util.concurrent.atomic.Atomi
 
 import org.slf4j.Logger;
 
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.server.message.EnqueueableMessage;
 import org.apache.qpid.server.model.ConfiguredObject;
 import org.apache.qpid.server.plugin.MessageMetaDataType;
@@ -76,7 +77,7 @@ public abstract class AbstractJDBCMessag
                                                                                
                   XID_TABLE_NAME, XID_ACTIONS_TABLE_NAME));
 
     private static final int DB_VERSION = 8;
-    private static final ByteBuffer EMPTY_BYTE_BUFFER = 
ByteBuffer.allocateDirect(0);
+    private static final QpidByteBuffer EMPTY_BYTE_BUFFER = 
QpidByteBuffer.allocateDirect(0);
 
     private final AtomicLong _messageId = new AtomicLong(0);
 
@@ -1098,7 +1099,7 @@ public abstract class AbstractJDBCMessag
 
     protected abstract byte[] getBlobAsBytes(ResultSet rs, int col) throws 
SQLException;
 
-    private void addContent(Connection conn, long messageId, ByteBuffer src)
+    private void addContent(Connection conn, long messageId, QpidByteBuffer 
src)
     {
         getLogger().debug("Adding content for message {}", messageId);
 
@@ -1106,16 +1107,10 @@ public abstract class AbstractJDBCMessag
 
         try
         {
-            src = src.slice();
-
-            byte[] chunkData = new byte[src.limit()];
-            src.duplicate().get(chunkData);
 
             stmt = conn.prepareStatement(INSERT_INTO_MESSAGE_CONTENT);
             stmt.setLong(1,messageId);
-
-            ByteArrayInputStream bis = new ByteArrayInputStream(chunkData);
-            stmt.setBinaryStream(2, bis, chunkData.length);
+            stmt.setBinaryStream(2, src.duplicate().asInputStream(), 
src.remaining());
             stmt.executeUpdate();
         }
         catch (SQLException e)
@@ -1184,7 +1179,7 @@ public abstract class AbstractJDBCMessag
     }
 
 
-    private ByteBuffer getAllContent(long messageId)
+    private QpidByteBuffer getAllContent(long messageId)
     {
         Connection conn = null;
         PreparedStatement stmt = null;
@@ -1203,7 +1198,7 @@ public abstract class AbstractJDBCMessag
             {
 
                 byte[] dataAsBytes = getBlobAsBytes(rs, 1);
-                ByteBuffer buf = ByteBuffer.allocateDirect(dataAsBytes.length);
+                QpidByteBuffer buf = 
QpidByteBuffer.allocateDirect(dataAsBytes.length);
                 buf.put(dataAsBytes);
                 buf.flip();
                 return buf;
@@ -1432,15 +1427,15 @@ public abstract class AbstractJDBCMessag
     static interface MessageDataRef<T extends StorableMessageMetaData>
     {
         T getMetaData();
-        ByteBuffer getData();
-        void setData(ByteBuffer data);
+        QpidByteBuffer getData();
+        void setData(QpidByteBuffer data);
         boolean isHardRef();
     }
 
     private static final class MessageDataHardRef<T extends 
StorableMessageMetaData> implements MessageDataRef<T>
     {
         private final T _metaData;
-        private ByteBuffer _data;
+        private QpidByteBuffer _data;
 
         private MessageDataHardRef(final T metaData)
         {
@@ -1454,13 +1449,13 @@ public abstract class AbstractJDBCMessag
         }
 
         @Override
-        public ByteBuffer getData()
+        public QpidByteBuffer getData()
         {
             return _data;
         }
 
         @Override
-        public void setData(final ByteBuffer data)
+        public void setData(final QpidByteBuffer data)
         {
             _data = data;
         }
@@ -1475,9 +1470,9 @@ public abstract class AbstractJDBCMessag
     private static final class MessageData<T extends StorableMessageMetaData>
     {
         private T _metaData;
-        private SoftReference<ByteBuffer> _data;
+        private SoftReference<QpidByteBuffer> _data;
 
-        private MessageData(final T metaData, final ByteBuffer data)
+        private MessageData(final T metaData, final QpidByteBuffer data)
         {
             _metaData = metaData;
 
@@ -1492,12 +1487,12 @@ public abstract class AbstractJDBCMessag
             return _metaData;
         }
 
-        public ByteBuffer getData()
+        public QpidByteBuffer getData()
         {
             return _data == null ? null : _data.get();
         }
 
-        public void setData(final ByteBuffer data)
+        public void setData(final QpidByteBuffer data)
         {
             _data = new SoftReference<>(data);
         }
@@ -1507,7 +1502,7 @@ public abstract class AbstractJDBCMessag
     private static final class MessageDataSoftRef<T extends 
StorableMessageMetaData> extends SoftReference<MessageData<T>> implements 
MessageDataRef<T>
     {
 
-        public MessageDataSoftRef(final T metadata, ByteBuffer data)
+        public MessageDataSoftRef(final T metadata, QpidByteBuffer data)
         {
             super(new MessageData<T>(metadata, data));
         }
@@ -1520,7 +1515,7 @@ public abstract class AbstractJDBCMessag
         }
 
         @Override
-        public ByteBuffer getData()
+        public QpidByteBuffer getData()
         {
             MessageData<T> ref = get();
 
@@ -1528,7 +1523,7 @@ public abstract class AbstractJDBCMessag
         }
 
         @Override
-        public void setData(final ByteBuffer data)
+        public void setData(final QpidByteBuffer data)
         {
             MessageData<T> ref = get();
             if(ref != null)
@@ -1601,10 +1596,10 @@ public abstract class AbstractJDBCMessag
         }
 
         @Override
-        public void addContent(ByteBuffer src)
+        public void addContent(QpidByteBuffer src)
         {
             src = src.slice();
-            ByteBuffer data = _messageDataRef.getData();
+            QpidByteBuffer data = _messageDataRef.getData();
             if(data == null)
             {
                 _messageDataRef.setData(src);
@@ -1612,7 +1607,7 @@ public abstract class AbstractJDBCMessag
             else
             {
                 int size = data.remaining() + src.remaining();
-                ByteBuffer buf = data.isDirect() ? 
ByteBuffer.allocateDirect(size) : ByteBuffer.allocate(size);
+                QpidByteBuffer buf = data.isDirect() ? 
QpidByteBuffer.allocateDirect(size) : QpidByteBuffer.allocate(size);
                 buf.put(data.duplicate());
                 buf.put(src.duplicate());
                 buf.flip();
@@ -1629,17 +1624,16 @@ public abstract class AbstractJDBCMessag
         @Override
         public int getContent(int offsetInMessage, ByteBuffer dst)
         {
-            ByteBuffer data = getContentAsByteBuffer();
-            data = data.slice();
+            QpidByteBuffer data = getContentAsByteBuffer();
             int length = Math.min(dst.remaining(), data.remaining());
-            data.limit(length);
-            dst.put(data);
+            data = data.view(offsetInMessage, length);
+            data.get(dst);
             return length;
         }
 
-        private ByteBuffer getContentAsByteBuffer()
+        private QpidByteBuffer getContentAsByteBuffer()
         {
-            ByteBuffer data = _messageDataRef.getData();
+            QpidByteBuffer data = _messageDataRef.getData();
             if(data == null)
             {
                 if(stored())
@@ -1666,19 +1660,16 @@ public abstract class AbstractJDBCMessag
                 }
                 else
                 {
-                    data = ByteBuffer.wrap(new byte[0]);
+                    data = QpidByteBuffer.wrap(new byte[0]);
                 }
             } return data;
         }
 
         @Override
-        public Collection<ByteBuffer> getContent(int offsetInMessage, int size)
+        public Collection<QpidByteBuffer> getContent(int offsetInMessage, int 
size)
         {
-            ByteBuffer data = getContentAsByteBuffer();
-            data = data.duplicate();
-            data.position(offsetInMessage);
-            data = data.slice();
-            data.limit(size);
+            QpidByteBuffer data = getContentAsByteBuffer();
+            data = data.view(offsetInMessage, 
Math.min(size,data.remaining()-offsetInMessage));
             return Collections.singleton(data);
         }
 

Modified: 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/MessageHandle.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/MessageHandle.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/MessageHandle.java
 (original)
+++ 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/MessageHandle.java
 Fri Aug  7 00:28:17 2015
@@ -22,10 +22,12 @@ package org.apache.qpid.server.store;
 
 import java.nio.ByteBuffer;
 
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
+
 public interface MessageHandle<M extends StorableMessageMetaData>
 {
 
-    void addContent(ByteBuffer src);
+    void addContent(QpidByteBuffer src);
 
     StoredMessage<M> allContentAdded();
 

Modified: 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java
 (original)
+++ 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java
 Fri Aug  7 00:28:17 2015
@@ -25,10 +25,12 @@ import java.nio.ByteBuffer;
 import java.util.Collection;
 import java.util.Collections;
 
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
+
 public class StoredMemoryMessage<T extends StorableMessageMetaData> implements 
StoredMessage<T>, MessageHandle<T>
 {
     private final long _messageNumber;
-    private ByteBuffer _content;
+    private QpidByteBuffer _content;
     private final T _metaData;
 
     public StoredMemoryMessage(long messageNumber, T metaData)
@@ -42,7 +44,7 @@ public class StoredMemoryMessage<T exten
         return _messageNumber;
     }
 
-    public void addContent(ByteBuffer src)
+    public void addContent(QpidByteBuffer src)
     {
         if(_content == null)
         {
@@ -61,9 +63,9 @@ public class StoredMemoryMessage<T exten
                 int size = (contentSize < _content.position() + 
src.remaining())
                         ? _content.position() + src.remaining()
                         : contentSize;
-                ByteBuffer oldContent = _content;
+                QpidByteBuffer oldContent = _content;
                 oldContent.flip();
-                _content = ByteBuffer.allocateDirect(size);
+                _content = QpidByteBuffer.allocateDirect(size);
                 _content.put(oldContent);
                 _content.put(src.duplicate());
             }
@@ -87,7 +89,7 @@ public class StoredMemoryMessage<T exten
         {
             return 0;
         }
-        ByteBuffer src = _content.duplicate();
+        QpidByteBuffer src = _content.duplicate();
 
         int oldPosition = src.position();
 
@@ -96,20 +98,20 @@ public class StoredMemoryMessage<T exten
         int length = dst.remaining() < src.remaining() ? dst.remaining() : 
src.remaining();
         src.limit(oldPosition + length);
 
-        dst.put(src);
+        src.get(dst);
 
 
         return length;
     }
 
 
-    public Collection<ByteBuffer> getContent(int offsetInMessage, int size)
+    public Collection<QpidByteBuffer> getContent(int offsetInMessage, int size)
     {
         if(_content == null)
         {
             return null;
         }
-        ByteBuffer buf = _content.duplicate();
+        QpidByteBuffer buf = _content.duplicate();
 
         if(offsetInMessage != 0)
         {
@@ -117,7 +119,7 @@ public class StoredMemoryMessage<T exten
             buf = buf.slice();
         }
 
-        buf.limit(size);
+        buf.limit(Math.min(size,buf.remaining()));
         return Collections.singleton(buf);
     }
 

Modified: 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/StoredMessage.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/StoredMessage.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/StoredMessage.java
 (original)
+++ 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/StoredMessage.java
 Fri Aug  7 00:28:17 2015
@@ -23,6 +23,8 @@ package org.apache.qpid.server.store;
 import java.nio.ByteBuffer;
 import java.util.Collection;
 
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
+
 public interface StoredMessage<M extends StorableMessageMetaData>
 {
     M getMetaData();
@@ -31,7 +33,7 @@ public interface StoredMessage<M extends
 
     int getContent(int offsetInMessage, ByteBuffer dst);
 
-    Collection<ByteBuffer> getContent(int offsetInMessage, int size);
+    Collection<QpidByteBuffer> getContent(int offsetInMessage, int size);
 
     void remove();
 

Added: 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractNonBlockingConnectionDelegate.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractNonBlockingConnectionDelegate.java?rev=1694594&view=auto
==============================================================================
--- 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractNonBlockingConnectionDelegate.java
 (added)
+++ 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractNonBlockingConnectionDelegate.java
 Fri Aug  7 00:28:17 2015
@@ -0,0 +1,43 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.transport;
+
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
+
+abstract class AbstractNonBlockingConnectionDelegate implements 
NonBlockingConnectionDelegate
+{
+    private QpidByteBuffer _applicationInputBuffer;
+
+    private final NonBlockingConnection _connection;
+
+    protected AbstractNonBlockingConnectionDelegate(final 
NonBlockingConnection connection)
+    {
+        _connection = connection;
+    }
+
+
+    abstract public QpidByteBuffer getNetworkInputBuffer();
+
+    public QpidByteBuffer getApplicationInputBuffer()
+    {
+        return _applicationInputBuffer;
+    }
+}

Propchange: 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractNonBlockingConnectionDelegate.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java
 (original)
+++ 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java
 Fri Aug  7 00:28:17 2015
@@ -26,7 +26,6 @@ import java.net.SocketAddress;
 import java.nio.ByteBuffer;
 import java.security.cert.Certificate;
 import java.util.Set;
-import java.util.TreeSet;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -35,6 +34,7 @@ import javax.security.auth.Subject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.server.logging.messages.ConnectionMessages;
 import org.apache.qpid.server.model.Broker;
 import org.apache.qpid.server.model.Protocol;
@@ -133,7 +133,7 @@ public class MultiVersionProtocolEngine
     }
 
 
-    public void received(ByteBuffer msg)
+    public void received(QpidByteBuffer msg)
     {
         _delegate.received(msg);
     }
@@ -272,7 +272,7 @@ public class MultiVersionProtocolEngine
 
         }
 
-        public void received(ByteBuffer msg)
+        public void received(QpidByteBuffer msg)
         {
             _logger.error("Error processing incoming data, could not negotiate 
a common protocol");
             msg.position(msg.limit());
@@ -338,7 +338,7 @@ public class MultiVersionProtocolEngine
 
     private class SelfDelegateProtocolEngine implements ProtocolEngine
     {
-        private final ByteBuffer _header = 
ByteBuffer.allocate(MINIMUM_REQUIRED_HEADER_BYTES);
+        private final QpidByteBuffer _header = 
QpidByteBuffer.allocate(MINIMUM_REQUIRED_HEADER_BYTES);
         private long _lastReadTime = System.currentTimeMillis();
         private final AtomicBoolean _hasWork = new AtomicBoolean();
 
@@ -389,10 +389,10 @@ public class MultiVersionProtocolEngine
             _hasWork.set(false);
         }
 
-        public void received(ByteBuffer msg)
+        public void received(QpidByteBuffer msg)
         {
             _lastReadTime = System.currentTimeMillis();
-            ByteBuffer msgheader = msg.duplicate().slice();
+            QpidByteBuffer msgheader = msg.duplicate().slice();
 
             if(_header.remaining() > msgheader.limit())
             {
@@ -472,7 +472,7 @@ public class MultiVersionProtocolEngine
                     {
                         _logger.debug("Unsupported protocol version requested, 
replying with: " + supportedReplyVersion);
                     }
-                    final ByteBuffer supportedReplyBuf = 
ByteBuffer.allocateDirect(supportedReplyBytes.length);
+                    final QpidByteBuffer supportedReplyBuf = 
QpidByteBuffer.allocateDirect(supportedReplyBytes.length);
                     supportedReplyBuf.put(supportedReplyBytes);
                     supportedReplyBuf.flip();
                     _sender.send(supportedReplyBuf);

Modified: 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
 (original)
+++ 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
 Fri Aug  7 00:28:17 2015
@@ -30,6 +30,7 @@ import java.util.*;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.server.model.port.AmqpPort;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -60,8 +61,6 @@ public class NonBlockingConnection imple
     private volatile int _maxReadIdle;
     private volatile int _maxWriteIdle;
 
-    private ByteBuffer _netInputBuffer;
-
     private volatile boolean _fullyWritten = true;
 
     private boolean _partialRead = false;
@@ -83,7 +82,6 @@ public class NonBlockingConnection imple
 
         _receiveBufferSize = receiveBufferSize;
 
-        _netInputBuffer = ByteBuffer.allocateDirect(receiveBufferSize);
         _remoteSocketAddress = 
_socketChannel.socket().getRemoteSocketAddress().toString();
         _port = port;
 
@@ -112,6 +110,11 @@ public class NonBlockingConnection imple
         return _partialRead;
     }
 
+    int getReceiveBufferSize()
+    {
+        return _receiveBufferSize;
+    }
+
     Ticker getTicker()
     {
         return _protocolEngine.getAggregateTicker();
@@ -230,7 +233,7 @@ public class NonBlockingConnection imple
                 _fullyWritten = doWrite();
                 _protocolEngine.setTransportBlockedForWriting(!_fullyWritten);
 
-                if (dataRead || (_delegate.needsWork() && 
_netInputBuffer.position() != 0))
+                if (dataRead || (_delegate.needsWork() && 
_delegate.getNetInputBuffer().position() != 0))
                 {
                     _protocolEngine.notifyWork();
                 }
@@ -330,48 +333,17 @@ public class NonBlockingConnection imple
      */
     boolean doRead() throws IOException
     {
-        boolean readData = false;
         _partialRead = false;
-        if (!_closed.get())
+        if(!_closed.get() && _delegate.readyForRead())
         {
-            readData = _delegate.doRead();
-        }
-        return readData;
-    }
-
-    boolean readAndProcessData() throws IOException
-    {
-        boolean readData = readIntoBuffer(_netInputBuffer);
+            int readData = readFromNetwork();
 
-        ByteBuffer duplicate = _netInputBuffer.duplicate();
-        duplicate.flip();
-
-        readData |= processData(duplicate);
-
-        if (_netInputBuffer.hasRemaining())
-        {
-            // slice but keep unprocessed data
-            int amountOfUnprocessedData = duplicate.remaining();
-            _netInputBuffer.position(_netInputBuffer.position() - 
amountOfUnprocessedData);
-            _netInputBuffer = _netInputBuffer.slice();
-            _netInputBuffer.position(amountOfUnprocessedData);
+            return (readData > 0) || _delegate.processData();
         }
         else
         {
-            if(duplicate.remaining() < _receiveBufferSize)
-            {
-                // compact into new buffer
-                _netInputBuffer = 
ByteBuffer.allocateDirect(_receiveBufferSize);
-                _netInputBuffer.put(duplicate);
-            }
-            else
-            {
-                // grow the buffer
-                _netInputBuffer = 
ByteBuffer.allocateDirect(_receiveBufferSize+_netInputBuffer.capacity());
-                _netInputBuffer.put(duplicate);
-            }
+            return false;
         }
-        return readData;
     }
 
     void writeToTransport(ByteBuffer[] buffers) throws IOException
@@ -402,20 +374,12 @@ public class NonBlockingConnection imple
         }
     }
 
-    boolean processData(ByteBuffer data) throws IOException
+    protected int readFromNetwork() throws IOException
     {
-        return _delegate.processData(data);
-    }
+        QpidByteBuffer buffer = _delegate.getNetInputBuffer();
 
-    protected boolean readIntoBuffer(final ByteBuffer buffer) throws 
IOException
-    {
-        boolean readData = false;
-        int read = _socketChannel.read(buffer);
-        if (read > 0)
-        {
-            readData = true;
-        }
-        else if (read == -1)
+        int read = _socketChannel.read(buffer.getNativeBuffer());
+        if (read == -1)
         {
             _closed.set(true);
         }
@@ -426,11 +390,11 @@ public class NonBlockingConnection imple
         {
             LOGGER.debug("Read " + read + " byte(s)");
         }
-        return readData;
+        return read;
     }
 
     @Override
-    public void send(final ByteBuffer msg)
+    public void send(final QpidByteBuffer msg)
     {
 
         if (_closed.get())
@@ -439,10 +403,11 @@ public class NonBlockingConnection imple
         }
         else if (msg.remaining() > 0)
         {
-            _buffers.add(msg);
+            _buffers.add(msg.getNativeBuffer());
         }
     }
 
+
     public void writeBufferProcessed()
     {
         _buffers.poll();
@@ -469,13 +434,14 @@ public class NonBlockingConnection imple
         return _scheduler;
     }
 
-    public void processAmqpData(ByteBuffer data)
+    public void processAmqpData(QpidByteBuffer applicationData)
     {
-        _protocolEngine.received(data);
+        _protocolEngine.received(applicationData);
     }
 
     public void setTransportEncryption(TransportEncryption transportEncryption)
     {
+        NonBlockingConnectionDelegate oldDelegate = _delegate;
         switch (transportEncryption)
         {
             case TLS:
@@ -488,6 +454,13 @@ public class NonBlockingConnection imple
             default:
                 throw new IllegalArgumentException("unknown 
TransportEncryption " + transportEncryption);
         }
+        if(oldDelegate != null)
+        {
+            QpidByteBuffer src = oldDelegate.getNetInputBuffer().duplicate();
+            src.flip();
+            _delegate.getNetInputBuffer().put(src);
+        }
         LOGGER.debug("Identified transport encryption as " + 
transportEncryption);
     }
+
 }

Modified: 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionDelegate.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionDelegate.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionDelegate.java
 (original)
+++ 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionDelegate.java
 Fri Aug  7 00:28:17 2015
@@ -24,15 +24,23 @@ import java.nio.ByteBuffer;
 import java.security.Principal;
 import java.security.cert.Certificate;
 
-public interface NonBlockingConnectionDelegate
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
+
+interface NonBlockingConnectionDelegate
 {
-    boolean doRead() throws IOException;
     boolean doWrite(ByteBuffer[] bufferArray) throws IOException;
-    boolean processData(ByteBuffer data) throws IOException;
+
+    boolean readyForRead();
+
+    boolean processData() throws IOException;
 
     Principal getPeerPrincipal();
 
     Certificate getPeerCertificate();
 
     boolean needsWork();
+
+    QpidByteBuffer getNetInputBuffer();
+
+    void setNetInputBuffer(QpidByteBuffer buffer);
 }

Modified: 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionPlainDelegate.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionPlainDelegate.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionPlainDelegate.java
 (original)
+++ 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionPlainDelegate.java
 Fri Aug  7 00:28:17 2015
@@ -24,29 +24,63 @@ import java.nio.ByteBuffer;
 import java.security.Principal;
 import java.security.cert.Certificate;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
+
 public class NonBlockingConnectionPlainDelegate implements 
NonBlockingConnectionDelegate
 {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(NonBlockingConnectionPlainDelegate.class);
+
     private final NonBlockingConnection _parent;
+    private QpidByteBuffer _netInputBuffer;
 
     public NonBlockingConnectionPlainDelegate(NonBlockingConnection parent)
     {
         _parent = parent;
+        _netInputBuffer = 
QpidByteBuffer.allocateDirect(parent.getReceiveBufferSize());
     }
 
     @Override
-    public boolean doRead() throws IOException
+    public boolean readyForRead()
     {
-        return _parent.readAndProcessData();
+        return true;
     }
 
     @Override
-    public boolean processData(ByteBuffer buffer)
+    public boolean processData()
     {
-        _parent.processAmqpData(buffer);
+        _netInputBuffer.flip();
+        _parent.processAmqpData(_netInputBuffer);
+
+        restoreApplicationBufferForWrite();
 
         return false;
     }
 
+    protected void restoreApplicationBufferForWrite()
+    {
+        _netInputBuffer = _netInputBuffer.slice();
+        if (_netInputBuffer.limit() != _netInputBuffer.capacity())
+        {
+            _netInputBuffer.position(_netInputBuffer.limit());
+            _netInputBuffer.limit(_netInputBuffer.capacity());
+        }
+        else
+        {
+            QpidByteBuffer currentBuffer = _netInputBuffer;
+            int newBufSize = (currentBuffer.capacity() < 
_parent.getReceiveBufferSize())
+                    ? _parent.getReceiveBufferSize()
+                    : currentBuffer.capacity() + 
_parent.getReceiveBufferSize();
+
+            _netInputBuffer = QpidByteBuffer.allocateDirect(newBufSize);
+            _netInputBuffer.put(currentBuffer);
+        }
+
+    }
+
+
     @Override
     public boolean doWrite(ByteBuffer[] bufferArray) throws IOException
     {
@@ -87,4 +121,16 @@ public class NonBlockingConnectionPlainD
     {
         return false;
     }
+
+    @Override
+    public QpidByteBuffer getNetInputBuffer()
+    {
+        return _netInputBuffer;
+    }
+
+    @Override
+    public void setNetInputBuffer(final QpidByteBuffer netInputBuffer)
+    {
+        _netInputBuffer = netInputBuffer;
+    }
 }

Modified: 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionTLSDelegate.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionTLSDelegate.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionTLSDelegate.java
 (original)
+++ 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionTLSDelegate.java
 Fri Aug  7 00:28:17 2015
@@ -19,6 +19,7 @@
 
 package org.apache.qpid.server.transport;
 
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.server.model.port.AmqpPort;
 import org.apache.qpid.transport.network.security.ssl.SSLUtil;
 import org.slf4j.Logger;
@@ -42,33 +43,77 @@ public class NonBlockingConnectionTLSDel
 
     private final SSLEngine _sslEngine;
     private final NonBlockingConnection _parent;
+    private final int _initialApplicationBufferSize;
     private SSLEngineResult _status;
     private final List<ByteBuffer> _encryptedOutput = new ArrayList<>();
     private Principal _principal;
     private Certificate _peerCertificate;
     private boolean _principalChecked;
+    private QpidByteBuffer _netInputBuffer;
+    private QpidByteBuffer _applicationBuffer;
+
 
     public NonBlockingConnectionTLSDelegate(NonBlockingConnection parent, 
AmqpPort port)
     {
         _parent = parent;
         _sslEngine = createSSLEngine(port);
+        _netInputBuffer = 
QpidByteBuffer.allocateDirect(Math.max(parent.getReceiveBufferSize(),
+                                                                 
_sslEngine.getSession().getPacketBufferSize()));
+
+        _initialApplicationBufferSize =
+                Math.max(_sslEngine.getSession().getApplicationBufferSize() + 
50, _parent.getReceiveBufferSize());
+        _applicationBuffer = 
QpidByteBuffer.allocateDirect(_initialApplicationBufferSize);
+
     }
 
     @Override
-    public boolean doRead() throws IOException
+    public boolean readyForRead()
     {
-        boolean readData = false;
-        if (_sslEngine.getHandshakeStatus() != 
SSLEngineResult.HandshakeStatus.NEED_WRAP && (_status == null || 
_status.getStatus() != SSLEngineResult.Status.CLOSED))
-        {
-            readData = _parent.readAndProcessData();
-        }
-        return readData;
+        return _sslEngine.getHandshakeStatus() != 
SSLEngineResult.HandshakeStatus.NEED_WRAP && (_status == null || 
_status.getStatus() != SSLEngineResult.Status.CLOSED);
     }
 
     @Override
-    public boolean processData(ByteBuffer buffer) throws IOException
+    public boolean processData() throws IOException
     {
-        return unwrapAndProcessBuffer(buffer);
+        _netInputBuffer.flip();
+        boolean readData = false;
+        boolean tasksRun;
+        int oldNetBufferPos;
+        do
+        {
+            int oldAppBufPos = _applicationBuffer.position();
+            oldNetBufferPos = _netInputBuffer.position();
+
+            _status = _sslEngine.unwrap(_netInputBuffer.getNativeBuffer(), 
_applicationBuffer.getNativeBuffer());
+            if (_status.getStatus() == SSLEngineResult.Status.CLOSED)
+            {
+                // KW If SSLEngine changes state to CLOSED, what will ever set 
_closed to true?
+                LOGGER.debug("SSLEngine closed");
+            }
+
+            tasksRun = runSSLEngineTasks(_status);
+            _applicationBuffer.flip();
+            if(_applicationBuffer.position() > oldAppBufPos)
+            {
+                readData = true;
+            }
+
+            _parent.processAmqpData(_applicationBuffer);
+
+            restoreApplicationBufferForWrite();
+
+        }
+        while((_netInputBuffer.hasRemaining() && 
(_netInputBuffer.position()>oldNetBufferPos)) || tasksRun);
+
+        if(_netInputBuffer.hasRemaining())
+        {
+            _netInputBuffer.compact();
+        }
+        else
+        {
+            _netInputBuffer.clear();
+        }
+        return readData;
     }
 
     @Override
@@ -97,34 +142,25 @@ public class NonBlockingConnectionTLSDel
         return (bufferArray.length == byteBuffersWritten) && 
_encryptedOutput.isEmpty();
     }
 
-    private boolean unwrapAndProcessBuffer(final ByteBuffer wrappedDataBuffer) 
throws SSLException
+    protected void restoreApplicationBufferForWrite()
     {
-        boolean readData = false;
-        int unwrapped;
-        boolean tasksRun;
-        do
+        _applicationBuffer = _applicationBuffer.slice();
+        if (_applicationBuffer.limit() != _applicationBuffer.capacity())
         {
-            ByteBuffer appInputBuffer =
-                    
ByteBuffer.allocateDirect(_sslEngine.getSession().getApplicationBufferSize() + 
50);
-            _status = _sslEngine.unwrap(wrappedDataBuffer, appInputBuffer);
-            if (_status.getStatus() == SSLEngineResult.Status.CLOSED)
-            {
-                // KW If SSLEngine changes state to CLOSED, what will ever set 
_closed to true?
-                LOGGER.debug("SSLEngine closed");
-            }
-
-            tasksRun = runSSLEngineTasks(_status);
+            _applicationBuffer.position(_applicationBuffer.limit());
+            _applicationBuffer.limit(_applicationBuffer.capacity());
+        }
+        else
+        {
+            QpidByteBuffer currentBuffer = _applicationBuffer;
+            int newBufSize = (currentBuffer.capacity() < 
_initialApplicationBufferSize)
+                    ? _initialApplicationBufferSize
+                    : currentBuffer.capacity() + _initialApplicationBufferSize;
 
-            appInputBuffer.flip();
-            unwrapped = appInputBuffer.remaining();
-            if(unwrapped > 0)
-            {
-                readData = true;
-            }
-            _parent.processAmqpData(appInputBuffer);
+            _applicationBuffer = QpidByteBuffer.allocateDirect(newBufSize);
+            _applicationBuffer.put(currentBuffer);
         }
-        while(unwrapped > 0 || tasksRun);
-        return readData;
+
     }
 
     private int wrapBufferArray(final ByteBuffer[] bufferArray) throws 
SSLException
@@ -173,8 +209,10 @@ public class NonBlockingConnectionTLSDel
             {
                 task.run();
             }
+
             return true;
         }
+
         return false;
     }
 
@@ -240,4 +278,15 @@ public class NonBlockingConnectionTLSDel
         return sslEngine;
     }
 
+    @Override
+    public QpidByteBuffer getNetInputBuffer()
+    {
+        return _netInputBuffer;
+    }
+
+    @Override
+    public void setNetInputBuffer(final QpidByteBuffer netInputBuffer)
+    {
+        _netInputBuffer = netInputBuffer;
+    }
 }

Modified: 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionUndecidedDelegate.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionUndecidedDelegate.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionUndecidedDelegate.java
 (original)
+++ 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionUndecidedDelegate.java
 Fri Aug  7 00:28:17 2015
@@ -19,6 +19,7 @@
 
 package org.apache.qpid.server.transport;
 
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.transport.network.TransportEncryption;
 
 import java.io.IOException;
@@ -29,25 +30,32 @@ import java.security.cert.Certificate;
 public class NonBlockingConnectionUndecidedDelegate implements 
NonBlockingConnectionDelegate
 {
     private static final int NUMBER_OF_BYTES_FOR_TLS_CHECK = 6;
+    private static final QpidByteBuffer EMPTY_BYTE_BUFFER = 
QpidByteBuffer.allocate(0);
     public final NonBlockingConnection _parent;
 
+    private QpidByteBuffer _netInputBuffer;
+
     public NonBlockingConnectionUndecidedDelegate(NonBlockingConnection parent)
     {
         _parent = parent;
+        _netInputBuffer = 
QpidByteBuffer.allocateDirect(NUMBER_OF_BYTES_FOR_TLS_CHECK);
+
     }
 
     @Override
-    public boolean doRead() throws IOException
+    public boolean readyForRead()
     {
-        return _parent.readAndProcessData();
+        return true;
     }
 
-    public boolean processData(ByteBuffer buffer) throws IOException
+    public boolean processData() throws IOException
     {
+        QpidByteBuffer buffer = _netInputBuffer.duplicate();
+        buffer.flip();
         if (buffer.remaining() >= NUMBER_OF_BYTES_FOR_TLS_CHECK)
         {
             final byte[] headerBytes = new byte[NUMBER_OF_BYTES_FOR_TLS_CHECK];
-            ByteBuffer dup = buffer.duplicate();
+            QpidByteBuffer dup = buffer.duplicate();
             dup.get(headerBytes);
 
             if (looksLikeSSL(headerBytes))
@@ -58,7 +66,8 @@ public class NonBlockingConnectionUndeci
             {
                 _parent.setTransportEncryption(TransportEncryption.NONE);
             }
-            _parent.processData(buffer);
+
+            return true;
         }
         return false;
     }
@@ -112,4 +121,16 @@ public class NonBlockingConnectionUndeci
                         headerBytes[4] == 2 || // TLS 1.1
                         headerBytes[4] == 3);
     }
+
+    @Override
+    public QpidByteBuffer getNetInputBuffer()
+    {
+        return _netInputBuffer;
+    }
+
+    @Override
+    public void setNetInputBuffer(final QpidByteBuffer netInputBuffer)
+    {
+        _netInputBuffer = netInputBuffer;
+    }
 }

Modified: 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ProtocolEngine.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ProtocolEngine.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ProtocolEngine.java
 (original)
+++ 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ProtocolEngine.java
 Fri Aug  7 00:28:17 2015
@@ -20,8 +20,11 @@
  */
 package org.apache.qpid.server.transport;
 
+import java.nio.ByteBuffer;
+
 import javax.security.auth.Subject;
 
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.server.util.Action;
 import org.apache.qpid.transport.ByteBufferReceiver;
 import org.apache.qpid.transport.network.AggregateTicker;
@@ -31,7 +34,7 @@ import org.apache.qpid.transport.network
  * A ProtocolEngine is a Receiver for java.nio.ByteBuffers. It takes the data 
passed to it in the received
  * decodes it and then process the result.
  */
-public interface ProtocolEngine extends ByteBufferReceiver, TransportActivity
+public interface ProtocolEngine extends TransportActivity
 {
 
    // Called by the NetworkDriver when the socket has been closed for reading
@@ -68,4 +71,6 @@ public interface ProtocolEngine extends
 
    void encryptedTransport();
 
+   void received(QpidByteBuffer msg);
+
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to