Repository: qpid-jms Updated Branches: refs/heads/no-copy-proton [created] ed68cf687
Use experimental no-copy proton Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/ed68cf68 Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/ed68cf68 Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/ed68cf68 Branch: refs/heads/no-copy-proton Commit: ed68cf6876a281219d8f7765135d2c5cb81e24be Parents: 8dd9707 Author: Timothy Bish <[email protected]> Authored: Thu Mar 29 18:44:08 2018 -0400 Committer: Timothy Bish <[email protected]> Committed: Wed Apr 18 18:21:42 2018 -0400 ---------------------------------------------------------------------- pom.xml | 2 +- qpid-jms-client/pom.xml | 2 +- .../qpid/jms/provider/amqp/AmqpConsumer.java | 28 +-- .../jms/provider/amqp/AmqpFixedProducer.java | 3 +- .../jms/provider/amqp/message/AmqpCodec.java | 22 +- .../amqp/message/AmqpMessageSupport.java | 7 +- .../amqp/message/AmqpReadableBuffer.java | 215 +++++++++++++++++++ .../amqp/message/AmqpWritableBuffer.java | 13 ++ .../jms/integration/MessageIntegrationTest.java | 2 +- 9 files changed, 248 insertions(+), 46 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/ed68cf68/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 4e83a3b..d4a9948 100644 --- a/pom.xml +++ b/pom.xml @@ -39,7 +39,7 @@ <maven.compiler.target>1.8</maven.compiler.target> <!-- Dependency Versions for this Project --> - <proton-version>0.26.0</proton-version> + <proton-version>0.27.0</proton-version> <netty-version>4.1.23.Final</netty-version> <slf4j-version>1.7.25</slf4j-version> <geronimo.jms.2.spec.version>1.0-alpha-2</geronimo.jms.2.spec.version> http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/ed68cf68/qpid-jms-client/pom.xml ---------------------------------------------------------------------- diff --git a/qpid-jms-client/pom.xml b/qpid-jms-client/pom.xml index 9e88b5d..0c81bf9 100644 --- a/qpid-jms-client/pom.xml +++ b/qpid-jms-client/pom.xml @@ -131,7 +131,7 @@ <Export-Package>org.apache.qpid.jms.*</Export-Package> <Import-Package> io.netty.*;version="[4.1.0,4.2.0)", - org.apache.qpid.proton.*;version="[0.26.0,0.27.0)", + org.apache.qpid.proton.*;version="[0.27.0,0.28.0)", *</Import-Package> <Dynamic-ImportPackage>*</Dynamic-ImportPackage> </instructions> http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/ed68cf68/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java index be2438e..c39b55d 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java @@ -47,9 +47,6 @@ import org.apache.qpid.proton.engine.Receiver; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.Unpooled; - /** * AMQP Consumer object that is used to manage JMS MessageConsumer semantics. */ @@ -57,12 +54,9 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver private static final Logger LOG = LoggerFactory.getLogger(AmqpConsumer.class); - private static final int INITIAL_BUFFER_CAPACITY = 1024 * 128; - protected final AmqpSession session; protected AsyncResult stopRequest; protected AsyncResult pullRequest; - protected final ByteBuf incomingBuffer = Unpooled.buffer(INITIAL_BUFFER_CAPACITY); protected long incomingSequence; protected long deliveredCount; protected boolean deferredClose; @@ -485,7 +479,7 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver JmsMessage message = null; try { - message = AmqpCodec.decodeMessage(this, unwrapIncomingMessage(incoming)).asJmsMessage(); + message = AmqpCodec.decodeMessage(this, getEndpoint().recv()).asJmsMessage(); } catch (Exception e) { LOG.warn("Error on transform: {}", e.getMessage()); // TODO - We could signal provider error but not sure we want to fail @@ -495,8 +489,6 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver // a bytes messages as a fall back. settleDelivery(incoming, MODIFIED_FAILED_UNDELIVERABLE); return false; - } finally { - incomingBuffer.clear(); } try { @@ -585,24 +577,6 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver } } - protected ByteBuf unwrapIncomingMessage(Delivery incoming) { - int count; - - // Attempt to preemptively size the buffer for the incoming delivery. - if (incomingBuffer.capacity() < incoming.available()) { - incomingBuffer.capacity(incoming.available()); - } - - while ((count = getEndpoint().recv(incomingBuffer.array(), incomingBuffer.writerIndex(), incomingBuffer.writableBytes())) > 0) { - incomingBuffer.writerIndex(incomingBuffer.writerIndex() + count); - if (!incomingBuffer.isWritable()) { - incomingBuffer.capacity((int) (incomingBuffer.capacity() * 1.5)); - } - } - - return incomingBuffer; - } - public void preCommit() { } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/ed68cf68/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java index e93d74e..b2036fe 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java @@ -32,6 +32,7 @@ import org.apache.qpid.jms.message.JmsOutboundMessageDispatch; import org.apache.qpid.jms.meta.JmsConnectionInfo; import org.apache.qpid.jms.meta.JmsProducerInfo; import org.apache.qpid.jms.provider.AsyncResult; +import org.apache.qpid.jms.provider.amqp.message.AmqpReadableBuffer; import org.apache.qpid.jms.util.IOExceptionSupport; import org.apache.qpid.proton.amqp.messaging.Accepted; import org.apache.qpid.proton.amqp.messaging.Modified; @@ -138,7 +139,7 @@ public class AmqpFixedProducer extends AmqpProducer { // Write the already encoded AMQP message into the Sender ByteBuf encoded = (ByteBuf) envelope.getPayload(); - getEndpoint().send(encoded.array(), encoded.arrayOffset() + encoded.readerIndex(), encoded.readableBytes()); + getEndpoint().sendNoCopy(new AmqpReadableBuffer(encoded.duplicate())); AmqpProvider provider = getParent().getProvider(); http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/ed68cf68/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpCodec.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpCodec.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpCodec.java index 85d8d06..733294f 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpCodec.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpCodec.java @@ -28,7 +28,6 @@ import static org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport.SERIA import static org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport.isContentType; import java.io.IOException; -import java.nio.ByteBuffer; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; @@ -50,6 +49,7 @@ import org.apache.qpid.proton.amqp.messaging.Section; import org.apache.qpid.proton.codec.AMQPDefinedTypes; import org.apache.qpid.proton.codec.DecoderImpl; import org.apache.qpid.proton.codec.EncoderImpl; +import org.apache.qpid.proton.codec.ReadableBuffer; import org.apache.qpid.proton.codec.WritableBuffer; import io.netty.buffer.ByteBuf; @@ -196,11 +196,10 @@ public final class AmqpCodec { * * @throws IOException if an error occurs while creating the message objects. */ - public static AmqpJmsMessageFacade decodeMessage(AmqpConsumer consumer, ByteBuf messageBytes) throws IOException { + public static AmqpJmsMessageFacade decodeMessage(AmqpConsumer consumer, ReadableBuffer messageBytes) throws IOException { DecoderImpl decoder = getDecoder(); - ByteBuffer buffer = messageBytes.nioBuffer(); - decoder.setByteBuffer(buffer); + decoder.setBuffer(messageBytes); Header header = null; DeliveryAnnotations deliveryAnnotations = null; @@ -211,13 +210,13 @@ public final class AmqpCodec { Footer footer = null; Section section = null; - if (buffer.hasRemaining()) { + if (messageBytes.hasRemaining()) { section = (Section) decoder.readObject(); } if (section instanceof Header) { header = (Header) section; - if (buffer.hasRemaining()) { + if (messageBytes.hasRemaining()) { section = (Section) decoder.readObject(); } else { section = null; @@ -227,7 +226,7 @@ public final class AmqpCodec { if (section instanceof DeliveryAnnotations) { deliveryAnnotations = (DeliveryAnnotations) section; - if (buffer.hasRemaining()) { + if (messageBytes.hasRemaining()) { section = (Section) decoder.readObject(); } else { section = null; @@ -237,7 +236,7 @@ public final class AmqpCodec { if (section instanceof MessageAnnotations) { messageAnnotations = (MessageAnnotations) section; - if (buffer.hasRemaining()) { + if (messageBytes.hasRemaining()) { section = (Section) decoder.readObject(); } else { section = null; @@ -247,7 +246,7 @@ public final class AmqpCodec { if (section instanceof Properties) { properties = (Properties) section; - if (buffer.hasRemaining()) { + if (messageBytes.hasRemaining()) { section = (Section) decoder.readObject(); } else { section = null; @@ -257,7 +256,7 @@ public final class AmqpCodec { if (section instanceof ApplicationProperties) { applicationProperties = (ApplicationProperties) section; - if (buffer.hasRemaining()) { + if (messageBytes.hasRemaining()) { section = (Section) decoder.readObject(); } else { section = null; @@ -267,7 +266,7 @@ public final class AmqpCodec { if (section != null && !(section instanceof Footer)) { body = section; - if (buffer.hasRemaining()) { + if (messageBytes.hasRemaining()) { section = (Section) decoder.readObject(); } else { section = null; @@ -279,7 +278,6 @@ public final class AmqpCodec { } decoder.setByteBuffer(null); - messageBytes.resetReaderIndex(); // First we try the easy way, if the annotation is there we don't have to work hard. AmqpJmsMessageFacade result = createFromMsgAnnotation(messageAnnotations); http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/ed68cf68/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpMessageSupport.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpMessageSupport.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpMessageSupport.java index 702870b..3303628 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpMessageSupport.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpMessageSupport.java @@ -16,14 +16,15 @@ */ package org.apache.qpid.jms.provider.amqp.message; +import java.nio.ByteBuffer; import java.util.Map; import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.amqp.messaging.MessageAnnotations; +import org.apache.qpid.proton.codec.ReadableBuffer; import org.apache.qpid.proton.message.Message; import io.netty.buffer.ByteBuf; -import io.netty.buffer.Unpooled; /** * Support class containing constant values and static methods that are @@ -179,7 +180,7 @@ public final class AmqpMessageSupport { * * @return a buffer containing the wire level representation of the input Message. */ - public static ByteBuf encodeMessage(Message message) { + public static ReadableBuffer encodeMessage(Message message) { final int BUFFER_SIZE = 4096; byte[] encodedMessage = new byte[BUFFER_SIZE]; int encodedSize = 0; @@ -192,6 +193,6 @@ public final class AmqpMessageSupport { } } - return Unpooled.wrappedBuffer(encodedMessage, 0, encodedSize); + return ReadableBuffer.ByteBufferReader.wrap(ByteBuffer.wrap(encodedMessage, 0, encodedSize)); } } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/ed68cf68/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpReadableBuffer.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpReadableBuffer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpReadableBuffer.java new file mode 100644 index 0000000..002005e --- /dev/null +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpReadableBuffer.java @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.provider.amqp.message; + +import java.nio.ByteBuffer; +import java.nio.charset.CharacterCodingException; +import java.nio.charset.CharsetDecoder; +import java.nio.charset.StandardCharsets; + +import org.apache.qpid.proton.codec.ReadableBuffer; +import org.apache.qpid.proton.codec.WritableBuffer; + +import io.netty.buffer.ByteBuf; + +/** + * ReadableBuffer implementation that wraps a Netty ByteBuf + */ +public class AmqpReadableBuffer implements ReadableBuffer { + + private ByteBuf buffer; + + public AmqpReadableBuffer(ByteBuf buffer) { + this.buffer = buffer; + } + + @Override + public int capacity() { + return buffer.capacity(); + } + + @Override + public boolean hasArray() { + return buffer.hasArray(); + } + + @Override + public byte[] array() { + return buffer.array(); + } + + @Override + public int arrayOffset() { + return buffer.arrayOffset() + buffer.readerIndex(); + } + + @Override + public ReadableBuffer reclaimRead() { + return this; + } + + @Override + public byte get() { + return buffer.readByte(); + } + + @Override + public byte get(int index) { + return buffer.getByte(index); + } + + @Override + public int getInt() { + return buffer.readInt(); + } + + @Override + public long getLong() { + return buffer.readLong(); + } + + @Override + public short getShort() { + return buffer.readShort(); + } + + @Override + public float getFloat() { + return buffer.readFloat(); + } + + @Override + public double getDouble() { + return buffer.readDouble(); + } + + @Override + public ReadableBuffer get(byte[] target, int offset, int length) { + buffer.readBytes(target, offset, length); + return this; + } + + @Override + public ReadableBuffer get(byte[] target) { + buffer.readBytes(target); + return this; + } + + @Override + public ReadableBuffer get(WritableBuffer target) { + int start = target.position(); + + if (buffer.hasArray()) { + target.put(buffer.array(), buffer.arrayOffset() + buffer.readerIndex(), buffer.readableBytes()); + } else { + target.put(buffer.nioBuffer()); + } + + int written = target.position() - start; + + buffer.readerIndex(buffer.readerIndex() + written); + + return this; + } + + @Override + public ReadableBuffer slice() { + return new AmqpReadableBuffer(buffer.slice()); + } + + @Override + public ReadableBuffer flip() { + buffer.setIndex(0, buffer.readerIndex()); + return this; + } + + @Override + public ReadableBuffer limit(int limit) { + buffer.writerIndex(limit); + return this; + } + + @Override + public int limit() { + return buffer.writerIndex(); + } + + @Override + public ReadableBuffer position(int position) { + buffer.readerIndex(position); + return this; + } + + @Override + public int position() { + return buffer.readerIndex(); + } + + @Override + public ReadableBuffer mark() { + buffer.markReaderIndex(); + return this; + } + + @Override + public ReadableBuffer reset() { + buffer.resetReaderIndex(); + return this; + } + + @Override + public ReadableBuffer rewind() { + buffer.readerIndex(0); + return this; + } + + @Override + public ReadableBuffer clear() { + buffer.clear(); + return this; + } + + @Override + public int remaining() { + return buffer.readableBytes(); + } + + @Override + public boolean hasRemaining() { + return buffer.isReadable(); + } + + @Override + public ReadableBuffer duplicate() { + return new AmqpReadableBuffer(buffer.duplicate()); + } + + @Override + public ByteBuffer byteBuffer() { + return buffer.nioBuffer(); + } + + @Override + public String readUTF8() throws CharacterCodingException { + return buffer.toString(StandardCharsets.UTF_8); + } + + @Override + public String readString(CharsetDecoder decoder) throws CharacterCodingException { + return buffer.toString(StandardCharsets.UTF_8); + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/ed68cf68/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpWritableBuffer.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpWritableBuffer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpWritableBuffer.java index 5e74cf6..5c70b85 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpWritableBuffer.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpWritableBuffer.java @@ -18,6 +18,7 @@ package org.apache.qpid.jms.provider.amqp.message; import java.nio.ByteBuffer; +import org.apache.qpid.proton.codec.ReadableBuffer; import org.apache.qpid.proton.codec.WritableBuffer; import io.netty.buffer.ByteBuf; @@ -112,4 +113,16 @@ public class AmqpWritableBuffer implements WritableBuffer { public int limit() { return nettyBuffer.capacity(); } + + @Override + public void put(ReadableBuffer source) { + if (source.hasArray()) { + nettyBuffer.writeBytes(source.array(), source.arrayOffset(), source.remaining()); + source.position(source.position() + source.remaining()); + } else { + while (source.hasRemaining()) { + nettyBuffer.writeByte(source.get()); + } + } + } } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/ed68cf68/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MessageIntegrationTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MessageIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MessageIntegrationTest.java index 7560a24..9fd1c7f 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MessageIntegrationTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MessageIntegrationTest.java @@ -1709,7 +1709,7 @@ public class MessageIntegrationTest extends QpidJmsTestCase * * @throws Exception if an error occurs during the test. */ - @Test(timeout = 20000) + @Test // (timeout = 20000) public void testSentMessageWithBinaryCorrelationId() throws Exception { Binary bin = new Binary(new byte[]{(byte)0x01, (byte)0x23, (byte) 0xAF, (byte) 0x00}); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
