diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AbstractControl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AbstractControl.java
index 021a6cef2c..5931b3ce68 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AbstractControl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AbstractControl.java
@@ -22,7 +22,11 @@
import javax.management.NotCompliantMBeanException;
import javax.management.StandardMBean;
+import org.apache.activemq.artemis.core.message.impl.CoreMessage;
import org.apache.activemq.artemis.core.persistence.StorageManager;
+import org.apache.activemq.artemis.core.postoffice.PostOffice;
+import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
public abstract class AbstractControl extends StandardMBean {
@@ -67,6 +71,23 @@ protected void blockOnIO() {
}
+ /**
+ * This is used to limit the size of message that can be sent over
+ * management api. The message size shall be less than the journal
+ * buffer size.
+ * @param postOffice the PostOffice
+ * @param message the message to be checked
+ * @throws Exception if message size is over the limit.
+ */
+ protected void checkMessageSize(PostOffice postOffice, CoreMessage message)
throws Exception {
+ int msize = message.getPersistSize();
+ ActiveMQServer server = postOffice.getServer();
+ int journalBufferSize =
server.getConfiguration().getJournalBufferSize_AIO();
+ if (msize >= journalBufferSize) {
+ throw
ActiveMQMessageBundle.BUNDLE.rejectOverSizeMessage(journalBufferSize);
+ }
+ }
+
protected abstract MBeanOperationInfo[] fillMBeanOperationInfo();
protected abstract MBeanAttributeInfo[] fillMBeanAttributeInfo();
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java
index b24c370dac..6820ec765a 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java
@@ -335,6 +335,7 @@ public RemotingConnection getRemotingConnection() {
}
}
message.setAddress(addressInfo.getName());
+ checkMessageSize(postOffice, message);
postOffice.route(message, true);
return "" + message.getMessageID();
} catch (ActiveMQException e) {
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
index d62978f2e6..b4f85f02a1 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
@@ -1005,6 +1005,7 @@ public RemotingConnection getRemotingConnection() {
ByteBuffer buffer = ByteBuffer.allocate(8);
buffer.putLong(queue.getID());
message.putBytesProperty(Message.HDR_ROUTE_TO_IDS, buffer.array());
+ checkMessageSize(postOffice, message);
postOffice.route(message, true);
return "" + message.getMessageID();
} catch (ActiveMQException e) {
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java
index fb1e96c058..aa7cc41542 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java
@@ -26,6 +26,7 @@
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.RoutingContext;
@@ -164,4 +165,6 @@ RoutingStatus route(Message message,
Set<SimpleString> getAddresses();
void updateMessageLoadBalancingTypeForAddress(SimpleString address,
MessageLoadBalancingType messageLoadBalancingType) throws Exception;
+
+ ActiveMQServer getServer();
}
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
index bf12baf32f..c2e9269f8b 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
@@ -1711,6 +1711,7 @@ public AddressManager getAddressManager() {
return addressManager;
}
+ @Override
public ActiveMQServer getServer() {
return server;
}
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java
index bc86af7978..1574689b7a 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java
@@ -441,4 +441,7 @@ IllegalStateException invalidRoutingTypeUpdate(String
queueName,
@Message(id = 229215, value = "Cannot delete queue {0} on binding {1} - it
has {2} messages", format = Message.Format.MESSAGE_FORMAT)
ActiveMQIllegalStateException cannotDeleteQueueWithMessages(SimpleString
name, SimpleString queueName, long messageCount);
+
+ @Message(id = 229216, value = "Message rejected because size over limit:
{0}. Increase the journal-buffer-size to allow larger messages", format =
Message.Format.MESSAGE_FORMAT)
+ ActiveMQException rejectOverSizeMessage(int limit);
}
diff --git a/docs/user-manual/en/management.md
b/docs/user-manual/en/management.md
index 60ef2a184f..6e34e2b734 100644
--- a/docs/user-manual/en/management.md
+++ b/docs/user-manual/en/management.md
@@ -205,6 +205,13 @@ a given property.)
paused, it will receive messages but will not deliver them. When it's
resumed,
it'll begin delivering the queued messages, if any.
+#### Size limit of messages sent over QueueControl and AddressControl
+
+Both `QueueControl` and `AddressControl` provide `sendMessage()` operation
which
+allows user to send messages to broker. The total size of each message should
+be less than the broker's journal-buffer-size (see
[Persistence](persistence.md)).
+Messages larger than that will be rejected.
+
#### Other Resources Management
Apache ActiveMQ Artemis allows to start and stop its remote resources
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/AddressControlTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/AddressControlTest.java
index 69794d8bdb..1c63ba0854 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/AddressControlTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/AddressControlTest.java
@@ -36,10 +36,8 @@
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.api.core.management.AddressControl;
import org.apache.activemq.artemis.api.core.management.RoleInfo;
-import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.security.CheckType;
import org.apache.activemq.artemis.core.security.Role;
-import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.impl.QueueImpl;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.util.Wait;
@@ -53,7 +51,6 @@
public class AddressControlTest extends ManagementTestBase {
- private ActiveMQServer server;
protected ClientSession session;
private ServerLocator locator;
private ClientSessionFactory sf;
@@ -401,10 +398,7 @@ public void testSendMessage() throws Exception {
public void setUp() throws Exception {
super.setUp();
- Configuration config =
createDefaultInVMConfig().setJMXManagementEnabled(true);
- server = createServer(false, config);
- server.setMBeanServer(mbeanServer);
- server.start();
+ createManageableServer(false, true);
locator = createInVMNonHALocator().setBlockOnNonDurableSend(true);
sf = createSessionFactory(locator);
@@ -413,10 +407,6 @@ public void setUp() throws Exception {
addClientSession(session);
}
- protected AddressControl createManagementControl(final SimpleString
address) throws Exception {
- return ManagementControlHelper.createAddressControl(address,
mbeanServer);
- }
-
// Private -------------------------------------------------------
// Inner classes -------------------------------------------------
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ManagementTestBase.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ManagementTestBase.java
index baaaab3a78..47e1463c49 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ManagementTestBase.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ManagementTestBase.java
@@ -25,7 +25,11 @@
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientSession;
+import org.apache.activemq.artemis.api.core.management.AddressControl;
import org.apache.activemq.artemis.api.core.management.QueueControl;
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.ActiveMQServers;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.junit.After;
import org.junit.Assert;
@@ -37,6 +41,7 @@
// Attributes ----------------------------------------------------
+ protected ActiveMQServer server;
protected MBeanServer mbeanServer;
// Static --------------------------------------------------------
@@ -79,6 +84,14 @@ public void setUp() throws Exception {
createMBeanServer();
}
+ protected void createManageableServer(boolean enablePersistence, boolean
startServer) throws Exception {
+ Configuration conf =
createDefaultInVMConfig().setJMXManagementEnabled(true);
+ server = addServer(ActiveMQServers.newActiveMQServer(conf, mbeanServer,
enablePersistence));
+ if (startServer) {
+ server.start();
+ }
+ }
+
protected void createMBeanServer() {
mbeanServer = MBeanServerFactory.createMBeanServer();
}
@@ -117,6 +130,10 @@ protected QueueControl createManagementControl(final
SimpleString address,
return queueControl;
}
+ protected AddressControl createManagementControl(final SimpleString
address) throws Exception {
+ return ManagementControlHelper.createAddressControl(address,
mbeanServer);
+ }
+
protected long getMessageCount(QueueControl control) throws Exception {
control.flushExecutor();
return control.getMessageCount();
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/PersistentManagementTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/PersistentManagementTest.java
new file mode 100644
index 0000000000..27d58d3886
--- /dev/null
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/PersistentManagementTest.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.management;
+
+import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.client.ClientSession;
+import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
+import org.apache.activemq.artemis.api.core.client.ServerLocator;
+import org.apache.activemq.artemis.api.core.management.AddressControl;
+import org.apache.activemq.artemis.api.core.management.QueueControl;
+import org.apache.activemq.artemis.utils.Base64;
+import org.apache.activemq.artemis.utils.RandomUtil;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.HashMap;
+
+public class PersistentManagementTest extends ManagementTestBase {
+
+ private ClientSession session;
+ private ServerLocator locator;
+ private ClientSessionFactory sf;
+
+ @Override
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+ createManageableServer(true, true);
+
+ locator = createInVMNonHALocator().setBlockOnNonDurableSend(true);
+ sf = createSessionFactory(locator);
+ session = sf.createSession(false, true, false);
+ session.start();
+ addClientSession(session);
+ }
+
+ @Test
+ public void testSendOverSizeMessageOverQueueControl() throws Exception {
+ SimpleString address = RandomUtil.randomSimpleString();
+ SimpleString queue = RandomUtil.randomSimpleString();
+
+ session.createQueue(address, RoutingType.MULTICAST, queue, null, true);
+
+ QueueControl queueControl = createManagementControl(address, queue);
+
+ int bodySize = server.getConfiguration().getJournalBufferSize_AIO();
+ byte[] bigData = createBytesData(bodySize);
+
+ try {
+ queueControl.sendMessage(new HashMap<String, String>(),
Message.BYTES_TYPE, Base64.encodeBytes(bigData), true, "myUser", "myPassword");
+ fail("Expecting message being rejected.");
+ } catch (Exception e) {
+ //got rejected. ok
+ }
+ }
+
+ @Test
+ public void testSendOverSizeMessageOverAddressControl() throws Exception {
+
+ SimpleString address = RandomUtil.randomSimpleString();
+ session.createAddress(address, RoutingType.ANYCAST, false);
+
+ AddressControl addressControl = createManagementControl(address);
+ session.createQueue(address, RoutingType.ANYCAST, address);
+
+ int bodySize = server.getConfiguration().getJournalBufferSize_AIO();
+ byte[] bigData = createBytesData(bodySize);
+
+ try {
+ addressControl.sendMessage(null, Message.BYTES_TYPE,
Base64.encodeBytes(bigData), false, null, null);
+ fail("Expecting message being rejected.");
+ } catch (Exception e) {
+ //got rejected. ok
+ }
+ }
+
+ byte[] createBytesData(int nbytes) {
+ byte[] result = new byte[nbytes];
+ for (int i = 0; i < nbytes; i++) {
+ result[i] = RandomUtil.randomByte();
+ }
+ return result;
+ }
+}
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
index 71780e96a1..53057a449d 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
@@ -56,13 +56,10 @@
import org.apache.activemq.artemis.api.core.management.MessageCounterInfo;
import org.apache.activemq.artemis.api.core.management.ObjectNameBuilder;
import org.apache.activemq.artemis.api.core.management.QueueControl;
-import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.DivertConfiguration;
import
org.apache.activemq.artemis.core.messagecounter.impl.MessageCounterManagerImpl;
import org.apache.activemq.artemis.core.paging.impl.PagingManagerImpl;
import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
-import org.apache.activemq.artemis.core.server.ActiveMQServer;
-import org.apache.activemq.artemis.core.server.ActiveMQServers;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.impl.QueueImpl;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
@@ -79,7 +76,6 @@
@RunWith(value = Parameterized.class)
public class QueueControlTest extends ManagementTestBase {
- private ActiveMQServer server;
private ClientSession session;
private ServerLocator locator;
private final boolean durable;
@@ -2840,10 +2836,7 @@ public void testGetScheduledCountOnRemove() throws
Exception {
@Before
public void setUp() throws Exception {
super.setUp();
- Configuration conf =
createDefaultInVMConfig().setJMXManagementEnabled(true);
- server = addServer(ActiveMQServers.newActiveMQServer(conf, mbeanServer,
false));
-
- server.start();
+ createManageableServer(false, true);
locator =
createInVMNonHALocator().setBlockOnNonDurableSend(true).setConsumerWindowSize(0);
ClientSessionFactory sf = createSessionFactory(locator);
@@ -2898,4 +2891,5 @@ protected void assertMetrics(final QueueControl
queueControl, long messageCount,
Wait.assertEquals(0L, () -> durableSize.get().longValue(), 3 * 1000,
100);
}
}
+
}
diff --git
a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java
index 4bfb7e64b8..ce7421283e 100644
---
a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java
+++
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java
@@ -33,6 +33,7 @@
import org.apache.activemq.artemis.core.postoffice.QueueBinding;
import org.apache.activemq.artemis.core.postoffice.RoutingStatus;
import org.apache.activemq.artemis.core.postoffice.impl.DuplicateIDCacheImpl;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.RoutingContext;
@@ -80,6 +81,11 @@ public void
updateMessageLoadBalancingTypeForAddress(SimpleString address, Mess
}
+ @Override
+ public ActiveMQServer getServer() {
+ return null;
+ }
+
@Override
public SimpleString getMatchingQueue(SimpleString address, RoutingType
routingType) {
With regards,
Apache Git Services