diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AbstractControl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AbstractControl.java
index 021a6cef2c..5931b3ce68 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AbstractControl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AbstractControl.java
@@ -22,7 +22,11 @@
 import javax.management.NotCompliantMBeanException;
 import javax.management.StandardMBean;
 
+import org.apache.activemq.artemis.core.message.impl.CoreMessage;
 import org.apache.activemq.artemis.core.persistence.StorageManager;
+import org.apache.activemq.artemis.core.postoffice.PostOffice;
+import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
 
 public abstract class AbstractControl extends StandardMBean {
 
@@ -67,6 +71,23 @@ protected void blockOnIO() {
 
    }
 
+   /**
+    * This is used to limit the size of message that can be sent over
+    * management api. The message size shall be less than the journal
+    * buffer size.
+    * @param postOffice the PostOffice
+    * @param message the message to be checked
+    * @throws Exception if message size is over the limit.
+    */
+   protected void checkMessageSize(PostOffice postOffice, CoreMessage message) 
throws Exception {
+      int msize = message.getPersistSize();
+      ActiveMQServer server = postOffice.getServer();
+      int journalBufferSize = 
server.getConfiguration().getJournalBufferSize_AIO();
+      if (msize >= journalBufferSize) {
+         throw 
ActiveMQMessageBundle.BUNDLE.rejectOverSizeMessage(journalBufferSize);
+      }
+   }
+
    protected abstract MBeanOperationInfo[] fillMBeanOperationInfo();
 
    protected abstract MBeanAttributeInfo[] fillMBeanAttributeInfo();
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java
index b24c370dac..6820ec765a 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java
@@ -335,6 +335,7 @@ public RemotingConnection getRemotingConnection() {
             }
          }
          message.setAddress(addressInfo.getName());
+         checkMessageSize(postOffice, message);
          postOffice.route(message, true);
          return "" + message.getMessageID();
       } catch (ActiveMQException e) {
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
index d62978f2e6..b4f85f02a1 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
@@ -1005,6 +1005,7 @@ public RemotingConnection getRemotingConnection() {
          ByteBuffer buffer = ByteBuffer.allocate(8);
          buffer.putLong(queue.getID());
          message.putBytesProperty(Message.HDR_ROUTE_TO_IDS, buffer.array());
+         checkMessageSize(postOffice, message);
          postOffice.route(message, true);
          return "" + message.getMessageID();
       } catch (ActiveMQException e) {
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java
index fb1e96c058..aa7cc41542 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java
@@ -26,6 +26,7 @@
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.filter.Filter;
 import org.apache.activemq.artemis.core.server.ActiveMQComponent;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.RoutingContext;
@@ -164,4 +165,6 @@ RoutingStatus route(Message message,
    Set<SimpleString> getAddresses();
 
    void updateMessageLoadBalancingTypeForAddress(SimpleString  address, 
MessageLoadBalancingType messageLoadBalancingType) throws Exception;
+
+   ActiveMQServer getServer();
 }
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
index bf12baf32f..c2e9269f8b 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
@@ -1711,6 +1711,7 @@ public AddressManager getAddressManager() {
       return addressManager;
    }
 
+   @Override
    public ActiveMQServer getServer() {
       return server;
    }
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java
index bc86af7978..1574689b7a 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java
@@ -441,4 +441,7 @@ IllegalStateException invalidRoutingTypeUpdate(String 
queueName,
 
    @Message(id = 229215, value = "Cannot delete queue {0} on binding {1} - it 
has {2} messages", format = Message.Format.MESSAGE_FORMAT)
    ActiveMQIllegalStateException cannotDeleteQueueWithMessages(SimpleString 
name, SimpleString queueName, long messageCount);
+
+   @Message(id = 229216, value = "Message rejected because size over limit: 
{0}. Increase the journal-buffer-size to allow larger messages", format = 
Message.Format.MESSAGE_FORMAT)
+   ActiveMQException rejectOverSizeMessage(int limit);
 }
diff --git a/docs/user-manual/en/management.md 
b/docs/user-manual/en/management.md
index 60ef2a184f..6e34e2b734 100644
--- a/docs/user-manual/en/management.md
+++ b/docs/user-manual/en/management.md
@@ -205,6 +205,13 @@ a given property.)
   paused, it will receive messages but will not deliver them.  When it's 
resumed,
   it'll begin delivering the queued messages, if any.
 
+#### Size limit of messages sent over QueueControl and AddressControl
+
+Both `QueueControl` and `AddressControl` provide `sendMessage()` operation 
which
+allows user to send messages to broker. The total size of each message should
+be less than the broker's journal-buffer-size (see 
[Persistence](persistence.md)).
+Messages larger than that will be rejected.
+
 #### Other Resources Management
 
 Apache ActiveMQ Artemis allows to start and stop its remote resources
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/AddressControlTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/AddressControlTest.java
index 69794d8bdb..1c63ba0854 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/AddressControlTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/AddressControlTest.java
@@ -36,10 +36,8 @@
 import org.apache.activemq.artemis.api.core.client.ServerLocator;
 import org.apache.activemq.artemis.api.core.management.AddressControl;
 import org.apache.activemq.artemis.api.core.management.RoleInfo;
-import org.apache.activemq.artemis.core.config.Configuration;
 import org.apache.activemq.artemis.core.security.CheckType;
 import org.apache.activemq.artemis.core.security.Role;
-import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.impl.QueueImpl;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import org.apache.activemq.artemis.tests.util.Wait;
@@ -53,7 +51,6 @@
 
 public class AddressControlTest extends ManagementTestBase {
 
-   private ActiveMQServer server;
    protected ClientSession session;
    private ServerLocator locator;
    private ClientSessionFactory sf;
@@ -401,10 +398,7 @@ public void testSendMessage() throws Exception {
    public void setUp() throws Exception {
       super.setUp();
 
-      Configuration config = 
createDefaultInVMConfig().setJMXManagementEnabled(true);
-      server = createServer(false, config);
-      server.setMBeanServer(mbeanServer);
-      server.start();
+      createManageableServer(false, true);
 
       locator = createInVMNonHALocator().setBlockOnNonDurableSend(true);
       sf = createSessionFactory(locator);
@@ -413,10 +407,6 @@ public void setUp() throws Exception {
       addClientSession(session);
    }
 
-   protected AddressControl createManagementControl(final SimpleString 
address) throws Exception {
-      return ManagementControlHelper.createAddressControl(address, 
mbeanServer);
-   }
-
    // Private -------------------------------------------------------
 
    // Inner classes -------------------------------------------------
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ManagementTestBase.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ManagementTestBase.java
index baaaab3a78..47e1463c49 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ManagementTestBase.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ManagementTestBase.java
@@ -25,7 +25,11 @@
 import org.apache.activemq.artemis.api.core.client.ClientConsumer;
 import org.apache.activemq.artemis.api.core.client.ClientMessage;
 import org.apache.activemq.artemis.api.core.client.ClientSession;
+import org.apache.activemq.artemis.api.core.management.AddressControl;
 import org.apache.activemq.artemis.api.core.management.QueueControl;
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.ActiveMQServers;
 import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
 import org.junit.After;
 import org.junit.Assert;
@@ -37,6 +41,7 @@
 
    // Attributes ----------------------------------------------------
 
+   protected ActiveMQServer server;
    protected MBeanServer mbeanServer;
 
    // Static --------------------------------------------------------
@@ -79,6 +84,14 @@ public void setUp() throws Exception {
       createMBeanServer();
    }
 
+   protected void createManageableServer(boolean enablePersistence, boolean 
startServer) throws Exception {
+      Configuration conf = 
createDefaultInVMConfig().setJMXManagementEnabled(true);
+      server = addServer(ActiveMQServers.newActiveMQServer(conf, mbeanServer, 
enablePersistence));
+      if (startServer) {
+         server.start();
+      }
+   }
+
    protected void createMBeanServer() {
       mbeanServer = MBeanServerFactory.createMBeanServer();
    }
@@ -117,6 +130,10 @@ protected QueueControl createManagementControl(final 
SimpleString address,
       return queueControl;
    }
 
+   protected AddressControl createManagementControl(final SimpleString 
address) throws Exception {
+      return ManagementControlHelper.createAddressControl(address, 
mbeanServer);
+   }
+
    protected long getMessageCount(QueueControl control) throws Exception {
       control.flushExecutor();
       return control.getMessageCount();
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/PersistentManagementTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/PersistentManagementTest.java
new file mode 100644
index 0000000000..27d58d3886
--- /dev/null
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/PersistentManagementTest.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.management;
+
+import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.client.ClientSession;
+import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
+import org.apache.activemq.artemis.api.core.client.ServerLocator;
+import org.apache.activemq.artemis.api.core.management.AddressControl;
+import org.apache.activemq.artemis.api.core.management.QueueControl;
+import org.apache.activemq.artemis.utils.Base64;
+import org.apache.activemq.artemis.utils.RandomUtil;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.HashMap;
+
+public class PersistentManagementTest extends ManagementTestBase {
+
+   private ClientSession session;
+   private ServerLocator locator;
+   private ClientSessionFactory sf;
+
+   @Override
+   @Before
+   public void setUp() throws Exception {
+      super.setUp();
+      createManageableServer(true, true);
+
+      locator = createInVMNonHALocator().setBlockOnNonDurableSend(true);
+      sf = createSessionFactory(locator);
+      session = sf.createSession(false, true, false);
+      session.start();
+      addClientSession(session);
+   }
+
+   @Test
+   public void testSendOverSizeMessageOverQueueControl() throws Exception {
+      SimpleString address = RandomUtil.randomSimpleString();
+      SimpleString queue = RandomUtil.randomSimpleString();
+
+      session.createQueue(address, RoutingType.MULTICAST, queue, null, true);
+
+      QueueControl queueControl = createManagementControl(address, queue);
+
+      int bodySize = server.getConfiguration().getJournalBufferSize_AIO();
+      byte[] bigData = createBytesData(bodySize);
+
+      try {
+         queueControl.sendMessage(new HashMap<String, String>(), 
Message.BYTES_TYPE, Base64.encodeBytes(bigData), true, "myUser", "myPassword");
+         fail("Expecting message being rejected.");
+      } catch (Exception e) {
+         //got rejected. ok
+      }
+   }
+
+   @Test
+   public void testSendOverSizeMessageOverAddressControl() throws Exception {
+
+      SimpleString address = RandomUtil.randomSimpleString();
+      session.createAddress(address, RoutingType.ANYCAST, false);
+
+      AddressControl addressControl = createManagementControl(address);
+      session.createQueue(address, RoutingType.ANYCAST, address);
+
+      int bodySize = server.getConfiguration().getJournalBufferSize_AIO();
+      byte[] bigData = createBytesData(bodySize);
+
+      try {
+         addressControl.sendMessage(null, Message.BYTES_TYPE, 
Base64.encodeBytes(bigData), false, null, null);
+         fail("Expecting message being rejected.");
+      } catch (Exception e) {
+         //got rejected. ok
+      }
+   }
+
+   byte[] createBytesData(int nbytes) {
+      byte[] result = new byte[nbytes];
+      for (int i = 0; i < nbytes; i++) {
+         result[i] = RandomUtil.randomByte();
+      }
+      return result;
+   }
+}
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
index 71780e96a1..53057a449d 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
@@ -56,13 +56,10 @@
 import org.apache.activemq.artemis.api.core.management.MessageCounterInfo;
 import org.apache.activemq.artemis.api.core.management.ObjectNameBuilder;
 import org.apache.activemq.artemis.api.core.management.QueueControl;
-import org.apache.activemq.artemis.core.config.Configuration;
 import org.apache.activemq.artemis.core.config.DivertConfiguration;
 import 
org.apache.activemq.artemis.core.messagecounter.impl.MessageCounterManagerImpl;
 import org.apache.activemq.artemis.core.paging.impl.PagingManagerImpl;
 import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
-import org.apache.activemq.artemis.core.server.ActiveMQServer;
-import org.apache.activemq.artemis.core.server.ActiveMQServers;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.impl.QueueImpl;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
@@ -79,7 +76,6 @@
 @RunWith(value = Parameterized.class)
 public class QueueControlTest extends ManagementTestBase {
 
-   private ActiveMQServer server;
    private ClientSession session;
    private ServerLocator locator;
    private final boolean durable;
@@ -2840,10 +2836,7 @@ public void testGetScheduledCountOnRemove() throws 
Exception {
    @Before
    public void setUp() throws Exception {
       super.setUp();
-      Configuration conf = 
createDefaultInVMConfig().setJMXManagementEnabled(true);
-      server = addServer(ActiveMQServers.newActiveMQServer(conf, mbeanServer, 
false));
-
-      server.start();
+      createManageableServer(false, true);
 
       locator = 
createInVMNonHALocator().setBlockOnNonDurableSend(true).setConsumerWindowSize(0);
       ClientSessionFactory sf = createSessionFactory(locator);
@@ -2898,4 +2891,5 @@ protected void assertMetrics(final QueueControl 
queueControl, long messageCount,
          Wait.assertEquals(0L, () -> durableSize.get().longValue(), 3 * 1000, 
100);
       }
    }
+
 }
diff --git 
a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java
 
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java
index 4bfb7e64b8..ce7421283e 100644
--- 
a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java
+++ 
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java
@@ -33,6 +33,7 @@
 import org.apache.activemq.artemis.core.postoffice.QueueBinding;
 import org.apache.activemq.artemis.core.postoffice.RoutingStatus;
 import org.apache.activemq.artemis.core.postoffice.impl.DuplicateIDCacheImpl;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.RoutingContext;
@@ -80,6 +81,11 @@ public void 
updateMessageLoadBalancingTypeForAddress(SimpleString  address, Mess
 
    }
 
+   @Override
+   public ActiveMQServer getServer() {
+      return null;
+   }
+
    @Override
    public SimpleString getMatchingQueue(SimpleString address, RoutingType 
routingType) {
 


With regards,
Apache Git Services

Reply via email to