This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new a4c311f ARTEMIS-3449 Speedup AMQP large message streamig
new 20c1836 This closes #3711
a4c311f is described below
commit a4c311f3681b31ae706a8843278746d8420185c6
Author: franz1981 <[email protected]>
AuthorDate: Fri Aug 27 18:04:29 2021 +0200
ARTEMIS-3449 Speedup AMQP large message streamig
---
.../artemis/core/message/LargeBodyReader.java | 3 +-
.../jdbc/store/file/JDBCSequentialFile.java | 10 +-
.../protocol/amqp/broker/AMQPLargeMessage.java | 29 ++---
.../amqp/proton/ProtonServerSenderContext.java | 123 +++++++++++----------
.../core/persistence/impl/journal/LargeBody.java | 13 ---
5 files changed, 88 insertions(+), 90 deletions(-)
diff --git
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/LargeBodyReader.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/LargeBodyReader.java
index 42de11b..7491aa5 100644
---
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/LargeBodyReader.java
+++
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/LargeBodyReader.java
@@ -27,7 +27,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQException;
*
* None of these methods should be caleld from Clients
*/
-public interface LargeBodyReader {
+public interface LargeBodyReader extends AutoCloseable {
/**
* This method must not be called directly by ActiveMQ Artemis clients.
@@ -51,6 +51,7 @@ public interface LargeBodyReader {
/**
* This method must not be called directly by ActiveMQ Artemis clients.
*/
+ @Override
void close() throws ActiveMQException;
/**
diff --git
a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFile.java
b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFile.java
index 6f0b297..1ee9c77 100644
---
a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFile.java
+++
b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFile.java
@@ -192,7 +192,15 @@ public class JDBCSequentialFile implements SequentialFile {
}
private synchronized int internalWrite(ByteBuffer buffer, IOCallback
callback) {
- return internalWrite(buffer.array(), callback, true);
+ final byte[] data;
+ if (buffer.hasArray() && buffer.arrayOffset() == 0 && buffer.position()
== 0 && buffer.limit() == buffer.array().length) {
+ data = buffer.array();
+ } else {
+ byte[] copy = new byte[buffer.remaining()];
+ buffer.get(copy);
+ data = copy;
+ }
+ return internalWrite(data, callback, true);
}
private void scheduleWrite(final ActiveMQBuffer bytes, final IOCallback
callback, boolean append) {
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java
index 526ad54..247a3ec 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java
@@ -27,6 +27,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
import org.apache.activemq.artemis.core.io.SequentialFile;
import org.apache.activemq.artemis.core.message.LargeBodyReader;
import org.apache.activemq.artemis.core.persistence.CoreMessageObjectPools;
@@ -107,7 +108,7 @@ public class AMQPLargeMessage extends AMQPMessage
implements LargeServerMessage
private StorageManager storageManager;
/** this is used to parse the initial packets from the buffer */
- CompositeReadableBuffer parsingBuffer;
+ private CompositeReadableBuffer parsingBuffer;
public AMQPLargeMessage(long id,
long messageFormat,
@@ -146,16 +147,6 @@ public class AMQPLargeMessage extends AMQPMessage
implements LargeServerMessage
setMessageID(newID);
}
- public void openLargeMessage() throws Exception {
- this.parsingData = new AmqpReadableBuffer(largeBody.map());
- }
-
- public void closeLargeMessage() throws Exception {
- largeBody.releaseResources(false, true);
- parsingData.freeDirectBuffer();
- parsingData = null;
- }
-
public void releaseEncodedBuffer() {
internalReleaseBuffer(1);
}
@@ -347,14 +338,14 @@ public class AMQPLargeMessage extends AMQPMessage
implements LargeServerMessage
public void addBytes(ReadableBuffer data) throws Exception {
parseLargeMessage(data);
- if (data.hasArray() && data.remaining() == data.array().length) {
- //System.out.println("Received " + data.array().length + "::" +
ByteUtil.formatGroup(ByteUtil.bytesToHex(data.array()), 8, 16));
- largeBody.addBytes(data.array());
- } else {
- byte[] bytes = new byte[data.remaining()];
- data.get(bytes);
- //System.out.println("Finishing " + bytes.length +
ByteUtil.formatGroup(ByteUtil.bytesToHex(bytes), 8, 16));
- largeBody.addBytes(bytes);
+ final int remaining = data.remaining();
+ final ByteBuf writeBuffer =
PooledByteBufAllocator.DEFAULT.directBuffer(remaining, remaining);
+ try {
+ // perform copy of data
+ data.get(new NettyWritable(writeBuffer));
+ largeBody.addBytes(new ChannelBufferWrapper(writeBuffer, true, true));
+ } finally {
+ writeBuffer.release();
}
}
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
index b46bcc3..32585d7 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
@@ -16,7 +16,6 @@
*/
package org.apache.activemq.artemis.protocol.amqp.proton;
-import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
@@ -572,7 +571,7 @@ public class ProtonServerSenderContext extends
ProtonInitializable implements Pr
void deliver() {
// This is discounting some bytes due to Transfer payload
- int frameSize =
protonSession.session.getConnection().getTransport().getOutboundFrameSizeLimit()
- 50 - (delivery.getTag() != null ? delivery.getTag().length : 0);
+ final int frameSize =
protonSession.session.getConnection().getTransport().getOutboundFrameSizeLimit()
- 50 - (delivery.getTag() != null ? delivery.getTag().length : 0);
DeliveryAnnotations deliveryAnnotationsToEncode;
@@ -584,48 +583,37 @@ public class ProtonServerSenderContext extends
ProtonInitializable implements Pr
deliveryAnnotationsToEncode = null;
}
- LargeBodyReader context = message.getLargeBodyReader();
try {
- context.open();
- try {
+ final ByteBuf frameBuffer =
PooledByteBufAllocator.DEFAULT.directBuffer(frameSize, frameSize);
+ final NettyReadable frameView = new NettyReadable(frameBuffer);
+ try (LargeBodyReader context = message.getLargeBodyReader()) {
+ context.open();
context.position(position);
long bodySize = context.getSize();
+ // materialize it so we can use its internal NIO buffer
+ frameBuffer.ensureWritable(frameSize);
- ByteBuffer buf = ByteBuffer.allocate(frameSize);
+ if (position == 0 && sender.getLocalState() !=
EndpointState.CLOSED && position < bodySize) {
+ if (!deliverInitialPacket(context,
deliveryAnnotationsToEncode, frameBuffer)) {
+ return;
+ }
+ }
for (; sender.getLocalState() != EndpointState.CLOSED &&
position < bodySize; ) {
if (!connection.flowControl(this::resume)) {
- context.close();
return;
}
- buf.clear();
- int size = 0;
+ frameBuffer.clear();
- try {
- if (position == 0) {
- replaceInitialHeader(deliveryAnnotationsToEncode,
context, WritableBuffer.ByteBufferWrapper.wrap(buf));
- }
- size = context.readInto(buf);
-
- sender.send(new ReadableBuffer.ByteBufferReader(buf));
- position += size;
- } catch (java.nio.BufferOverflowException overflowException)
{
- if (position == 0) {
- if (log.isDebugEnabled()) {
- log.debug("Delivery of message failed with an
overFlowException, retrying again with expandable buffer");
- }
- // on the very first packet, if the initial header was
replaced with a much bigger header (re-encoding)
- // we could recover the situation with a retry using
an expandable buffer.
- // this is tested on
org.apache.activemq.artemis.tests.integration.amqp.AmqpMessageDivertsTest
- size =
retryInitialPacketWithExpandableBuffer(deliveryAnnotationsToEncode, context,
buf);
- } else {
- // if this is not the position 0, something is going on
- // we just forward the exception as this is not
supposed to happen
- throw overflowException;
- }
- }
+ final int readSize =
context.readInto(frameBuffer.internalNioBuffer(0, frameSize));
+
+ frameBuffer.writerIndex(readSize);
+
+ sender.send(frameView);
+
+ position += readSize;
- if (size > 0) {
+ if (readSize > 0) {
if (position < bodySize) {
connection.instantFlush();
@@ -633,7 +621,7 @@ public class ProtonServerSenderContext extends
ProtonInitializable implements Pr
}
}
} finally {
- context.close();
+ frameBuffer.release();
}
if (preSettle) {
@@ -661,35 +649,59 @@ public class ProtonServerSenderContext extends
ProtonInitializable implements Pr
}
}
+ private boolean deliverInitialPacket(final LargeBodyReader context,
+ final DeliveryAnnotations
deliveryAnnotationsToEncode,
+ final ByteBuf frameBuffer) throws
Exception {
+ assert position == 0 && context.position() == 0;
+ if (!connection.flowControl(this::resume)) {
+ return false;
+ }
+ frameBuffer.clear();
+ try {
+ replaceInitialHeader(deliveryAnnotationsToEncode, context, new
NettyWritable(frameBuffer));
+ } catch (IndexOutOfBoundsException indexOutOfBoundsException) {
+ assert position == 0 : "this shouldn't happen unless
replaceInitialHeader is updating position before modifying frameBuffer";
+ if (log.isDebugEnabled()) {
+ log.debug("Delivery of message failed with an
overFlowException, retrying again with expandable buffer");
+ }
+ // on the very first packet, if the initial header was replaced
with a much bigger header (re-encoding)
+ // we could recover the situation with a retry using an expandable
buffer.
+ // this is tested on
org.apache.activemq.artemis.tests.integration.amqp.AmqpMessageDivertsTest
+ sendAndFlushInitialPacket(deliveryAnnotationsToEncode, context);
+ return true;
+ }
+ final int writableBytes = frameBuffer.writableBytes();
+ if (writableBytes == 0) {
+ sender.send(new NettyReadable(frameBuffer));
+ connection.instantFlush();
+ return true;
+ }
+ final int writtenBytes = frameBuffer.writerIndex();
+ final int readSize =
context.readInto(frameBuffer.internalNioBuffer(writtenBytes, writableBytes));
+ frameBuffer.writerIndex(writtenBytes + readSize);
+ sender.send(new NettyReadable(frameBuffer));
+ position += readSize;
+ connection.instantFlush();
+ return true;
+ }
+
/**
- * This is a retry logic when either the delivery annotations or
re-encoded buffer is bigger than the frame size
- * This will create one expandable buffer.
- * It will then let Proton to do the framing correctly
+ * This must be used when either the delivery annotations or re-encoded
buffer is bigger than the frame size.<br>
+ * This will create one expandable buffer, send and flush it.
*/
- private int retryInitialPacketWithExpandableBuffer(DeliveryAnnotations
deliveryAnnotationsToEncode,
- LargeBodyReader
context,
- ByteBuffer buf)
throws Exception {
- int size;
- buf.clear();
+ private void sendAndFlushInitialPacket(DeliveryAnnotations
deliveryAnnotationsToEncode,
+ LargeBodyReader context) throws
Exception {
// if the buffer overflow happened during the initial position
// this means the replaced headers are bigger then the frame size
// on this case we do with an expandable netty buffer
- ByteBuf nettyBuffer =
PooledByteBufAllocator.DEFAULT.buffer(AMQPMessageBrokerAccessor.getRemainingBodyPosition(message)
* 2);
+ final ByteBuf nettyBuffer =
PooledByteBufAllocator.DEFAULT.directBuffer(AMQPMessageBrokerAccessor.getRemainingBodyPosition(message)
* 2);
try {
replaceInitialHeader(deliveryAnnotationsToEncode, context, new
NettyWritable(nettyBuffer));
- size = context.readInto(buf);
- position += size;
-
- nettyBuffer.writeBytes(buf);
-
- ByteBuffer nioBuffer = nettyBuffer.nioBuffer();
- nioBuffer.position(nettyBuffer.writerIndex());
- nioBuffer = (ByteBuffer) nioBuffer.flip();
- sender.send(new ReadableBuffer.ByteBufferReader(nioBuffer));
+ sender.send(new NettyReadable(nettyBuffer));
} finally {
nettyBuffer.release();
+ connection.instantFlush();
}
- return size;
}
private int replaceInitialHeader(DeliveryAnnotations
deliveryAnnotationsToEncode,
@@ -697,7 +709,7 @@ public class ProtonServerSenderContext extends
ProtonInitializable implements Pr
WritableBuffer buf) throws Exception {
TLSEncode.getEncoder().setByteBuffer(buf);
try {
- int proposedPosition = writeHeaderAndAnnotations(context,
deliveryAnnotationsToEncode);
+ int proposedPosition =
writeHeaderAndAnnotations(deliveryAnnotationsToEncode);
if (message.isReencoded()) {
proposedPosition =
writeMessageAnnotationsPropertiesAndApplicationProperties(context, message);
}
@@ -740,8 +752,7 @@ public class ProtonServerSenderContext extends
ProtonInitializable implements Pr
}
}
- private int writeHeaderAndAnnotations(LargeBodyReader context,
- DeliveryAnnotations
deliveryAnnotationsToEncode) throws ActiveMQException {
+ private int writeHeaderAndAnnotations(DeliveryAnnotations
deliveryAnnotationsToEncode) {
Header header = AMQPMessageBrokerAccessor.getCurrentHeader(message);
if (header != null) {
TLSEncode.getEncoder().writeObject(header);
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeBody.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeBody.java
index a2e4273..1eacecd 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeBody.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeBody.java
@@ -74,19 +74,6 @@ public class LargeBody {
this.storageManager = storageManager;
}
- public ByteBuffer map() throws Exception {
- ensureFileExists(true);
- if (!file.isOpen()) {
- file.open();
- }
- return file.map(0, file.size());
- }
-
- public LargeBody(long messageID, JournalStorageManager storageManager) {
- this(null, storageManager);
- this.messageID = messageID;
- }
-
public void setMessage(LargeServerMessage message) {
this.message = message;