This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/master by this push:
new ae1ccf0 ARTEMIS-2228 Large Messages over Management
new 1e65b29 This closes #2498
ae1ccf0 is described below
commit ae1ccf034abb3b9fe6f84032a68b9c2816c4a7c6
Author: Clebert Suconic <[email protected]>
AuthorDate: Mon Jan 14 16:11:29 2019 -0500
ARTEMIS-2228 Large Messages over Management
---
.../core/management/impl/AbstractControl.java | 68 ++++-
.../core/management/impl/AddressControlImpl.java | 60 +----
.../impl/ManagementRemotingConnection.java | 277 +++++++++++++++++++++
.../core/management/impl/QueueControlImpl.java | 61 +----
.../impl/journal/DummyOperationContext.java | 2 +-
.../impl/journal/LargeServerMessageImpl.java | 26 ++
.../core/server/impl/ServerSessionImpl.java | 25 +-
.../management/impl/ManagementServiceImpl.java | 4 +-
.../management/LargeMessageOverManagementTest.java | 148 +++++++++++
9 files changed, 542 insertions(+), 129 deletions(-)
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AbstractControl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AbstractControl.java
index 021a6ce..a6d531a 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AbstractControl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AbstractControl.java
@@ -21,8 +21,19 @@ import javax.management.MBeanInfo;
import javax.management.MBeanOperationInfo;
import javax.management.NotCompliantMBeanException;
import javax.management.StandardMBean;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.Map;
+import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.message.impl.CoreMessage;
import org.apache.activemq.artemis.core.persistence.StorageManager;
+import
org.apache.activemq.artemis.core.persistence.impl.journal.DummyOperationContext;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.ServerSession;
+import org.apache.activemq.artemis.utils.Base64;
+import org.apache.activemq.artemis.utils.UUIDGenerator;
public abstract class AbstractControl extends StandardMBean {
@@ -77,8 +88,61 @@ public abstract class AbstractControl extends StandardMBean {
return new MBeanInfo(info.getClassName(), info.getDescription(),
fillMBeanAttributeInfo(), info.getConstructors(), fillMBeanOperationInfo(),
info.getNotifications());
}
- // Private -------------------------------------------------------
+ protected String sendMessage(SimpleString address,
+ ActiveMQServer server,
+ Map<String, String> headers,
+ int type,
+ String body,
+ boolean durable,
+ String user,
+ String password,
+ Long...queueID) throws Exception {
+ ManagementRemotingConnection fakeConnection = new
ManagementRemotingConnection();
+ ServerSession serverSession = server.createSession("management::" +
UUIDGenerator.getInstance().generateStringUUID(), user, password,
+ Integer.MAX_VALUE,
fakeConnection,
+ true, true, false,
+ false,
address.toString(), fakeConnection.callback,
+ false, new
DummyOperationContext(), Collections.emptyMap());
+ try {
+ CoreMessage message = new CoreMessage(storageManager.generateID(),
50);
+ if (headers != null) {
+ for (String header : headers.keySet()) {
+ message.putStringProperty(new SimpleString(header), new
SimpleString(headers.get(header)));
+ }
+ }
+ message.setType((byte) type);
+ message.setDurable(durable);
+ message.setTimestamp(System.currentTimeMillis());
+ if (body != null) {
+ if (type == Message.TEXT_TYPE) {
+ message.getBodyBuffer().writeNullableSimpleString(new
SimpleString(body));
+ } else {
+ message.getBodyBuffer().writeBytes(Base64.decode(body));
+ }
+ }
+
+ message.setAddress(address);
+
+ // if a queueID is used, we set the routeToIDs property
+ // to one or many specific queues
+ if (queueID != null && queueID.length > 0) {
+ ByteBuffer buffer = ByteBuffer.allocate(8 * queueID.length);
+ for (Long q : queueID) {
+ buffer.putLong(q);
+ }
+ message.putBytesProperty(Message.HDR_ROUTE_TO_IDS, buffer.array());
+ }
- // Inner classes -------------------------------------------------
+ // There's no point on direct delivery using the management thread,
use false here
+ serverSession.send(message, false);
+ return "" + message.getMessageID();
+ } finally {
+ try {
+ serverSession.close(false);
+ } catch (Exception ignored) {
+ }
+ }
+ }
+ // Inner classes------------------------------------------------
}
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java
index b24c370..5ef91c3 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java
@@ -26,30 +26,23 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
-import org.apache.activemq.artemis.api.core.ActiveMQException;
-import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.RoutingType;
-import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.management.AddressControl;
import org.apache.activemq.artemis.api.core.management.QueueControl;
import org.apache.activemq.artemis.api.core.management.ResourceNames;
-import org.apache.activemq.artemis.core.message.impl.CoreMessage;
import org.apache.activemq.artemis.core.paging.PagingManager;
import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.Bindings;
-import org.apache.activemq.artemis.core.postoffice.PostOffice;
import org.apache.activemq.artemis.core.postoffice.QueueBinding;
import org.apache.activemq.artemis.core.security.CheckType;
import org.apache.activemq.artemis.core.security.Role;
-import org.apache.activemq.artemis.core.security.SecurityAuth;
import org.apache.activemq.artemis.core.security.SecurityStore;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.management.ManagementService;
import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
-import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
-import org.apache.activemq.artemis.utils.Base64;
import org.apache.activemq.artemis.utils.JsonLoader;
public class AddressControlImpl extends AbstractControl implements
AddressControl {
@@ -60,7 +53,7 @@ public class AddressControlImpl extends AbstractControl
implements AddressContro
private AddressInfo addressInfo;
- private final PostOffice postOffice;
+ private final ActiveMQServer server;
private final PagingManager pagingManager;
@@ -75,15 +68,15 @@ public class AddressControlImpl extends AbstractControl
implements AddressContro
// Constructors --------------------------------------------------
public AddressControlImpl(AddressInfo addressInfo,
- final PostOffice postOffice,
+ final ActiveMQServer server,
final PagingManager pagingManager,
final StorageManager storageManager,
final HierarchicalRepository<Set<Role>>
securityRepository,
final SecurityStore securityStore,
final ManagementService managementService)throws
Exception {
super(AddressControl.class, storageManager);
+ this.server = server;
this.addressInfo = addressInfo;
- this.postOffice = postOffice;
this.pagingManager = pagingManager;
this.securityRepository = securityRepository;
this.securityStore = securityStore;
@@ -130,7 +123,7 @@ public class AddressControlImpl extends AbstractControl
implements AddressContro
public String[] getQueueNames() throws Exception {
clearIO();
try {
- Bindings bindings =
postOffice.getBindingsForAddress(addressInfo.getName());
+ Bindings bindings =
server.getPostOffice().getBindingsForAddress(addressInfo.getName());
List<String> queueNames = new ArrayList<>();
for (Binding binding : bindings.getBindings()) {
if (binding instanceof QueueBinding) {
@@ -149,7 +142,7 @@ public class AddressControlImpl extends AbstractControl
implements AddressContro
public String[] getBindingNames() throws Exception {
clearIO();
try {
- Bindings bindings =
postOffice.getBindingsForAddress(addressInfo.getName());
+ Bindings bindings =
server.getPostOffice().getBindingsForAddress(addressInfo.getName());
String[] bindingNames = new String[bindings.getBindings().size()];
int i = 0;
for (Binding binding : bindings.getBindings()) {
@@ -234,7 +227,7 @@ public class AddressControlImpl extends AbstractControl
implements AddressContro
clearIO();
long totalMsgs = 0;
try {
- Bindings bindings =
postOffice.getBindingsForAddress(addressInfo.getName());
+ Bindings bindings =
server.getPostOffice().getBindingsForAddress(addressInfo.getName());
for (Binding binding : bindings.getBindings()) {
if (binding instanceof QueueBinding) {
totalMsgs += ((QueueBinding)
binding).getQueue().getMessageCount();
@@ -302,46 +295,13 @@ public class AddressControlImpl extends AbstractControl
implements AddressContro
final String user,
final String password) throws Exception {
try {
- securityStore.check(addressInfo.getName(), CheckType.SEND, new
SecurityAuth() {
- @Override
- public String getUsername() {
- return user;
- }
-
- @Override
- public String getPassword() {
- return password;
- }
-
- @Override
- public RemotingConnection getRemotingConnection() {
- return null;
- }
- });
- CoreMessage message = new CoreMessage(storageManager.generateID(),
50);
- if (headers != null) {
- for (String header : headers.keySet()) {
- message.putStringProperty(new SimpleString(header), new
SimpleString(headers.get(header)));
- }
- }
- message.setType((byte) type);
- message.setDurable(durable);
- message.setTimestamp(System.currentTimeMillis());
- if (body != null) {
- if (type == Message.TEXT_TYPE) {
- message.getBodyBuffer().writeNullableSimpleString(new
SimpleString(body));
- } else {
- message.getBodyBuffer().writeBytes(Base64.decode(body));
- }
- }
- message.setAddress(addressInfo.getName());
- postOffice.route(message, true);
- return "" + message.getMessageID();
- } catch (ActiveMQException e) {
+ return sendMessage(addressInfo.getName(), server, headers, type,
body, durable, user, password);
+ } catch (Exception e) {
throw new IllegalStateException(e.getMessage());
}
}
+
@Override
protected MBeanOperationInfo[] fillMBeanOperationInfo() {
return MBeanInfoHelper.getMBeanOperationsInfo(AddressControl.class);
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ManagementRemotingConnection.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ManagementRemotingConnection.java
new file mode 100644
index 0000000..7e760c1
--- /dev/null
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ManagementRemotingConnection.java
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.management.impl;
+
+import javax.security.auth.Subject;
+import java.util.List;
+
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.remoting.CloseListener;
+import org.apache.activemq.artemis.core.remoting.FailureListener;
+import org.apache.activemq.artemis.core.server.MessageReference;
+import org.apache.activemq.artemis.core.server.ServerConsumer;
+import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
+import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
+import org.apache.activemq.artemis.spi.core.remoting.Connection;
+import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
+
+public class ManagementRemotingConnection implements RemotingConnection {
+
+ @Override
+ public Object getID() {
+ return null;
+ }
+
+ @Override
+ public long getCreationTime() {
+ return 0;
+ }
+
+ @Override
+ public String getRemoteAddress() {
+ return "Management";
+ }
+
+ @Override
+ public void scheduledFlush() {
+
+ }
+
+ @Override
+ public void addFailureListener(FailureListener listener) {
+
+ }
+
+ @Override
+ public boolean removeFailureListener(FailureListener listener) {
+ return false;
+ }
+
+ @Override
+ public void addCloseListener(CloseListener listener) {
+
+ }
+
+ @Override
+ public boolean removeCloseListener(CloseListener listener) {
+ return false;
+ }
+
+ @Override
+ public List<CloseListener> removeCloseListeners() {
+ return null;
+ }
+
+ @Override
+ public void setCloseListeners(List<CloseListener> listeners) {
+
+ }
+
+ @Override
+ public List<FailureListener> getFailureListeners() {
+ return null;
+ }
+
+ @Override
+ public List<FailureListener> removeFailureListeners() {
+ return null;
+ }
+
+ @Override
+ public void setFailureListeners(List<FailureListener> listeners) {
+
+ }
+
+ @Override
+ public ActiveMQBuffer createTransportBuffer(int size) {
+ return null;
+ }
+
+ @Override
+ public void fail(ActiveMQException me) {
+
+ }
+
+ @Override
+ public void fail(ActiveMQException me, String scaleDownTargetNodeID) {
+
+ }
+
+ @Override
+ public void destroy() {
+
+ }
+
+ @Override
+ public Connection getTransportConnection() {
+ return null;
+ }
+
+ @Override
+ public boolean isClient() {
+ return false;
+ }
+
+ @Override
+ public boolean isDestroyed() {
+ return false;
+ }
+
+ @Override
+ public void disconnect(boolean criticalError) {
+
+ }
+
+ @Override
+ public void disconnect(String scaleDownNodeID, boolean criticalError) {
+
+ }
+
+ @Override
+ public boolean checkDataReceived() {
+ return false;
+ }
+
+ @Override
+ public void flush() {
+
+ }
+
+ @Override
+ public boolean isWritable(ReadyListener callback) {
+ return false;
+ }
+
+ @Override
+ public void killMessage(SimpleString nodeID) {
+
+ }
+
+ @Override
+ public boolean isSupportReconnect() {
+ return false;
+ }
+
+ @Override
+ public boolean isSupportsFlowControl() {
+ return false;
+ }
+
+ @Override
+ public Subject getSubject() {
+ return null;
+ }
+
+ @Override
+ public String getProtocolName() {
+ return null;
+ }
+
+ @Override
+ public void setClientID(String cID) {
+
+ }
+
+ @Override
+ public String getClientID() {
+ return null;
+ }
+
+ @Override
+ public String getTransportLocalAddress() {
+ return "Manaement";
+ }
+
+ @Override
+ public void bufferReceived(Object connectionID, ActiveMQBuffer buffer) {
+
+ }
+
+ public SessionCallback callback = new SessionCallback() {
+ @Override
+ public boolean hasCredits(ServerConsumer consumerID) {
+ return false;
+ }
+
+ @Override
+ public void afterDelivery() throws Exception {
+
+ }
+
+ @Override
+ public boolean updateDeliveryCountAfterCancel(ServerConsumer consumer,
MessageReference ref, boolean failed) {
+ return false;
+ }
+
+ @Override
+ public void sendProducerCreditsMessage(int credits, SimpleString
address) {
+
+ }
+
+ @Override
+ public void sendProducerCreditsFailMessage(int credits, SimpleString
address) {
+
+ }
+
+ @Override
+ public int sendMessage(MessageReference ref, Message message,
ServerConsumer consumerID, int deliveryCount) {
+ return 0;
+ }
+
+ @Override
+ public int sendLargeMessage(MessageReference reference,
+ Message message,
+ ServerConsumer consumerID,
+ long bodySize,
+ int deliveryCount) {
+ return 0;
+ }
+
+ @Override
+ public int sendLargeMessageContinuation(ServerConsumer consumerID,
+ byte[] body,
+ boolean continues,
+ boolean requiresResponse) {
+ return 0;
+ }
+
+ @Override
+ public void closed() {
+
+ }
+
+ @Override
+ public void disconnect(ServerConsumer consumerId, SimpleString
queueName) {
+
+ }
+
+ @Override
+ public boolean isWritable(ReadyListener callback, Object
protocolContext) {
+ return false;
+ }
+
+ @Override
+ public void browserFinished(ServerConsumer consumer) {
+
+ }
+ };
+}
+
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
index d62978f..b0d5910 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
@@ -22,7 +22,6 @@ import javax.json.JsonObjectBuilder;
import javax.management.MBeanAttributeInfo;
import javax.management.MBeanOperationInfo;
import javax.management.openmbean.CompositeData;
-import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
@@ -39,24 +38,19 @@ import
org.apache.activemq.artemis.api.core.management.QueueControl;
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.filter.impl.FilterImpl;
import
org.apache.activemq.artemis.core.management.impl.openmbean.OpenTypeSupport;
-import org.apache.activemq.artemis.core.message.impl.CoreMessage;
import org.apache.activemq.artemis.core.messagecounter.MessageCounter;
import
org.apache.activemq.artemis.core.messagecounter.impl.MessageCounterHelper;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.postoffice.Binding;
-import org.apache.activemq.artemis.core.postoffice.PostOffice;
-import org.apache.activemq.artemis.core.security.CheckType;
-import org.apache.activemq.artemis.core.security.SecurityAuth;
import org.apache.activemq.artemis.core.security.SecurityStore;
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.Consumer;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
-import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
-import org.apache.activemq.artemis.utils.Base64;
import org.apache.activemq.artemis.utils.JsonLoader;
import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
@@ -72,7 +66,7 @@ public class QueueControlImpl extends AbstractControl
implements QueueControl {
private final String address;
- private final PostOffice postOffice;
+ private final ActiveMQServer server;
private final StorageManager storageManager;
private final SecurityStore securityStore;
@@ -111,14 +105,14 @@ public class QueueControlImpl extends AbstractControl
implements QueueControl {
public QueueControlImpl(final Queue queue,
final String address,
- final PostOffice postOffice,
+ final ActiveMQServer server,
final StorageManager storageManager,
final SecurityStore securityStore,
final HierarchicalRepository<AddressSettings>
addressSettingsRepository) throws Exception {
super(QueueControl.class, storageManager);
this.queue = queue;
this.address = address;
- this.postOffice = postOffice;
+ this.server = server;
this.storageManager = storageManager;
this.securityStore = securityStore;
this.addressSettingsRepository = addressSettingsRepository;
@@ -896,7 +890,7 @@ public class QueueControlImpl extends AbstractControl
implements QueueControl {
clearIO();
try {
- Binding binding = postOffice.getBinding(new
SimpleString(otherQueueName));
+ Binding binding = server.getPostOffice().getBinding(new
SimpleString(otherQueueName));
if (binding == null) {
throw ActiveMQMessageBundle.BUNDLE.noQueueFound(otherQueueName);
@@ -925,7 +919,7 @@ public class QueueControlImpl extends AbstractControl
implements QueueControl {
try {
Filter filter = FilterImpl.createFilter(filterStr);
- Binding binding = postOffice.getBinding(new
SimpleString(otherQueueName));
+ Binding binding = server.getPostOffice().getBinding(new
SimpleString(otherQueueName));
if (binding == null) {
throw ActiveMQMessageBundle.BUNDLE.noQueueFound(otherQueueName);
@@ -969,45 +963,8 @@ public class QueueControlImpl extends AbstractControl
implements QueueControl {
final String user,
final String password) throws Exception {
try {
- securityStore.check(queue.getAddress(), queue.getName(),
CheckType.SEND, new SecurityAuth() {
- @Override
- public String getUsername() {
- return user;
- }
-
- @Override
- public String getPassword() {
- return password;
- }
-
- @Override
- public RemotingConnection getRemotingConnection() {
- return null;
- }
- });
- CoreMessage message = new CoreMessage(storageManager.generateID(),
50);
- if (headers != null) {
- for (String header : headers.keySet()) {
- message.putStringProperty(new SimpleString(header), new
SimpleString(headers.get(header)));
- }
- }
- message.setType((byte) type);
- message.setDurable(durable);
- message.setTimestamp(System.currentTimeMillis());
- if (body != null) {
- if (type == Message.TEXT_TYPE) {
- message.getBodyBuffer().writeNullableSimpleString(new
SimpleString(body));
- } else {
- message.getBodyBuffer().writeBytes(Base64.decode(body));
- }
- }
- message.setAddress(queue.getAddress());
- ByteBuffer buffer = ByteBuffer.allocate(8);
- buffer.putLong(queue.getID());
- message.putBytesProperty(Message.HDR_ROUTE_TO_IDS, buffer.array());
- postOffice.route(message, true);
- return "" + message.getMessageID();
- } catch (ActiveMQException e) {
+ return sendMessage(queue.getAddress(), server, headers, type, body,
durable, user, password, queue.getID());
+ } catch (Exception e) {
throw new IllegalStateException(e.getMessage());
}
}
@@ -1419,7 +1376,7 @@ public class QueueControlImpl extends AbstractControl
implements QueueControl {
// Private -------------------------------------------------------
private void checkStarted() {
- if (!postOffice.isStarted()) {
+ if (!server.getPostOffice().isStarted()) {
throw new IllegalStateException("Broker is not started. Queue can not
be managed yet");
}
}
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DummyOperationContext.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DummyOperationContext.java
index 6fd95ff..f4883c8 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DummyOperationContext.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DummyOperationContext.java
@@ -19,7 +19,7 @@ package
org.apache.activemq.artemis.core.persistence.impl.journal;
import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.persistence.OperationContext;
-final class DummyOperationContext implements OperationContext {
+public final class DummyOperationContext implements OperationContext {
private static DummyOperationContext instance = new DummyOperationContext();
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java
index 110070c..2824ff7 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java
@@ -24,11 +24,13 @@ import
org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
import org.apache.activemq.artemis.api.core.ActiveMQIOErrorException;
import org.apache.activemq.artemis.api.core.ActiveMQInternalErrorException;
+import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
import org.apache.activemq.artemis.core.io.SequentialFile;
import org.apache.activemq.artemis.core.message.LargeBodyEncoder;
import org.apache.activemq.artemis.core.message.impl.CoreMessage;
+import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.LargeServerMessage;
import org.apache.activemq.artemis.utils.DataConstants;
@@ -39,6 +41,30 @@ import io.netty.buffer.Unpooled;
public final class LargeServerMessageImpl extends CoreMessage implements
LargeServerMessage {
+ /** This will check if a regular message needs to be converted as large
message */
+ public static Message checkLargeMessage(Message message, StorageManager
storageManager) throws Exception {
+ if (message.isLargeMessage()) {
+ return message; // nothing to be done on this case
+ }
+
+ if (message.getEncodeSize() > storageManager.getMaxRecordSize()) {
+ return asLargeMessage(message, storageManager);
+ } else {
+ return message;
+ }
+ }
+
+ private static Message asLargeMessage(Message message, StorageManager
storageManager) throws Exception {
+ ICoreMessage coreMessage = message.toCore();
+ LargeServerMessage lsm =
storageManager.createLargeMessage(storageManager.generateID(), coreMessage);
+ ActiveMQBuffer buffer = coreMessage.getReadOnlyBodyBuffer();
+ final int readableBytes = buffer.readableBytes();
+ lsm.addBytes(buffer);
+ lsm.releaseResources();
+ lsm.putLongProperty(Message.HDR_LARGE_BODY_SIZE, readableBytes);
+ return lsm;
+ }
+
// Constants -----------------------------------------------------
private static final Logger logger =
Logger.getLogger(LargeServerMessageImpl.class);
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
index 11b096b..8f905a2 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
@@ -35,12 +35,10 @@ import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.artemis.Closeable;
-import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQIOErrorException;
import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException;
import org.apache.activemq.artemis.api.core.ActiveMQNonExistentQueueException;
-import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.RoutingType;
@@ -55,6 +53,7 @@ import org.apache.activemq.artemis.core.paging.PagingManager;
import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.persistence.StorageManager;
+import
org.apache.activemq.artemis.core.persistence.impl.journal.LargeServerMessageImpl;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.BindingType;
import org.apache.activemq.artemis.core.postoffice.PostOffice;
@@ -71,7 +70,6 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.AddressQueryResult;
import org.apache.activemq.artemis.core.server.BindingQueryResult;
-import org.apache.activemq.artemis.core.server.LargeServerMessage;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.QueueQueryResult;
@@ -1465,18 +1463,6 @@ public class ServerSessionImpl implements ServerSession,
FailureListener {
return send(getCurrentTransaction(), message, direct, noAutoCreateQueue);
}
-
- private LargeServerMessage messageToLargeMessage(Message message) throws
Exception {
- ICoreMessage coreMessage = message.toCore();
- LargeServerMessage lsm =
getStorageManager().createLargeMessage(storageManager.generateID(),
coreMessage);
- ActiveMQBuffer buffer = coreMessage.getReadOnlyBodyBuffer();
- final int readableBytes = buffer.readableBytes();
- lsm.addBytes(buffer);
- lsm.releaseResources();
- lsm.putLongProperty(Message.HDR_LARGE_BODY_SIZE, readableBytes);
- return lsm;
- }
-
@Override
public synchronized RoutingStatus send(Transaction tx,
Message msg,
@@ -1487,17 +1473,12 @@ public class ServerSessionImpl implements
ServerSession, FailureListener {
@Override
public synchronized RoutingStatus send(Transaction tx,
- Message msg,
+ Message messageParameter,
final boolean direct,
boolean noAutoCreateQueue,
RoutingContext routingContext)
throws Exception {
- final Message message;
- if ((msg.getEncodeSize() > storageManager.getMaxRecordSize()) &&
!msg.isLargeMessage()) {
- message = messageToLargeMessage(msg);
- } else {
- message = msg;
- }
+ final Message message =
LargeServerMessageImpl.checkLargeMessage(messageParameter, storageManager);
if (server.hasBrokerMessagePlugins()) {
server.callBrokerMessagePlugins(plugin -> plugin.beforeSend(this, tx,
message, direct, noAutoCreateQueue));
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java
index ecb8d55..25354da 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java
@@ -216,7 +216,7 @@ public class ManagementServiceImpl implements
ManagementService {
@Override
public void registerAddress(AddressInfo addressInfo) throws Exception {
ObjectName objectName =
objectNameBuilder.getAddressObjectName(addressInfo.getName());
- AddressControlImpl addressControl = new AddressControlImpl(addressInfo,
postOffice, pagingManager, storageManager, securityRepository, securityStore,
this);
+ AddressControlImpl addressControl = new AddressControlImpl(addressInfo,
messagingServer, pagingManager, storageManager, securityRepository,
securityStore, this);
registerInJMX(objectName, addressControl);
@@ -246,7 +246,7 @@ public class ManagementServiceImpl implements
ManagementService {
return;
}
- QueueControlImpl queueControl = new QueueControlImpl(queue,
addressInfo.getName().toString(), postOffice, storageManager, securityStore,
addressSettingsRepository);
+ QueueControlImpl queueControl = new QueueControlImpl(queue,
addressInfo.getName().toString(), messagingServer, storageManager,
securityStore, addressSettingsRepository);
if (messageCounterManager != null) {
MessageCounter counter = new
MessageCounter(queue.getName().toString(), null, queue, false,
queue.isDurableMessage(), messageCounterManager.getMaxDayCount());
queueControl.setMessageCounter(counter);
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/LargeMessageOverManagementTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/LargeMessageOverManagementTest.java
new file mode 100644
index 0000000..45f14da
--- /dev/null
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/LargeMessageOverManagementTest.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.management;
+
+import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.TransportConfiguration;
+import org.apache.activemq.artemis.api.core.client.ClientConsumer;
+import org.apache.activemq.artemis.api.core.client.ClientMessage;
+import org.apache.activemq.artemis.api.core.client.ClientSession;
+import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
+import org.apache.activemq.artemis.api.core.client.ServerLocator;
+import org.apache.activemq.artemis.api.core.management.AddressControl;
+import org.apache.activemq.artemis.api.core.management.QueueControl;
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.utils.Base64;
+import org.apache.activemq.artemis.utils.RandomUtil;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.HashMap;
+
+public class LargeMessageOverManagementTest extends ManagementTestBase {
+
+ private ClientSession session;
+ private ServerLocator locator;
+ private ClientSessionFactory sf;
+ private ActiveMQServer server;
+
+ protected AddressControl createManagementControl(final SimpleString
address) throws Exception {
+ return ManagementControlHelper.createAddressControl(address,
mbeanServer);
+ }
+
+ @Override
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+
+ Configuration config = createBasicConfig();
+
+
+ TransportConfiguration acceptorConfig =
createTransportConfiguration(false, true, generateParams(0, false));
+ config.addAcceptorConfiguration(acceptorConfig);
+ server = createServer(true, config);
+ server.setMBeanServer(mbeanServer);
+ server.start();
+
+ locator = createInVMNonHALocator().setBlockOnNonDurableSend(true);
+ sf = createSessionFactory(locator);
+ session = sf.createSession(false, true, false);
+ session.start();
+ addClientSession(session);
+ }
+
+ @Test
+ public void testSendOverSizeMessageOverQueueControl() throws Exception {
+ SimpleString address = RandomUtil.randomSimpleString();
+ SimpleString queue = RandomUtil.randomSimpleString();
+ SimpleString emptyqueue = RandomUtil.randomSimpleString();
+
+ session.createQueue(address, RoutingType.MULTICAST, queue, null, true);
+ session.createQueue(address, RoutingType.MULTICAST, emptyqueue, null,
true);
+
+ QueueControl queueControl = createManagementControl(address, queue);
+
+ int bodySize = (int) server.getStorageManager().getMaxRecordSize() + 100;
+ byte[] bigData = createBytesData(bodySize);
+
+ queueControl.sendMessage(new HashMap<String, String>(),
Message.BYTES_TYPE, Base64.encodeBytes(bigData), true, "myUser", "myPassword");
+
+
+ ClientConsumer consumer = session.createConsumer(queue);
+ ClientMessage message = consumer.receive(1000);
+ Assert.assertNotNull(message);
+ Assert.assertEquals(bigData.length, message.getBodySize());
+ Assert.assertTrue(message.isLargeMessage());
+
+
+
+ byte[] bytesRead = new byte[bigData.length];
+ message.getBodyBuffer().readBytes(bytesRead);
+
+ for (int i = 0; i < bytesRead.length; i++) {
+ Assert.assertEquals(bytesRead[i], bigData[i]);
+ }
+
+
+ consumer.close();
+
+ // this is an extra check,
+ consumer = session.createConsumer(emptyqueue);
+ Assert.assertNull(consumer.receiveImmediate());
+
+ }
+
+ @Test
+ public void testSendOverSizeMessageOverAddressControl() throws Exception {
+
+ SimpleString address = RandomUtil.randomSimpleString();
+ session.createAddress(address, RoutingType.ANYCAST, false);
+
+ AddressControl addressControl = createManagementControl(address);
+ session.createQueue(address, RoutingType.ANYCAST, address);
+
+ int bodySize = server.getConfiguration().getJournalBufferSize_AIO();
+ byte[] bigData = createBytesData(bodySize);
+ addressControl.sendMessage(null, Message.BYTES_TYPE,
Base64.encodeBytes(bigData), false, null, null);
+
+ ClientConsumer consumer = session.createConsumer(address);
+ ClientMessage message = consumer.receive(1000);
+ Assert.assertNotNull(message);
+ Assert.assertEquals(bigData.length, message.getBodySize());
+ Assert.assertTrue(message.isLargeMessage());
+
+ byte[] bytesRead = new byte[bigData.length];
+ message.getBodyBuffer().readBytes(bytesRead);
+
+ for (int i = 0; i < bytesRead.length; i++) {
+ Assert.assertEquals(bytesRead[i], bigData[i]);
+ }
+
+ }
+
+ byte[] createBytesData(int nbytes) {
+ byte[] result = new byte[nbytes];
+ for (int i = 0; i < nbytes; i++) {
+ result[i] = RandomUtil.randomByte();
+ }
+ return result;
+ }
+}