This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new a4c311f  ARTEMIS-3449 Speedup AMQP large message streamig
     new 20c1836  This closes #3711
a4c311f is described below

commit a4c311f3681b31ae706a8843278746d8420185c6
Author: franz1981 <[email protected]>
AuthorDate: Fri Aug 27 18:04:29 2021 +0200

    ARTEMIS-3449 Speedup AMQP large message streamig
---
 .../artemis/core/message/LargeBodyReader.java      |   3 +-
 .../jdbc/store/file/JDBCSequentialFile.java        |  10 +-
 .../protocol/amqp/broker/AMQPLargeMessage.java     |  29 ++---
 .../amqp/proton/ProtonServerSenderContext.java     | 123 +++++++++++----------
 .../core/persistence/impl/journal/LargeBody.java   |  13 ---
 5 files changed, 88 insertions(+), 90 deletions(-)

diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/LargeBodyReader.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/LargeBodyReader.java
index 42de11b..7491aa5 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/LargeBodyReader.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/LargeBodyReader.java
@@ -27,7 +27,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQException;
  *
  * None of these methods should be caleld from Clients
  */
-public interface LargeBodyReader {
+public interface LargeBodyReader extends AutoCloseable {
 
    /**
     * This method must not be called directly by ActiveMQ Artemis clients.
@@ -51,6 +51,7 @@ public interface LargeBodyReader {
    /**
     * This method must not be called directly by ActiveMQ Artemis clients.
     */
+   @Override
    void close() throws ActiveMQException;
 
    /**
diff --git 
a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFile.java
 
b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFile.java
index 6f0b297..1ee9c77 100644
--- 
a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFile.java
+++ 
b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFile.java
@@ -192,7 +192,15 @@ public class JDBCSequentialFile implements SequentialFile {
    }
 
    private synchronized int internalWrite(ByteBuffer buffer, IOCallback 
callback) {
-      return internalWrite(buffer.array(), callback, true);
+      final byte[] data;
+      if (buffer.hasArray() && buffer.arrayOffset() == 0 && buffer.position() 
== 0 && buffer.limit() == buffer.array().length) {
+         data = buffer.array();
+      } else {
+         byte[] copy = new byte[buffer.remaining()];
+         buffer.get(copy);
+         data = copy;
+      }
+      return internalWrite(data, callback, true);
    }
 
    private void scheduleWrite(final ActiveMQBuffer bytes, final IOCallback 
callback, boolean append) {
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java
index 526ad54..247a3ec 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java
@@ -27,6 +27,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.ICoreMessage;
 import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
 import org.apache.activemq.artemis.core.io.SequentialFile;
 import org.apache.activemq.artemis.core.message.LargeBodyReader;
 import org.apache.activemq.artemis.core.persistence.CoreMessageObjectPools;
@@ -107,7 +108,7 @@ public class AMQPLargeMessage extends AMQPMessage 
implements LargeServerMessage
    private StorageManager storageManager;
 
    /** this is used to parse the initial packets from the buffer */
-   CompositeReadableBuffer parsingBuffer;
+   private CompositeReadableBuffer parsingBuffer;
 
    public AMQPLargeMessage(long id,
                            long messageFormat,
@@ -146,16 +147,6 @@ public class AMQPLargeMessage extends AMQPMessage 
implements LargeServerMessage
       setMessageID(newID);
    }
 
-   public void openLargeMessage() throws Exception {
-      this.parsingData = new AmqpReadableBuffer(largeBody.map());
-   }
-
-   public void closeLargeMessage() throws Exception {
-      largeBody.releaseResources(false, true);
-      parsingData.freeDirectBuffer();
-      parsingData = null;
-   }
-
    public void releaseEncodedBuffer() {
       internalReleaseBuffer(1);
    }
@@ -347,14 +338,14 @@ public class AMQPLargeMessage extends AMQPMessage 
implements LargeServerMessage
    public void addBytes(ReadableBuffer data) throws Exception {
       parseLargeMessage(data);
 
-      if (data.hasArray() && data.remaining() == data.array().length) {
-         //System.out.println("Received " + data.array().length + "::" + 
ByteUtil.formatGroup(ByteUtil.bytesToHex(data.array()), 8, 16));
-         largeBody.addBytes(data.array());
-      } else {
-         byte[] bytes = new byte[data.remaining()];
-         data.get(bytes);
-         //System.out.println("Finishing " + bytes.length + 
ByteUtil.formatGroup(ByteUtil.bytesToHex(bytes), 8, 16));
-         largeBody.addBytes(bytes);
+      final int remaining = data.remaining();
+      final ByteBuf writeBuffer = 
PooledByteBufAllocator.DEFAULT.directBuffer(remaining, remaining);
+      try {
+         // perform copy of data
+         data.get(new NettyWritable(writeBuffer));
+         largeBody.addBytes(new ChannelBufferWrapper(writeBuffer, true, true));
+      } finally {
+         writeBuffer.release();
       }
    }
 
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
index b46bcc3..32585d7 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
@@ -16,7 +16,6 @@
  */
 package org.apache.activemq.artemis.protocol.amqp.proton;
 
-import java.nio.ByteBuffer;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
@@ -572,7 +571,7 @@ public class ProtonServerSenderContext extends 
ProtonInitializable implements Pr
       void deliver() {
 
          // This is discounting some bytes due to Transfer payload
-         int frameSize = 
protonSession.session.getConnection().getTransport().getOutboundFrameSizeLimit()
 - 50 - (delivery.getTag() != null ? delivery.getTag().length : 0);
+         final int frameSize = 
protonSession.session.getConnection().getTransport().getOutboundFrameSizeLimit()
 - 50 - (delivery.getTag() != null ? delivery.getTag().length : 0);
 
          DeliveryAnnotations deliveryAnnotationsToEncode;
 
@@ -584,48 +583,37 @@ public class ProtonServerSenderContext extends 
ProtonInitializable implements Pr
             deliveryAnnotationsToEncode = null;
          }
 
-         LargeBodyReader context = message.getLargeBodyReader();
          try {
-            context.open();
-            try {
+            final ByteBuf frameBuffer = 
PooledByteBufAllocator.DEFAULT.directBuffer(frameSize, frameSize);
+            final NettyReadable frameView = new NettyReadable(frameBuffer);
+            try (LargeBodyReader context = message.getLargeBodyReader()) {
+               context.open();
                context.position(position);
                long bodySize = context.getSize();
+               // materialize it so we can use its internal NIO buffer
+               frameBuffer.ensureWritable(frameSize);
 
-               ByteBuffer buf = ByteBuffer.allocate(frameSize);
+               if (position == 0 && sender.getLocalState() != 
EndpointState.CLOSED && position < bodySize) {
+                  if (!deliverInitialPacket(context, 
deliveryAnnotationsToEncode, frameBuffer)) {
+                     return;
+                  }
+               }
 
                for (; sender.getLocalState() != EndpointState.CLOSED && 
position < bodySize; ) {
                   if (!connection.flowControl(this::resume)) {
-                     context.close();
                      return;
                   }
-                  buf.clear();
-                  int size = 0;
+                  frameBuffer.clear();
 
-                  try {
-                     if (position == 0) {
-                        replaceInitialHeader(deliveryAnnotationsToEncode, 
context, WritableBuffer.ByteBufferWrapper.wrap(buf));
-                     }
-                     size = context.readInto(buf);
-
-                     sender.send(new ReadableBuffer.ByteBufferReader(buf));
-                     position += size;
-                  } catch (java.nio.BufferOverflowException overflowException) 
{
-                     if (position == 0) {
-                        if (log.isDebugEnabled()) {
-                           log.debug("Delivery of message failed with an 
overFlowException, retrying again with expandable buffer");
-                        }
-                        // on the very first packet, if the initial header was 
replaced with a much bigger header (re-encoding)
-                        // we could recover the situation with a retry using 
an expandable buffer.
-                        // this is tested on 
org.apache.activemq.artemis.tests.integration.amqp.AmqpMessageDivertsTest
-                        size = 
retryInitialPacketWithExpandableBuffer(deliveryAnnotationsToEncode, context, 
buf);
-                     } else {
-                        // if this is not the position 0, something is going on
-                        // we just forward the exception as this is not 
supposed to happen
-                        throw overflowException;
-                     }
-                  }
+                  final int readSize = 
context.readInto(frameBuffer.internalNioBuffer(0, frameSize));
+
+                  frameBuffer.writerIndex(readSize);
+
+                  sender.send(frameView);
+
+                  position += readSize;
 
-                  if (size > 0) {
+                  if (readSize > 0) {
 
                      if (position < bodySize) {
                         connection.instantFlush();
@@ -633,7 +621,7 @@ public class ProtonServerSenderContext extends 
ProtonInitializable implements Pr
                   }
                }
             } finally {
-               context.close();
+               frameBuffer.release();
             }
 
             if (preSettle) {
@@ -661,35 +649,59 @@ public class ProtonServerSenderContext extends 
ProtonInitializable implements Pr
          }
       }
 
+      private boolean deliverInitialPacket(final LargeBodyReader context,
+                                           final DeliveryAnnotations 
deliveryAnnotationsToEncode,
+                                           final ByteBuf frameBuffer) throws 
Exception {
+         assert position == 0 && context.position() == 0;
+         if (!connection.flowControl(this::resume)) {
+            return false;
+         }
+         frameBuffer.clear();
+         try {
+            replaceInitialHeader(deliveryAnnotationsToEncode, context, new 
NettyWritable(frameBuffer));
+         } catch (IndexOutOfBoundsException indexOutOfBoundsException) {
+            assert position == 0 : "this shouldn't happen unless 
replaceInitialHeader is updating position before modifying frameBuffer";
+            if (log.isDebugEnabled()) {
+               log.debug("Delivery of message failed with an 
overFlowException, retrying again with expandable buffer");
+            }
+            // on the very first packet, if the initial header was replaced 
with a much bigger header (re-encoding)
+            // we could recover the situation with a retry using an expandable 
buffer.
+            // this is tested on 
org.apache.activemq.artemis.tests.integration.amqp.AmqpMessageDivertsTest
+            sendAndFlushInitialPacket(deliveryAnnotationsToEncode, context);
+            return true;
+         }
+         final int writableBytes = frameBuffer.writableBytes();
+         if (writableBytes == 0) {
+            sender.send(new NettyReadable(frameBuffer));
+            connection.instantFlush();
+            return true;
+         }
+         final int writtenBytes = frameBuffer.writerIndex();
+         final int readSize = 
context.readInto(frameBuffer.internalNioBuffer(writtenBytes, writableBytes));
+         frameBuffer.writerIndex(writtenBytes + readSize);
+         sender.send(new NettyReadable(frameBuffer));
+         position += readSize;
+         connection.instantFlush();
+         return true;
+      }
+
       /**
-       * This is a retry logic when either the delivery annotations or 
re-encoded buffer is bigger than the frame size
-       * This will create one expandable buffer.
-       * It will then let Proton to do the framing correctly
+       * This must be used when either the delivery annotations or re-encoded 
buffer is bigger than the frame size.<br>
+       * This will create one expandable buffer, send and flush it.
        */
-      private int retryInitialPacketWithExpandableBuffer(DeliveryAnnotations 
deliveryAnnotationsToEncode,
-                                                         LargeBodyReader 
context,
-                                                         ByteBuffer buf) 
throws Exception {
-         int size;
-         buf.clear();
+      private void sendAndFlushInitialPacket(DeliveryAnnotations 
deliveryAnnotationsToEncode,
+                                             LargeBodyReader context) throws 
Exception {
          // if the buffer overflow happened during the initial position
          // this means the replaced headers are bigger then the frame size
          // on this case we do with an expandable netty buffer
-         ByteBuf nettyBuffer = 
PooledByteBufAllocator.DEFAULT.buffer(AMQPMessageBrokerAccessor.getRemainingBodyPosition(message)
 * 2);
+         final ByteBuf nettyBuffer = 
PooledByteBufAllocator.DEFAULT.directBuffer(AMQPMessageBrokerAccessor.getRemainingBodyPosition(message)
 * 2);
          try {
             replaceInitialHeader(deliveryAnnotationsToEncode, context, new 
NettyWritable(nettyBuffer));
-            size = context.readInto(buf);
-            position += size;
-
-            nettyBuffer.writeBytes(buf);
-
-            ByteBuffer nioBuffer = nettyBuffer.nioBuffer();
-            nioBuffer.position(nettyBuffer.writerIndex());
-            nioBuffer = (ByteBuffer) nioBuffer.flip();
-            sender.send(new ReadableBuffer.ByteBufferReader(nioBuffer));
+            sender.send(new NettyReadable(nettyBuffer));
          } finally {
             nettyBuffer.release();
+            connection.instantFlush();
          }
-         return size;
       }
 
       private int replaceInitialHeader(DeliveryAnnotations 
deliveryAnnotationsToEncode,
@@ -697,7 +709,7 @@ public class ProtonServerSenderContext extends 
ProtonInitializable implements Pr
                                         WritableBuffer buf) throws Exception {
          TLSEncode.getEncoder().setByteBuffer(buf);
          try {
-            int proposedPosition = writeHeaderAndAnnotations(context, 
deliveryAnnotationsToEncode);
+            int proposedPosition = 
writeHeaderAndAnnotations(deliveryAnnotationsToEncode);
             if (message.isReencoded()) {
                proposedPosition = 
writeMessageAnnotationsPropertiesAndApplicationProperties(context, message);
             }
@@ -740,8 +752,7 @@ public class ProtonServerSenderContext extends 
ProtonInitializable implements Pr
          }
       }
 
-      private int writeHeaderAndAnnotations(LargeBodyReader context,
-                                             DeliveryAnnotations 
deliveryAnnotationsToEncode) throws ActiveMQException {
+      private int writeHeaderAndAnnotations(DeliveryAnnotations 
deliveryAnnotationsToEncode) {
          Header header = AMQPMessageBrokerAccessor.getCurrentHeader(message);
          if (header != null) {
             TLSEncode.getEncoder().writeObject(header);
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeBody.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeBody.java
index a2e4273..1eacecd 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeBody.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeBody.java
@@ -74,19 +74,6 @@ public class LargeBody {
       this.storageManager = storageManager;
    }
 
-   public ByteBuffer map() throws Exception {
-      ensureFileExists(true);
-      if (!file.isOpen()) {
-         file.open();
-      }
-      return file.map(0, file.size());
-   }
-
-   public LargeBody(long messageID, JournalStorageManager storageManager) {
-      this(null, storageManager);
-      this.messageID = messageID;
-   }
-
    public void setMessage(LargeServerMessage message) {
       this.message = message;
 

Reply via email to