[
https://issues.apache.org/jira/browse/ARTEMIS-3449?focusedWorklogId=644466&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-644466
]
ASF GitHub Bot logged work on ARTEMIS-3449:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 31/Aug/21 15:27
Start Date: 31/Aug/21 15:27
Worklog Time Spent: 10m
Work Description: gemmellr commented on a change in pull request #3711:
URL: https://github.com/apache/activemq-artemis/pull/3711#discussion_r698558253
##########
File path:
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
##########
@@ -587,43 +587,48 @@ void deliver() {
LargeBodyReader context = message.getLargeBodyReader();
try {
context.open();
+ final ByteBuf tmpFrameBuf =
PooledByteBufAllocator.DEFAULT.directBuffer(frameSize);
+ final NettyReadable nettyReadable = new NettyReadable(tmpFrameBuf);
try {
+
context.position(position);
long bodySize = context.getSize();
-
- ByteBuffer buf = ByteBuffer.allocate(frameSize);
+ // materialize it so we can use its internal NIO buffer
+ tmpFrameBuf.ensureWritable(frameSize);
for (; sender.getLocalState() != EndpointState.CLOSED &&
position < bodySize; ) {
if (!connection.flowControl(this::resume)) {
context.close();
return;
}
- buf.clear();
- int size = 0;
-
- try {
- if (position == 0) {
- replaceInitialHeader(deliveryAnnotationsToEncode,
context, WritableBuffer.ByteBufferWrapper.wrap(buf));
- }
- size = context.readInto(buf);
-
- sender.send(new ReadableBuffer.ByteBufferReader(buf));
- position += size;
- } catch (java.nio.BufferOverflowException overflowException)
{
- if (position == 0) {
- if (log.isDebugEnabled()) {
- log.debug("Delivery of message failed with an
overFlowException, retrying again with expandable buffer");
- }
- // on the very first packet, if the initial header was
replaced with a much bigger header (re-encoding)
- // we could recover the situation with a retry using
an expandable buffer.
- // this is tested on
org.apache.activemq.artemis.tests.integration.amqp.AmqpMessageDivertsTest
- size =
retryInitialPacketWithExpandableBuffer(deliveryAnnotationsToEncode, context,
buf);
- } else {
- // if this is not the position 0, something is going on
- // we just forward the exception as this is not
supposed to happen
- throw overflowException;
+ // using internalNioBuffer to save creating a new ByteBuffer
duplicate/slice/view in the loop
+ ByteBuffer nioBuffer = tmpFrameBuf.internalNioBuffer(0,
frameSize);
+ int bufPosition = nioBuffer.position();
+ tmpFrameBuf.clear();
+ final int writtenBytes;
+ if (position == 0) {
+ // no need to cache NettyWritable: position should be 0
just once per large message file
+ replaceInitialHeader(deliveryAnnotationsToEncode,
context, new NettyWritable(tmpFrameBuf));
+ writtenBytes = tmpFrameBuf.writerIndex();
+ // tested on
org.apache.activemq.artemis.tests.integration.amqp.AmqpMessageDivertsTest:
+ // tmpFrameBuf can grow over the initial capacity
+ if (nioBuffer.remaining() < writtenBytes) {
+ // ensure reading at least frameSize from the file
+ tmpFrameBuf.ensureWritable(frameSize);
+ // refresh internal NIO buffer: the previous one is no
longer valid
+ nioBuffer = tmpFrameBuf.internalNioBuffer(0,
writtenBytes + frameSize);
+ bufPosition = nioBuffer.position();
Review comment:
I think the old approach of perfoming an explicit send with the
oversized initial content (as it did in the
retryInitialPacketWithExpandableBuffer method), before then continuing as
normal with the regular process for 'beyond position 0' payload seems simpler
and better.
Since it is 'replacing' the nioBuffer variable here already with the
enlarged one, honestly it might as well do the above and then instead recreate
the actual ByteBuf with the appropriate sized buffer again and continue the
loop to send the rest (assuming there is remaining content), skipping all this
complexity around tracking whats written and changing indexes etc and ensuring
the remainder of the payload follows a consistent framing behaviour (see
related comment above) as intended.
##########
File path:
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
##########
@@ -587,43 +587,48 @@ void deliver() {
LargeBodyReader context = message.getLargeBodyReader();
try {
context.open();
+ final ByteBuf tmpFrameBuf =
PooledByteBufAllocator.DEFAULT.directBuffer(frameSize);
+ final NettyReadable nettyReadable = new NettyReadable(tmpFrameBuf);
try {
+
context.position(position);
long bodySize = context.getSize();
-
- ByteBuffer buf = ByteBuffer.allocate(frameSize);
+ // materialize it so we can use its internal NIO buffer
+ tmpFrameBuf.ensureWritable(frameSize);
for (; sender.getLocalState() != EndpointState.CLOSED &&
position < bodySize; ) {
if (!connection.flowControl(this::resume)) {
context.close();
return;
}
- buf.clear();
- int size = 0;
-
- try {
- if (position == 0) {
- replaceInitialHeader(deliveryAnnotationsToEncode,
context, WritableBuffer.ByteBufferWrapper.wrap(buf));
- }
- size = context.readInto(buf);
-
- sender.send(new ReadableBuffer.ByteBufferReader(buf));
- position += size;
- } catch (java.nio.BufferOverflowException overflowException)
{
- if (position == 0) {
- if (log.isDebugEnabled()) {
- log.debug("Delivery of message failed with an
overFlowException, retrying again with expandable buffer");
- }
- // on the very first packet, if the initial header was
replaced with a much bigger header (re-encoding)
- // we could recover the situation with a retry using
an expandable buffer.
- // this is tested on
org.apache.activemq.artemis.tests.integration.amqp.AmqpMessageDivertsTest
- size =
retryInitialPacketWithExpandableBuffer(deliveryAnnotationsToEncode, context,
buf);
- } else {
- // if this is not the position 0, something is going on
- // we just forward the exception as this is not
supposed to happen
- throw overflowException;
+ // using internalNioBuffer to save creating a new ByteBuffer
duplicate/slice/view in the loop
+ ByteBuffer nioBuffer = tmpFrameBuf.internalNioBuffer(0,
frameSize);
+ int bufPosition = nioBuffer.position();
+ tmpFrameBuf.clear();
+ final int writtenBytes;
+ if (position == 0) {
+ // no need to cache NettyWritable: position should be 0
just once per large message file
Review comment:
Superflous comment, might as well delete it.
##########
File path:
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
##########
@@ -587,43 +587,48 @@ void deliver() {
LargeBodyReader context = message.getLargeBodyReader();
try {
context.open();
+ final ByteBuf tmpFrameBuf =
PooledByteBufAllocator.DEFAULT.directBuffer(frameSize);
+ final NettyReadable nettyReadable = new NettyReadable(tmpFrameBuf);
try {
+
context.position(position);
long bodySize = context.getSize();
-
- ByteBuffer buf = ByteBuffer.allocate(frameSize);
+ // materialize it so we can use its internal NIO buffer
+ tmpFrameBuf.ensureWritable(frameSize);
for (; sender.getLocalState() != EndpointState.CLOSED &&
position < bodySize; ) {
if (!connection.flowControl(this::resume)) {
context.close();
return;
}
- buf.clear();
- int size = 0;
-
- try {
- if (position == 0) {
- replaceInitialHeader(deliveryAnnotationsToEncode,
context, WritableBuffer.ByteBufferWrapper.wrap(buf));
- }
- size = context.readInto(buf);
-
- sender.send(new ReadableBuffer.ByteBufferReader(buf));
- position += size;
- } catch (java.nio.BufferOverflowException overflowException)
{
- if (position == 0) {
- if (log.isDebugEnabled()) {
- log.debug("Delivery of message failed with an
overFlowException, retrying again with expandable buffer");
- }
- // on the very first packet, if the initial header was
replaced with a much bigger header (re-encoding)
- // we could recover the situation with a retry using
an expandable buffer.
- // this is tested on
org.apache.activemq.artemis.tests.integration.amqp.AmqpMessageDivertsTest
- size =
retryInitialPacketWithExpandableBuffer(deliveryAnnotationsToEncode, context,
buf);
- } else {
- // if this is not the position 0, something is going on
- // we just forward the exception as this is not
supposed to happen
- throw overflowException;
+ // using internalNioBuffer to save creating a new ByteBuffer
duplicate/slice/view in the loop
+ ByteBuffer nioBuffer = tmpFrameBuf.internalNioBuffer(0,
frameSize);
+ int bufPosition = nioBuffer.position();
+ tmpFrameBuf.clear();
+ final int writtenBytes;
+ if (position == 0) {
+ // no need to cache NettyWritable: position should be 0
just once per large message file
+ replaceInitialHeader(deliveryAnnotationsToEncode,
context, new NettyWritable(tmpFrameBuf));
+ writtenBytes = tmpFrameBuf.writerIndex();
+ // tested on
org.apache.activemq.artemis.tests.integration.amqp.AmqpMessageDivertsTest:
+ // tmpFrameBuf can grow over the initial capacity
+ if (nioBuffer.remaining() < writtenBytes) {
+ // ensure reading at least frameSize from the file
+ tmpFrameBuf.ensureWritable(frameSize);
Review comment:
I dont think it should do this. The point to the original behaviour was
to try and use a fixed size buffer for [most of] the sends and thus limit the
eventually-sent data to being similarly frame-sized (well, slightly smaller
than max permittable) chunks at a time. This was so as to try and generate a
more consistent transfer framing behaviour on the wire and avoid continually
generating large frames followed by smaller/tiny followup frames for payload
'overshoots'.
This change seems like it will specifically undo that by making the buffer
perpetually the wrong size and make every subsequent write for the message use
an off-sized buffer leading to oddball and less efficient transfer framing
behaviour.
##########
File path:
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
##########
@@ -587,52 +586,48 @@ void deliver() {
LargeBodyReader context = message.getLargeBodyReader();
try {
context.open();
+ final ByteBuf tmpFrameBuf =
PooledByteBufAllocator.DEFAULT.directBuffer(frameSize);
+ final NettyReadable nettyReadable = new NettyReadable(tmpFrameBuf);
try {
+
context.position(position);
long bodySize = context.getSize();
-
- ByteBuffer buf = ByteBuffer.allocate(frameSize);
+ // materialize it so we can use its internal NIO buffer
+ tmpFrameBuf.ensureWritable(frameSize);
for (; sender.getLocalState() != EndpointState.CLOSED &&
position < bodySize; ) {
if (!connection.flowControl(this::resume)) {
context.close();
return;
}
- buf.clear();
- int size = 0;
-
- try {
- if (position == 0) {
- replaceInitialHeader(deliveryAnnotationsToEncode,
context, WritableBuffer.ByteBufferWrapper.wrap(buf));
- }
- size = context.readInto(buf);
-
- sender.send(new ReadableBuffer.ByteBufferReader(buf));
- position += size;
- } catch (java.nio.BufferOverflowException overflowException)
{
- if (position == 0) {
- if (log.isDebugEnabled()) {
- log.debug("Delivery of message failed with an
overFlowException, retrying again with expandable buffer");
- }
- // on the very first packet, if the initial header was
replaced with a much bigger header (re-encoding)
- // we could recover the situation with a retry using
an expandable buffer.
- // this is tested on
org.apache.activemq.artemis.tests.integration.amqp.AmqpMessageDivertsTest
- size =
retryInitialPacketWithExpandableBuffer(deliveryAnnotationsToEncode, context,
buf);
+ tmpFrameBuf.clear();
+ int headerSize = 0;
+ int readLimit = frameSize;
+ if (position == 0) {
+ replaceInitialHeader(deliveryAnnotationsToEncode,
context, new NettyWritable(tmpFrameBuf));
+ headerSize = tmpFrameBuf.writerIndex();
+ if (headerSize <= frameSize) {
+ readLimit = frameSize - headerSize;
Review comment:
This still feels unnecessarily complicated. It complicates the whole
remainder of the method, for a corner case thats actually quite unlikely to
even arise. The variable is also confusingly named since it is absolutely not
the 'header size', its only non-header stuff than can ever get it into this
situation.
Same comment as yesterdays prior version, why not just send the generated
first payload at this point (as the old version did when the initial buffer
wasnt big enough, in retryInitialPacketWithExpandableBuffer()) and then replace
the buffer with an appropriately initial-sized one, and continue the loop
(assuming there is more to be sent). That would consolidate all the complexity
inside this sub-part of the 'if position=0' handling, and leave the rest of the
handling as the most trivial 'read data, send data' as it can be.
##########
File path:
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java
##########
@@ -146,16 +147,6 @@ private AMQPLargeMessage(final AMQPLargeMessage copy,
setMessageID(newID);
}
- public void openLargeMessage() throws Exception {
Review comment:
The 'getData()' method appears to depend on this method having created
"parsingData". Though I admit this method does seem ununused, getData() seems
to be used a lot. I thus wonder why this seems unused
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 644466)
Time Spent: 6h (was: 5h 50m)
> Speedup AMQP large message streaming
> ------------------------------------
>
> Key: ARTEMIS-3449
> URL: https://issues.apache.org/jira/browse/ARTEMIS-3449
> Project: ActiveMQ Artemis
> Issue Type: Improvement
> Reporter: Francesco Nigro
> Assignee: Francesco Nigro
> Priority: Major
> Time Spent: 6h
> Remaining Estimate: 0h
>
> AMQP is using unpooled heap ByteBuffer(s) to stream AMQP large messages:
> given that the underline NIO sequential file can both use FileChannel or
> RandomAccessFile (depending if the ByteBuffer used is direct/heap based),
> both approaches would benefit from using Netty pooled direct buffers and save
> additional copies (performed by RandomAccessFile) to happen, reducing GC too.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)