This is an automated email from the ASF dual-hosted git repository.
jbonofre pushed a commit to branch activemq-5.16.x
in repository https://gitbox.apache.org/repos/asf/activemq.git
The following commit(s) were added to refs/heads/activemq-5.16.x by this push:
new afc50ee [AMQ-8412] Client-side management of max frame size
afc50ee is described below
commit afc50ee7275b5417d5768db84a9fe9054d526e7a
Author: Matt Pavlovich <[email protected]>
AuthorDate: Mon Jan 24 13:10:09 2022 -0600
[AMQ-8412] Client-side management of max frame size
- Store maxFrameSize on client-side
- Check messages on outbound send to send an actionable exception to the
caller
- Add a reason code for max message size exceeded (client-side)
(cherry picked from commit 48d0fb11716d3490b1d76f7e8860b61324d4e82f)
---
.../java/org/apache/activemq/ActiveMQConnection.java | 20 +++++++++++++++++++-
1 file changed, 19 insertions(+), 1 deletion(-)
diff --git
a/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java
b/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java
index 5d11fb6..2be8203 100644
--- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java
+++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java
@@ -35,6 +35,7 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
import javax.jms.Connection;
import javax.jms.ConnectionConsumer;
@@ -192,6 +193,7 @@ public class ActiveMQConnection implements Connection,
TopicConnection, QueueCon
// Assume that protocol is the latest. Change to the actual protocol
// version when a WireFormatInfo is received.
private final AtomicInteger protocolVersion = new
AtomicInteger(CommandTypes.PROTOCOL_VERSION);
+ private final AtomicLong maxFrameSize = new AtomicLong(Long.MAX_VALUE);
private final long timeCreated;
private final ConnectionAudit connectionAudit = new ConnectionAudit();
private DestinationSource destinationSource;
@@ -1386,7 +1388,12 @@ public class ActiveMQConnection implements Connection,
TopicConnection, QueueCon
if (isClosed()) {
throw new ConnectionClosedException();
} else {
-
+ if(command.isMessage()) {
+ int tmpMsgSize = Message.class.cast(command).getSize();
+ if(maxFrameSize.get() < tmpMsgSize) {
+ throw new JMSException("Message size: " + tmpMsgSize + "
exceeds maximum allowed by broker: " + maxFrameSize.get(), "41300");
+ }
+ }
try {
Response response = (Response)(timeout > 0
? this.transport.request(command, timeout)
@@ -1920,6 +1927,17 @@ public class ActiveMQConnection implements Connection,
TopicConnection, QueueCon
protected void onWireFormatInfo(WireFormatInfo info) {
protocolVersion.set(info.getVersion());
+
+ long tmpMaxFrameSize = 0;
+ try {
+ tmpMaxFrameSize = info.getMaxFrameSize();
+ } catch (IOException e) {
+ // unmarshal error on property map
+ }
+
+ if(tmpMaxFrameSize > 0) {
+ maxFrameSize.set(tmpMaxFrameSize);
+ }
}
/**