This is an automated email from the ASF dual-hosted git repository.

jbonofre pushed a commit to branch activemq-5.16.x
in repository https://gitbox.apache.org/repos/asf/activemq.git


The following commit(s) were added to refs/heads/activemq-5.16.x by this push:
     new afc50ee  [AMQ-8412] Client-side management of max frame size
afc50ee is described below

commit afc50ee7275b5417d5768db84a9fe9054d526e7a
Author: Matt Pavlovich <[email protected]>
AuthorDate: Mon Jan 24 13:10:09 2022 -0600

    [AMQ-8412] Client-side management of max frame size
    
     - Store maxFrameSize on client-side
     - Check messages on outbound send to send an actionable exception to the 
caller
     - Add a reason code for max message size exceeded (client-side)
    
    (cherry picked from commit 48d0fb11716d3490b1d76f7e8860b61324d4e82f)
---
 .../java/org/apache/activemq/ActiveMQConnection.java | 20 +++++++++++++++++++-
 1 file changed, 19 insertions(+), 1 deletion(-)

diff --git 
a/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java 
b/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java
index 5d11fb6..2be8203 100644
--- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java
+++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java
@@ -35,6 +35,7 @@ import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 
 import javax.jms.Connection;
 import javax.jms.ConnectionConsumer;
@@ -192,6 +193,7 @@ public class ActiveMQConnection implements Connection, 
TopicConnection, QueueCon
     // Assume that protocol is the latest. Change to the actual protocol
     // version when a WireFormatInfo is received.
     private final AtomicInteger protocolVersion = new 
AtomicInteger(CommandTypes.PROTOCOL_VERSION);
+    private final AtomicLong maxFrameSize = new AtomicLong(Long.MAX_VALUE);
     private final long timeCreated;
     private final ConnectionAudit connectionAudit = new ConnectionAudit();
     private DestinationSource destinationSource;
@@ -1386,7 +1388,12 @@ public class ActiveMQConnection implements Connection, 
TopicConnection, QueueCon
         if (isClosed()) {
             throw new ConnectionClosedException();
         } else {
-
+            if(command.isMessage()) {
+                int tmpMsgSize = Message.class.cast(command).getSize();
+                if(maxFrameSize.get() < tmpMsgSize) {
+                    throw new JMSException("Message size: " +  tmpMsgSize + " 
exceeds maximum allowed by broker: " + maxFrameSize.get(), "41300");
+                }
+            }
             try {
                 Response response = (Response)(timeout > 0
                         ? this.transport.request(command, timeout)
@@ -1920,6 +1927,17 @@ public class ActiveMQConnection implements Connection, 
TopicConnection, QueueCon
 
     protected void onWireFormatInfo(WireFormatInfo info) {
         protocolVersion.set(info.getVersion());
+
+        long tmpMaxFrameSize = 0;
+        try {
+            tmpMaxFrameSize = info.getMaxFrameSize();
+        } catch (IOException e) {
+            // unmarshal error on property map
+        }
+
+        if(tmpMaxFrameSize > 0) {
+            maxFrameSize.set(tmpMaxFrameSize);
+        }
     }
 
     /**

Reply via email to