Repository: activemq-artemis Updated Branches: refs/heads/master 9e6c40a8d -> 2ba90ef60
ARTEMIS-1038 Make usage of Delivery.available and upgrade proton Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/ae34b010 Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/ae34b010 Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/ae34b010 Branch: refs/heads/master Commit: ae34b01065af7712376bbb3d32d0e677b885ef10 Parents: 9e6c40a Author: Clebert Suconic <[email protected]> Authored: Tue Mar 14 11:38:40 2017 -0400 Committer: Clebert Suconic <[email protected]> Committed: Wed Mar 15 14:37:41 2017 -0400 ---------------------------------------------------------------------- .../amqp/proton/ProtonServerReceiverContext.java | 19 ++++--------------- pom.xml | 2 +- .../amqp/client/util/UnmodifiableDelivery.java | 5 +++++ 3 files changed, 10 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ae34b010/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java index d5fc196..f08c1fc 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java @@ -19,8 +19,6 @@ package org.apache.activemq.artemis.protocol.amqp.proton; import java.util.Arrays; import java.util.List; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.PooledByteBufAllocator; import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback; @@ -28,7 +26,6 @@ import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPExceptio import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPInternalErrorException; import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPNotFoundException; import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMessageBundle; -import org.apache.activemq.artemis.protocol.amqp.util.DeliveryUtil; import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.amqp.messaging.Rejected; import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy; @@ -134,7 +131,6 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements @Override public void onMessage(Delivery delivery) throws ActiveMQAMQPException { Receiver receiver; - ByteBuf buffer = null; try { receiver = ((Receiver) delivery.getLink()); @@ -145,20 +141,17 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements if (delivery.isPartial()) { return; } - // This should be used if getDataLength was avilable -// byte[] data = new byte[delivery.getDataLength()]; - buffer = PooledByteBufAllocator.DEFAULT.heapBuffer(10 * 1024); Transaction tx = null; + byte[] data; + synchronized (connection.getLock()) { - DeliveryUtil.readDelivery(receiver, buffer); + data = new byte[delivery.available()]; + receiver.recv(data, 0, data.length); receiver.advance(); } - byte[] data = new byte[buffer.writerIndex()]; - buffer.readBytes(data); - if (delivery.getRemoteState() instanceof TransactionalState) { TransactionalState txState = (TransactionalState) delivery.getRemoteState(); @@ -179,10 +172,6 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements rejected.setError(condition); delivery.disposition(rejected); delivery.settle(); - } finally { - if (buffer != null) { - buffer.release(); - } } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ae34b010/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 37f6ca1..8c05537 100644 --- a/pom.xml +++ b/pom.xml @@ -83,7 +83,7 @@ <jgroups.version>3.6.9.Final</jgroups.version> <maven.assembly.plugin.version>2.4</maven.assembly.plugin.version> <netty.version>4.1.6.Final</netty.version> - <proton.version>0.16.0</proton.version> + <proton.version>0.18.0</proton.version> <resteasy.version>3.0.19.Final</resteasy.version> <slf4j.version>1.7.21</slf4j.version> <qpid.jms.version>0.20.0</qpid.jms.version> http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ae34b010/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableDelivery.java ---------------------------------------------------------------------- diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableDelivery.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableDelivery.java index d9bddcb..5545884 100644 --- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableDelivery.java +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableDelivery.java @@ -58,6 +58,11 @@ public class UnmodifiableDelivery implements Delivery { } */ @Override + public int available() { + return delivery.available(); + } + + @Override public DeliveryState getLocalState() { return delivery.getLocalState(); }
