This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/master by this push:
     new ae1ccf0  ARTEMIS-2228 Large Messages over Management
     new 1e65b29  This closes #2498
ae1ccf0 is described below

commit ae1ccf034abb3b9fe6f84032a68b9c2816c4a7c6
Author: Clebert Suconic <[email protected]>
AuthorDate: Mon Jan 14 16:11:29 2019 -0500

    ARTEMIS-2228 Large Messages over Management
---
 .../core/management/impl/AbstractControl.java      |  68 ++++-
 .../core/management/impl/AddressControlImpl.java   |  60 +----
 .../impl/ManagementRemotingConnection.java         | 277 +++++++++++++++++++++
 .../core/management/impl/QueueControlImpl.java     |  61 +----
 .../impl/journal/DummyOperationContext.java        |   2 +-
 .../impl/journal/LargeServerMessageImpl.java       |  26 ++
 .../core/server/impl/ServerSessionImpl.java        |  25 +-
 .../management/impl/ManagementServiceImpl.java     |   4 +-
 .../management/LargeMessageOverManagementTest.java | 148 +++++++++++
 9 files changed, 542 insertions(+), 129 deletions(-)

diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AbstractControl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AbstractControl.java
index 021a6ce..a6d531a 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AbstractControl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AbstractControl.java
@@ -21,8 +21,19 @@ import javax.management.MBeanInfo;
 import javax.management.MBeanOperationInfo;
 import javax.management.NotCompliantMBeanException;
 import javax.management.StandardMBean;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.Map;
 
+import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.message.impl.CoreMessage;
 import org.apache.activemq.artemis.core.persistence.StorageManager;
+import 
org.apache.activemq.artemis.core.persistence.impl.journal.DummyOperationContext;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.ServerSession;
+import org.apache.activemq.artemis.utils.Base64;
+import org.apache.activemq.artemis.utils.UUIDGenerator;
 
 public abstract class AbstractControl extends StandardMBean {
 
@@ -77,8 +88,61 @@ public abstract class AbstractControl extends StandardMBean {
       return new MBeanInfo(info.getClassName(), info.getDescription(), 
fillMBeanAttributeInfo(), info.getConstructors(), fillMBeanOperationInfo(), 
info.getNotifications());
    }
 
-   // Private -------------------------------------------------------
+   protected String sendMessage(SimpleString address,
+                                ActiveMQServer server,
+                                Map<String, String> headers,
+                                int type,
+                                String body,
+                                boolean durable,
+                                String user,
+                                String password,
+                                Long...queueID) throws Exception {
+      ManagementRemotingConnection fakeConnection = new 
ManagementRemotingConnection();
+      ServerSession serverSession = server.createSession("management::" + 
UUIDGenerator.getInstance().generateStringUUID(), user, password,
+                                                         Integer.MAX_VALUE, 
fakeConnection,
+                                                         true, true, false,
+                                                         false, 
address.toString(), fakeConnection.callback,
+                                                         false, new 
DummyOperationContext(), Collections.emptyMap());
+      try {
+         CoreMessage message = new CoreMessage(storageManager.generateID(), 
50);
+         if (headers != null) {
+            for (String header : headers.keySet()) {
+               message.putStringProperty(new SimpleString(header), new 
SimpleString(headers.get(header)));
+            }
+         }
+         message.setType((byte) type);
+         message.setDurable(durable);
+         message.setTimestamp(System.currentTimeMillis());
+         if (body != null) {
+            if (type == Message.TEXT_TYPE) {
+               message.getBodyBuffer().writeNullableSimpleString(new 
SimpleString(body));
+            } else {
+               message.getBodyBuffer().writeBytes(Base64.decode(body));
+            }
+         }
+
+         message.setAddress(address);
+
+         // if a queueID is used, we set the routeToIDs property
+         // to one or many specific queues
+         if (queueID != null && queueID.length > 0) {
+            ByteBuffer buffer = ByteBuffer.allocate(8 * queueID.length);
+            for (Long q : queueID) {
+               buffer.putLong(q);
+            }
+            message.putBytesProperty(Message.HDR_ROUTE_TO_IDS, buffer.array());
+         }
 
-   // Inner classes -------------------------------------------------
+         // There's no point on direct delivery using the management thread, 
use false here
+         serverSession.send(message, false);
+         return "" + message.getMessageID();
+      } finally {
+         try {
+            serverSession.close(false);
+         } catch (Exception ignored) {
+         }
+      }
+   }
+   // Inner classes------------------------------------------------
 
 }
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java
index b24c370..5ef91c3 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java
@@ -26,30 +26,23 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-import org.apache.activemq.artemis.api.core.ActiveMQException;
-import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.RoutingType;
-import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.management.AddressControl;
 import org.apache.activemq.artemis.api.core.management.QueueControl;
 import org.apache.activemq.artemis.api.core.management.ResourceNames;
-import org.apache.activemq.artemis.core.message.impl.CoreMessage;
 import org.apache.activemq.artemis.core.paging.PagingManager;
 import org.apache.activemq.artemis.core.paging.PagingStore;
 import org.apache.activemq.artemis.core.persistence.StorageManager;
 import org.apache.activemq.artemis.core.postoffice.Binding;
 import org.apache.activemq.artemis.core.postoffice.Bindings;
-import org.apache.activemq.artemis.core.postoffice.PostOffice;
 import org.apache.activemq.artemis.core.postoffice.QueueBinding;
 import org.apache.activemq.artemis.core.security.CheckType;
 import org.apache.activemq.artemis.core.security.Role;
-import org.apache.activemq.artemis.core.security.SecurityAuth;
 import org.apache.activemq.artemis.core.security.SecurityStore;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.impl.AddressInfo;
 import org.apache.activemq.artemis.core.server.management.ManagementService;
 import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
-import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
-import org.apache.activemq.artemis.utils.Base64;
 import org.apache.activemq.artemis.utils.JsonLoader;
 
 public class AddressControlImpl extends AbstractControl implements 
AddressControl {
@@ -60,7 +53,7 @@ public class AddressControlImpl extends AbstractControl 
implements AddressContro
 
    private AddressInfo addressInfo;
 
-   private final PostOffice postOffice;
+   private final ActiveMQServer server;
 
    private final PagingManager pagingManager;
 
@@ -75,15 +68,15 @@ public class AddressControlImpl extends AbstractControl 
implements AddressContro
    // Constructors --------------------------------------------------
 
    public AddressControlImpl(AddressInfo addressInfo,
-                             final PostOffice postOffice,
+                             final ActiveMQServer server,
                              final PagingManager pagingManager,
                              final StorageManager storageManager,
                              final HierarchicalRepository<Set<Role>> 
securityRepository,
                              final SecurityStore securityStore,
                              final ManagementService managementService)throws 
Exception {
       super(AddressControl.class, storageManager);
+      this.server = server;
       this.addressInfo = addressInfo;
-      this.postOffice = postOffice;
       this.pagingManager = pagingManager;
       this.securityRepository = securityRepository;
       this.securityStore = securityStore;
@@ -130,7 +123,7 @@ public class AddressControlImpl extends AbstractControl 
implements AddressContro
    public String[] getQueueNames() throws Exception {
       clearIO();
       try {
-         Bindings bindings = 
postOffice.getBindingsForAddress(addressInfo.getName());
+         Bindings bindings = 
server.getPostOffice().getBindingsForAddress(addressInfo.getName());
          List<String> queueNames = new ArrayList<>();
          for (Binding binding : bindings.getBindings()) {
             if (binding instanceof QueueBinding) {
@@ -149,7 +142,7 @@ public class AddressControlImpl extends AbstractControl 
implements AddressContro
    public String[] getBindingNames() throws Exception {
       clearIO();
       try {
-         Bindings bindings = 
postOffice.getBindingsForAddress(addressInfo.getName());
+         Bindings bindings = 
server.getPostOffice().getBindingsForAddress(addressInfo.getName());
          String[] bindingNames = new String[bindings.getBindings().size()];
          int i = 0;
          for (Binding binding : bindings.getBindings()) {
@@ -234,7 +227,7 @@ public class AddressControlImpl extends AbstractControl 
implements AddressContro
       clearIO();
       long totalMsgs = 0;
       try {
-         Bindings bindings = 
postOffice.getBindingsForAddress(addressInfo.getName());
+         Bindings bindings = 
server.getPostOffice().getBindingsForAddress(addressInfo.getName());
          for (Binding binding : bindings.getBindings()) {
             if (binding instanceof QueueBinding) {
                totalMsgs += ((QueueBinding) 
binding).getQueue().getMessageCount();
@@ -302,46 +295,13 @@ public class AddressControlImpl extends AbstractControl 
implements AddressContro
                              final String user,
                              final String password) throws Exception {
       try {
-         securityStore.check(addressInfo.getName(), CheckType.SEND, new 
SecurityAuth() {
-            @Override
-            public String getUsername() {
-               return user;
-            }
-
-            @Override
-            public String getPassword() {
-               return password;
-            }
-
-            @Override
-            public RemotingConnection getRemotingConnection() {
-               return null;
-            }
-         });
-         CoreMessage message = new CoreMessage(storageManager.generateID(), 
50);
-         if (headers != null) {
-            for (String header : headers.keySet()) {
-               message.putStringProperty(new SimpleString(header), new 
SimpleString(headers.get(header)));
-            }
-         }
-         message.setType((byte) type);
-         message.setDurable(durable);
-         message.setTimestamp(System.currentTimeMillis());
-         if (body != null) {
-            if (type == Message.TEXT_TYPE) {
-               message.getBodyBuffer().writeNullableSimpleString(new 
SimpleString(body));
-            } else {
-               message.getBodyBuffer().writeBytes(Base64.decode(body));
-            }
-         }
-         message.setAddress(addressInfo.getName());
-         postOffice.route(message, true);
-         return "" + message.getMessageID();
-      } catch (ActiveMQException e) {
+         return sendMessage(addressInfo.getName(), server, headers, type, 
body, durable, user, password);
+      } catch (Exception e) {
          throw new IllegalStateException(e.getMessage());
       }
    }
 
+
    @Override
    protected MBeanOperationInfo[] fillMBeanOperationInfo() {
       return MBeanInfoHelper.getMBeanOperationsInfo(AddressControl.class);
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ManagementRemotingConnection.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ManagementRemotingConnection.java
new file mode 100644
index 0000000..7e760c1
--- /dev/null
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ManagementRemotingConnection.java
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.management.impl;
+
+import javax.security.auth.Subject;
+import java.util.List;
+
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.remoting.CloseListener;
+import org.apache.activemq.artemis.core.remoting.FailureListener;
+import org.apache.activemq.artemis.core.server.MessageReference;
+import org.apache.activemq.artemis.core.server.ServerConsumer;
+import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
+import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
+import org.apache.activemq.artemis.spi.core.remoting.Connection;
+import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
+
+public class ManagementRemotingConnection implements RemotingConnection {
+
+   @Override
+   public Object getID() {
+      return null;
+   }
+
+   @Override
+   public long getCreationTime() {
+      return 0;
+   }
+
+   @Override
+   public String getRemoteAddress() {
+      return "Management";
+   }
+
+   @Override
+   public void scheduledFlush() {
+
+   }
+
+   @Override
+   public void addFailureListener(FailureListener listener) {
+
+   }
+
+   @Override
+   public boolean removeFailureListener(FailureListener listener) {
+      return false;
+   }
+
+   @Override
+   public void addCloseListener(CloseListener listener) {
+
+   }
+
+   @Override
+   public boolean removeCloseListener(CloseListener listener) {
+      return false;
+   }
+
+   @Override
+   public List<CloseListener> removeCloseListeners() {
+      return null;
+   }
+
+   @Override
+   public void setCloseListeners(List<CloseListener> listeners) {
+
+   }
+
+   @Override
+   public List<FailureListener> getFailureListeners() {
+      return null;
+   }
+
+   @Override
+   public List<FailureListener> removeFailureListeners() {
+      return null;
+   }
+
+   @Override
+   public void setFailureListeners(List<FailureListener> listeners) {
+
+   }
+
+   @Override
+   public ActiveMQBuffer createTransportBuffer(int size) {
+      return null;
+   }
+
+   @Override
+   public void fail(ActiveMQException me) {
+
+   }
+
+   @Override
+   public void fail(ActiveMQException me, String scaleDownTargetNodeID) {
+
+   }
+
+   @Override
+   public void destroy() {
+
+   }
+
+   @Override
+   public Connection getTransportConnection() {
+      return null;
+   }
+
+   @Override
+   public boolean isClient() {
+      return false;
+   }
+
+   @Override
+   public boolean isDestroyed() {
+      return false;
+   }
+
+   @Override
+   public void disconnect(boolean criticalError) {
+
+   }
+
+   @Override
+   public void disconnect(String scaleDownNodeID, boolean criticalError) {
+
+   }
+
+   @Override
+   public boolean checkDataReceived() {
+      return false;
+   }
+
+   @Override
+   public void flush() {
+
+   }
+
+   @Override
+   public boolean isWritable(ReadyListener callback) {
+      return false;
+   }
+
+   @Override
+   public void killMessage(SimpleString nodeID) {
+
+   }
+
+   @Override
+   public boolean isSupportReconnect() {
+      return false;
+   }
+
+   @Override
+   public boolean isSupportsFlowControl() {
+      return false;
+   }
+
+   @Override
+   public Subject getSubject() {
+      return null;
+   }
+
+   @Override
+   public String getProtocolName() {
+      return null;
+   }
+
+   @Override
+   public void setClientID(String cID) {
+
+   }
+
+   @Override
+   public String getClientID() {
+      return null;
+   }
+
+   @Override
+   public String getTransportLocalAddress() {
+      return "Manaement";
+   }
+
+   @Override
+   public void bufferReceived(Object connectionID, ActiveMQBuffer buffer) {
+
+   }
+
+   public SessionCallback callback = new SessionCallback() {
+      @Override
+      public boolean hasCredits(ServerConsumer consumerID) {
+         return false;
+      }
+
+      @Override
+      public void afterDelivery() throws Exception {
+
+      }
+
+      @Override
+      public boolean updateDeliveryCountAfterCancel(ServerConsumer consumer, 
MessageReference ref, boolean failed) {
+         return false;
+      }
+
+      @Override
+      public void sendProducerCreditsMessage(int credits, SimpleString 
address) {
+
+      }
+
+      @Override
+      public void sendProducerCreditsFailMessage(int credits, SimpleString 
address) {
+
+      }
+
+      @Override
+      public int sendMessage(MessageReference ref, Message message, 
ServerConsumer consumerID, int deliveryCount) {
+         return 0;
+      }
+
+      @Override
+      public int sendLargeMessage(MessageReference reference,
+                                  Message message,
+                                  ServerConsumer consumerID,
+                                  long bodySize,
+                                  int deliveryCount) {
+         return 0;
+      }
+
+      @Override
+      public int sendLargeMessageContinuation(ServerConsumer consumerID,
+                                              byte[] body,
+                                              boolean continues,
+                                              boolean requiresResponse) {
+         return 0;
+      }
+
+      @Override
+      public void closed() {
+
+      }
+
+      @Override
+      public void disconnect(ServerConsumer consumerId, SimpleString 
queueName) {
+
+      }
+
+      @Override
+      public boolean isWritable(ReadyListener callback, Object 
protocolContext) {
+         return false;
+      }
+
+      @Override
+      public void browserFinished(ServerConsumer consumer) {
+
+      }
+   };
+}
+
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
index d62978f..b0d5910 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
@@ -22,7 +22,6 @@ import javax.json.JsonObjectBuilder;
 import javax.management.MBeanAttributeInfo;
 import javax.management.MBeanOperationInfo;
 import javax.management.openmbean.CompositeData;
-import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Date;
@@ -39,24 +38,19 @@ import 
org.apache.activemq.artemis.api.core.management.QueueControl;
 import org.apache.activemq.artemis.core.filter.Filter;
 import org.apache.activemq.artemis.core.filter.impl.FilterImpl;
 import 
org.apache.activemq.artemis.core.management.impl.openmbean.OpenTypeSupport;
-import org.apache.activemq.artemis.core.message.impl.CoreMessage;
 import org.apache.activemq.artemis.core.messagecounter.MessageCounter;
 import 
org.apache.activemq.artemis.core.messagecounter.impl.MessageCounterHelper;
 import org.apache.activemq.artemis.core.persistence.StorageManager;
 import org.apache.activemq.artemis.core.postoffice.Binding;
-import org.apache.activemq.artemis.core.postoffice.PostOffice;
-import org.apache.activemq.artemis.core.security.CheckType;
-import org.apache.activemq.artemis.core.security.SecurityAuth;
 import org.apache.activemq.artemis.core.security.SecurityStore;
 import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.Consumer;
 import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.ServerConsumer;
 import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
-import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
-import org.apache.activemq.artemis.utils.Base64;
 import org.apache.activemq.artemis.utils.JsonLoader;
 import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
 
@@ -72,7 +66,7 @@ public class QueueControlImpl extends AbstractControl 
implements QueueControl {
 
    private final String address;
 
-   private final PostOffice postOffice;
+   private final ActiveMQServer server;
 
    private final StorageManager storageManager;
    private final SecurityStore securityStore;
@@ -111,14 +105,14 @@ public class QueueControlImpl extends AbstractControl 
implements QueueControl {
 
    public QueueControlImpl(final Queue queue,
                            final String address,
-                           final PostOffice postOffice,
+                           final ActiveMQServer server,
                            final StorageManager storageManager,
                            final SecurityStore securityStore,
                            final HierarchicalRepository<AddressSettings> 
addressSettingsRepository) throws Exception {
       super(QueueControl.class, storageManager);
       this.queue = queue;
       this.address = address;
-      this.postOffice = postOffice;
+      this.server = server;
       this.storageManager = storageManager;
       this.securityStore = securityStore;
       this.addressSettingsRepository = addressSettingsRepository;
@@ -896,7 +890,7 @@ public class QueueControlImpl extends AbstractControl 
implements QueueControl {
 
       clearIO();
       try {
-         Binding binding = postOffice.getBinding(new 
SimpleString(otherQueueName));
+         Binding binding = server.getPostOffice().getBinding(new 
SimpleString(otherQueueName));
 
          if (binding == null) {
             throw ActiveMQMessageBundle.BUNDLE.noQueueFound(otherQueueName);
@@ -925,7 +919,7 @@ public class QueueControlImpl extends AbstractControl 
implements QueueControl {
       try {
          Filter filter = FilterImpl.createFilter(filterStr);
 
-         Binding binding = postOffice.getBinding(new 
SimpleString(otherQueueName));
+         Binding binding = server.getPostOffice().getBinding(new 
SimpleString(otherQueueName));
 
          if (binding == null) {
             throw ActiveMQMessageBundle.BUNDLE.noQueueFound(otherQueueName);
@@ -969,45 +963,8 @@ public class QueueControlImpl extends AbstractControl 
implements QueueControl {
                              final String user,
                              final String password) throws Exception {
       try {
-         securityStore.check(queue.getAddress(), queue.getName(), 
CheckType.SEND, new SecurityAuth() {
-            @Override
-            public String getUsername() {
-               return user;
-            }
-
-            @Override
-            public String getPassword() {
-               return password;
-            }
-
-            @Override
-            public RemotingConnection getRemotingConnection() {
-               return null;
-            }
-         });
-         CoreMessage message = new CoreMessage(storageManager.generateID(), 
50);
-         if (headers != null) {
-            for (String header : headers.keySet()) {
-               message.putStringProperty(new SimpleString(header), new 
SimpleString(headers.get(header)));
-            }
-         }
-         message.setType((byte) type);
-         message.setDurable(durable);
-         message.setTimestamp(System.currentTimeMillis());
-         if (body != null) {
-            if (type == Message.TEXT_TYPE) {
-               message.getBodyBuffer().writeNullableSimpleString(new 
SimpleString(body));
-            } else {
-               message.getBodyBuffer().writeBytes(Base64.decode(body));
-            }
-         }
-         message.setAddress(queue.getAddress());
-         ByteBuffer buffer = ByteBuffer.allocate(8);
-         buffer.putLong(queue.getID());
-         message.putBytesProperty(Message.HDR_ROUTE_TO_IDS, buffer.array());
-         postOffice.route(message, true);
-         return "" + message.getMessageID();
-      } catch (ActiveMQException e) {
+         return sendMessage(queue.getAddress(), server, headers, type, body, 
durable, user, password, queue.getID());
+      } catch (Exception e) {
          throw new IllegalStateException(e.getMessage());
       }
    }
@@ -1419,7 +1376,7 @@ public class QueueControlImpl extends AbstractControl 
implements QueueControl {
    // Private -------------------------------------------------------
 
    private void checkStarted() {
-      if (!postOffice.isStarted()) {
+      if (!server.getPostOffice().isStarted()) {
          throw new IllegalStateException("Broker is not started. Queue can not 
be managed yet");
       }
    }
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DummyOperationContext.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DummyOperationContext.java
index 6fd95ff..f4883c8 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DummyOperationContext.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DummyOperationContext.java
@@ -19,7 +19,7 @@ package 
org.apache.activemq.artemis.core.persistence.impl.journal;
 import org.apache.activemq.artemis.core.io.IOCallback;
 import org.apache.activemq.artemis.core.persistence.OperationContext;
 
-final class DummyOperationContext implements OperationContext {
+public final class DummyOperationContext implements OperationContext {
 
    private static DummyOperationContext instance = new DummyOperationContext();
 
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java
index 110070c..2824ff7 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java
@@ -24,11 +24,13 @@ import 
org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
 import org.apache.activemq.artemis.api.core.ActiveMQIOErrorException;
 import org.apache.activemq.artemis.api.core.ActiveMQInternalErrorException;
+import org.apache.activemq.artemis.api.core.ICoreMessage;
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
 import org.apache.activemq.artemis.core.io.SequentialFile;
 import org.apache.activemq.artemis.core.message.LargeBodyEncoder;
 import org.apache.activemq.artemis.core.message.impl.CoreMessage;
+import org.apache.activemq.artemis.core.persistence.StorageManager;
 import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
 import org.apache.activemq.artemis.core.server.LargeServerMessage;
 import org.apache.activemq.artemis.utils.DataConstants;
@@ -39,6 +41,30 @@ import io.netty.buffer.Unpooled;
 
 public final class LargeServerMessageImpl extends CoreMessage implements 
LargeServerMessage {
 
+   /** This will check if a regular message needs to be converted as large 
message */
+   public static Message checkLargeMessage(Message message, StorageManager 
storageManager) throws Exception {
+      if (message.isLargeMessage()) {
+         return message; // nothing to be done on this case
+      }
+
+      if (message.getEncodeSize() > storageManager.getMaxRecordSize()) {
+         return asLargeMessage(message, storageManager);
+      } else {
+         return message;
+      }
+   }
+
+   private static Message asLargeMessage(Message message, StorageManager 
storageManager) throws Exception {
+      ICoreMessage coreMessage = message.toCore();
+      LargeServerMessage lsm = 
storageManager.createLargeMessage(storageManager.generateID(), coreMessage);
+      ActiveMQBuffer buffer = coreMessage.getReadOnlyBodyBuffer();
+      final int readableBytes = buffer.readableBytes();
+      lsm.addBytes(buffer);
+      lsm.releaseResources();
+      lsm.putLongProperty(Message.HDR_LARGE_BODY_SIZE, readableBytes);
+      return lsm;
+   }
+
    // Constants -----------------------------------------------------
    private static final Logger logger = 
Logger.getLogger(LargeServerMessageImpl.class);
 
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
index 11b096b..8f905a2 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
@@ -35,12 +35,10 @@ import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.activemq.artemis.Closeable;
-import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.ActiveMQIOErrorException;
 import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException;
 import org.apache.activemq.artemis.api.core.ActiveMQNonExistentQueueException;
-import org.apache.activemq.artemis.api.core.ICoreMessage;
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.Pair;
 import org.apache.activemq.artemis.api.core.RoutingType;
@@ -55,6 +53,7 @@ import org.apache.activemq.artemis.core.paging.PagingManager;
 import org.apache.activemq.artemis.core.paging.PagingStore;
 import org.apache.activemq.artemis.core.persistence.OperationContext;
 import org.apache.activemq.artemis.core.persistence.StorageManager;
+import 
org.apache.activemq.artemis.core.persistence.impl.journal.LargeServerMessageImpl;
 import org.apache.activemq.artemis.core.postoffice.Binding;
 import org.apache.activemq.artemis.core.postoffice.BindingType;
 import org.apache.activemq.artemis.core.postoffice.PostOffice;
@@ -71,7 +70,6 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
 import org.apache.activemq.artemis.core.server.AddressQueryResult;
 import org.apache.activemq.artemis.core.server.BindingQueryResult;
-import org.apache.activemq.artemis.core.server.LargeServerMessage;
 import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.QueueQueryResult;
@@ -1465,18 +1463,6 @@ public class ServerSessionImpl implements ServerSession, 
FailureListener {
       return send(getCurrentTransaction(), message, direct, noAutoCreateQueue);
    }
 
-
-   private LargeServerMessage messageToLargeMessage(Message message) throws 
Exception {
-      ICoreMessage coreMessage = message.toCore();
-      LargeServerMessage lsm = 
getStorageManager().createLargeMessage(storageManager.generateID(), 
coreMessage);
-      ActiveMQBuffer buffer = coreMessage.getReadOnlyBodyBuffer();
-      final int readableBytes = buffer.readableBytes();
-      lsm.addBytes(buffer);
-      lsm.releaseResources();
-      lsm.putLongProperty(Message.HDR_LARGE_BODY_SIZE, readableBytes);
-      return lsm;
-   }
-
    @Override
    public synchronized RoutingStatus send(Transaction tx,
                                           Message msg,
@@ -1487,17 +1473,12 @@ public class ServerSessionImpl implements 
ServerSession, FailureListener {
 
    @Override
    public synchronized RoutingStatus send(Transaction tx,
-                                          Message msg,
+                                          Message messageParameter,
                                           final boolean direct,
                                           boolean noAutoCreateQueue,
                                           RoutingContext routingContext) 
throws Exception {
 
-      final Message message;
-      if ((msg.getEncodeSize() > storageManager.getMaxRecordSize()) && 
!msg.isLargeMessage()) {
-         message = messageToLargeMessage(msg);
-      } else {
-         message = msg;
-      }
+      final Message message = 
LargeServerMessageImpl.checkLargeMessage(messageParameter, storageManager);
 
       if (server.hasBrokerMessagePlugins()) {
          server.callBrokerMessagePlugins(plugin -> plugin.beforeSend(this, tx, 
message, direct, noAutoCreateQueue));
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java
index ecb8d55..25354da 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java
@@ -216,7 +216,7 @@ public class ManagementServiceImpl implements 
ManagementService {
    @Override
    public void registerAddress(AddressInfo addressInfo) throws Exception {
       ObjectName objectName = 
objectNameBuilder.getAddressObjectName(addressInfo.getName());
-      AddressControlImpl addressControl = new AddressControlImpl(addressInfo, 
postOffice, pagingManager, storageManager, securityRepository, securityStore, 
this);
+      AddressControlImpl addressControl = new AddressControlImpl(addressInfo, 
messagingServer, pagingManager, storageManager, securityRepository, 
securityStore, this);
 
       registerInJMX(objectName, addressControl);
 
@@ -246,7 +246,7 @@ public class ManagementServiceImpl implements 
ManagementService {
          return;
       }
 
-      QueueControlImpl queueControl = new QueueControlImpl(queue, 
addressInfo.getName().toString(), postOffice, storageManager, securityStore, 
addressSettingsRepository);
+      QueueControlImpl queueControl = new QueueControlImpl(queue, 
addressInfo.getName().toString(), messagingServer, storageManager, 
securityStore, addressSettingsRepository);
       if (messageCounterManager != null) {
          MessageCounter counter = new 
MessageCounter(queue.getName().toString(), null, queue, false, 
queue.isDurableMessage(), messageCounterManager.getMaxDayCount());
          queueControl.setMessageCounter(counter);
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/LargeMessageOverManagementTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/LargeMessageOverManagementTest.java
new file mode 100644
index 0000000..45f14da
--- /dev/null
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/LargeMessageOverManagementTest.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.management;
+
+import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.TransportConfiguration;
+import org.apache.activemq.artemis.api.core.client.ClientConsumer;
+import org.apache.activemq.artemis.api.core.client.ClientMessage;
+import org.apache.activemq.artemis.api.core.client.ClientSession;
+import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
+import org.apache.activemq.artemis.api.core.client.ServerLocator;
+import org.apache.activemq.artemis.api.core.management.AddressControl;
+import org.apache.activemq.artemis.api.core.management.QueueControl;
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.utils.Base64;
+import org.apache.activemq.artemis.utils.RandomUtil;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.HashMap;
+
+public class LargeMessageOverManagementTest extends ManagementTestBase {
+
+   private ClientSession session;
+   private ServerLocator locator;
+   private ClientSessionFactory sf;
+   private ActiveMQServer server;
+
+   protected AddressControl createManagementControl(final SimpleString 
address) throws Exception {
+      return ManagementControlHelper.createAddressControl(address, 
mbeanServer);
+   }
+
+   @Override
+   @Before
+   public void setUp() throws Exception {
+      super.setUp();
+
+      Configuration config = createBasicConfig();
+
+
+      TransportConfiguration acceptorConfig = 
createTransportConfiguration(false, true, generateParams(0, false));
+      config.addAcceptorConfiguration(acceptorConfig);
+      server = createServer(true, config);
+      server.setMBeanServer(mbeanServer);
+      server.start();
+
+      locator = createInVMNonHALocator().setBlockOnNonDurableSend(true);
+      sf = createSessionFactory(locator);
+      session = sf.createSession(false, true, false);
+      session.start();
+      addClientSession(session);
+   }
+
+   @Test
+   public void testSendOverSizeMessageOverQueueControl() throws Exception {
+      SimpleString address = RandomUtil.randomSimpleString();
+      SimpleString queue = RandomUtil.randomSimpleString();
+      SimpleString emptyqueue = RandomUtil.randomSimpleString();
+
+      session.createQueue(address, RoutingType.MULTICAST, queue, null, true);
+      session.createQueue(address, RoutingType.MULTICAST, emptyqueue, null, 
true);
+
+      QueueControl queueControl = createManagementControl(address, queue);
+
+      int bodySize = (int) server.getStorageManager().getMaxRecordSize() + 100;
+      byte[] bigData = createBytesData(bodySize);
+
+      queueControl.sendMessage(new HashMap<String, String>(), 
Message.BYTES_TYPE, Base64.encodeBytes(bigData), true, "myUser", "myPassword");
+
+
+      ClientConsumer consumer = session.createConsumer(queue);
+      ClientMessage message = consumer.receive(1000);
+      Assert.assertNotNull(message);
+      Assert.assertEquals(bigData.length, message.getBodySize());
+      Assert.assertTrue(message.isLargeMessage());
+
+
+
+      byte[] bytesRead = new byte[bigData.length];
+      message.getBodyBuffer().readBytes(bytesRead);
+
+      for (int i = 0; i < bytesRead.length; i++) {
+         Assert.assertEquals(bytesRead[i], bigData[i]);
+      }
+
+
+      consumer.close();
+
+      // this is an extra check,
+      consumer = session.createConsumer(emptyqueue);
+      Assert.assertNull(consumer.receiveImmediate());
+
+   }
+
+   @Test
+   public void testSendOverSizeMessageOverAddressControl() throws Exception {
+
+      SimpleString address = RandomUtil.randomSimpleString();
+      session.createAddress(address, RoutingType.ANYCAST, false);
+
+      AddressControl addressControl = createManagementControl(address);
+      session.createQueue(address, RoutingType.ANYCAST, address);
+
+      int bodySize = server.getConfiguration().getJournalBufferSize_AIO();
+      byte[] bigData = createBytesData(bodySize);
+      addressControl.sendMessage(null, Message.BYTES_TYPE, 
Base64.encodeBytes(bigData), false, null, null);
+
+      ClientConsumer consumer = session.createConsumer(address);
+      ClientMessage message = consumer.receive(1000);
+      Assert.assertNotNull(message);
+      Assert.assertEquals(bigData.length, message.getBodySize());
+      Assert.assertTrue(message.isLargeMessage());
+
+      byte[] bytesRead = new byte[bigData.length];
+      message.getBodyBuffer().readBytes(bytesRead);
+
+      for (int i = 0; i < bytesRead.length; i++) {
+         Assert.assertEquals(bytesRead[i], bigData[i]);
+      }
+
+   }
+
+   byte[] createBytesData(int nbytes) {
+      byte[] result = new byte[nbytes];
+      for (int i = 0; i < nbytes; i++) {
+         result[i] = RandomUtil.randomByte();
+      }
+      return result;
+   }
+}

Reply via email to