This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/master by this push:
new d27d61f ARTEMIS-2706 Use FrameSize to decide when to flush large
messages
new 17d6590 This closes #3079
d27d61f is described below
commit d27d61f223fe88fd01f8d98415ddadb75605d374
Author: Clebert Suconic <[email protected]>
AuthorDate: Mon Apr 13 20:29:57 2020 -0400
ARTEMIS-2706 Use FrameSize to decide when to flush large messages
---
.../protocol/amqp/proton/ProtonServerSenderContext.java | 17 ++++++++++-------
1 file changed, 10 insertions(+), 7 deletions(-)
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
index 24d0bcb..4810519 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
@@ -836,9 +836,10 @@ public class ProtonServerSenderContext extends
ProtonInitializable implements Pr
void resume() {
connection.runNow(this::deliver);
}
- private static final int BUFFER_LENGTH = 1024;
-
void deliver() {
+
+ int frameSize =
protonSession.session.getConnection().getTransport().getOutboundFrameSizeLimit();
+
// Let the Message decide how to present the message bytes
LargeBodyReader context = message.getLargeBodyReader();
try {
@@ -850,7 +851,7 @@ public class ProtonServerSenderContext extends
ProtonInitializable implements Pr
// TODO: it would be nice to use pooled buffer here,
// however I would need a version of ReadableBuffer for
Netty
- ByteBuffer buf = ByteBuffer.allocate(BUFFER_LENGTH);
+ ByteBuffer buf = ByteBuffer.allocate(frameSize);
for (; position < bodySize; ) {
if (!connection.flowControl(this::resume)) {
@@ -860,11 +861,13 @@ public class ProtonServerSenderContext extends
ProtonInitializable implements Pr
buf.clear();
int size = context.readInto(buf);
- sender.send(buf.array(), 0, size);
-
- connection.instantFlush();
+ sender.send(new ReadableBuffer.ByteBufferReader(buf));
position += size;
+
+ if (position < bodySize) {
+ connection.instantFlush();
+ }
}
} finally {
context.close();
@@ -882,7 +885,7 @@ public class ProtonServerSenderContext extends
ProtonInitializable implements Pr
sender.advance();
}
- connection.flush();
+ connection.instantFlush();
synchronized (creditsLock) {
pending.decrementAndGet();