using older proton version
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/326a8e4c Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/326a8e4c Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/326a8e4c Branch: refs/heads/artemis-1009 Commit: 326a8e4cb8de875b58fb4942cb602d2bde6456c2 Parents: 7381bf7 Author: Clebert Suconic <[email protected]> Authored: Tue Feb 28 22:42:06 2017 -0500 Committer: Clebert Suconic <[email protected]> Committed: Wed Mar 1 12:19:02 2017 -0500 ---------------------------------------------------------------------- .../protocol/amqp/broker/AMQPMessage.java | 8 +-- .../message/JMSMappingOutboundTransformer.java | 21 +------- .../proton/ProtonServerReceiverContext.java | 19 +++++-- .../protocol/amqp/util/DeliveryUtil.java | 13 +++++ .../artemis/protocol/amqp/util/TLSEncode.java | 52 ++++++++++++++++++++ .../amqp/client/util/UnmodifiableDelivery.java | 3 +- 6 files changed, 89 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/326a8e4c/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java index 1cb85ea..c963061 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java @@ -35,6 +35,7 @@ import org.apache.activemq.artemis.api.core.encode.BodyType; import org.apache.activemq.artemis.core.message.LargeBodyEncoder; import org.apache.activemq.artemis.core.persistence.Persister; import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable; +import org.apache.activemq.artemis.protocol.amqp.util.TLSEncode; import org.apache.activemq.artemis.utils.DataConstants; import org.apache.qpid.proton.amqp.UnsignedInteger; import org.apache.qpid.proton.amqp.messaging.ApplicationProperties; @@ -46,7 +47,6 @@ import org.apache.qpid.proton.amqp.messaging.Section; import org.apache.qpid.proton.codec.DecoderImpl; import org.apache.qpid.proton.message.Message; import org.apache.qpid.proton.message.impl.MessageImpl; -import org.apache.qpid.proton.util.TLSEncoder; // see https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#section-message-format public class AMQPMessage extends RefCountMessage { @@ -183,7 +183,7 @@ public class AMQPMessage extends RefCountMessage { } private synchronized void partialDecode(ByteBuffer buffer, boolean readApplicationProperties) { - DecoderImpl decoder = TLSEncoder.getDecoder(); + DecoderImpl decoder = TLSEncode.getDecoder(); decoder.setByteBuffer(buffer); buffer.position(0); @@ -500,8 +500,8 @@ public class AMQPMessage extends RefCountMessage { } else { header.setDeliveryCount(UnsignedInteger.valueOf(1)); } - TLSEncoder.getEncoder().setByteBuffer(new NettyWritable(buffer)); - TLSEncoder.getEncoder().writeObject(header); + TLSEncode.getEncoder().setByteBuffer(new NettyWritable(buffer)); + TLSEncode.getEncoder().writeObject(header); } } buffer.writeBytes(data, headerEnd, data.writerIndex() - headerEnd); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/326a8e4c/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformer.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformer.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformer.java index 2ef3122..23bcaf1 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformer.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformer.java @@ -71,6 +71,7 @@ import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSObjectMe import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSStreamMessage; import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSTextMessage; import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPIllegalStateException; +import org.apache.activemq.artemis.protocol.amqp.util.TLSEncode; import org.apache.activemq.artemis.reader.MessageUtil; import org.apache.activemq.artemis.utils.IDGenerator; import org.apache.qpid.proton.amqp.Binary; @@ -87,8 +88,6 @@ import org.apache.qpid.proton.amqp.messaging.Header; import org.apache.qpid.proton.amqp.messaging.MessageAnnotations; import org.apache.qpid.proton.amqp.messaging.Properties; import org.apache.qpid.proton.amqp.messaging.Section; -import org.apache.qpid.proton.codec.AMQPDefinedTypes; -import org.apache.qpid.proton.codec.DecoderImpl; import org.apache.qpid.proton.codec.EncoderImpl; import org.apache.qpid.proton.codec.WritableBuffer; import org.jboss.logging.Logger; @@ -105,22 +104,6 @@ public class JMSMappingOutboundTransformer extends OutboundTransformer { public static final byte TEMP_QUEUE_TYPE = 0x02; public static final byte TEMP_TOPIC_TYPE = 0x03; - // For now Proton requires that we create a decoder to create an encoder - private static class EncoderDecoderPair { - DecoderImpl decoder = new DecoderImpl(); - EncoderImpl encoder = new EncoderImpl(decoder); - { - AMQPDefinedTypes.registerAllTypes(decoder, encoder); - } - } - - private static final ThreadLocal<EncoderDecoderPair> tlsCodec = new ThreadLocal<EncoderDecoderPair>() { - @Override - protected EncoderDecoderPair initialValue() { - return new EncoderDecoderPair(); - } - }; - public JMSMappingOutboundTransformer(IDGenerator idGenerator) { super(idGenerator); } @@ -375,7 +358,7 @@ public class JMSMappingOutboundTransformer extends OutboundTransformer { apMap.put(key, objectProperty); } - EncoderImpl encoder = tlsCodec.get().encoder; + EncoderImpl encoder = TLSEncode.getEncoder(); encoder.setByteBuffer(buffer); if (header != null) { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/326a8e4c/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java index 34c2c07..ea2635e 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java @@ -16,6 +16,8 @@ */ package org.apache.activemq.artemis.protocol.amqp.proton; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.PooledByteBufAllocator; import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback; @@ -23,6 +25,7 @@ import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPExceptio import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPInternalErrorException; import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPNotFoundException; import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMessageBundle; +import org.apache.activemq.artemis.protocol.amqp.util.DeliveryUtil; import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.amqp.messaging.Rejected; import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy; @@ -131,6 +134,7 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements @Override public void onMessage(Delivery delivery) throws ActiveMQAMQPException { Receiver receiver; + ByteBuf buffer = null; try { receiver = ((Receiver) delivery.getLink()); @@ -141,15 +145,20 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements if (delivery.isPartial()) { return; } - byte[] data = new byte[delivery.getDataLength()]; + // This should be used if getDataLength was avilable +// byte[] data = new byte[delivery.getDataLength()]; + + buffer = PooledByteBufAllocator.DEFAULT.heapBuffer(10 * 1024); Transaction tx = null; synchronized (connection.getLock()) { - receiver.recv(data, 0, data.length); - + DeliveryUtil.readDelivery(receiver, buffer); receiver.advance(); } + byte[] data = new byte[buffer.writerIndex()]; + buffer.readBytes(data); + if (delivery.getRemoteState() instanceof TransactionalState) { TransactionalState txState = (TransactionalState) delivery.getRemoteState(); @@ -170,6 +179,10 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements rejected.setError(condition); delivery.disposition(rejected); delivery.settle(); + } finally { + if (buffer != null) { + buffer.release(); + } } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/326a8e4c/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/DeliveryUtil.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/DeliveryUtil.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/DeliveryUtil.java index 0ff1d3b..9257c6b 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/DeliveryUtil.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/DeliveryUtil.java @@ -17,11 +17,24 @@ package org.apache.activemq.artemis.protocol.amqp.util; import io.netty.buffer.ByteBuf; +import org.apache.qpid.proton.engine.Receiver; import org.apache.qpid.proton.message.Message; import org.apache.qpid.proton.message.impl.MessageImpl; public class DeliveryUtil { + public static int readDelivery(Receiver receiver, ByteBuf buffer) { + int initial = buffer.writerIndex(); + // optimization by norman + int count; + while ((count = receiver.recv(buffer.array(), buffer.arrayOffset() + buffer.writerIndex(), buffer.writableBytes())) > 0) { + // Increment the writer index by the number of bytes written into it while calling recv. + buffer.writerIndex(buffer.writerIndex() + count); + buffer.ensureWritable(count); + } + return buffer.writerIndex() - initial; + } + public static MessageImpl decodeMessageImpl(ByteBuf buffer) { MessageImpl message = (MessageImpl) Message.Factory.create(); message.decode(buffer.array(), buffer.arrayOffset() + buffer.readerIndex(), buffer.readableBytes()); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/326a8e4c/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/TLSEncode.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/TLSEncode.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/TLSEncode.java new file mode 100644 index 0000000..b2f0fdc --- /dev/null +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/TLSEncode.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.protocol.amqp.util; + +import org.apache.qpid.proton.codec.AMQPDefinedTypes; +import org.apache.qpid.proton.codec.DecoderImpl; +import org.apache.qpid.proton.codec.EncoderImpl; + +/** This can go away if Proton provides this feature. */ +public class TLSEncode { + + // For now Proton requires that we create a decoder to create an encoder + private static class EncoderDecoderPair { + DecoderImpl decoder = new DecoderImpl(); + EncoderImpl encoder = new EncoderImpl(decoder); + { + AMQPDefinedTypes.registerAllTypes(decoder, encoder); + } + } + + private static final ThreadLocal<EncoderDecoderPair> tlsCodec = new ThreadLocal<EncoderDecoderPair>() { + @Override + protected EncoderDecoderPair initialValue() { + return new EncoderDecoderPair(); + } + }; + + public static EncoderImpl getEncoder() { + return tlsCodec.get().encoder; + } + + public static DecoderImpl getDecoder() { + return tlsCodec.get().decoder; + } + + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/326a8e4c/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableDelivery.java ---------------------------------------------------------------------- diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableDelivery.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableDelivery.java index d7d7f9d..d9bddcb 100644 --- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableDelivery.java +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableDelivery.java @@ -51,10 +51,11 @@ public class UnmodifiableDelivery implements Delivery { } } + /* waiting Pull Request sent @Override public int getDataLength() { return delivery.getDataLength(); - } + } */ @Override public DeliveryState getLocalState() {
