This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/master by this push:
     new d27d61f  ARTEMIS-2706 Use FrameSize to decide when to flush large 
messages
     new 17d6590  This closes #3079
d27d61f is described below

commit d27d61f223fe88fd01f8d98415ddadb75605d374
Author: Clebert Suconic <[email protected]>
AuthorDate: Mon Apr 13 20:29:57 2020 -0400

    ARTEMIS-2706 Use FrameSize to decide when to flush large messages
---
 .../protocol/amqp/proton/ProtonServerSenderContext.java | 17 ++++++++++-------
 1 file changed, 10 insertions(+), 7 deletions(-)

diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
index 24d0bcb..4810519 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
@@ -836,9 +836,10 @@ public class ProtonServerSenderContext extends 
ProtonInitializable implements Pr
       void resume() {
          connection.runNow(this::deliver);
       }
-      private static final int BUFFER_LENGTH = 1024;
-
       void deliver() {
+
+         int frameSize = 
protonSession.session.getConnection().getTransport().getOutboundFrameSizeLimit();
+
          // Let the Message decide how to present the message bytes
          LargeBodyReader context = message.getLargeBodyReader();
          try {
@@ -850,7 +851,7 @@ public class ProtonServerSenderContext extends 
ProtonInitializable implements Pr
 
                // TODO: it would be nice to use pooled buffer here,
                //       however I would need a version of ReadableBuffer for 
Netty
-               ByteBuffer buf = ByteBuffer.allocate(BUFFER_LENGTH);
+               ByteBuffer buf = ByteBuffer.allocate(frameSize);
 
                for (; position < bodySize; ) {
                   if (!connection.flowControl(this::resume)) {
@@ -860,11 +861,13 @@ public class ProtonServerSenderContext extends 
ProtonInitializable implements Pr
                   buf.clear();
                   int size = context.readInto(buf);
 
-                  sender.send(buf.array(), 0, size);
-
-                  connection.instantFlush();
+                  sender.send(new ReadableBuffer.ByteBufferReader(buf));
 
                   position += size;
+
+                  if (position < bodySize) {
+                     connection.instantFlush();
+                  }
                }
             } finally {
                context.close();
@@ -882,7 +885,7 @@ public class ProtonServerSenderContext extends 
ProtonInitializable implements Pr
                sender.advance();
             }
 
-            connection.flush();
+            connection.instantFlush();
 
             synchronized (creditsLock) {
                pending.decrementAndGet();

Reply via email to