[
https://issues.apache.org/jira/browse/ARTEMIS-3449?focusedWorklogId=644217&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-644217
]
ASF GitHub Bot logged work on ARTEMIS-3449:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 31/Aug/21 15:00
Start Date: 31/Aug/21 15:00
Worklog Time Spent: 10m
Work Description: franz1981 commented on a change in pull request #3711:
URL: https://github.com/apache/activemq-artemis/pull/3711#discussion_r698583729
##########
File path:
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
##########
@@ -587,43 +587,48 @@ void deliver() {
LargeBodyReader context = message.getLargeBodyReader();
try {
context.open();
+ final ByteBuf tmpFrameBuf =
PooledByteBufAllocator.DEFAULT.directBuffer(frameSize);
+ final NettyReadable nettyReadable = new NettyReadable(tmpFrameBuf);
try {
+
context.position(position);
long bodySize = context.getSize();
-
- ByteBuffer buf = ByteBuffer.allocate(frameSize);
+ // materialize it so we can use its internal NIO buffer
+ tmpFrameBuf.ensureWritable(frameSize);
for (; sender.getLocalState() != EndpointState.CLOSED &&
position < bodySize; ) {
if (!connection.flowControl(this::resume)) {
context.close();
return;
}
- buf.clear();
- int size = 0;
-
- try {
- if (position == 0) {
- replaceInitialHeader(deliveryAnnotationsToEncode,
context, WritableBuffer.ByteBufferWrapper.wrap(buf));
- }
- size = context.readInto(buf);
-
- sender.send(new ReadableBuffer.ByteBufferReader(buf));
- position += size;
- } catch (java.nio.BufferOverflowException overflowException)
{
- if (position == 0) {
- if (log.isDebugEnabled()) {
- log.debug("Delivery of message failed with an
overFlowException, retrying again with expandable buffer");
- }
- // on the very first packet, if the initial header was
replaced with a much bigger header (re-encoding)
- // we could recover the situation with a retry using
an expandable buffer.
- // this is tested on
org.apache.activemq.artemis.tests.integration.amqp.AmqpMessageDivertsTest
- size =
retryInitialPacketWithExpandableBuffer(deliveryAnnotationsToEncode, context,
buf);
- } else {
- // if this is not the position 0, something is going on
- // we just forward the exception as this is not
supposed to happen
- throw overflowException;
+ // using internalNioBuffer to save creating a new ByteBuffer
duplicate/slice/view in the loop
+ ByteBuffer nioBuffer = tmpFrameBuf.internalNioBuffer(0,
frameSize);
+ int bufPosition = nioBuffer.position();
+ tmpFrameBuf.clear();
+ final int writtenBytes;
+ if (position == 0) {
+ // no need to cache NettyWritable: position should be 0
just once per large message file
+ replaceInitialHeader(deliveryAnnotationsToEncode,
context, new NettyWritable(tmpFrameBuf));
+ writtenBytes = tmpFrameBuf.writerIndex();
+ // tested on
org.apache.activemq.artemis.tests.integration.amqp.AmqpMessageDivertsTest:
+ // tmpFrameBuf can grow over the initial capacity
+ if (nioBuffer.remaining() < writtenBytes) {
+ // ensure reading at least frameSize from the file
+ tmpFrameBuf.ensureWritable(frameSize);
Review comment:
This has to be done because of how netty works with internalNioBuffer:
it forces Netty buffer to be materialized from EMPTY singleton to a proper one.
I still need to be sure that's still valid TBH.
> This change seems like it will specifically undo that by making the buffer
perpetually the wrong size and make every subsequent write for the message use
an off-sized buffer leading to oddball and less efficient transfer framing
behaviour.
I'm not sure about this really: this change is going to send `frameSize` (or
less, depending by the remaining data in the large file) data and, if
https://issues.apache.org/jira/browse/ARTEMIS-3026 is going to happen, just a
single initial bigger one which length is `encoded header + frameSize`. The
subsequent frames will be `frameSize`-sized as ever.
##########
File path:
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
##########
@@ -587,43 +587,48 @@ void deliver() {
LargeBodyReader context = message.getLargeBodyReader();
try {
context.open();
+ final ByteBuf tmpFrameBuf =
PooledByteBufAllocator.DEFAULT.directBuffer(frameSize);
+ final NettyReadable nettyReadable = new NettyReadable(tmpFrameBuf);
try {
+
context.position(position);
long bodySize = context.getSize();
-
- ByteBuffer buf = ByteBuffer.allocate(frameSize);
+ // materialize it so we can use its internal NIO buffer
+ tmpFrameBuf.ensureWritable(frameSize);
for (; sender.getLocalState() != EndpointState.CLOSED &&
position < bodySize; ) {
if (!connection.flowControl(this::resume)) {
context.close();
return;
}
- buf.clear();
- int size = 0;
-
- try {
- if (position == 0) {
- replaceInitialHeader(deliveryAnnotationsToEncode,
context, WritableBuffer.ByteBufferWrapper.wrap(buf));
- }
- size = context.readInto(buf);
-
- sender.send(new ReadableBuffer.ByteBufferReader(buf));
- position += size;
- } catch (java.nio.BufferOverflowException overflowException)
{
- if (position == 0) {
- if (log.isDebugEnabled()) {
- log.debug("Delivery of message failed with an
overFlowException, retrying again with expandable buffer");
- }
- // on the very first packet, if the initial header was
replaced with a much bigger header (re-encoding)
- // we could recover the situation with a retry using
an expandable buffer.
- // this is tested on
org.apache.activemq.artemis.tests.integration.amqp.AmqpMessageDivertsTest
- size =
retryInitialPacketWithExpandableBuffer(deliveryAnnotationsToEncode, context,
buf);
- } else {
- // if this is not the position 0, something is going on
- // we just forward the exception as this is not
supposed to happen
- throw overflowException;
+ // using internalNioBuffer to save creating a new ByteBuffer
duplicate/slice/view in the loop
+ ByteBuffer nioBuffer = tmpFrameBuf.internalNioBuffer(0,
frameSize);
+ int bufPosition = nioBuffer.position();
+ tmpFrameBuf.clear();
+ final int writtenBytes;
+ if (position == 0) {
+ // no need to cache NettyWritable: position should be 0
just once per large message file
+ replaceInitialHeader(deliveryAnnotationsToEncode,
context, new NettyWritable(tmpFrameBuf));
+ writtenBytes = tmpFrameBuf.writerIndex();
+ // tested on
org.apache.activemq.artemis.tests.integration.amqp.AmqpMessageDivertsTest:
+ // tmpFrameBuf can grow over the initial capacity
+ if (nioBuffer.remaining() < writtenBytes) {
+ // ensure reading at least frameSize from the file
+ tmpFrameBuf.ensureWritable(frameSize);
Review comment:
This has to be done because of how netty works with internalNioBuffer:
it forces Netty buffer to be materialized from EMPTY singleton to a proper one.
I still need to be sure that's still valid TBH.
> This change seems like it will specifically undo that by making the buffer
perpetually the wrong size and make every subsequent write for the message use
an off-sized buffer leading to oddball and less efficient transfer framing
behaviour.
I'm not sure about this really: this change is going to send `frameSize` (or
less, depending by the remaining data in the large file) data and, if
https://issues.apache.org/jira/browse/ARTEMIS-3026 is going to happen, just the
single initial one `encoded header + frameSize` length, while subsequent ones
`frameSize`-sized as expected.
It looks to me very similar to the original code behaviour although without
the exceptional code path in.
##########
File path:
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
##########
@@ -587,43 +587,48 @@ void deliver() {
LargeBodyReader context = message.getLargeBodyReader();
try {
context.open();
+ final ByteBuf tmpFrameBuf =
PooledByteBufAllocator.DEFAULT.directBuffer(frameSize);
+ final NettyReadable nettyReadable = new NettyReadable(tmpFrameBuf);
try {
+
context.position(position);
long bodySize = context.getSize();
-
- ByteBuffer buf = ByteBuffer.allocate(frameSize);
+ // materialize it so we can use its internal NIO buffer
+ tmpFrameBuf.ensureWritable(frameSize);
for (; sender.getLocalState() != EndpointState.CLOSED &&
position < bodySize; ) {
if (!connection.flowControl(this::resume)) {
context.close();
return;
}
- buf.clear();
- int size = 0;
-
- try {
- if (position == 0) {
- replaceInitialHeader(deliveryAnnotationsToEncode,
context, WritableBuffer.ByteBufferWrapper.wrap(buf));
- }
- size = context.readInto(buf);
-
- sender.send(new ReadableBuffer.ByteBufferReader(buf));
- position += size;
- } catch (java.nio.BufferOverflowException overflowException)
{
- if (position == 0) {
- if (log.isDebugEnabled()) {
- log.debug("Delivery of message failed with an
overFlowException, retrying again with expandable buffer");
- }
- // on the very first packet, if the initial header was
replaced with a much bigger header (re-encoding)
- // we could recover the situation with a retry using
an expandable buffer.
- // this is tested on
org.apache.activemq.artemis.tests.integration.amqp.AmqpMessageDivertsTest
- size =
retryInitialPacketWithExpandableBuffer(deliveryAnnotationsToEncode, context,
buf);
- } else {
- // if this is not the position 0, something is going on
- // we just forward the exception as this is not
supposed to happen
- throw overflowException;
+ // using internalNioBuffer to save creating a new ByteBuffer
duplicate/slice/view in the loop
+ ByteBuffer nioBuffer = tmpFrameBuf.internalNioBuffer(0,
frameSize);
+ int bufPosition = nioBuffer.position();
+ tmpFrameBuf.clear();
+ final int writtenBytes;
+ if (position == 0) {
+ // no need to cache NettyWritable: position should be 0
just once per large message file
+ replaceInitialHeader(deliveryAnnotationsToEncode,
context, new NettyWritable(tmpFrameBuf));
+ writtenBytes = tmpFrameBuf.writerIndex();
+ // tested on
org.apache.activemq.artemis.tests.integration.amqp.AmqpMessageDivertsTest:
+ // tmpFrameBuf can grow over the initial capacity
+ if (nioBuffer.remaining() < writtenBytes) {
+ // ensure reading at least frameSize from the file
+ tmpFrameBuf.ensureWritable(frameSize);
+ // refresh internal NIO buffer: the previous one is no
longer valid
+ nioBuffer = tmpFrameBuf.internalNioBuffer(0,
writtenBytes + frameSize);
+ bufPosition = nioBuffer.position();
Review comment:
Let me see if I can simplify the code path here to make it simpler then,
I see that looks too tricky and can be easily changed causing bugs
##########
File path:
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
##########
@@ -587,43 +587,48 @@ void deliver() {
LargeBodyReader context = message.getLargeBodyReader();
try {
context.open();
+ final ByteBuf tmpFrameBuf =
PooledByteBufAllocator.DEFAULT.directBuffer(frameSize);
+ final NettyReadable nettyReadable = new NettyReadable(tmpFrameBuf);
try {
+
context.position(position);
long bodySize = context.getSize();
-
- ByteBuffer buf = ByteBuffer.allocate(frameSize);
+ // materialize it so we can use its internal NIO buffer
+ tmpFrameBuf.ensureWritable(frameSize);
for (; sender.getLocalState() != EndpointState.CLOSED &&
position < bodySize; ) {
if (!connection.flowControl(this::resume)) {
context.close();
return;
}
- buf.clear();
- int size = 0;
-
- try {
- if (position == 0) {
- replaceInitialHeader(deliveryAnnotationsToEncode,
context, WritableBuffer.ByteBufferWrapper.wrap(buf));
- }
- size = context.readInto(buf);
-
- sender.send(new ReadableBuffer.ByteBufferReader(buf));
- position += size;
- } catch (java.nio.BufferOverflowException overflowException)
{
- if (position == 0) {
- if (log.isDebugEnabled()) {
- log.debug("Delivery of message failed with an
overFlowException, retrying again with expandable buffer");
- }
- // on the very first packet, if the initial header was
replaced with a much bigger header (re-encoding)
- // we could recover the situation with a retry using
an expandable buffer.
- // this is tested on
org.apache.activemq.artemis.tests.integration.amqp.AmqpMessageDivertsTest
- size =
retryInitialPacketWithExpandableBuffer(deliveryAnnotationsToEncode, context,
buf);
- } else {
- // if this is not the position 0, something is going on
- // we just forward the exception as this is not
supposed to happen
- throw overflowException;
+ // using internalNioBuffer to save creating a new ByteBuffer
duplicate/slice/view in the loop
+ ByteBuffer nioBuffer = tmpFrameBuf.internalNioBuffer(0,
frameSize);
+ int bufPosition = nioBuffer.position();
+ tmpFrameBuf.clear();
+ final int writtenBytes;
+ if (position == 0) {
+ // no need to cache NettyWritable: position should be 0
just once per large message file
+ replaceInitialHeader(deliveryAnnotationsToEncode,
context, new NettyWritable(tmpFrameBuf));
+ writtenBytes = tmpFrameBuf.writerIndex();
+ // tested on
org.apache.activemq.artemis.tests.integration.amqp.AmqpMessageDivertsTest:
+ // tmpFrameBuf can grow over the initial capacity
+ if (nioBuffer.remaining() < writtenBytes) {
+ // ensure reading at least frameSize from the file
+ tmpFrameBuf.ensureWritable(frameSize);
+ // refresh internal NIO buffer: the previous one is no
longer valid
+ nioBuffer = tmpFrameBuf.internalNioBuffer(0,
writtenBytes + frameSize);
+ bufPosition = nioBuffer.position();
Review comment:
Good points, let me see if I can simplify the code path here to make it
simpler then, I see that looks too tricky and can be easily changed causing bugs
##########
File path:
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
##########
@@ -587,43 +587,48 @@ void deliver() {
LargeBodyReader context = message.getLargeBodyReader();
try {
context.open();
+ final ByteBuf tmpFrameBuf =
PooledByteBufAllocator.DEFAULT.directBuffer(frameSize);
+ final NettyReadable nettyReadable = new NettyReadable(tmpFrameBuf);
try {
+
context.position(position);
long bodySize = context.getSize();
-
- ByteBuffer buf = ByteBuffer.allocate(frameSize);
+ // materialize it so we can use its internal NIO buffer
+ tmpFrameBuf.ensureWritable(frameSize);
for (; sender.getLocalState() != EndpointState.CLOSED &&
position < bodySize; ) {
if (!connection.flowControl(this::resume)) {
context.close();
return;
}
- buf.clear();
- int size = 0;
-
- try {
- if (position == 0) {
- replaceInitialHeader(deliveryAnnotationsToEncode,
context, WritableBuffer.ByteBufferWrapper.wrap(buf));
- }
- size = context.readInto(buf);
-
- sender.send(new ReadableBuffer.ByteBufferReader(buf));
- position += size;
- } catch (java.nio.BufferOverflowException overflowException)
{
- if (position == 0) {
- if (log.isDebugEnabled()) {
- log.debug("Delivery of message failed with an
overFlowException, retrying again with expandable buffer");
- }
- // on the very first packet, if the initial header was
replaced with a much bigger header (re-encoding)
- // we could recover the situation with a retry using
an expandable buffer.
- // this is tested on
org.apache.activemq.artemis.tests.integration.amqp.AmqpMessageDivertsTest
- size =
retryInitialPacketWithExpandableBuffer(deliveryAnnotationsToEncode, context,
buf);
- } else {
- // if this is not the position 0, something is going on
- // we just forward the exception as this is not
supposed to happen
- throw overflowException;
+ // using internalNioBuffer to save creating a new ByteBuffer
duplicate/slice/view in the loop
+ ByteBuffer nioBuffer = tmpFrameBuf.internalNioBuffer(0,
frameSize);
+ int bufPosition = nioBuffer.position();
+ tmpFrameBuf.clear();
+ final int writtenBytes;
+ if (position == 0) {
+ // no need to cache NettyWritable: position should be 0
just once per large message file
+ replaceInitialHeader(deliveryAnnotationsToEncode,
context, new NettyWritable(tmpFrameBuf));
+ writtenBytes = tmpFrameBuf.writerIndex();
+ // tested on
org.apache.activemq.artemis.tests.integration.amqp.AmqpMessageDivertsTest:
+ // tmpFrameBuf can grow over the initial capacity
+ if (nioBuffer.remaining() < writtenBytes) {
+ // ensure reading at least frameSize from the file
+ tmpFrameBuf.ensureWritable(frameSize);
Review comment:
This has to be done because of how netty works with internalNioBuffer:
it forces Netty buffer to be materialized from EMPTY singleton to a proper one.
I still need to be sure that's still valid TBH.
> This change seems like it will specifically undo that by making the buffer
perpetually the wrong size and make every subsequent write for the message use
an off-sized buffer leading to oddball and less efficient transfer framing
behaviour.
I'm not sure about this really: this change is going to send `frameSize` (or
less, depending by the remaining data in the large file) data and, if
https://issues.apache.org/jira/browse/ARTEMIS-3026 is going to happen, just the
single initial one `encoded header + frameSize` long, while subsequent ones
will be 'frameSize`long as expected.
It looks to me very similar to the original code behaviour although without
the exceptional code path in.
##########
File path:
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
##########
@@ -587,43 +587,48 @@ void deliver() {
LargeBodyReader context = message.getLargeBodyReader();
try {
context.open();
+ final ByteBuf tmpFrameBuf =
PooledByteBufAllocator.DEFAULT.directBuffer(frameSize);
+ final NettyReadable nettyReadable = new NettyReadable(tmpFrameBuf);
try {
+
context.position(position);
long bodySize = context.getSize();
-
- ByteBuffer buf = ByteBuffer.allocate(frameSize);
+ // materialize it so we can use its internal NIO buffer
+ tmpFrameBuf.ensureWritable(frameSize);
for (; sender.getLocalState() != EndpointState.CLOSED &&
position < bodySize; ) {
if (!connection.flowControl(this::resume)) {
context.close();
return;
}
- buf.clear();
- int size = 0;
-
- try {
- if (position == 0) {
- replaceInitialHeader(deliveryAnnotationsToEncode,
context, WritableBuffer.ByteBufferWrapper.wrap(buf));
- }
- size = context.readInto(buf);
-
- sender.send(new ReadableBuffer.ByteBufferReader(buf));
- position += size;
- } catch (java.nio.BufferOverflowException overflowException)
{
- if (position == 0) {
- if (log.isDebugEnabled()) {
- log.debug("Delivery of message failed with an
overFlowException, retrying again with expandable buffer");
- }
- // on the very first packet, if the initial header was
replaced with a much bigger header (re-encoding)
- // we could recover the situation with a retry using
an expandable buffer.
- // this is tested on
org.apache.activemq.artemis.tests.integration.amqp.AmqpMessageDivertsTest
- size =
retryInitialPacketWithExpandableBuffer(deliveryAnnotationsToEncode, context,
buf);
- } else {
- // if this is not the position 0, something is going on
- // we just forward the exception as this is not
supposed to happen
- throw overflowException;
+ // using internalNioBuffer to save creating a new ByteBuffer
duplicate/slice/view in the loop
+ ByteBuffer nioBuffer = tmpFrameBuf.internalNioBuffer(0,
frameSize);
+ int bufPosition = nioBuffer.position();
+ tmpFrameBuf.clear();
+ final int writtenBytes;
+ if (position == 0) {
+ // no need to cache NettyWritable: position should be 0
just once per large message file
+ replaceInitialHeader(deliveryAnnotationsToEncode,
context, new NettyWritable(tmpFrameBuf));
+ writtenBytes = tmpFrameBuf.writerIndex();
+ // tested on
org.apache.activemq.artemis.tests.integration.amqp.AmqpMessageDivertsTest:
+ // tmpFrameBuf can grow over the initial capacity
+ if (nioBuffer.remaining() < writtenBytes) {
+ // ensure reading at least frameSize from the file
+ tmpFrameBuf.ensureWritable(frameSize);
Review comment:
mmmm now that I read my own code I see that's not doing exactly what I
was expecting to do, let me turn this into draft so I can fix it and better
match the existing logic
##########
File path:
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
##########
@@ -587,43 +587,48 @@ void deliver() {
LargeBodyReader context = message.getLargeBodyReader();
try {
context.open();
+ final ByteBuf tmpFrameBuf =
PooledByteBufAllocator.DEFAULT.directBuffer(frameSize);
+ final NettyReadable nettyReadable = new NettyReadable(tmpFrameBuf);
try {
+
context.position(position);
long bodySize = context.getSize();
-
- ByteBuffer buf = ByteBuffer.allocate(frameSize);
+ // materialize it so we can use its internal NIO buffer
+ tmpFrameBuf.ensureWritable(frameSize);
for (; sender.getLocalState() != EndpointState.CLOSED &&
position < bodySize; ) {
if (!connection.flowControl(this::resume)) {
context.close();
return;
}
- buf.clear();
- int size = 0;
-
- try {
- if (position == 0) {
- replaceInitialHeader(deliveryAnnotationsToEncode,
context, WritableBuffer.ByteBufferWrapper.wrap(buf));
- }
- size = context.readInto(buf);
-
- sender.send(new ReadableBuffer.ByteBufferReader(buf));
- position += size;
- } catch (java.nio.BufferOverflowException overflowException)
{
- if (position == 0) {
- if (log.isDebugEnabled()) {
- log.debug("Delivery of message failed with an
overFlowException, retrying again with expandable buffer");
- }
- // on the very first packet, if the initial header was
replaced with a much bigger header (re-encoding)
- // we could recover the situation with a retry using
an expandable buffer.
- // this is tested on
org.apache.activemq.artemis.tests.integration.amqp.AmqpMessageDivertsTest
- size =
retryInitialPacketWithExpandableBuffer(deliveryAnnotationsToEncode, context,
buf);
- } else {
- // if this is not the position 0, something is going on
- // we just forward the exception as this is not
supposed to happen
- throw overflowException;
+ // using internalNioBuffer to save creating a new ByteBuffer
duplicate/slice/view in the loop
+ ByteBuffer nioBuffer = tmpFrameBuf.internalNioBuffer(0,
frameSize);
+ int bufPosition = nioBuffer.position();
+ tmpFrameBuf.clear();
+ final int writtenBytes;
+ if (position == 0) {
+ // no need to cache NettyWritable: position should be 0
just once per large message file
+ replaceInitialHeader(deliveryAnnotationsToEncode,
context, new NettyWritable(tmpFrameBuf));
+ writtenBytes = tmpFrameBuf.writerIndex();
+ // tested on
org.apache.activemq.artemis.tests.integration.amqp.AmqpMessageDivertsTest:
+ // tmpFrameBuf can grow over the initial capacity
+ if (nioBuffer.remaining() < writtenBytes) {
+ // ensure reading at least frameSize from the file
+ tmpFrameBuf.ensureWritable(frameSize);
Review comment:
Let's try again: I'm now simplified the code path, hope now it looks a
bit more simpler to follow, with less checks
##########
File path:
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
##########
@@ -587,43 +587,48 @@ void deliver() {
LargeBodyReader context = message.getLargeBodyReader();
try {
context.open();
+ final ByteBuf tmpFrameBuf =
PooledByteBufAllocator.DEFAULT.directBuffer(frameSize);
+ final NettyReadable nettyReadable = new NettyReadable(tmpFrameBuf);
try {
+
context.position(position);
long bodySize = context.getSize();
-
- ByteBuffer buf = ByteBuffer.allocate(frameSize);
+ // materialize it so we can use its internal NIO buffer
+ tmpFrameBuf.ensureWritable(frameSize);
for (; sender.getLocalState() != EndpointState.CLOSED &&
position < bodySize; ) {
if (!connection.flowControl(this::resume)) {
context.close();
return;
}
- buf.clear();
- int size = 0;
-
- try {
- if (position == 0) {
- replaceInitialHeader(deliveryAnnotationsToEncode,
context, WritableBuffer.ByteBufferWrapper.wrap(buf));
- }
- size = context.readInto(buf);
-
- sender.send(new ReadableBuffer.ByteBufferReader(buf));
- position += size;
- } catch (java.nio.BufferOverflowException overflowException)
{
- if (position == 0) {
- if (log.isDebugEnabled()) {
- log.debug("Delivery of message failed with an
overFlowException, retrying again with expandable buffer");
- }
- // on the very first packet, if the initial header was
replaced with a much bigger header (re-encoding)
- // we could recover the situation with a retry using
an expandable buffer.
- // this is tested on
org.apache.activemq.artemis.tests.integration.amqp.AmqpMessageDivertsTest
- size =
retryInitialPacketWithExpandableBuffer(deliveryAnnotationsToEncode, context,
buf);
- } else {
- // if this is not the position 0, something is going on
- // we just forward the exception as this is not
supposed to happen
- throw overflowException;
+ // using internalNioBuffer to save creating a new ByteBuffer
duplicate/slice/view in the loop
+ ByteBuffer nioBuffer = tmpFrameBuf.internalNioBuffer(0,
frameSize);
+ int bufPosition = nioBuffer.position();
+ tmpFrameBuf.clear();
+ final int writtenBytes;
+ if (position == 0) {
+ // no need to cache NettyWritable: position should be 0
just once per large message file
+ replaceInitialHeader(deliveryAnnotationsToEncode,
context, new NettyWritable(tmpFrameBuf));
+ writtenBytes = tmpFrameBuf.writerIndex();
+ // tested on
org.apache.activemq.artemis.tests.integration.amqp.AmqpMessageDivertsTest:
+ // tmpFrameBuf can grow over the initial capacity
+ if (nioBuffer.remaining() < writtenBytes) {
+ // ensure reading at least frameSize from the file
+ tmpFrameBuf.ensureWritable(frameSize);
Review comment:
Let's try again: I've simplified the code path, making it more concise
then the original one and retaining the same exact behavior: hope too now it
looks a bit more simpler to follow, with less checks
##########
File path:
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
##########
@@ -587,43 +587,48 @@ void deliver() {
LargeBodyReader context = message.getLargeBodyReader();
try {
context.open();
+ final ByteBuf tmpFrameBuf =
PooledByteBufAllocator.DEFAULT.directBuffer(frameSize);
+ final NettyReadable nettyReadable = new NettyReadable(tmpFrameBuf);
try {
+
context.position(position);
long bodySize = context.getSize();
-
- ByteBuffer buf = ByteBuffer.allocate(frameSize);
+ // materialize it so we can use its internal NIO buffer
+ tmpFrameBuf.ensureWritable(frameSize);
for (; sender.getLocalState() != EndpointState.CLOSED &&
position < bodySize; ) {
if (!connection.flowControl(this::resume)) {
context.close();
return;
}
- buf.clear();
- int size = 0;
-
- try {
- if (position == 0) {
- replaceInitialHeader(deliveryAnnotationsToEncode,
context, WritableBuffer.ByteBufferWrapper.wrap(buf));
- }
- size = context.readInto(buf);
-
- sender.send(new ReadableBuffer.ByteBufferReader(buf));
- position += size;
- } catch (java.nio.BufferOverflowException overflowException)
{
- if (position == 0) {
- if (log.isDebugEnabled()) {
- log.debug("Delivery of message failed with an
overFlowException, retrying again with expandable buffer");
- }
- // on the very first packet, if the initial header was
replaced with a much bigger header (re-encoding)
- // we could recover the situation with a retry using
an expandable buffer.
- // this is tested on
org.apache.activemq.artemis.tests.integration.amqp.AmqpMessageDivertsTest
- size =
retryInitialPacketWithExpandableBuffer(deliveryAnnotationsToEncode, context,
buf);
- } else {
- // if this is not the position 0, something is going on
- // we just forward the exception as this is not
supposed to happen
- throw overflowException;
+ // using internalNioBuffer to save creating a new ByteBuffer
duplicate/slice/view in the loop
+ ByteBuffer nioBuffer = tmpFrameBuf.internalNioBuffer(0,
frameSize);
+ int bufPosition = nioBuffer.position();
+ tmpFrameBuf.clear();
+ final int writtenBytes;
+ if (position == 0) {
+ // no need to cache NettyWritable: position should be 0
just once per large message file
+ replaceInitialHeader(deliveryAnnotationsToEncode,
context, new NettyWritable(tmpFrameBuf));
+ writtenBytes = tmpFrameBuf.writerIndex();
+ // tested on
org.apache.activemq.artemis.tests.integration.amqp.AmqpMessageDivertsTest:
+ // tmpFrameBuf can grow over the initial capacity
+ if (nioBuffer.remaining() < writtenBytes) {
+ // ensure reading at least frameSize from the file
+ tmpFrameBuf.ensureWritable(frameSize);
Review comment:
Let's try again: I've simplified the code path, making it more concise
then the original one and retaining the same exact behavior: hope too now it
looks a bit more simpler to follow, with less checks
Original code:
- `position == 0`: no header overflow -> `sent size := (header size + read
file) <= frameSize` attempt
- `position == 0`: header overflow -> `sent size := (header size + read
file) <= unbounded`, with `read file <= frameSize`
- `position != 0`: `sent size := read file <= frameSize`
The new version is 1:1 with this one, but need to track `headerSize` (by
reading the Netty buffer writer index) in order to extract the right slice of
(internal) NIO buffer to perform the file read, but the same code is used to
make more explicit the above logic re frame sizing too.
##########
File path:
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
##########
@@ -587,43 +587,48 @@ void deliver() {
LargeBodyReader context = message.getLargeBodyReader();
try {
context.open();
+ final ByteBuf tmpFrameBuf =
PooledByteBufAllocator.DEFAULT.directBuffer(frameSize);
+ final NettyReadable nettyReadable = new NettyReadable(tmpFrameBuf);
try {
+
context.position(position);
long bodySize = context.getSize();
-
- ByteBuffer buf = ByteBuffer.allocate(frameSize);
+ // materialize it so we can use its internal NIO buffer
+ tmpFrameBuf.ensureWritable(frameSize);
for (; sender.getLocalState() != EndpointState.CLOSED &&
position < bodySize; ) {
if (!connection.flowControl(this::resume)) {
context.close();
return;
}
- buf.clear();
- int size = 0;
-
- try {
- if (position == 0) {
- replaceInitialHeader(deliveryAnnotationsToEncode,
context, WritableBuffer.ByteBufferWrapper.wrap(buf));
- }
- size = context.readInto(buf);
-
- sender.send(new ReadableBuffer.ByteBufferReader(buf));
- position += size;
- } catch (java.nio.BufferOverflowException overflowException)
{
- if (position == 0) {
- if (log.isDebugEnabled()) {
- log.debug("Delivery of message failed with an
overFlowException, retrying again with expandable buffer");
- }
- // on the very first packet, if the initial header was
replaced with a much bigger header (re-encoding)
- // we could recover the situation with a retry using
an expandable buffer.
- // this is tested on
org.apache.activemq.artemis.tests.integration.amqp.AmqpMessageDivertsTest
- size =
retryInitialPacketWithExpandableBuffer(deliveryAnnotationsToEncode, context,
buf);
- } else {
- // if this is not the position 0, something is going on
- // we just forward the exception as this is not
supposed to happen
- throw overflowException;
+ // using internalNioBuffer to save creating a new ByteBuffer
duplicate/slice/view in the loop
+ ByteBuffer nioBuffer = tmpFrameBuf.internalNioBuffer(0,
frameSize);
+ int bufPosition = nioBuffer.position();
+ tmpFrameBuf.clear();
+ final int writtenBytes;
+ if (position == 0) {
+ // no need to cache NettyWritable: position should be 0
just once per large message file
+ replaceInitialHeader(deliveryAnnotationsToEncode,
context, new NettyWritable(tmpFrameBuf));
+ writtenBytes = tmpFrameBuf.writerIndex();
+ // tested on
org.apache.activemq.artemis.tests.integration.amqp.AmqpMessageDivertsTest:
+ // tmpFrameBuf can grow over the initial capacity
+ if (nioBuffer.remaining() < writtenBytes) {
+ // ensure reading at least frameSize from the file
+ tmpFrameBuf.ensureWritable(frameSize);
Review comment:
Let's try again: I've simplified the code path, making it more concise
then the original one and retaining the same exact behavior: hope too now it
looks a bit more simpler to follow, with less checks
Original code:
- `position == 0`: no header overflow -> `sent size := (header size + read
file) <= frameSize`
- `position == 0`: header overflow -> `sent size := (header size + read
file) <= unbounded`, with `read file <= frameSize`
- `position != 0`: `sent size := read file <= frameSize`
The new version is 1:1 with this one, but need to track `headerSize` (by
reading the Netty buffer writer index) in order to extract the right slice of
(internal) NIO buffer to perform the file read, but the same code is used to
make more explicit the above logic re frame sizing too.
##########
File path:
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
##########
@@ -587,43 +587,48 @@ void deliver() {
LargeBodyReader context = message.getLargeBodyReader();
try {
context.open();
+ final ByteBuf tmpFrameBuf =
PooledByteBufAllocator.DEFAULT.directBuffer(frameSize);
+ final NettyReadable nettyReadable = new NettyReadable(tmpFrameBuf);
try {
+
context.position(position);
long bodySize = context.getSize();
-
- ByteBuffer buf = ByteBuffer.allocate(frameSize);
+ // materialize it so we can use its internal NIO buffer
+ tmpFrameBuf.ensureWritable(frameSize);
for (; sender.getLocalState() != EndpointState.CLOSED &&
position < bodySize; ) {
if (!connection.flowControl(this::resume)) {
context.close();
return;
}
- buf.clear();
- int size = 0;
-
- try {
- if (position == 0) {
- replaceInitialHeader(deliveryAnnotationsToEncode,
context, WritableBuffer.ByteBufferWrapper.wrap(buf));
- }
- size = context.readInto(buf);
-
- sender.send(new ReadableBuffer.ByteBufferReader(buf));
- position += size;
- } catch (java.nio.BufferOverflowException overflowException)
{
- if (position == 0) {
- if (log.isDebugEnabled()) {
- log.debug("Delivery of message failed with an
overFlowException, retrying again with expandable buffer");
- }
- // on the very first packet, if the initial header was
replaced with a much bigger header (re-encoding)
- // we could recover the situation with a retry using
an expandable buffer.
- // this is tested on
org.apache.activemq.artemis.tests.integration.amqp.AmqpMessageDivertsTest
- size =
retryInitialPacketWithExpandableBuffer(deliveryAnnotationsToEncode, context,
buf);
- } else {
- // if this is not the position 0, something is going on
- // we just forward the exception as this is not
supposed to happen
- throw overflowException;
+ // using internalNioBuffer to save creating a new ByteBuffer
duplicate/slice/view in the loop
+ ByteBuffer nioBuffer = tmpFrameBuf.internalNioBuffer(0,
frameSize);
+ int bufPosition = nioBuffer.position();
+ tmpFrameBuf.clear();
+ final int writtenBytes;
+ if (position == 0) {
+ // no need to cache NettyWritable: position should be 0
just once per large message file
+ replaceInitialHeader(deliveryAnnotationsToEncode,
context, new NettyWritable(tmpFrameBuf));
+ writtenBytes = tmpFrameBuf.writerIndex();
+ // tested on
org.apache.activemq.artemis.tests.integration.amqp.AmqpMessageDivertsTest:
+ // tmpFrameBuf can grow over the initial capacity
+ if (nioBuffer.remaining() < writtenBytes) {
+ // ensure reading at least frameSize from the file
+ tmpFrameBuf.ensureWritable(frameSize);
Review comment:
Let's try again: I've simplified the code path, making it more concise
then the original one and retaining the same exact behavior: hope too now it
looks a bit more simpler to follow, with less checks
Original code:
- `position == 0`: no header overflow -> `sent size := (header size + read
file) <= frameSize`
- `position == 0`: header overflow -> `sent size := (header size + read
file) <= unbounded`, with `read file <= frameSize`
- `position != 0`: `sent size := read file <= frameSize`
The new version is 1:1 with this one, but need to track `headerSize` (by
reading the Netty buffer writer index) in order to extract the right slice of
(internal) NIO buffer to perform the file read: luckily the same code is used
to make more explicit the above logic re frame sizing.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 644217)
Time Spent: 5.5h (was: 5h 20m)
> Speedup AMQP large message streaming
> ------------------------------------
>
> Key: ARTEMIS-3449
> URL: https://issues.apache.org/jira/browse/ARTEMIS-3449
> Project: ActiveMQ Artemis
> Issue Type: Improvement
> Reporter: Francesco Nigro
> Assignee: Francesco Nigro
> Priority: Major
> Time Spent: 5.5h
> Remaining Estimate: 0h
>
> AMQP is using unpooled heap ByteBuffer(s) to stream AMQP large messages:
> given that the underline NIO sequential file can both use FileChannel or
> RandomAccessFile (depending if the ByteBuffer used is direct/heap based),
> both approaches would benefit from using Netty pooled direct buffers and save
> additional copies (performed by RandomAccessFile) to happen, reducing GC too.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)