Author: rgodfrey
Date: Fri Aug 7 00:28:17 2015
New Revision: 1694594
URL: http://svn.apache.org/r1694594
Log:
QPID-6662 : Wrap use of ByteBuffers in QpidByteBuffer class to allow for use of
pools of buffers rather than always allocating fresh. Refactor
NonBlockingConnection to allow connection object to only perform partial reads
Added:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractNonBlockingConnectionDelegate.java
(with props)
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerDecoder.java
- copied, changed from r1693306,
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/codec/BBDecoder.java
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerFrame.java
- copied, changed from r1693306,
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/Frame.java
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerInputHandler.java
- copied, changed from r1693325,
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/InputHandler.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/
qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/ByteBufferRef.java
(with props)
qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/NonPooledByteBufferRef.java
(with props)
qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/PooledByteBufferRef.java
(with props)
qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java
(with props)
Removed:
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ExtendedDataInput.java
Modified:
qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/FieldTableEncoding.java
qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/ByteBufferBinding.java
qpid/java/trunk/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
qpid/java/trunk/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderTest.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/EnqueueableMessage.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageContentSource.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/plugin/MessageMetaDataType.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/MessageHandle.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/StoredMessage.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionDelegate.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionPlainDelegate.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionTLSDelegate.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionUndecidedDelegate.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ProtocolEngine.java
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/transport/NetworkConnectionSchedulerTest.java
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerAssembler.java
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerDisassembler.java
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MockStoredMessage.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/AbstractDescribedTypeWriter.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/ArrayTypeConstructor.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/ArrayWriter.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/BinaryTypeConstructor.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/BooleanConstructor.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/BooleanWriter.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/ByteTypeConstructor.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/ByteWriter.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/CharTypeConstructor.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/CompoundTypeConstructor.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/CompoundWriter.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/DecimalConstructor.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/DelegatingValueWriter.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/DescribedTypeConstructor.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/DoubleTypeConstructor.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/FixedEightWriter.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/FixedFourWriter.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/FixedOneWriter.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/FixedSixteenWriter.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/FixedTwoWriter.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/FloatTypeConstructor.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/FrameWriter.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/IntTypeConstructor.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/IntegerWriter.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/ListWriter.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/LongTypeConstructor.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/LongWriter.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/NullTypeConstructor.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/NullWriter.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/ProtocolHandler.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/ProtocolHeaderHandler.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/ShortTypeConstructor.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/SimpleVariableWidthWriter.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/SmallIntConstructor.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/SmallLongConstructor.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/SmallUIntConstructor.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/SmallULongConstructor.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/StringTypeConstructor.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/SymbolArrayWriter.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/SymbolTypeConstructor.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/SymbolWriter.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/TimestampTypeConstructor.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/TypeConstructor.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/UByteTypeConstructor.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/UIntTypeConstructor.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/ULongTypeConstructor.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/UShortTypeConstructor.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/UUIDTypeConstructor.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/UnsignedByteWriter.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/UnsignedIntegerWriter.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/UnsignedLongWriter.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/ValueHandler.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/ValueWriter.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/VariableWidthTypeConstructor.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/VariableWidthWriter.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/ZeroListConstructor.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/ZeroUIntConstructor.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/ZeroULongConstructor.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/framing/AMQFrame.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/framing/AMQPProtocolHeaderHandler.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/framing/FrameHandler.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/framing/SASLFrameHandler.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/framing/SASLProtocolHeaderHandler.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/framing/TransportFrame.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/messaging/SectionDecoder.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/messaging/SectionDecoderImpl.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/messaging/SectionEncoderImpl.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/type/transport/Transfer.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_from_1_0.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorLink_1_0.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0Test.java
qpid/java/trunk/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java
qpid/java/trunk/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java
qpid/java/trunk/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java
qpid/java/trunk/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java
qpid/java/trunk/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQProtocolHandler.java
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
qpid/java/trunk/client/src/test/java/org/apache/qpid/client/transport/MockSender.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/codec/ClientDecoder.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/codec/MarkableDataInput.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/codec/ServerDecoder.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQFrame.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ByteArrayDataInput.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ByteBufferDataInput.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ByteBufferListDataInput.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ChannelMethodProcessor.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ContentBody.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/FieldTable.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/FrameCreatingMethodProcessor.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/ByteBufferSender.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/MessageTransfer.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/Method.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/ProtocolHeader.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/codec/AbstractDecoder.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/Assembler.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLSender.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLSender.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/util/Functions.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/util/ByteBufferUtils.java
qpid/java/trunk/common/src/test/java/org/apache/qpid/codec/AMQDecoderTest.java
qpid/java/trunk/common/src/test/java/org/apache/qpid/framing/FieldTableTest.java
qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/security/auth/manager/MultipleAuthenticationManagersTest.java
qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/unit/basic/FieldTableMessageTest.java
qpid/java/trunk/systests/src/test/java/org/apache/qpid/transport/MaxFrameSizeTest.java
qpid/java/trunk/systests/src/test/java/org/apache/qpid/transport/ProtocolNegotiationTest.java
Modified:
qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
---
qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
(original)
+++
qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
Fri Aug 7 00:28:17 2015
@@ -48,6 +48,7 @@ import com.sleepycat.je.SequenceConfig;
import com.sleepycat.je.Transaction;
import org.slf4j.Logger;
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
import org.apache.qpid.server.message.EnqueueableMessage;
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.store.Event;
@@ -370,8 +371,8 @@ public abstract class AbstractBDBMessage
OperationStatus status = getMessageContentDb().get(null,
contentKeyEntry, value, LockMode.READ_UNCOMMITTED);
if (status == OperationStatus.SUCCESS)
{
- ByteBuffer dataAsBytes =
contentTupleBinding.entryToObject(value);
- int size = dataAsBytes.remaining();
+ QpidByteBuffer buffer =
contentTupleBinding.entryToObject(value);
+ int size = buffer.remaining();
if (offset > size)
{
throw new RuntimeException("Offset " + offset + " is
greater than message size " + size
@@ -384,11 +385,8 @@ public abstract class AbstractBDBMessage
{
written = dst.remaining();
}
- dataAsBytes.position(dataAsBytes.position()+offset);
-
- dataAsBytes = dataAsBytes.slice();
- dataAsBytes.limit(written);
- dst.put(dataAsBytes);
+ buffer = buffer.view(offset, written);
+ buffer.get(dst);
}
return written;
}
@@ -401,7 +399,7 @@ public abstract class AbstractBDBMessage
}
}
- ByteBuffer getAllContent(long messageId) throws StoreException
+ QpidByteBuffer getAllContent(long messageId) throws StoreException
{
DatabaseEntry contentKeyEntry = new DatabaseEntry();
LongBinding.longToEntry(messageId, contentKeyEntry);
@@ -513,7 +511,7 @@ public abstract class AbstractBDBMessage
* @throws org.apache.qpid.server.store.StoreException If the operation
fails for any reason, or if the specified message does not exist.
*/
private void addContent(final Transaction tx, long messageId, int offset,
- Collection<ByteBuffer> contentBody) throws
StoreException
+ Collection<QpidByteBuffer> contentBody) throws
StoreException
{
DatabaseEntry key = new DatabaseEntry();
LongBinding.longToEntry(messageId, key);
@@ -521,15 +519,15 @@ public abstract class AbstractBDBMessage
int size = 0;
- for(ByteBuffer buf : contentBody)
+ for(QpidByteBuffer buf : contentBody)
{
size += buf.remaining();
}
byte[] data = new byte[size];
ByteBuffer dst = ByteBuffer.wrap(data);
- for(ByteBuffer buf : contentBody)
+ for(QpidByteBuffer buf : contentBody)
{
- dst.put(buf.duplicate());
+ buf.duplicate().get(dst);
}
value.setData(data);
try
@@ -903,15 +901,15 @@ public abstract class AbstractBDBMessage
static interface MessageDataRef<T extends StorableMessageMetaData>
{
T getMetaData();
- Collection<ByteBuffer> getData();
- void setData(Collection<ByteBuffer> data);
+ Collection<QpidByteBuffer> getData();
+ void setData(Collection<QpidByteBuffer> data);
boolean isHardRef();
}
private static final class MessageDataHardRef<T extends
StorableMessageMetaData> implements MessageDataRef<T>
{
private final T _metaData;
- private volatile Collection<ByteBuffer> _data;
+ private volatile Collection<QpidByteBuffer> _data;
private MessageDataHardRef(final T metaData)
{
@@ -925,13 +923,13 @@ public abstract class AbstractBDBMessage
}
@Override
- public Collection<ByteBuffer> getData()
+ public Collection<QpidByteBuffer> getData()
{
return _data;
}
@Override
- public void setData(final Collection<ByteBuffer> data)
+ public void setData(final Collection<QpidByteBuffer> data)
{
_data = data;
}
@@ -946,9 +944,9 @@ public abstract class AbstractBDBMessage
private static final class MessageData<T extends StorableMessageMetaData>
{
private T _metaData;
- private SoftReference<Collection<ByteBuffer>> _data;
+ private SoftReference<Collection<QpidByteBuffer>> _data;
- private MessageData(final T metaData, final Collection<ByteBuffer>
data)
+ private MessageData(final T metaData, final Collection<QpidByteBuffer>
data)
{
_metaData = metaData;
@@ -963,12 +961,12 @@ public abstract class AbstractBDBMessage
return _metaData;
}
- public Collection<ByteBuffer> getData()
+ public Collection<QpidByteBuffer> getData()
{
return _data == null ? null : _data.get();
}
- public void setData(final Collection<ByteBuffer> data)
+ public void setData(final Collection<QpidByteBuffer> data)
{
_data = new SoftReference<>(data);
}
@@ -978,7 +976,7 @@ public abstract class AbstractBDBMessage
private static final class MessageDataSoftRef<T extends
StorableMessageMetaData> extends SoftReference<MessageData<T>> implements
MessageDataRef<T>
{
- public MessageDataSoftRef(final T metadata, Collection<ByteBuffer>
data)
+ public MessageDataSoftRef(final T metadata, Collection<QpidByteBuffer>
data)
{
super(new MessageData<T>(metadata, data));
}
@@ -991,7 +989,7 @@ public abstract class AbstractBDBMessage
}
@Override
- public Collection<ByteBuffer> getData()
+ public Collection<QpidByteBuffer> getData()
{
MessageData<T> ref = get();
@@ -999,7 +997,7 @@ public abstract class AbstractBDBMessage
}
@Override
- public void setData(final Collection<ByteBuffer> data)
+ public void setData(final Collection<QpidByteBuffer> data)
{
MessageData<T> ref = get();
if(ref != null)
@@ -1062,17 +1060,17 @@ public abstract class AbstractBDBMessage
}
@Override
- public void addContent(ByteBuffer src)
+ public void addContent(QpidByteBuffer src)
{
src = src.slice();
- Collection<ByteBuffer> data = _messageDataRef.getData();
+ Collection<QpidByteBuffer> data = _messageDataRef.getData();
if(data == null)
{
_messageDataRef.setData(Collections.singleton(src));
}
else
{
- List<ByteBuffer> newCollection = new
ArrayList<>(data.size()+1);
+ List<QpidByteBuffer> newCollection = new
ArrayList<>(data.size()+1);
newCollection.addAll(data);
newCollection.add(src);
_messageDataRef.setData(Collections.unmodifiableCollection(newCollection));
@@ -1089,17 +1087,19 @@ public abstract class AbstractBDBMessage
@Override
public int getContent(int offsetInMessage, ByteBuffer dst)
{
- ByteBuffer data =
ByteBufferUtils.combine(getContentAsByteBuffer());
- data = data.slice();
- int length = Math.min(dst.remaining(), data.remaining());
- data.limit(length);
- dst.put(data);
+ Collection<QpidByteBuffer> allContent = getContentAsByteBuffer();
+ int length = 0;
+ for(QpidByteBuffer contentChunk : allContent)
+ {
+ length += contentChunk.remaining();
+ contentChunk.duplicate().get(dst);
+ }
return length;
}
- private Collection<ByteBuffer> getContentAsByteBuffer()
+ private Collection<QpidByteBuffer> getContentAsByteBuffer()
{
- Collection<ByteBuffer> data = _messageDataRef.getData();
+ Collection<QpidByteBuffer> data = _messageDataRef.getData();
if(data == null)
{
if(stored())
@@ -1125,14 +1125,14 @@ public abstract class AbstractBDBMessage
}
@Override
- public Collection<ByteBuffer> getContent(int offsetInMessage, int size)
+ public Collection<QpidByteBuffer> getContent(int offsetInMessage, int
size)
{
int pos = 0;
int added = 0;
- Collection<ByteBuffer> bufs = getContentAsByteBuffer();
- List<ByteBuffer> content = new ArrayList<>(bufs.size());
- for(ByteBuffer buf : bufs)
+ Collection<QpidByteBuffer> bufs = getContentAsByteBuffer();
+ List<QpidByteBuffer> content = new ArrayList<>(bufs.size());
+ for(QpidByteBuffer buf : bufs)
{
if(pos < offsetInMessage)
{
@@ -1160,7 +1160,7 @@ public abstract class AbstractBDBMessage
{
buf.limit(size-added);
}
- content.add(buf.slice());
+ content.add(buf);
added += buf.remaining();
}
if(added >= size)
@@ -1180,7 +1180,7 @@ public abstract class AbstractBDBMessage
AbstractBDBMessageStore.this.storeMetaData(txn, _messageId,
_messageDataRef.getMetaData());
AbstractBDBMessageStore.this.addContent(txn, _messageId, 0,
_messageDataRef.getData() == null
- ?
Collections.<ByteBuffer>emptySet()
+ ?
Collections.<QpidByteBuffer>emptySet()
:
_messageDataRef.getData());
Modified:
qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/FieldTableEncoding.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/FieldTableEncoding.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
---
qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/FieldTableEncoding.java
(original)
+++
qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/FieldTableEncoding.java
Fri Aug 7 00:28:17 2015
@@ -24,6 +24,7 @@ import com.sleepycat.bind.tuple.TupleInp
import com.sleepycat.bind.tuple.TupleOutput;
import com.sleepycat.je.DatabaseException;
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.store.berkeleydb.tuple.ByteBufferBinding;
@@ -48,7 +49,7 @@ public class FieldTableEncoding
ByteBuffer buf =
ByteBufferBinding.getInstance().readByteBuffer(tupleInput, (int) length);
- return new FieldTable(buf);
+ return new FieldTable(QpidByteBuffer.wrap(buf));
}
Modified:
qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/ByteBufferBinding.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/ByteBufferBinding.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
---
qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/ByteBufferBinding.java
(original)
+++
qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/ByteBufferBinding.java
Fri Aug 7 00:28:17 2015
@@ -26,7 +26,9 @@ import com.sleepycat.bind.tuple.TupleBin
import com.sleepycat.bind.tuple.TupleInput;
import com.sleepycat.bind.tuple.TupleOutput;
-public class ByteBufferBinding extends TupleBinding<ByteBuffer>
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
+
+public class ByteBufferBinding extends TupleBinding<QpidByteBuffer>
{
private static final int COPY_BUFFER_SIZE = 8192;
@@ -51,10 +53,10 @@ public class ByteBufferBinding extends T
private ByteBufferBinding() { }
@Override
- public ByteBuffer entryToObject(final TupleInput input)
+ public QpidByteBuffer entryToObject(final TupleInput input)
{
int available = input.available();
- ByteBuffer buf = ByteBuffer.allocateDirect(available);
+ QpidByteBuffer buf = QpidByteBuffer.allocateDirect(available);
byte[] copyBuf = COPY_BUFFER.get();
while(available > 0)
{
@@ -67,7 +69,7 @@ public class ByteBufferBinding extends T
}
@Override
- public void objectToEntry(ByteBuffer data, final TupleOutput output)
+ public void objectToEntry(QpidByteBuffer data, final TupleOutput output)
{
data = data.duplicate();
byte[] copyBuf = COPY_BUFFER.get();
Modified:
qpid/java/trunk/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
---
qpid/java/trunk/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
(original)
+++
qpid/java/trunk/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
Fri Aug 7 00:28:17 2015
@@ -27,6 +27,7 @@ import java.io.File;
import java.nio.ByteBuffer;
import java.util.Arrays;
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.ContentHeaderBody;
@@ -90,10 +91,10 @@ public class BDBMessageStoreTest extends
// Use a single chunk for the 0-10 message as per broker behaviour.
String bodyText =
"jfhdjsflsdhfjdshfjdslhfjdslhfsjlhfsjkhfdsjkhfdsjkfhdslkjf";
- ByteBuffer firstContentBytes_0_8 =
ByteBuffer.wrap(bodyText.substring(0, 10).getBytes());
- ByteBuffer secondContentBytes_0_8 =
ByteBuffer.wrap(bodyText.substring(10).getBytes());
+ QpidByteBuffer firstContentBytes_0_8 =
QpidByteBuffer.wrap(bodyText.substring(0, 10).getBytes());
+ QpidByteBuffer secondContentBytes_0_8 =
QpidByteBuffer.wrap(bodyText.substring(10).getBytes());
- ByteBuffer completeContentBody_0_10 =
ByteBuffer.wrap(bodyText.getBytes());
+ QpidByteBuffer completeContentBody_0_10 =
QpidByteBuffer.wrap(bodyText.getBytes());
int bodySize = completeContentBody_0_10.limit();
/*
@@ -123,7 +124,7 @@ public class BDBMessageStoreTest extends
Header header_0_10 = new Header(delProps_0_10, msgProps_0_10);
MessageTransfer xfr_0_10 = new MessageTransfer("destination",
MessageAcceptMode.EXPLICIT,
- MessageAcquireMode.PRE_ACQUIRED, header_0_10,
completeContentBody_0_10);
+ MessageAcquireMode.PRE_ACQUIRED, header_0_10,
completeContentBody_0_10.getNativeBuffer());
MessageMetaData_0_10 messageMetaData_0_10 = new
MessageMetaData_0_10(xfr_0_10);
MessageHandle<MessageMetaData_0_10> messageHandle_0_10 =
bdbStore.addMessage(messageMetaData_0_10);
@@ -344,7 +345,7 @@ public class BDBMessageStoreTest extends
private StoredMessage<MessageMetaData>
createAndStoreSingleChunkMessage_0_8(MessageStore store)
{
- ByteBuffer chunk1 = ByteBuffer.wrap(CONTENT_BYTES);
+ QpidByteBuffer chunk1 = QpidByteBuffer.wrap(CONTENT_BYTES);
int bodySize = CONTENT_BYTES.length;
Modified:
qpid/java/trunk/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderTest.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderTest.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
---
qpid/java/trunk/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderTest.java
(original)
+++
qpid/java/trunk/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderTest.java
Fri Aug 7 00:28:17 2015
@@ -25,6 +25,7 @@ import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
import org.apache.qpid.server.store.berkeleydb.BDBConfigurationStore;
import org.apache.qpid.server.store.berkeleydb.tuple.ByteBufferBinding;
@@ -138,7 +139,7 @@ public class UpgraderTest extends Abstra
{
long id = LongBinding.entryToLong(key);
assertTrue("Unexpected id", id > 0);
- ByteBuffer content = contentBinding.entryToObject(value);
+ QpidByteBuffer content = contentBinding.entryToObject(value);
assertNotNull("Unexpected content", content);
assertTrue("Expected content", content.hasRemaining());
}
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java
(original)
+++
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java
Fri Aug 7 00:28:17 2015
@@ -29,6 +29,7 @@ import java.util.UUID;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
import org.apache.qpid.server.store.StorableMessageMetaData;
import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.store.TransactionLogResource;
@@ -166,7 +167,7 @@ public abstract class AbstractServerMess
}
@Override
- final public Collection<ByteBuffer> getContent(int offset, int size)
+ final public Collection<QpidByteBuffer> getContent(int offset, int size)
{
return getStoredMessage().getContent(offset, size);
}
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/EnqueueableMessage.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/EnqueueableMessage.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/EnqueueableMessage.java
(original)
+++
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/EnqueueableMessage.java
Fri Aug 7 00:28:17 2015
@@ -20,11 +20,12 @@
*/
package org.apache.qpid.server.message;
+import org.apache.qpid.server.store.StorableMessageMetaData;
import org.apache.qpid.server.store.StoredMessage;
-public interface EnqueueableMessage
+public interface EnqueueableMessage<T extends StorableMessageMetaData>
{
long getMessageNumber();
boolean isPersistent();
- StoredMessage getStoredMessage();
+ StoredMessage<T> getStoredMessage();
}
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageContentSource.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageContentSource.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageContentSource.java
(original)
+++
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageContentSource.java
Fri Aug 7 00:28:17 2015
@@ -24,10 +24,12 @@ package org.apache.qpid.server.message;
import java.nio.ByteBuffer;
import java.util.Collection;
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
+
public interface MessageContentSource
{
int getContent(ByteBuffer buf, int offset);
- Collection<ByteBuffer> getContent(int offset, int size);
+ Collection<QpidByteBuffer> getContent(int offset, int size);
long getSize();
}
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java
(original)
+++
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java
Fri Aug 7 00:28:17 2015
@@ -33,7 +33,7 @@ public interface ServerMessage<T extends
AMQMessageHeader getMessageHeader();
- public StoredMessage<T> getStoredMessage();
+ StoredMessage<T> getStoredMessage();
boolean isPersistent();
@@ -49,13 +49,7 @@ public interface ServerMessage<T extends
boolean isReferenced();
- long getMessageNumber();
-
long getArrivalTime();
- public int getContent(ByteBuffer buf, int offset);
-
- Collection<ByteBuffer> getContent(int offset, int size);
-
Object getConnectionReference();
}
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java
(original)
+++
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java
Fri Aug 7 00:28:17 2015
@@ -33,6 +33,7 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
import org.apache.qpid.server.message.AMQMessageHeader;
import org.apache.qpid.server.message.AbstractServerMessageImpl;
import org.apache.qpid.server.store.MessageHandle;
@@ -64,7 +65,7 @@ public class InternalMessage extends Abs
{
super(msg, null);
_contentSize = msg.getMetaData().getContentSize();
- Collection<ByteBuffer> bufs = msg.getContent(0, _contentSize);
+ Collection<QpidByteBuffer> bufs = msg.getContent(0, _contentSize);
try(ObjectInputStream is = new ObjectInputStream(new
ByteBufferInputStream(ByteBufferUtils.combine(bufs))))
{
@@ -140,7 +141,7 @@ public class InternalMessage extends Abs
final InternalMessageMetaData metaData =
InternalMessageMetaData.create(persistent, internalHeader, bytes.length);
MessageHandle<InternalMessageMetaData> handle =
store.addMessage(metaData);
- handle.addContent(ByteBuffer.wrap(bytes));
+ handle.addContent(QpidByteBuffer.wrap(bytes));
StoredMessage<InternalMessageMetaData> storedMessage =
handle.allContentAdded();
return new InternalMessage(storedMessage, internalHeader,
bodyObject);
}
@@ -238,9 +239,9 @@ public class InternalMessage extends Abs
}
@Override
- public Collection<ByteBuffer> getContent(final int
offsetInMessage, final int size)
+ public Collection<QpidByteBuffer> getContent(final int
offsetInMessage, final int size)
{
- return Collections.singleton(ByteBuffer.wrap(bytes,
offsetInMessage, size));
+ return
Collections.singleton(QpidByteBuffer.wrap(bytes, offsetInMessage, size));
}
@Override
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/plugin/MessageMetaDataType.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/plugin/MessageMetaDataType.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/plugin/MessageMetaDataType.java
(original)
+++
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/plugin/MessageMetaDataType.java
Fri Aug 7 00:28:17 2015
@@ -28,15 +28,15 @@ import org.apache.qpid.server.store.Stor
public interface MessageMetaDataType<M extends StorableMessageMetaData>
extends Pluggable
{
- public static interface Factory<M extends StorableMessageMetaData>
+ interface Factory<M extends StorableMessageMetaData>
{
M createMetaData(ByteBuffer buf);
}
- public int ordinal();
+ int ordinal();
- public M createMetaData(ByteBuffer buf);
+ M createMetaData(ByteBuffer buf);
- public ServerMessage<M> createMessage(StoredMessage<M> msg);
+ ServerMessage<M> createMessage(StoredMessage<M> msg);
}
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
(original)
+++
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
Fri Aug 7 00:28:17 2015
@@ -49,6 +49,7 @@ import java.util.concurrent.atomic.Atomi
import org.slf4j.Logger;
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
import org.apache.qpid.server.message.EnqueueableMessage;
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.plugin.MessageMetaDataType;
@@ -76,7 +77,7 @@ public abstract class AbstractJDBCMessag
XID_TABLE_NAME, XID_ACTIONS_TABLE_NAME));
private static final int DB_VERSION = 8;
- private static final ByteBuffer EMPTY_BYTE_BUFFER =
ByteBuffer.allocateDirect(0);
+ private static final QpidByteBuffer EMPTY_BYTE_BUFFER =
QpidByteBuffer.allocateDirect(0);
private final AtomicLong _messageId = new AtomicLong(0);
@@ -1098,7 +1099,7 @@ public abstract class AbstractJDBCMessag
protected abstract byte[] getBlobAsBytes(ResultSet rs, int col) throws
SQLException;
- private void addContent(Connection conn, long messageId, ByteBuffer src)
+ private void addContent(Connection conn, long messageId, QpidByteBuffer
src)
{
getLogger().debug("Adding content for message {}", messageId);
@@ -1106,16 +1107,10 @@ public abstract class AbstractJDBCMessag
try
{
- src = src.slice();
-
- byte[] chunkData = new byte[src.limit()];
- src.duplicate().get(chunkData);
stmt = conn.prepareStatement(INSERT_INTO_MESSAGE_CONTENT);
stmt.setLong(1,messageId);
-
- ByteArrayInputStream bis = new ByteArrayInputStream(chunkData);
- stmt.setBinaryStream(2, bis, chunkData.length);
+ stmt.setBinaryStream(2, src.duplicate().asInputStream(),
src.remaining());
stmt.executeUpdate();
}
catch (SQLException e)
@@ -1184,7 +1179,7 @@ public abstract class AbstractJDBCMessag
}
- private ByteBuffer getAllContent(long messageId)
+ private QpidByteBuffer getAllContent(long messageId)
{
Connection conn = null;
PreparedStatement stmt = null;
@@ -1203,7 +1198,7 @@ public abstract class AbstractJDBCMessag
{
byte[] dataAsBytes = getBlobAsBytes(rs, 1);
- ByteBuffer buf = ByteBuffer.allocateDirect(dataAsBytes.length);
+ QpidByteBuffer buf =
QpidByteBuffer.allocateDirect(dataAsBytes.length);
buf.put(dataAsBytes);
buf.flip();
return buf;
@@ -1432,15 +1427,15 @@ public abstract class AbstractJDBCMessag
static interface MessageDataRef<T extends StorableMessageMetaData>
{
T getMetaData();
- ByteBuffer getData();
- void setData(ByteBuffer data);
+ QpidByteBuffer getData();
+ void setData(QpidByteBuffer data);
boolean isHardRef();
}
private static final class MessageDataHardRef<T extends
StorableMessageMetaData> implements MessageDataRef<T>
{
private final T _metaData;
- private ByteBuffer _data;
+ private QpidByteBuffer _data;
private MessageDataHardRef(final T metaData)
{
@@ -1454,13 +1449,13 @@ public abstract class AbstractJDBCMessag
}
@Override
- public ByteBuffer getData()
+ public QpidByteBuffer getData()
{
return _data;
}
@Override
- public void setData(final ByteBuffer data)
+ public void setData(final QpidByteBuffer data)
{
_data = data;
}
@@ -1475,9 +1470,9 @@ public abstract class AbstractJDBCMessag
private static final class MessageData<T extends StorableMessageMetaData>
{
private T _metaData;
- private SoftReference<ByteBuffer> _data;
+ private SoftReference<QpidByteBuffer> _data;
- private MessageData(final T metaData, final ByteBuffer data)
+ private MessageData(final T metaData, final QpidByteBuffer data)
{
_metaData = metaData;
@@ -1492,12 +1487,12 @@ public abstract class AbstractJDBCMessag
return _metaData;
}
- public ByteBuffer getData()
+ public QpidByteBuffer getData()
{
return _data == null ? null : _data.get();
}
- public void setData(final ByteBuffer data)
+ public void setData(final QpidByteBuffer data)
{
_data = new SoftReference<>(data);
}
@@ -1507,7 +1502,7 @@ public abstract class AbstractJDBCMessag
private static final class MessageDataSoftRef<T extends
StorableMessageMetaData> extends SoftReference<MessageData<T>> implements
MessageDataRef<T>
{
- public MessageDataSoftRef(final T metadata, ByteBuffer data)
+ public MessageDataSoftRef(final T metadata, QpidByteBuffer data)
{
super(new MessageData<T>(metadata, data));
}
@@ -1520,7 +1515,7 @@ public abstract class AbstractJDBCMessag
}
@Override
- public ByteBuffer getData()
+ public QpidByteBuffer getData()
{
MessageData<T> ref = get();
@@ -1528,7 +1523,7 @@ public abstract class AbstractJDBCMessag
}
@Override
- public void setData(final ByteBuffer data)
+ public void setData(final QpidByteBuffer data)
{
MessageData<T> ref = get();
if(ref != null)
@@ -1601,10 +1596,10 @@ public abstract class AbstractJDBCMessag
}
@Override
- public void addContent(ByteBuffer src)
+ public void addContent(QpidByteBuffer src)
{
src = src.slice();
- ByteBuffer data = _messageDataRef.getData();
+ QpidByteBuffer data = _messageDataRef.getData();
if(data == null)
{
_messageDataRef.setData(src);
@@ -1612,7 +1607,7 @@ public abstract class AbstractJDBCMessag
else
{
int size = data.remaining() + src.remaining();
- ByteBuffer buf = data.isDirect() ?
ByteBuffer.allocateDirect(size) : ByteBuffer.allocate(size);
+ QpidByteBuffer buf = data.isDirect() ?
QpidByteBuffer.allocateDirect(size) : QpidByteBuffer.allocate(size);
buf.put(data.duplicate());
buf.put(src.duplicate());
buf.flip();
@@ -1629,17 +1624,16 @@ public abstract class AbstractJDBCMessag
@Override
public int getContent(int offsetInMessage, ByteBuffer dst)
{
- ByteBuffer data = getContentAsByteBuffer();
- data = data.slice();
+ QpidByteBuffer data = getContentAsByteBuffer();
int length = Math.min(dst.remaining(), data.remaining());
- data.limit(length);
- dst.put(data);
+ data = data.view(offsetInMessage, length);
+ data.get(dst);
return length;
}
- private ByteBuffer getContentAsByteBuffer()
+ private QpidByteBuffer getContentAsByteBuffer()
{
- ByteBuffer data = _messageDataRef.getData();
+ QpidByteBuffer data = _messageDataRef.getData();
if(data == null)
{
if(stored())
@@ -1666,19 +1660,16 @@ public abstract class AbstractJDBCMessag
}
else
{
- data = ByteBuffer.wrap(new byte[0]);
+ data = QpidByteBuffer.wrap(new byte[0]);
}
} return data;
}
@Override
- public Collection<ByteBuffer> getContent(int offsetInMessage, int size)
+ public Collection<QpidByteBuffer> getContent(int offsetInMessage, int
size)
{
- ByteBuffer data = getContentAsByteBuffer();
- data = data.duplicate();
- data.position(offsetInMessage);
- data = data.slice();
- data.limit(size);
+ QpidByteBuffer data = getContentAsByteBuffer();
+ data = data.view(offsetInMessage,
Math.min(size,data.remaining()-offsetInMessage));
return Collections.singleton(data);
}
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/MessageHandle.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/MessageHandle.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/MessageHandle.java
(original)
+++
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/MessageHandle.java
Fri Aug 7 00:28:17 2015
@@ -22,10 +22,12 @@ package org.apache.qpid.server.store;
import java.nio.ByteBuffer;
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
+
public interface MessageHandle<M extends StorableMessageMetaData>
{
- void addContent(ByteBuffer src);
+ void addContent(QpidByteBuffer src);
StoredMessage<M> allContentAdded();
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java
(original)
+++
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java
Fri Aug 7 00:28:17 2015
@@ -25,10 +25,12 @@ import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.Collections;
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
+
public class StoredMemoryMessage<T extends StorableMessageMetaData> implements
StoredMessage<T>, MessageHandle<T>
{
private final long _messageNumber;
- private ByteBuffer _content;
+ private QpidByteBuffer _content;
private final T _metaData;
public StoredMemoryMessage(long messageNumber, T metaData)
@@ -42,7 +44,7 @@ public class StoredMemoryMessage<T exten
return _messageNumber;
}
- public void addContent(ByteBuffer src)
+ public void addContent(QpidByteBuffer src)
{
if(_content == null)
{
@@ -61,9 +63,9 @@ public class StoredMemoryMessage<T exten
int size = (contentSize < _content.position() +
src.remaining())
? _content.position() + src.remaining()
: contentSize;
- ByteBuffer oldContent = _content;
+ QpidByteBuffer oldContent = _content;
oldContent.flip();
- _content = ByteBuffer.allocateDirect(size);
+ _content = QpidByteBuffer.allocateDirect(size);
_content.put(oldContent);
_content.put(src.duplicate());
}
@@ -87,7 +89,7 @@ public class StoredMemoryMessage<T exten
{
return 0;
}
- ByteBuffer src = _content.duplicate();
+ QpidByteBuffer src = _content.duplicate();
int oldPosition = src.position();
@@ -96,20 +98,20 @@ public class StoredMemoryMessage<T exten
int length = dst.remaining() < src.remaining() ? dst.remaining() :
src.remaining();
src.limit(oldPosition + length);
- dst.put(src);
+ src.get(dst);
return length;
}
- public Collection<ByteBuffer> getContent(int offsetInMessage, int size)
+ public Collection<QpidByteBuffer> getContent(int offsetInMessage, int size)
{
if(_content == null)
{
return null;
}
- ByteBuffer buf = _content.duplicate();
+ QpidByteBuffer buf = _content.duplicate();
if(offsetInMessage != 0)
{
@@ -117,7 +119,7 @@ public class StoredMemoryMessage<T exten
buf = buf.slice();
}
- buf.limit(size);
+ buf.limit(Math.min(size,buf.remaining()));
return Collections.singleton(buf);
}
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/StoredMessage.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/StoredMessage.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/StoredMessage.java
(original)
+++
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/StoredMessage.java
Fri Aug 7 00:28:17 2015
@@ -23,6 +23,8 @@ package org.apache.qpid.server.store;
import java.nio.ByteBuffer;
import java.util.Collection;
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
+
public interface StoredMessage<M extends StorableMessageMetaData>
{
M getMetaData();
@@ -31,7 +33,7 @@ public interface StoredMessage<M extends
int getContent(int offsetInMessage, ByteBuffer dst);
- Collection<ByteBuffer> getContent(int offsetInMessage, int size);
+ Collection<QpidByteBuffer> getContent(int offsetInMessage, int size);
void remove();
Added:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractNonBlockingConnectionDelegate.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractNonBlockingConnectionDelegate.java?rev=1694594&view=auto
==============================================================================
---
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractNonBlockingConnectionDelegate.java
(added)
+++
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractNonBlockingConnectionDelegate.java
Fri Aug 7 00:28:17 2015
@@ -0,0 +1,43 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.transport;
+
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
+
+abstract class AbstractNonBlockingConnectionDelegate implements
NonBlockingConnectionDelegate
+{
+ private QpidByteBuffer _applicationInputBuffer;
+
+ private final NonBlockingConnection _connection;
+
+ protected AbstractNonBlockingConnectionDelegate(final
NonBlockingConnection connection)
+ {
+ _connection = connection;
+ }
+
+
+ abstract public QpidByteBuffer getNetworkInputBuffer();
+
+ public QpidByteBuffer getApplicationInputBuffer()
+ {
+ return _applicationInputBuffer;
+ }
+}
Propchange:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractNonBlockingConnectionDelegate.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java
(original)
+++
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java
Fri Aug 7 00:28:17 2015
@@ -26,7 +26,6 @@ import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.security.cert.Certificate;
import java.util.Set;
-import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
@@ -35,6 +34,7 @@ import javax.security.auth.Subject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
import org.apache.qpid.server.logging.messages.ConnectionMessages;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.Protocol;
@@ -133,7 +133,7 @@ public class MultiVersionProtocolEngine
}
- public void received(ByteBuffer msg)
+ public void received(QpidByteBuffer msg)
{
_delegate.received(msg);
}
@@ -272,7 +272,7 @@ public class MultiVersionProtocolEngine
}
- public void received(ByteBuffer msg)
+ public void received(QpidByteBuffer msg)
{
_logger.error("Error processing incoming data, could not negotiate
a common protocol");
msg.position(msg.limit());
@@ -338,7 +338,7 @@ public class MultiVersionProtocolEngine
private class SelfDelegateProtocolEngine implements ProtocolEngine
{
- private final ByteBuffer _header =
ByteBuffer.allocate(MINIMUM_REQUIRED_HEADER_BYTES);
+ private final QpidByteBuffer _header =
QpidByteBuffer.allocate(MINIMUM_REQUIRED_HEADER_BYTES);
private long _lastReadTime = System.currentTimeMillis();
private final AtomicBoolean _hasWork = new AtomicBoolean();
@@ -389,10 +389,10 @@ public class MultiVersionProtocolEngine
_hasWork.set(false);
}
- public void received(ByteBuffer msg)
+ public void received(QpidByteBuffer msg)
{
_lastReadTime = System.currentTimeMillis();
- ByteBuffer msgheader = msg.duplicate().slice();
+ QpidByteBuffer msgheader = msg.duplicate().slice();
if(_header.remaining() > msgheader.limit())
{
@@ -472,7 +472,7 @@ public class MultiVersionProtocolEngine
{
_logger.debug("Unsupported protocol version requested,
replying with: " + supportedReplyVersion);
}
- final ByteBuffer supportedReplyBuf =
ByteBuffer.allocateDirect(supportedReplyBytes.length);
+ final QpidByteBuffer supportedReplyBuf =
QpidByteBuffer.allocateDirect(supportedReplyBytes.length);
supportedReplyBuf.put(supportedReplyBytes);
supportedReplyBuf.flip();
_sender.send(supportedReplyBuf);
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
(original)
+++
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
Fri Aug 7 00:28:17 2015
@@ -30,6 +30,7 @@ import java.util.*;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
import org.apache.qpid.server.model.port.AmqpPort;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -60,8 +61,6 @@ public class NonBlockingConnection imple
private volatile int _maxReadIdle;
private volatile int _maxWriteIdle;
- private ByteBuffer _netInputBuffer;
-
private volatile boolean _fullyWritten = true;
private boolean _partialRead = false;
@@ -83,7 +82,6 @@ public class NonBlockingConnection imple
_receiveBufferSize = receiveBufferSize;
- _netInputBuffer = ByteBuffer.allocateDirect(receiveBufferSize);
_remoteSocketAddress =
_socketChannel.socket().getRemoteSocketAddress().toString();
_port = port;
@@ -112,6 +110,11 @@ public class NonBlockingConnection imple
return _partialRead;
}
+ int getReceiveBufferSize()
+ {
+ return _receiveBufferSize;
+ }
+
Ticker getTicker()
{
return _protocolEngine.getAggregateTicker();
@@ -230,7 +233,7 @@ public class NonBlockingConnection imple
_fullyWritten = doWrite();
_protocolEngine.setTransportBlockedForWriting(!_fullyWritten);
- if (dataRead || (_delegate.needsWork() &&
_netInputBuffer.position() != 0))
+ if (dataRead || (_delegate.needsWork() &&
_delegate.getNetInputBuffer().position() != 0))
{
_protocolEngine.notifyWork();
}
@@ -330,48 +333,17 @@ public class NonBlockingConnection imple
*/
boolean doRead() throws IOException
{
- boolean readData = false;
_partialRead = false;
- if (!_closed.get())
+ if(!_closed.get() && _delegate.readyForRead())
{
- readData = _delegate.doRead();
- }
- return readData;
- }
-
- boolean readAndProcessData() throws IOException
- {
- boolean readData = readIntoBuffer(_netInputBuffer);
+ int readData = readFromNetwork();
- ByteBuffer duplicate = _netInputBuffer.duplicate();
- duplicate.flip();
-
- readData |= processData(duplicate);
-
- if (_netInputBuffer.hasRemaining())
- {
- // slice but keep unprocessed data
- int amountOfUnprocessedData = duplicate.remaining();
- _netInputBuffer.position(_netInputBuffer.position() -
amountOfUnprocessedData);
- _netInputBuffer = _netInputBuffer.slice();
- _netInputBuffer.position(amountOfUnprocessedData);
+ return (readData > 0) || _delegate.processData();
}
else
{
- if(duplicate.remaining() < _receiveBufferSize)
- {
- // compact into new buffer
- _netInputBuffer =
ByteBuffer.allocateDirect(_receiveBufferSize);
- _netInputBuffer.put(duplicate);
- }
- else
- {
- // grow the buffer
- _netInputBuffer =
ByteBuffer.allocateDirect(_receiveBufferSize+_netInputBuffer.capacity());
- _netInputBuffer.put(duplicate);
- }
+ return false;
}
- return readData;
}
void writeToTransport(ByteBuffer[] buffers) throws IOException
@@ -402,20 +374,12 @@ public class NonBlockingConnection imple
}
}
- boolean processData(ByteBuffer data) throws IOException
+ protected int readFromNetwork() throws IOException
{
- return _delegate.processData(data);
- }
+ QpidByteBuffer buffer = _delegate.getNetInputBuffer();
- protected boolean readIntoBuffer(final ByteBuffer buffer) throws
IOException
- {
- boolean readData = false;
- int read = _socketChannel.read(buffer);
- if (read > 0)
- {
- readData = true;
- }
- else if (read == -1)
+ int read = _socketChannel.read(buffer.getNativeBuffer());
+ if (read == -1)
{
_closed.set(true);
}
@@ -426,11 +390,11 @@ public class NonBlockingConnection imple
{
LOGGER.debug("Read " + read + " byte(s)");
}
- return readData;
+ return read;
}
@Override
- public void send(final ByteBuffer msg)
+ public void send(final QpidByteBuffer msg)
{
if (_closed.get())
@@ -439,10 +403,11 @@ public class NonBlockingConnection imple
}
else if (msg.remaining() > 0)
{
- _buffers.add(msg);
+ _buffers.add(msg.getNativeBuffer());
}
}
+
public void writeBufferProcessed()
{
_buffers.poll();
@@ -469,13 +434,14 @@ public class NonBlockingConnection imple
return _scheduler;
}
- public void processAmqpData(ByteBuffer data)
+ public void processAmqpData(QpidByteBuffer applicationData)
{
- _protocolEngine.received(data);
+ _protocolEngine.received(applicationData);
}
public void setTransportEncryption(TransportEncryption transportEncryption)
{
+ NonBlockingConnectionDelegate oldDelegate = _delegate;
switch (transportEncryption)
{
case TLS:
@@ -488,6 +454,13 @@ public class NonBlockingConnection imple
default:
throw new IllegalArgumentException("unknown
TransportEncryption " + transportEncryption);
}
+ if(oldDelegate != null)
+ {
+ QpidByteBuffer src = oldDelegate.getNetInputBuffer().duplicate();
+ src.flip();
+ _delegate.getNetInputBuffer().put(src);
+ }
LOGGER.debug("Identified transport encryption as " +
transportEncryption);
}
+
}
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionDelegate.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionDelegate.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionDelegate.java
(original)
+++
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionDelegate.java
Fri Aug 7 00:28:17 2015
@@ -24,15 +24,23 @@ import java.nio.ByteBuffer;
import java.security.Principal;
import java.security.cert.Certificate;
-public interface NonBlockingConnectionDelegate
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
+
+interface NonBlockingConnectionDelegate
{
- boolean doRead() throws IOException;
boolean doWrite(ByteBuffer[] bufferArray) throws IOException;
- boolean processData(ByteBuffer data) throws IOException;
+
+ boolean readyForRead();
+
+ boolean processData() throws IOException;
Principal getPeerPrincipal();
Certificate getPeerCertificate();
boolean needsWork();
+
+ QpidByteBuffer getNetInputBuffer();
+
+ void setNetInputBuffer(QpidByteBuffer buffer);
}
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionPlainDelegate.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionPlainDelegate.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionPlainDelegate.java
(original)
+++
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionPlainDelegate.java
Fri Aug 7 00:28:17 2015
@@ -24,29 +24,63 @@ import java.nio.ByteBuffer;
import java.security.Principal;
import java.security.cert.Certificate;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
+
public class NonBlockingConnectionPlainDelegate implements
NonBlockingConnectionDelegate
{
+ private static final Logger LOGGER =
LoggerFactory.getLogger(NonBlockingConnectionPlainDelegate.class);
+
private final NonBlockingConnection _parent;
+ private QpidByteBuffer _netInputBuffer;
public NonBlockingConnectionPlainDelegate(NonBlockingConnection parent)
{
_parent = parent;
+ _netInputBuffer =
QpidByteBuffer.allocateDirect(parent.getReceiveBufferSize());
}
@Override
- public boolean doRead() throws IOException
+ public boolean readyForRead()
{
- return _parent.readAndProcessData();
+ return true;
}
@Override
- public boolean processData(ByteBuffer buffer)
+ public boolean processData()
{
- _parent.processAmqpData(buffer);
+ _netInputBuffer.flip();
+ _parent.processAmqpData(_netInputBuffer);
+
+ restoreApplicationBufferForWrite();
return false;
}
+ protected void restoreApplicationBufferForWrite()
+ {
+ _netInputBuffer = _netInputBuffer.slice();
+ if (_netInputBuffer.limit() != _netInputBuffer.capacity())
+ {
+ _netInputBuffer.position(_netInputBuffer.limit());
+ _netInputBuffer.limit(_netInputBuffer.capacity());
+ }
+ else
+ {
+ QpidByteBuffer currentBuffer = _netInputBuffer;
+ int newBufSize = (currentBuffer.capacity() <
_parent.getReceiveBufferSize())
+ ? _parent.getReceiveBufferSize()
+ : currentBuffer.capacity() +
_parent.getReceiveBufferSize();
+
+ _netInputBuffer = QpidByteBuffer.allocateDirect(newBufSize);
+ _netInputBuffer.put(currentBuffer);
+ }
+
+ }
+
+
@Override
public boolean doWrite(ByteBuffer[] bufferArray) throws IOException
{
@@ -87,4 +121,16 @@ public class NonBlockingConnectionPlainD
{
return false;
}
+
+ @Override
+ public QpidByteBuffer getNetInputBuffer()
+ {
+ return _netInputBuffer;
+ }
+
+ @Override
+ public void setNetInputBuffer(final QpidByteBuffer netInputBuffer)
+ {
+ _netInputBuffer = netInputBuffer;
+ }
}
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionTLSDelegate.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionTLSDelegate.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionTLSDelegate.java
(original)
+++
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionTLSDelegate.java
Fri Aug 7 00:28:17 2015
@@ -19,6 +19,7 @@
package org.apache.qpid.server.transport;
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
import org.apache.qpid.server.model.port.AmqpPort;
import org.apache.qpid.transport.network.security.ssl.SSLUtil;
import org.slf4j.Logger;
@@ -42,33 +43,77 @@ public class NonBlockingConnectionTLSDel
private final SSLEngine _sslEngine;
private final NonBlockingConnection _parent;
+ private final int _initialApplicationBufferSize;
private SSLEngineResult _status;
private final List<ByteBuffer> _encryptedOutput = new ArrayList<>();
private Principal _principal;
private Certificate _peerCertificate;
private boolean _principalChecked;
+ private QpidByteBuffer _netInputBuffer;
+ private QpidByteBuffer _applicationBuffer;
+
public NonBlockingConnectionTLSDelegate(NonBlockingConnection parent,
AmqpPort port)
{
_parent = parent;
_sslEngine = createSSLEngine(port);
+ _netInputBuffer =
QpidByteBuffer.allocateDirect(Math.max(parent.getReceiveBufferSize(),
+
_sslEngine.getSession().getPacketBufferSize()));
+
+ _initialApplicationBufferSize =
+ Math.max(_sslEngine.getSession().getApplicationBufferSize() +
50, _parent.getReceiveBufferSize());
+ _applicationBuffer =
QpidByteBuffer.allocateDirect(_initialApplicationBufferSize);
+
}
@Override
- public boolean doRead() throws IOException
+ public boolean readyForRead()
{
- boolean readData = false;
- if (_sslEngine.getHandshakeStatus() !=
SSLEngineResult.HandshakeStatus.NEED_WRAP && (_status == null ||
_status.getStatus() != SSLEngineResult.Status.CLOSED))
- {
- readData = _parent.readAndProcessData();
- }
- return readData;
+ return _sslEngine.getHandshakeStatus() !=
SSLEngineResult.HandshakeStatus.NEED_WRAP && (_status == null ||
_status.getStatus() != SSLEngineResult.Status.CLOSED);
}
@Override
- public boolean processData(ByteBuffer buffer) throws IOException
+ public boolean processData() throws IOException
{
- return unwrapAndProcessBuffer(buffer);
+ _netInputBuffer.flip();
+ boolean readData = false;
+ boolean tasksRun;
+ int oldNetBufferPos;
+ do
+ {
+ int oldAppBufPos = _applicationBuffer.position();
+ oldNetBufferPos = _netInputBuffer.position();
+
+ _status = _sslEngine.unwrap(_netInputBuffer.getNativeBuffer(),
_applicationBuffer.getNativeBuffer());
+ if (_status.getStatus() == SSLEngineResult.Status.CLOSED)
+ {
+ // KW If SSLEngine changes state to CLOSED, what will ever set
_closed to true?
+ LOGGER.debug("SSLEngine closed");
+ }
+
+ tasksRun = runSSLEngineTasks(_status);
+ _applicationBuffer.flip();
+ if(_applicationBuffer.position() > oldAppBufPos)
+ {
+ readData = true;
+ }
+
+ _parent.processAmqpData(_applicationBuffer);
+
+ restoreApplicationBufferForWrite();
+
+ }
+ while((_netInputBuffer.hasRemaining() &&
(_netInputBuffer.position()>oldNetBufferPos)) || tasksRun);
+
+ if(_netInputBuffer.hasRemaining())
+ {
+ _netInputBuffer.compact();
+ }
+ else
+ {
+ _netInputBuffer.clear();
+ }
+ return readData;
}
@Override
@@ -97,34 +142,25 @@ public class NonBlockingConnectionTLSDel
return (bufferArray.length == byteBuffersWritten) &&
_encryptedOutput.isEmpty();
}
- private boolean unwrapAndProcessBuffer(final ByteBuffer wrappedDataBuffer)
throws SSLException
+ protected void restoreApplicationBufferForWrite()
{
- boolean readData = false;
- int unwrapped;
- boolean tasksRun;
- do
+ _applicationBuffer = _applicationBuffer.slice();
+ if (_applicationBuffer.limit() != _applicationBuffer.capacity())
{
- ByteBuffer appInputBuffer =
-
ByteBuffer.allocateDirect(_sslEngine.getSession().getApplicationBufferSize() +
50);
- _status = _sslEngine.unwrap(wrappedDataBuffer, appInputBuffer);
- if (_status.getStatus() == SSLEngineResult.Status.CLOSED)
- {
- // KW If SSLEngine changes state to CLOSED, what will ever set
_closed to true?
- LOGGER.debug("SSLEngine closed");
- }
-
- tasksRun = runSSLEngineTasks(_status);
+ _applicationBuffer.position(_applicationBuffer.limit());
+ _applicationBuffer.limit(_applicationBuffer.capacity());
+ }
+ else
+ {
+ QpidByteBuffer currentBuffer = _applicationBuffer;
+ int newBufSize = (currentBuffer.capacity() <
_initialApplicationBufferSize)
+ ? _initialApplicationBufferSize
+ : currentBuffer.capacity() + _initialApplicationBufferSize;
- appInputBuffer.flip();
- unwrapped = appInputBuffer.remaining();
- if(unwrapped > 0)
- {
- readData = true;
- }
- _parent.processAmqpData(appInputBuffer);
+ _applicationBuffer = QpidByteBuffer.allocateDirect(newBufSize);
+ _applicationBuffer.put(currentBuffer);
}
- while(unwrapped > 0 || tasksRun);
- return readData;
+
}
private int wrapBufferArray(final ByteBuffer[] bufferArray) throws
SSLException
@@ -173,8 +209,10 @@ public class NonBlockingConnectionTLSDel
{
task.run();
}
+
return true;
}
+
return false;
}
@@ -240,4 +278,15 @@ public class NonBlockingConnectionTLSDel
return sslEngine;
}
+ @Override
+ public QpidByteBuffer getNetInputBuffer()
+ {
+ return _netInputBuffer;
+ }
+
+ @Override
+ public void setNetInputBuffer(final QpidByteBuffer netInputBuffer)
+ {
+ _netInputBuffer = netInputBuffer;
+ }
}
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionUndecidedDelegate.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionUndecidedDelegate.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionUndecidedDelegate.java
(original)
+++
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionUndecidedDelegate.java
Fri Aug 7 00:28:17 2015
@@ -19,6 +19,7 @@
package org.apache.qpid.server.transport;
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
import org.apache.qpid.transport.network.TransportEncryption;
import java.io.IOException;
@@ -29,25 +30,32 @@ import java.security.cert.Certificate;
public class NonBlockingConnectionUndecidedDelegate implements
NonBlockingConnectionDelegate
{
private static final int NUMBER_OF_BYTES_FOR_TLS_CHECK = 6;
+ private static final QpidByteBuffer EMPTY_BYTE_BUFFER =
QpidByteBuffer.allocate(0);
public final NonBlockingConnection _parent;
+ private QpidByteBuffer _netInputBuffer;
+
public NonBlockingConnectionUndecidedDelegate(NonBlockingConnection parent)
{
_parent = parent;
+ _netInputBuffer =
QpidByteBuffer.allocateDirect(NUMBER_OF_BYTES_FOR_TLS_CHECK);
+
}
@Override
- public boolean doRead() throws IOException
+ public boolean readyForRead()
{
- return _parent.readAndProcessData();
+ return true;
}
- public boolean processData(ByteBuffer buffer) throws IOException
+ public boolean processData() throws IOException
{
+ QpidByteBuffer buffer = _netInputBuffer.duplicate();
+ buffer.flip();
if (buffer.remaining() >= NUMBER_OF_BYTES_FOR_TLS_CHECK)
{
final byte[] headerBytes = new byte[NUMBER_OF_BYTES_FOR_TLS_CHECK];
- ByteBuffer dup = buffer.duplicate();
+ QpidByteBuffer dup = buffer.duplicate();
dup.get(headerBytes);
if (looksLikeSSL(headerBytes))
@@ -58,7 +66,8 @@ public class NonBlockingConnectionUndeci
{
_parent.setTransportEncryption(TransportEncryption.NONE);
}
- _parent.processData(buffer);
+
+ return true;
}
return false;
}
@@ -112,4 +121,16 @@ public class NonBlockingConnectionUndeci
headerBytes[4] == 2 || // TLS 1.1
headerBytes[4] == 3);
}
+
+ @Override
+ public QpidByteBuffer getNetInputBuffer()
+ {
+ return _netInputBuffer;
+ }
+
+ @Override
+ public void setNetInputBuffer(final QpidByteBuffer netInputBuffer)
+ {
+ _netInputBuffer = netInputBuffer;
+ }
}
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ProtocolEngine.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ProtocolEngine.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ProtocolEngine.java
(original)
+++
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ProtocolEngine.java
Fri Aug 7 00:28:17 2015
@@ -20,8 +20,11 @@
*/
package org.apache.qpid.server.transport;
+import java.nio.ByteBuffer;
+
import javax.security.auth.Subject;
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
import org.apache.qpid.server.util.Action;
import org.apache.qpid.transport.ByteBufferReceiver;
import org.apache.qpid.transport.network.AggregateTicker;
@@ -31,7 +34,7 @@ import org.apache.qpid.transport.network
* A ProtocolEngine is a Receiver for java.nio.ByteBuffers. It takes the data
passed to it in the received
* decodes it and then process the result.
*/
-public interface ProtocolEngine extends ByteBufferReceiver, TransportActivity
+public interface ProtocolEngine extends TransportActivity
{
// Called by the NetworkDriver when the socket has been closed for reading
@@ -68,4 +71,6 @@ public interface ProtocolEngine extends
void encryptedTransport();
+ void received(QpidByteBuffer msg);
+
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]