ARTEMIS-1722 Don't copy message bytes unless needed

Alternate patch that doesn't copy the message bytes unless doing a
redelivery or skipping delivery annotations in the original version of
the message.  Proton-J will copy the bytes provided to the Sender's send
method so a copy isn't necessary on most common sends.


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/169d0b7f
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/169d0b7f
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/169d0b7f

Branch: refs/heads/master
Commit: 169d0b7fa7852efecf77048a06fd9be89d749064
Parents: a1c39cd
Author: Timothy Bish <tabish...@gmail.com>
Authored: Fri Mar 2 15:03:08 2018 -0500
Committer: Clebert Suconic <clebertsuco...@apache.org>
Committed: Tue Mar 6 18:44:22 2018 -0500

----------------------------------------------------------------------
 .../protocol/amqp/broker/AMQPMessage.java       | 107 ++++++++++++++++---
 .../amqp/proton/ProtonServerSenderContext.java  |  23 ++--
 2 files changed, 105 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/169d0b7f/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
----------------------------------------------------------------------
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
index 56b0e6c..ba87ae6 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
@@ -665,16 +665,20 @@ public class AMQPMessage extends RefCountMessage {
 
    private synchronized void checkBuffer() {
       if (!bufferValid) {
-         int estimated = Math.max(1500, data != null ? data.capacity() + 1000 
: 0);
-         ByteBuf buffer = PooledByteBufAllocator.DEFAULT.heapBuffer(estimated);
-         try {
-            getProtonMessage().encode(new NettyWritable(buffer));
-            byte[] bytes = new byte[buffer.writerIndex()];
-            buffer.readBytes(bytes);
-            this.data = Unpooled.wrappedBuffer(bytes);
-         } finally {
-            buffer.release();
-         }
+         encodeProtonMessage();
+      }
+   }
+
+   private void encodeProtonMessage() {
+      int estimated = Math.max(1500, data != null ? data.capacity() + 1000 : 
0);
+      ByteBuf buffer = PooledByteBufAllocator.DEFAULT.heapBuffer(estimated);
+      try {
+         getProtonMessage().encode(new NettyWritable(buffer));
+         byte[] bytes = new byte[buffer.writerIndex()];
+         buffer.readBytes(bytes);
+         this.data = Unpooled.wrappedBuffer(bytes);
+      } finally {
+         buffer.release();
       }
    }
 
@@ -691,15 +695,16 @@ public class AMQPMessage extends RefCountMessage {
 
       int amqpDeliveryCount = deliveryCount - 1;
 
-      Header header = getHeader();
-      if (header == null && (amqpDeliveryCount > 0)) {
-         header = new Header();
-         header.setDurable(durable);
-      }
-
       // If the re-delivering the message then the header must be re-encoded
       // otherwise we want to write the original header if present.
       if (amqpDeliveryCount > 0) {
+
+         Header header = getHeader();
+         if (header == null) {
+            header = new Header();
+            header.setDurable(durable);
+         }
+
          synchronized (header) {
             
header.setDeliveryCount(UnsignedInteger.valueOf(amqpDeliveryCount));
             TLSEncode.getEncoder().setByteBuffer(new NettyWritable(buffer));
@@ -713,6 +718,76 @@ public class AMQPMessage extends RefCountMessage {
       buffer.writeBytes(data, messagePaylodStart, data.writerIndex() - 
messagePaylodStart);
    }
 
+   /**
+    * Gets a ByteBuf from the Message that contains the encoded bytes to be 
sent on the wire.
+    * <p>
+    * When possible this method will present the bytes to the caller without 
copying them into
+    * another buffer copy.  If copying is needed a new Netty buffer is created 
and returned. The
+    * caller should ensure that the reference count on the returned buffer is 
always decremented
+    * to avoid a leak in the case of a copied buffer being returned.
+    *
+    * @param deliveryCount
+    *       The new delivery count for this message.
+    *
+    * @return a Netty ByteBuf containing the encoded bytes of this Message 
instance.
+    */
+   public ByteBuf getSendBuffer(int deliveryCount) {
+      checkBuffer();
+
+      if (deliveryCount > 1) {
+         return createCopyWithNewDeliveryCount(deliveryCount);
+      } else if (headerEnds != messagePaylodStart) {
+         return createCopyWithoutDeliveryAnnotations();
+      } else {
+         // Common case message has no delivery annotations and this is the 
first delivery
+         // so no re-encoding or section skipping needed.
+         return data.retainedDuplicate();
+      }
+   }
+
+   private ByteBuf createCopyWithoutDeliveryAnnotations() {
+      assert headerEnds != messagePaylodStart;
+
+      // The original message had delivery annotations and so we must copy 
into a new
+      // buffer skipping the delivery annotations section as that is not meant 
to survive
+      // beyond this hop.
+      final ByteBuf result = 
PooledByteBufAllocator.DEFAULT.heapBuffer(getEncodeSize());
+      result.writeBytes(data, 0, headerEnds);
+      result.writeBytes(data, messagePaylodStart, data.writerIndex() - 
messagePaylodStart);
+      return result;
+   }
+
+   private ByteBuf createCopyWithNewDeliveryCount(int deliveryCount) {
+      assert deliveryCount > 1;
+
+      final int amqpDeliveryCount = deliveryCount - 1;
+      // If the re-delivering the message then the header must be re-encoded
+      // (or created if not previously present).  Any delivery annotations 
should
+      // be skipped as well in the resulting buffer.
+
+      final ByteBuf result = 
PooledByteBufAllocator.DEFAULT.heapBuffer(getEncodeSize());
+
+      Header header = getHeader();
+      if (header == null) {
+         header = new Header();
+         header.setDurable(durable);
+      }
+
+      synchronized (header) {
+         // Updates or adds a Header section with the correct delivery count
+         header.setDeliveryCount(UnsignedInteger.valueOf(amqpDeliveryCount));
+         TLSEncode.getEncoder().setByteBuffer(new NettyWritable(result));
+         TLSEncode.getEncoder().writeObject(header);
+         TLSEncode.getEncoder().setByteBuffer((WritableBuffer) null);
+      }
+
+      // This will skip any existing delivery annotations that might have been 
present
+      // in the original message.
+      result.writeBytes(data, messagePaylodStart, data.writerIndex() - 
messagePaylodStart);
+
+      return result;
+   }
+
    public TypedProperties createExtraProperties() {
       if (extraProperties == null) {
          extraProperties = new TypedProperties();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/169d0b7f/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
----------------------------------------------------------------------
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
index 1823168..990a217 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
@@ -22,8 +22,6 @@ import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.PooledByteBufAllocator;
 import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
 import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
 import org.apache.activemq.artemis.api.core.ActiveMQSecurityException;
@@ -48,6 +46,7 @@ import 
org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPNotFound
 import 
org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPResourceLimitExceededException;
 import 
org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMessageBundle;
 import 
org.apache.activemq.artemis.protocol.amqp.proton.transaction.ProtonTransactionImpl;
+import org.apache.activemq.artemis.protocol.amqp.util.NettyReadable;
 import org.apache.activemq.artemis.reader.MessageUtil;
 import org.apache.activemq.artemis.selector.filter.FilterException;
 import org.apache.activemq.artemis.selector.impl.SelectorParser;
@@ -75,6 +74,8 @@ import org.apache.qpid.proton.engine.Link;
 import org.apache.qpid.proton.engine.Sender;
 import org.jboss.logging.Logger;
 
+import io.netty.buffer.ByteBuf;
+
 /**
  * TODO: Merge {@link ProtonServerSenderContext} and {@link 
org.apache.activemq.artemis.protocol.amqp.client.ProtonClientSenderContext} 
once we support 'global' link names. The split is a workaround for outgoing 
links
  */
@@ -690,11 +691,11 @@ public class ProtonServerSenderContext extends 
ProtonInitializable implements Pr
       // we only need a tag if we are going to settle later
       byte[] tag = preSettle ? new byte[0] : protonSession.getTag();
 
-      ByteBuf nettyBuffer = 
PooledByteBufAllocator.DEFAULT.heapBuffer(message.getEncodeSize());
-      try {
-         message.sendBuffer(nettyBuffer, deliveryCount);
+      // Let the Message decide how to present the message bytes
+      ByteBuf sendBuffer = message.getSendBuffer(deliveryCount);
 
-         int size = nettyBuffer.writerIndex();
+      try {
+         int size = sendBuffer.writerIndex();
 
          while (!connection.tryLock(1, TimeUnit.SECONDS)) {
             if (closed || sender.getLocalState() == EndpointState.CLOSED) {
@@ -714,8 +715,12 @@ public class ProtonServerSenderContext extends 
ProtonInitializable implements Pr
             delivery.setMessageFormat((int) message.getMessageFormat());
             delivery.setContext(messageReference);
 
-            // this will avoid a copy.. patch provided by Norman using 
buffer.array()
-            sender.send(nettyBuffer.array(), nettyBuffer.arrayOffset() + 
nettyBuffer.readerIndex(), nettyBuffer.readableBytes());
+            if (sendBuffer.hasArray()) {
+               // this will avoid a copy.. patch provided by Norman using 
buffer.array()
+               sender.send(sendBuffer.array(), sendBuffer.arrayOffset() + 
sendBuffer.readerIndex(), sendBuffer.readableBytes());
+            } else {
+               sender.send(new NettyReadable(sendBuffer));
+            }
 
             if (preSettle) {
                // Presettled means the client implicitly accepts any delivery 
we send it.
@@ -731,7 +736,7 @@ public class ProtonServerSenderContext extends 
ProtonInitializable implements Pr
 
          return size;
       } finally {
-         nettyBuffer.release();
+         sendBuffer.release();
       }
    }
 

Reply via email to