This is an automated email from the ASF dual-hosted git repository. clebertsuconic pushed a commit to branch 2.19.x in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
commit ffe15b4123311130b62dff2f8256245bf6a03e43 Author: Clebert Suconic <[email protected]> AuthorDate: Thu Oct 14 13:25:56 2021 -0400 ARTEMIS-3461 Generalize MBean Support on Messages and avoid converstion to core on AMQP Messages on console browsing Done in collaboration with Erwin Dondorp through https://github.com/apache/activemq-artemis/pull/3794/ (cherry picked from commit a833d95c1fb7fd5ab5a5220a64df80d06815da47) --- .../apache/activemq/artemis/api/core/JsonUtil.java | 13 +- .../apache/activemq/artemis/api/core/Message.java | 6 + .../artemis/core/message/impl/CoreMessage.java | 95 +++++++ .../message}/openmbean/CompositeDataConstants.java | 2 +- .../message/openmbean/MessageOpenTypeFactory.java | 224 +++++++++++++++ .../artemis/protocol/amqp/broker/AMQPMessage.java | 164 +++++++++++ .../core/management/impl/QueueControlImpl.java | 11 +- .../impl/openmbean/CompositeDataConstants.java | 63 +---- .../management/impl/openmbean/OpenTypeSupport.java | 305 --------------------- .../impl/openmbean/OpenTypeSupportTest.java | 3 +- .../integration/management/QueueControlTest.java | 131 ++++++++- 11 files changed, 645 insertions(+), 372 deletions(-) diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JsonUtil.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JsonUtil.java index 58cac51..69dfa8a 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JsonUtil.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JsonUtil.java @@ -325,14 +325,19 @@ public final class JsonUtil { private JsonUtil() { } + public static String truncateString(final String str, final int valueSizeLimit) { + if (str.length() > valueSizeLimit) { + return new StringBuilder(valueSizeLimit + 32).append(str.substring(0, valueSizeLimit)).append(", + ").append(str.length() - valueSizeLimit).append(" more").toString(); + } else { + return str; + } + } + public static Object truncate(final Object value, final int valueSizeLimit) { Object result = value; if (valueSizeLimit >= 0) { if (String.class.equals(value.getClass())) { - String str = (String) value; - if (str.length() > valueSizeLimit) { - result = new StringBuilder(valueSizeLimit + 32).append(str.substring(0, valueSizeLimit)).append(", + ").append(str.length() - valueSizeLimit).append(" more").toString(); - } + result = truncateString((String)value, valueSizeLimit); } else if (value.getClass().isArray()) { if (byte[].class.equals(value.getClass())) { if (((byte[]) value).length > valueSizeLimit) { diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java index 5525fee..bef0fa4 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java @@ -16,6 +16,8 @@ */ package org.apache.activemq.artemis.api.core; +import javax.management.openmbean.CompositeData; +import javax.management.openmbean.OpenDataException; import java.io.InputStream; import java.util.HashMap; import java.util.Map; @@ -769,6 +771,10 @@ public interface Message { /** This should make you convert your message into Core format. */ ICoreMessage toCore(); + default CompositeData toCompositeData(int fieldsLimit, int deliveryCount) throws OpenDataException { + return null; + } + /** This should make you convert your message into Core format. */ ICoreMessage toCore(CoreMessageObjectPools coreMessageObjectPools); diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java index 1129f55..1d5acda 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java @@ -17,8 +17,15 @@ package org.apache.activemq.artemis.core.message.impl; +import javax.management.openmbean.ArrayType; +import javax.management.openmbean.CompositeData; +import javax.management.openmbean.CompositeDataSupport; +import javax.management.openmbean.CompositeType; +import javax.management.openmbean.OpenDataException; +import javax.management.openmbean.SimpleType; import java.io.InputStream; import java.nio.ByteBuffer; +import java.util.Map; import java.util.Objects; import java.util.Set; import java.util.zip.DataFormatException; @@ -33,6 +40,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffers; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException; import org.apache.activemq.artemis.api.core.ICoreMessage; +import org.apache.activemq.artemis.api.core.JsonUtil; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.RefCountMessage; import org.apache.activemq.artemis.api.core.RoutingType; @@ -40,6 +48,8 @@ import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper; import org.apache.activemq.artemis.core.buffers.impl.ResetLimitWrappedActiveMQBuffer; import org.apache.activemq.artemis.core.message.LargeBodyReader; +import org.apache.activemq.artemis.core.message.openmbean.CompositeDataConstants; +import org.apache.activemq.artemis.core.message.openmbean.MessageOpenTypeFactory; import org.apache.activemq.artemis.core.persistence.CoreMessageObjectPools; import org.apache.activemq.artemis.core.persistence.Persister; import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; @@ -1216,6 +1226,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage { return this; } + @Override public CoreMessage toCore(CoreMessageObjectPools coreMessageObjectPools) { return this; @@ -1290,4 +1301,88 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage { return body; } + + + // ******************************************************************************************************************************* + // Composite Data implementation + + private static MessageOpenTypeFactory TEXT_FACTORY = new TextMessageOpenTypeFactory(); + private static MessageOpenTypeFactory BYTES_FACTORY = new BytesMessageOpenTypeFactory(); + + + @Override + public CompositeData toCompositeData(int fieldsLimit, int deliveryCount) throws OpenDataException { + CompositeType ct; + Map<String, Object> fields; + byte type = getType(); + switch (type) { + case Message.TEXT_TYPE: + ct = TEXT_FACTORY.getCompositeType(); + fields = TEXT_FACTORY.getFields(this, fieldsLimit, deliveryCount); + break; + default: + ct = BYTES_FACTORY.getCompositeType(); + fields = BYTES_FACTORY.getFields(this, fieldsLimit, deliveryCount); + break; + } + return new CompositeDataSupport(ct, fields); + + } + + static class BytesMessageOpenTypeFactory extends MessageOpenTypeFactory<CoreMessage> { + protected ArrayType body; + + @Override + protected void init() throws OpenDataException { + super.init(); + body = new ArrayType(SimpleType.BYTE, true); + addItem(CompositeDataConstants.TYPE, CompositeDataConstants.TYPE_DESCRIPTION, SimpleType.BYTE); + addItem(CompositeDataConstants.BODY, CompositeDataConstants.BODY_DESCRIPTION, body); + } + + @Override + public Map<String, Object> getFields(CoreMessage m, int valueSizeLimit, int delivery) throws OpenDataException { + Map<String, Object> rc = super.getFields(m, valueSizeLimit, delivery); + rc.put(CompositeDataConstants.TYPE, m.getType()); + if (!m.isLargeMessage()) { + ActiveMQBuffer bodyCopy = m.toCore().getReadOnlyBodyBuffer(); + byte[] bytes = new byte[bodyCopy.readableBytes() <= valueSizeLimit ? bodyCopy.readableBytes() : valueSizeLimit + 1]; + bodyCopy.readBytes(bytes); + rc.put(CompositeDataConstants.BODY, JsonUtil.truncate(bytes, valueSizeLimit)); + } else { + rc.put(CompositeDataConstants.BODY, new byte[0]); + } + return rc; + } + } + + static class TextMessageOpenTypeFactory extends MessageOpenTypeFactory<CoreMessage> { + @Override + protected void init() throws OpenDataException { + super.init(); + addItem(CompositeDataConstants.TYPE, CompositeDataConstants.TYPE_DESCRIPTION, SimpleType.BYTE); + addItem(CompositeDataConstants.TEXT_BODY, CompositeDataConstants.TEXT_BODY, SimpleType.STRING); + } + + @Override + public Map<String, Object> getFields(CoreMessage m, int valueSizeLimit, int delivery) throws OpenDataException { + Map<String, Object> rc = super.getFields(m, valueSizeLimit, delivery); + rc.put(CompositeDataConstants.TYPE, m.getType()); + if (!m.isLargeMessage()) { + if (m.containsProperty(Message.HDR_LARGE_COMPRESSED)) { + rc.put(CompositeDataConstants.TEXT_BODY, "[compressed]"); + } else { + SimpleString text = m.toCore().getReadOnlyBodyBuffer().readNullableSimpleString(); + rc.put(CompositeDataConstants.TEXT_BODY, text != null ? JsonUtil.truncate(text.toString(), valueSizeLimit) : ""); + } + } else { + rc.put(CompositeDataConstants.TEXT_BODY, "[large message]"); + } + return rc; + } + } + + // Composite Data implementation + // ******************************************************************************************************************************* + } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/openmbean/CompositeDataConstants.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/openmbean/CompositeDataConstants.java similarity index 97% copy from artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/openmbean/CompositeDataConstants.java copy to artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/openmbean/CompositeDataConstants.java index a8cde03..84fbbfd 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/openmbean/CompositeDataConstants.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/openmbean/CompositeDataConstants.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.activemq.artemis.core.management.impl.openmbean; +package org.apache.activemq.artemis.core.message.openmbean; public interface CompositeDataConstants { diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/openmbean/MessageOpenTypeFactory.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/openmbean/MessageOpenTypeFactory.java new file mode 100644 index 0000000..0028051 --- /dev/null +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/openmbean/MessageOpenTypeFactory.java @@ -0,0 +1,224 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.core.message.openmbean; + +import javax.management.openmbean.CompositeDataSupport; +import javax.management.openmbean.CompositeType; +import javax.management.openmbean.OpenDataException; +import javax.management.openmbean.OpenType; +import javax.management.openmbean.SimpleType; +import javax.management.openmbean.TabularDataSupport; +import javax.management.openmbean.TabularType; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.api.core.JsonUtil; +import org.apache.activemq.artemis.api.core.Message; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.jboss.logging.Logger; + +public class MessageOpenTypeFactory<M extends Message> { + + private static final Logger logger = Logger.getLogger(MessageOpenTypeFactory.class); + + public MessageOpenTypeFactory() { + try { + init(); + compositeType = createCompositeType(); + } catch (Exception e) { + logger.warn(e.getMessage(), e); + } + } + + + private CompositeType compositeType; + private final List<String> itemNamesList = new ArrayList<>(); + private final List<String> itemDescriptionsList = new ArrayList<>(); + private final List<OpenType> itemTypesList = new ArrayList<>(); + + protected TabularType stringPropertyTabularType; + protected TabularType booleanPropertyTabularType; + protected TabularType bytePropertyTabularType; + protected TabularType shortPropertyTabularType; + protected TabularType intPropertyTabularType; + protected TabularType longPropertyTabularType; + protected TabularType floatPropertyTabularType; + protected TabularType doublePropertyTabularType; + protected Object[][] typedPropertyFields; + + protected String getTypeName() { + return Message.class.getName(); + } + + public CompositeType getCompositeType() throws OpenDataException { + return compositeType; + } + + protected void init() throws OpenDataException { + + addItem(CompositeDataConstants.ADDRESS, CompositeDataConstants.ADDRESS_DESCRIPTION, SimpleType.STRING); + addItem(CompositeDataConstants.MESSAGE_ID, CompositeDataConstants.MESSAGE_ID_DESCRIPTION, SimpleType.STRING); + addItem(CompositeDataConstants.USER_ID, CompositeDataConstants.USER_ID_DESCRIPTION, SimpleType.STRING); + addItem(CompositeDataConstants.DURABLE, CompositeDataConstants.DURABLE_DESCRIPTION, SimpleType.BOOLEAN); + addItem(CompositeDataConstants.EXPIRATION, CompositeDataConstants.EXPIRATION_DESCRIPTION, SimpleType.LONG); + addItem(CompositeDataConstants.PRIORITY, CompositeDataConstants.PRIORITY_DESCRIPTION, SimpleType.BYTE); + addItem(CompositeDataConstants.REDELIVERED, CompositeDataConstants.REDELIVERED_DESCRIPTION, SimpleType.BOOLEAN); + addItem(CompositeDataConstants.TIMESTAMP, CompositeDataConstants.TIMESTAMP_DESCRIPTION, SimpleType.LONG); + addItem(CompositeDataConstants.LARGE_MESSAGE, CompositeDataConstants.LARGE_MESSAGE_DESCRIPTION, SimpleType.BOOLEAN); + addItem(CompositeDataConstants.PERSISTENT_SIZE, CompositeDataConstants.PERSISTENT_SIZE_DESCRIPTION, SimpleType.LONG); + + addItem(CompositeDataConstants.PROPERTIES, CompositeDataConstants.PROPERTIES_DESCRIPTION, SimpleType.STRING); + + // now lets expose the type safe properties + stringPropertyTabularType = createTabularType(String.class, SimpleType.STRING); + booleanPropertyTabularType = createTabularType(Boolean.class, SimpleType.BOOLEAN); + bytePropertyTabularType = createTabularType(Byte.class, SimpleType.BYTE); + shortPropertyTabularType = createTabularType(Short.class, SimpleType.SHORT); + intPropertyTabularType = createTabularType(Integer.class, SimpleType.INTEGER); + longPropertyTabularType = createTabularType(Long.class, SimpleType.LONG); + floatPropertyTabularType = createTabularType(Float.class, SimpleType.FLOAT); + doublePropertyTabularType = createTabularType(Double.class, SimpleType.DOUBLE); + + addItem(CompositeDataConstants.STRING_PROPERTIES, CompositeDataConstants.STRING_PROPERTIES_DESCRIPTION, stringPropertyTabularType); + addItem(CompositeDataConstants.BOOLEAN_PROPERTIES, CompositeDataConstants.BOOLEAN_PROPERTIES_DESCRIPTION, booleanPropertyTabularType); + addItem(CompositeDataConstants.BYTE_PROPERTIES, CompositeDataConstants.BYTE_PROPERTIES_DESCRIPTION, bytePropertyTabularType); + addItem(CompositeDataConstants.SHORT_PROPERTIES, CompositeDataConstants.SHORT_PROPERTIES_DESCRIPTION, shortPropertyTabularType); + addItem(CompositeDataConstants.INT_PROPERTIES, CompositeDataConstants.INT_PROPERTIES_DESCRIPTION, intPropertyTabularType); + addItem(CompositeDataConstants.LONG_PROPERTIES, CompositeDataConstants.LONG_PROPERTIES_DESCRIPTION, longPropertyTabularType); + addItem(CompositeDataConstants.FLOAT_PROPERTIES, CompositeDataConstants.FLOAT_PROPERTIES_DESCRIPTION, floatPropertyTabularType); + addItem(CompositeDataConstants.DOUBLE_PROPERTIES, CompositeDataConstants.DOUBLE_PROPERTIES_DESCRIPTION, doublePropertyTabularType); + + typedPropertyFields = new Object[][] { + {CompositeDataConstants.STRING_PROPERTIES, stringPropertyTabularType, String.class}, + {CompositeDataConstants.BOOLEAN_PROPERTIES, booleanPropertyTabularType, Boolean.class}, + {CompositeDataConstants.BYTE_PROPERTIES, bytePropertyTabularType, Byte.class}, + {CompositeDataConstants.SHORT_PROPERTIES, shortPropertyTabularType, Short.class}, + {CompositeDataConstants.INT_PROPERTIES, intPropertyTabularType, Integer.class}, + {CompositeDataConstants.LONG_PROPERTIES, longPropertyTabularType, Long.class}, + {CompositeDataConstants.FLOAT_PROPERTIES, floatPropertyTabularType, Float.class}, + {CompositeDataConstants.DOUBLE_PROPERTIES, doublePropertyTabularType, Double.class} + }; + + } + + public Map<String, Object> getFields(M m, int valueSizeLimit, int deliveryCount) throws OpenDataException { + Map<String, Object> rc = new HashMap<>(); + rc.put(CompositeDataConstants.MESSAGE_ID, "" + m.getMessageID()); + if (m.getUserID() != null) { + rc.put(CompositeDataConstants.USER_ID, "ID:" + m.getUserID().toString()); + } else { + rc.put(CompositeDataConstants.USER_ID, ""); + } + rc.put(CompositeDataConstants.ADDRESS, m.getAddress() == null ? "" : m.getAddress().toString()); + rc.put(CompositeDataConstants.DURABLE, m.isDurable()); + rc.put(CompositeDataConstants.EXPIRATION, m.getExpiration()); + rc.put(CompositeDataConstants.TIMESTAMP, m.getTimestamp()); + rc.put(CompositeDataConstants.PRIORITY, m.getPriority()); + rc.put(CompositeDataConstants.REDELIVERED, deliveryCount > 1); + rc.put(CompositeDataConstants.LARGE_MESSAGE, m.isLargeMessage()); + try { + rc.put(CompositeDataConstants.PERSISTENT_SIZE, m.getPersistentSize()); + } catch (final ActiveMQException e1) { + rc.put(CompositeDataConstants.PERSISTENT_SIZE, -1); + } + + Map<String, Object> propertyMap = m.toPropertyMap(valueSizeLimit); + + rc.put(CompositeDataConstants.PROPERTIES, JsonUtil.truncate("" + propertyMap, valueSizeLimit)); + + // only populate if there are some values + TabularDataSupport tabularData; + for (Object[] typedPropertyInfo : typedPropertyFields) { + tabularData = null; + try { + tabularData = createTabularData(propertyMap, (TabularType) typedPropertyInfo[1], (Class) typedPropertyInfo[2]); + } catch (Exception ignored) { + } + if (tabularData != null && !tabularData.isEmpty()) { + rc.put((String) typedPropertyInfo[0], tabularData); + } else { + rc.put((String) typedPropertyInfo[0], null); + } + } + return rc; + } + + protected String toString(Object value) { + if (value == null) { + return null; + } + return value.toString(); + } + + protected CompositeType createCompositeType() throws OpenDataException { + String[] itemNames = itemNamesList.toArray(new String[itemNamesList.size()]); + String[] itemDescriptions = itemDescriptionsList.toArray(new String[itemDescriptionsList.size()]); + OpenType[] itemTypes = itemTypesList.toArray(new OpenType[itemTypesList.size()]); + return new CompositeType(getTypeName(), getDescription(), itemNames, itemDescriptions, itemTypes); + } + + protected String getDescription() { + return getTypeName(); + } + + protected <T> TabularType createTabularType(Class<T> type, OpenType openType) throws OpenDataException { + String typeName = "java.util.Map<java.lang.String, " + type.getName() + ">"; + String[] keyValue = new String[]{"key", "value"}; + OpenType[] openTypes = new OpenType[]{SimpleType.STRING, openType}; + CompositeType rowType = new CompositeType(typeName, typeName, keyValue, keyValue, openTypes); + return new TabularType(typeName, typeName, rowType, new String[]{"key"}); + } + + protected TabularDataSupport createTabularData(Map<String, Object> entries, + TabularType type, + Class valueType) throws IOException, OpenDataException { + TabularDataSupport answer = new TabularDataSupport(type); + + for (String key : entries.keySet()) { + Object value = entries.get(key); + if (valueType.isInstance(value)) { + CompositeDataSupport compositeData = createTabularRowValue(type, key, value); + answer.put(compositeData); + } else if (valueType == String.class && value instanceof SimpleString) { + CompositeDataSupport compositeData = createTabularRowValue(type, key, value.toString()); + answer.put(compositeData); + } + } + return answer; + } + + protected CompositeDataSupport createTabularRowValue(TabularType type, + String key, + Object value) throws OpenDataException { + Map<String, Object> fields = new HashMap<>(); + fields.put("key", key); + fields.put("value", value); + return new CompositeDataSupport(type.getRowType(), fields); + } + + protected void addItem(String name, String description, OpenType type) { + itemNamesList.add(name); + itemDescriptionsList.add(description); + itemTypesList.add(type); + } +} + diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java index 3926fa5..0609a76 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java @@ -16,11 +16,17 @@ */ package org.apache.activemq.artemis.protocol.amqp.broker; +import javax.management.openmbean.CompositeData; +import javax.management.openmbean.CompositeDataSupport; +import javax.management.openmbean.OpenDataException; +import javax.management.openmbean.SimpleType; +import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; import java.util.Collections; import java.util.Date; import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Set; @@ -34,6 +40,8 @@ import org.apache.activemq.artemis.api.core.JsonUtil; import org.apache.activemq.artemis.api.core.RefCountMessage; import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.message.openmbean.CompositeDataConstants; +import org.apache.activemq.artemis.core.message.openmbean.MessageOpenTypeFactory; import org.apache.activemq.artemis.core.paging.PagingStore; import org.apache.activemq.artemis.core.persistence.CoreMessageObjectPools; import org.apache.activemq.artemis.core.persistence.Persister; @@ -72,6 +80,8 @@ import org.apache.qpid.proton.message.Message; import org.apache.qpid.proton.message.impl.MessageImpl; import org.jboss.logging.Logger; +import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.getCharsetForTextualContent; + /** * See <a href="https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#section-message-format">AMQP v1.0 message format</a> * <pre> @@ -834,9 +844,64 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache. value = JsonUtil.truncate(value, valueSizeLimit); map.put(name.toString(), value); } + + TypedProperties extraProperties = getExtraProperties(); + if (extraProperties != null) { + extraProperties.forEach((s, o) -> { + map.put(s.toString(), JsonUtil.truncate(o.toString(), valueSizeLimit)); + }); + } + if (!isLargeMessage()) { + addAnnotationsAsProperties(map, messageAnnotations); + } + + if (properties != null) { + if (properties.getContentType() != null) { + map.put("properties.getContentType()", properties.getContentType().toString()); + } + if (properties.getContentEncoding() != null) { + map.put("properties.getContentEncoding()", properties.getContentEncoding().toString()); + } + if (properties.getGroupId() != null) { + map.put("properties.getGroupID()", properties.getGroupId()); + } + if (properties.getGroupSequence() != null) { + map.put("properties.getGroupSequence()", properties.getGroupSequence().intValue()); + } + if (properties.getReplyToGroupId() != null) { + map.put("properties.getReplyToGroupId()", properties.getReplyToGroupId()); + } + } + return map; } + + protected static void addAnnotationsAsProperties(Map map, MessageAnnotations annotations) { + if (annotations != null && annotations.getValue() != null) { + for (Map.Entry<?, ?> entry : annotations.getValue().entrySet()) { + String key = entry.getKey().toString(); + if ("x-opt-delivery-time".equals(key) && entry.getValue() != null) { + long deliveryTime = ((Number) entry.getValue()).longValue(); + map.put("annotation x-opt-delivery-time", deliveryTime); + } else if ("x-opt-delivery-delay".equals(key) && entry.getValue() != null) { + long delay = ((Number) entry.getValue()).longValue(); + if (delay > 0) { + map.put("annotation x-opt-delivery-delay", System.currentTimeMillis() + delay); + } + } else if (AMQPMessageSupport.X_OPT_INGRESS_TIME.equals(key) && entry.getValue() != null) { + map.put("annotation X_OPT_INGRESS_TIME", ((Number) entry.getValue()).longValue()); + } else { + try { + map.put("annotation " + key, entry.getValue()); + } catch (ActiveMQPropertyConversionException e) { + } + } + } + } + } + + @Override public ICoreMessage toCore(CoreMessageObjectPools coreMessageObjectPools) { try { @@ -1726,4 +1791,103 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache. public void setOwner(Object object) { this.owner = object; } + + + // ******************************************************************************************************************************* + // Composite Data implementation + + private static MessageOpenTypeFactory AMQP_FACTORY = new AmqpMessageOpenTypeFactory(); + + static class AmqpMessageOpenTypeFactory extends MessageOpenTypeFactory<AMQPMessage> { + @Override + protected void init() throws OpenDataException { + super.init(); + addItem(CompositeDataConstants.TEXT_BODY, CompositeDataConstants.TEXT_BODY, SimpleType.STRING); + addItem(CompositeDataConstants.TYPE, CompositeDataConstants.TYPE_DESCRIPTION, SimpleType.BYTE); + } + + @Override + public Map<String, Object> getFields(AMQPMessage m, int valueSizeLimit, int delivery) throws OpenDataException { + Map<String, Object> rc = super.getFields(m, valueSizeLimit, delivery); + + if (!m.isLargeMessage()) { + m.ensureScanning(); + } + + Properties properties = m.getCurrentProperties(); + + byte type = getType(m, properties); + + rc.put(CompositeDataConstants.TYPE, type); + + if (m.isLargeMessage()) { + rc.put(CompositeDataConstants.TEXT_BODY, "... Large message ..."); + } else { + if (m.getBody() instanceof AmqpValue) { + Object amqpValue = ((AmqpValue) m.getBody()).getValue(); + + rc.put(CompositeDataConstants.TEXT_BODY, JsonUtil.truncateString(amqpValue.toString(), valueSizeLimit)); + } else { + rc.put(CompositeDataConstants.TEXT_BODY, JsonUtil.truncateString("" + m.getBody(), valueSizeLimit)); + } + } + + return rc; + } + + private byte getType(AMQPMessage m, Properties properties) { + if (m.isLargeMessage()) { + return DEFAULT_TYPE; + } + byte type = BYTES_TYPE; + + final Symbol contentType = properties != null ? properties.getContentType() : null; + final String contentTypeString = contentType != null ? contentType.toString() : null; + + if (m.getBody() instanceof Data) { + + if (contentType.equals(AMQPMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE)) { + type = OBJECT_TYPE; + } else if (contentType.equals(AMQPMessageSupport.OCTET_STREAM_CONTENT_TYPE)) { + type = BYTES_TYPE; + } else { + Charset charset = getCharsetForTextualContent(contentTypeString); + if (StandardCharsets.UTF_8.equals(charset)) { + type = TEXT_TYPE; + } + } + } else if (m.getBody() instanceof AmqpSequence) { + type = STREAM_TYPE; + } else if (m.getBody() instanceof AmqpValue) { + Object value = ((AmqpValue) m.getBody()).getValue(); + + if (value instanceof String) { + type = TEXT_TYPE; + } else if (value instanceof Binary) { + + if (contentType.equals(AMQPMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE)) { + type = OBJECT_TYPE; + } else { + type = BYTES_TYPE; + } + } else if (value instanceof List) { + type = STREAM_TYPE; + } else if (value instanceof Map) { + type = MAP_TYPE; + } + } + return type; + } + } + + @Override + public CompositeData toCompositeData(int fieldsLimit, int deliveryCount) throws OpenDataException { + Map<String, Object> fields; + fields = AMQP_FACTORY.getFields(this, fieldsLimit, deliveryCount); + return new CompositeDataSupport(AMQP_FACTORY.getCompositeType(), fields); + } + + // Composite Data implementation + // ******************************************************************************************************************************* + } \ No newline at end of file diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java index 5643a78..fbb4d78 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java @@ -39,7 +39,6 @@ import org.apache.activemq.artemis.api.core.management.QueueControl; import org.apache.activemq.artemis.api.core.management.ResourceNames; import org.apache.activemq.artemis.core.filter.Filter; import org.apache.activemq.artemis.core.filter.impl.FilterImpl; -import org.apache.activemq.artemis.core.management.impl.openmbean.OpenTypeSupport; import org.apache.activemq.artemis.core.messagecounter.MessageCounter; import org.apache.activemq.artemis.core.messagecounter.impl.MessageCounterHelper; import org.apache.activemq.artemis.core.persistence.StorageManager; @@ -61,9 +60,12 @@ import org.apache.activemq.artemis.selector.filter.Filterable; import org.apache.activemq.artemis.logs.AuditLogger; import org.apache.activemq.artemis.utils.JsonLoader; import org.apache.activemq.artemis.utils.collections.LinkedListIterator; +import org.jboss.logging.Logger; public class QueueControlImpl extends AbstractControl implements QueueControl { + private static final Logger logger = Logger.getLogger(QueueControlImpl.class); + public static final int FLUSH_LIMIT = 500; // Constants ----------------------------------------------------- @@ -1583,7 +1585,7 @@ public class QueueControlImpl extends AbstractControl implements QueueControl { MessageReference ref = iterator.next(); if (thefilter == null || thefilter.match(ref.getMessage())) { if (index >= start) { - c.add(OpenTypeSupport.convert(ref, attributeSizeLimit)); + c.add(ref.getMessage().toCompositeData(attributeSizeLimit, ref.getDeliveryCount())); } //we only increase the index if we add a message, otherwise we could stop before we get to a filtered message index++; @@ -1600,7 +1602,8 @@ public class QueueControlImpl extends AbstractControl implements QueueControl { } return rc; } - } catch (ActiveMQException e) { + } catch (Exception e) { + logger.warn(e.getMessage(), e); if (AuditLogger.isResourceLoggingEnabled()) { AuditLogger.browseMessagesFailure(queue.getName().toString()); } @@ -1635,7 +1638,7 @@ public class QueueControlImpl extends AbstractControl implements QueueControl { while (iterator.hasNext() && currentPageSize++ < limit) { MessageReference ref = iterator.next(); if (thefilter == null || thefilter.match(ref.getMessage())) { - c.add(OpenTypeSupport.convert(ref, attributeSizeLimit)); + c.add(ref.getMessage().toCompositeData(attributeSizeLimit, ref.getDeliveryCount())); } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/openmbean/CompositeDataConstants.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/openmbean/CompositeDataConstants.java index a8cde03..2970c0c 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/openmbean/CompositeDataConstants.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/openmbean/CompositeDataConstants.java @@ -1,12 +1,12 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with + * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at + * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -14,56 +14,13 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.activemq.artemis.core.management.impl.openmbean; - -public interface CompositeDataConstants { - - String ADDRESS = "address"; - String MESSAGE_ID = "messageID"; - String USER_ID = "userID"; - String TYPE = "type"; - String DURABLE = "durable"; - String EXPIRATION = "expiration"; - String PRIORITY = "priority"; - String REDELIVERED = "redelivered"; - String TIMESTAMP = "timestamp"; - String BODY = "BodyPreview"; - String TEXT_BODY = "text"; - String LARGE_MESSAGE = "largeMessage"; - String PERSISTENT_SIZE = "persistentSize"; - String PROPERTIES = "PropertiesText"; - String ADDRESS_DESCRIPTION = "The Address"; - String MESSAGE_ID_DESCRIPTION = "The message ID"; - String USER_ID_DESCRIPTION = "The user ID"; - String TYPE_DESCRIPTION = "The message type"; - String DURABLE_DESCRIPTION = "Is the message durable"; - String EXPIRATION_DESCRIPTION = "The message expiration"; - String PRIORITY_DESCRIPTION = "The message priority"; - String REDELIVERED_DESCRIPTION = "Has the message been redelivered"; - String TIMESTAMP_DESCRIPTION = "The message timestamp"; - String BODY_DESCRIPTION = "The message body"; - String LARGE_MESSAGE_DESCRIPTION = "Is the message treated as a large message"; - String PERSISTENT_SIZE_DESCRIPTION = "The message size when persisted on disk"; - String PROPERTIES_DESCRIPTION = "The properties text"; - - // User properties - String STRING_PROPERTIES = "StringProperties"; - String BOOLEAN_PROPERTIES = "BooleanProperties"; - String BYTE_PROPERTIES = "ByteProperties"; - String SHORT_PROPERTIES = "ShortProperties"; - String INT_PROPERTIES = "IntProperties"; - String LONG_PROPERTIES = "LongProperties"; - String FLOAT_PROPERTIES = "FloatProperties"; - String DOUBLE_PROPERTIES = "DoubleProperties"; +package org.apache.activemq.artemis.core.management.impl.openmbean; - String STRING_PROPERTIES_DESCRIPTION = "User String Properties"; - String BOOLEAN_PROPERTIES_DESCRIPTION = "User Boolean Properties"; - String BYTE_PROPERTIES_DESCRIPTION = "User Byte Properties"; - String SHORT_PROPERTIES_DESCRIPTION = "User Short Properties"; - String INT_PROPERTIES_DESCRIPTION = "User Int Properties"; - String LONG_PROPERTIES_DESCRIPTION = "User Long Properties"; - String FLOAT_PROPERTIES_DESCRIPTION = "User Float Properties"; - String DOUBLE_PROPERTIES_DESCRIPTION = "User Double Properties"; +/** + * @deprecated use org.apache.activemq.artemis.core.message.openmbean.CompositeDataConstants + */ +@Deprecated +public interface CompositeDataConstants extends org.apache.activemq.artemis.core.message.openmbean.CompositeDataConstants { } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/openmbean/OpenTypeSupport.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/openmbean/OpenTypeSupport.java deleted file mode 100644 index 21e9437..0000000 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/openmbean/OpenTypeSupport.java +++ /dev/null @@ -1,305 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.core.management.impl.openmbean; - -import javax.management.openmbean.ArrayType; -import javax.management.openmbean.CompositeData; -import javax.management.openmbean.CompositeDataSupport; -import javax.management.openmbean.CompositeType; -import javax.management.openmbean.OpenDataException; -import javax.management.openmbean.OpenType; -import javax.management.openmbean.SimpleType; -import javax.management.openmbean.TabularDataSupport; -import javax.management.openmbean.TabularType; -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import org.apache.activemq.artemis.api.core.ActiveMQBuffer; -import org.apache.activemq.artemis.api.core.ActiveMQException; -import org.apache.activemq.artemis.api.core.ICoreMessage; -import org.apache.activemq.artemis.api.core.JsonUtil; -import org.apache.activemq.artemis.api.core.Message; -import org.apache.activemq.artemis.api.core.SimpleString; -import org.apache.activemq.artemis.core.server.MessageReference; - -public final class OpenTypeSupport { - - private static MessageOpenTypeFactory TEXT_FACTORY = new TextMessageOpenTypeFactory(); - private static MessageOpenTypeFactory BYTES_FACTORY = new BytesMessageOpenTypeFactory(); - - private OpenTypeSupport() { - } - - public static CompositeData convert(MessageReference ref, int valueSizeLimit) throws OpenDataException { - CompositeType ct; - - ICoreMessage message = ref.getMessage().toCore(); - - Map<String, Object> fields; - byte type = message.getType(); - - switch(type) { - case Message.TEXT_TYPE: - ct = TEXT_FACTORY.getCompositeType(); - fields = TEXT_FACTORY.getFields(ref, valueSizeLimit); - break; - default: - ct = BYTES_FACTORY.getCompositeType(); - fields = BYTES_FACTORY.getFields(ref, valueSizeLimit); - break; - } - return new CompositeDataSupport(ct, fields); - } - - static class MessageOpenTypeFactory { - - private CompositeType compositeType; - private final List<String> itemNamesList = new ArrayList<>(); - private final List<String> itemDescriptionsList = new ArrayList<>(); - private final List<OpenType> itemTypesList = new ArrayList<>(); - - protected TabularType stringPropertyTabularType; - protected TabularType booleanPropertyTabularType; - protected TabularType bytePropertyTabularType; - protected TabularType shortPropertyTabularType; - protected TabularType intPropertyTabularType; - protected TabularType longPropertyTabularType; - protected TabularType floatPropertyTabularType; - protected TabularType doublePropertyTabularType; - protected Object[][] typedPropertyFields; - - protected String getTypeName() { - return Message.class.getName(); - } - - public CompositeType getCompositeType() throws OpenDataException { - if (compositeType == null) { - init(); - compositeType = createCompositeType(); - } - return compositeType; - } - - protected void init() throws OpenDataException { - - addItem(CompositeDataConstants.ADDRESS, CompositeDataConstants.ADDRESS_DESCRIPTION, SimpleType.STRING); - addItem(CompositeDataConstants.MESSAGE_ID, CompositeDataConstants.MESSAGE_ID_DESCRIPTION, SimpleType.STRING); - addItem(CompositeDataConstants.USER_ID, CompositeDataConstants.USER_ID_DESCRIPTION, SimpleType.STRING); - addItem(CompositeDataConstants.TYPE, CompositeDataConstants.TYPE_DESCRIPTION, SimpleType.BYTE); - addItem(CompositeDataConstants.DURABLE, CompositeDataConstants.DURABLE_DESCRIPTION, SimpleType.BOOLEAN); - addItem(CompositeDataConstants.EXPIRATION, CompositeDataConstants.EXPIRATION_DESCRIPTION, SimpleType.LONG); - addItem(CompositeDataConstants.PRIORITY, CompositeDataConstants.PRIORITY_DESCRIPTION, SimpleType.BYTE); - addItem(CompositeDataConstants.REDELIVERED, CompositeDataConstants.REDELIVERED_DESCRIPTION, SimpleType.BOOLEAN); - addItem(CompositeDataConstants.TIMESTAMP, CompositeDataConstants.TIMESTAMP_DESCRIPTION, SimpleType.LONG); - addItem(CompositeDataConstants.LARGE_MESSAGE, CompositeDataConstants.LARGE_MESSAGE_DESCRIPTION, SimpleType.BOOLEAN); - addItem(CompositeDataConstants.PERSISTENT_SIZE, CompositeDataConstants.PERSISTENT_SIZE_DESCRIPTION, SimpleType.LONG); - - addItem(CompositeDataConstants.PROPERTIES, CompositeDataConstants.PROPERTIES_DESCRIPTION, SimpleType.STRING); - - // now lets expose the type safe properties - stringPropertyTabularType = createTabularType(String.class, SimpleType.STRING); - booleanPropertyTabularType = createTabularType(Boolean.class, SimpleType.BOOLEAN); - bytePropertyTabularType = createTabularType(Byte.class, SimpleType.BYTE); - shortPropertyTabularType = createTabularType(Short.class, SimpleType.SHORT); - intPropertyTabularType = createTabularType(Integer.class, SimpleType.INTEGER); - longPropertyTabularType = createTabularType(Long.class, SimpleType.LONG); - floatPropertyTabularType = createTabularType(Float.class, SimpleType.FLOAT); - doublePropertyTabularType = createTabularType(Double.class, SimpleType.DOUBLE); - - addItem(CompositeDataConstants.STRING_PROPERTIES, CompositeDataConstants.STRING_PROPERTIES_DESCRIPTION, stringPropertyTabularType); - addItem(CompositeDataConstants.BOOLEAN_PROPERTIES, CompositeDataConstants.BOOLEAN_PROPERTIES_DESCRIPTION, booleanPropertyTabularType); - addItem(CompositeDataConstants.BYTE_PROPERTIES, CompositeDataConstants.BYTE_PROPERTIES_DESCRIPTION, bytePropertyTabularType); - addItem(CompositeDataConstants.SHORT_PROPERTIES, CompositeDataConstants.SHORT_PROPERTIES_DESCRIPTION, shortPropertyTabularType); - addItem(CompositeDataConstants.INT_PROPERTIES, CompositeDataConstants.INT_PROPERTIES_DESCRIPTION, intPropertyTabularType); - addItem(CompositeDataConstants.LONG_PROPERTIES, CompositeDataConstants.LONG_PROPERTIES_DESCRIPTION, longPropertyTabularType); - addItem(CompositeDataConstants.FLOAT_PROPERTIES, CompositeDataConstants.FLOAT_PROPERTIES_DESCRIPTION, floatPropertyTabularType); - addItem(CompositeDataConstants.DOUBLE_PROPERTIES, CompositeDataConstants.DOUBLE_PROPERTIES_DESCRIPTION, doublePropertyTabularType); - - typedPropertyFields = new Object[][] { - {CompositeDataConstants.STRING_PROPERTIES, stringPropertyTabularType, String.class}, - {CompositeDataConstants.BOOLEAN_PROPERTIES, booleanPropertyTabularType, Boolean.class}, - {CompositeDataConstants.BYTE_PROPERTIES, bytePropertyTabularType, Byte.class}, - {CompositeDataConstants.SHORT_PROPERTIES, shortPropertyTabularType, Short.class}, - {CompositeDataConstants.INT_PROPERTIES, intPropertyTabularType, Integer.class}, - {CompositeDataConstants.LONG_PROPERTIES, longPropertyTabularType, Long.class}, - {CompositeDataConstants.FLOAT_PROPERTIES, floatPropertyTabularType, Float.class}, - {CompositeDataConstants.DOUBLE_PROPERTIES, doublePropertyTabularType, Double.class} - }; - - } - - public Map<String, Object> getFields(MessageReference ref, int valueSizeLimit) throws OpenDataException { - Map<String, Object> rc = new HashMap<>(); - ICoreMessage m = ref.getMessage().toCore(); - rc.put(CompositeDataConstants.MESSAGE_ID, "" + m.getMessageID()); - if (m.getUserID() != null) { - rc.put(CompositeDataConstants.USER_ID, "ID:" + m.getUserID().toString()); - } else { - rc.put(CompositeDataConstants.USER_ID, ""); - } - rc.put(CompositeDataConstants.ADDRESS, m.getAddress() == null ? "" : m.getAddress().toString()); - rc.put(CompositeDataConstants.TYPE, m.getType()); - rc.put(CompositeDataConstants.DURABLE, m.isDurable()); - rc.put(CompositeDataConstants.EXPIRATION, m.getExpiration()); - rc.put(CompositeDataConstants.TIMESTAMP, m.getTimestamp()); - rc.put(CompositeDataConstants.PRIORITY, m.getPriority()); - rc.put(CompositeDataConstants.REDELIVERED, ref.getDeliveryCount() > 1); - rc.put(CompositeDataConstants.LARGE_MESSAGE, m.isLargeMessage()); - try { - rc.put(CompositeDataConstants.PERSISTENT_SIZE, m.getPersistentSize()); - } catch (final ActiveMQException e1) { - rc.put(CompositeDataConstants.PERSISTENT_SIZE, -1); - } - - Map<String, Object> propertyMap = m.toPropertyMap(valueSizeLimit); - - rc.put(CompositeDataConstants.PROPERTIES, JsonUtil.truncate("" + propertyMap, valueSizeLimit)); - - // only populate if there are some values - TabularDataSupport tabularData; - for (Object[] typedPropertyInfo : typedPropertyFields) { - tabularData = null; - try { - tabularData = createTabularData(propertyMap, (TabularType) typedPropertyInfo[1], (Class) typedPropertyInfo[2]); - } catch (Exception ignored) { - } - if (tabularData != null && !tabularData.isEmpty()) { - rc.put((String) typedPropertyInfo[0], tabularData); - } else { - rc.put((String) typedPropertyInfo[0], null); - } - } - return rc; - } - - protected String toString(Object value) { - if (value == null) { - return null; - } - return value.toString(); - } - - protected CompositeType createCompositeType() throws OpenDataException { - String[] itemNames = itemNamesList.toArray(new String[itemNamesList.size()]); - String[] itemDescriptions = itemDescriptionsList.toArray(new String[itemDescriptionsList.size()]); - OpenType[] itemTypes = itemTypesList.toArray(new OpenType[itemTypesList.size()]); - return new CompositeType(getTypeName(), getDescription(), itemNames, itemDescriptions, itemTypes); - } - - protected String getDescription() { - return getTypeName(); - } - - protected <T> TabularType createTabularType(Class<T> type, OpenType openType) throws OpenDataException { - String typeName = "java.util.Map<java.lang.String, " + type.getName() + ">"; - String[] keyValue = new String[]{"key", "value"}; - OpenType[] openTypes = new OpenType[]{SimpleType.STRING, openType}; - CompositeType rowType = new CompositeType(typeName, typeName, keyValue, keyValue, openTypes); - return new TabularType(typeName, typeName, rowType, new String[]{"key"}); - } - - protected TabularDataSupport createTabularData(Map<String, Object> entries, - TabularType type, - Class valueType) throws IOException, OpenDataException { - TabularDataSupport answer = new TabularDataSupport(type); - - for (String key : entries.keySet()) { - Object value = entries.get(key); - if (valueType.isInstance(value)) { - CompositeDataSupport compositeData = createTabularRowValue(type, key, value); - answer.put(compositeData); - } else if (valueType == String.class && value instanceof SimpleString) { - CompositeDataSupport compositeData = createTabularRowValue(type, key, value.toString()); - answer.put(compositeData); - } - } - return answer; - } - - protected CompositeDataSupport createTabularRowValue(TabularType type, - String key, - Object value) throws OpenDataException { - Map<String, Object> fields = new HashMap<>(); - fields.put("key", key); - fields.put("value", value); - return new CompositeDataSupport(type.getRowType(), fields); - } - - protected void addItem(String name, String description, OpenType type) { - itemNamesList.add(name); - itemDescriptionsList.add(description); - itemTypesList.add(type); - } - } - - - static class BytesMessageOpenTypeFactory extends MessageOpenTypeFactory { - protected ArrayType body; - - @Override - protected void init() throws OpenDataException { - super.init(); - body = new ArrayType(SimpleType.BYTE, true); - addItem(CompositeDataConstants.BODY, CompositeDataConstants.BODY_DESCRIPTION, body); - } - - @Override - public Map<String, Object> getFields(MessageReference ref, int valueSizeLimit) throws OpenDataException { - Map<String, Object> rc = super.getFields(ref, valueSizeLimit); - ICoreMessage m = ref.getMessage().toCore(); - if (!m.isLargeMessage()) { - ActiveMQBuffer bodyCopy = m.getReadOnlyBodyBuffer(); - byte[] bytes = new byte[bodyCopy.readableBytes() <= valueSizeLimit ? bodyCopy.readableBytes() : valueSizeLimit + 1]; - bodyCopy.readBytes(bytes); - rc.put(CompositeDataConstants.BODY, JsonUtil.truncate(bytes, valueSizeLimit)); - } else { - rc.put(CompositeDataConstants.BODY, new byte[0]); - } - return rc; - } - } - - static class TextMessageOpenTypeFactory extends MessageOpenTypeFactory { - protected SimpleType text; - - @Override - protected void init() throws OpenDataException { - super.init(); - addItem(CompositeDataConstants.TEXT_BODY, CompositeDataConstants.TEXT_BODY, SimpleType.STRING); - } - - @Override - public Map<String, Object> getFields(MessageReference ref, int valueSizeLimit) throws OpenDataException { - Map<String, Object> rc = super.getFields(ref, valueSizeLimit); - ICoreMessage m = ref.getMessage().toCore(); - if (!m.isLargeMessage()) { - if (m.containsProperty(Message.HDR_LARGE_COMPRESSED)) { - rc.put(CompositeDataConstants.TEXT_BODY, "[compressed]"); - } else { - SimpleString text = m.getReadOnlyBodyBuffer().readNullableSimpleString(); - rc.put(CompositeDataConstants.TEXT_BODY, text != null ? JsonUtil.truncate(text.toString(), valueSizeLimit) : ""); - } - } else { - rc.put(CompositeDataConstants.TEXT_BODY, "[large message]"); - } - return rc; - } - } -} diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/management/impl/openmbean/OpenTypeSupportTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/management/impl/openmbean/OpenTypeSupportTest.java index 1839fe5..877237d 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/management/impl/openmbean/OpenTypeSupportTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/management/impl/openmbean/OpenTypeSupportTest.java @@ -20,7 +20,6 @@ package org.apache.activemq.artemis.core.management.impl.openmbean; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.message.impl.CoreMessage; -import org.apache.activemq.artemis.core.server.impl.MessageReferenceImpl; import org.apache.activemq.artemis.reader.TextMessageUtil; import org.junit.Assert; import org.junit.Test; @@ -39,7 +38,7 @@ public class OpenTypeSupportTest { TextMessageUtil.writeBodyText(coreMessage.getBodyBuffer(), SimpleString.toSimpleString(bodyText)); - CompositeData cd = OpenTypeSupport.convert(new MessageReferenceImpl(coreMessage, null), 256); + CompositeData cd = coreMessage.toCompositeData(256, 1); Assert.assertEquals(bodyText, cd.get("text")); } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java index d45bcff..66dbcff 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java @@ -16,6 +16,12 @@ */ package org.apache.activemq.artemis.tests.integration.management; +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.DeliveryMode; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; import javax.json.JsonArray; import javax.json.JsonObject; import javax.management.Notification; @@ -70,10 +76,12 @@ import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServers; import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.core.server.impl.QueueImpl; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.core.transaction.impl.XidImpl; import org.apache.activemq.artemis.tests.integration.jms.server.management.JMSUtil; +import org.apache.activemq.artemis.tests.util.CFUtil; import org.apache.activemq.artemis.tests.util.Wait; import org.apache.activemq.artemis.utils.Base64; import org.apache.activemq.artemis.utils.RandomUtil; @@ -86,8 +94,8 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; -import static org.apache.activemq.artemis.core.management.impl.openmbean.CompositeDataConstants.BODY; -import static org.apache.activemq.artemis.core.management.impl.openmbean.CompositeDataConstants.STRING_PROPERTIES; +import static org.apache.activemq.artemis.core.message.openmbean.CompositeDataConstants.BODY; +import static org.apache.activemq.artemis.core.message.openmbean.CompositeDataConstants.STRING_PROPERTIES; @RunWith(value = Parameterized.class) public class QueueControlTest extends ManagementTestBase { @@ -3447,6 +3455,123 @@ public class QueueControlTest extends ManagementTestBase { Assert.assertEquals(new String(body), "theBody"); } + + @Test + public void testSendMessageWithAMQP() throws Exception { + SimpleString address = new SimpleString("address_testSendMessageWithAMQP"); + SimpleString queue = new SimpleString("queue_testSendMessageWithAMQP"); + + server.addAddressInfo(new AddressInfo(address).setAutoCreated(false).addRoutingType(RoutingType.ANYCAST)); + server.createQueue(new QueueConfiguration(queue).setAddress(address).setDurable(durable).setRoutingType(RoutingType.ANYCAST)); + + Wait.assertTrue(() -> server.locateQueue(queue) != null && server.getAddressInfo(address) != null); + + QueueControl queueControl = createManagementControl(address, queue, RoutingType.ANYCAST); + + { // a namespace + ConnectionFactory factory = CFUtil.createConnectionFactory("amqp", "tcp://localhost:61616"); + try (Connection connection = factory.createConnection("myUser", "myPassword")) { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(session.createQueue(address.toString())); + producer.setDeliveryMode(DeliveryMode.PERSISTENT); + TextMessage message = session.createTextMessage("theAMQPBody"); + message.setStringProperty("protocolUsed", "amqp"); + producer.send(message); + } + } + + { // a namespace + ConnectionFactory factory = CFUtil.createConnectionFactory("core", "tcp://localhost:61616"); + try (Connection connection = factory.createConnection("myUser", "myPassword")) { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(session.createQueue(address.toString())); + producer.setDeliveryMode(DeliveryMode.PERSISTENT); + TextMessage message = session.createTextMessage("theCoreBody"); + message.setStringProperty("protocolUsed", "core"); + producer.send(message); + } + } + + Wait.assertEquals(2L, () -> getMessageCount(queueControl), 2000, 100); + + // the message IDs are set on the server + CompositeData[] browse = queueControl.browse(null); + + Assert.assertEquals(2, browse.length); + + String body = (String) browse[0].get("text"); + + Assert.assertNotNull(body); + + Assert.assertEquals("theAMQPBody", body); + + body = (String) browse[1].get("text"); + + Assert.assertNotNull(body); + + Assert.assertEquals("theCoreBody", body); + + } + + + @Test + public void testSendMessageWithAMQPLarge() throws Exception { + SimpleString address = new SimpleString("address_testSendMessageWithAMQP"); + SimpleString queue = new SimpleString("queue_testSendMessageWithAMQP"); + + server.addAddressInfo(new AddressInfo(address).setAutoCreated(false).addRoutingType(RoutingType.ANYCAST)); + server.createQueue(new QueueConfiguration(queue).setAddress(address).setDurable(durable).setRoutingType(RoutingType.ANYCAST)); + + Wait.assertTrue(() -> server.locateQueue(queue) != null && server.getAddressInfo(address) != null); + + QueueControl queueControl = createManagementControl(address, queue, RoutingType.ANYCAST); + + StringBuffer bufferLarge = new StringBuffer(); + for (int i = 0; i < 100 * 1024; i++) { + bufferLarge.append("*-"); + } + + { // a namespace + ConnectionFactory factory = CFUtil.createConnectionFactory("amqp", "tcp://localhost:61616"); + try (Connection connection = factory.createConnection("myUser", "myPassword")) { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(session.createQueue(address.toString())); + producer.setDeliveryMode(DeliveryMode.PERSISTENT); + TextMessage message = session.createTextMessage(bufferLarge.toString()); + message.setStringProperty("protocolUsed", "amqp"); + producer.send(message); + } + } + + { // a namespace + ConnectionFactory factory = CFUtil.createConnectionFactory("core", "tcp://localhost:61616"); + try (Connection connection = factory.createConnection("myUser", "myPassword")) { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(session.createQueue(address.toString())); + producer.setDeliveryMode(DeliveryMode.PERSISTENT); + TextMessage message = session.createTextMessage(bufferLarge.toString()); + message.setStringProperty("protocolUsed", "core"); + producer.send(message); + } + } + + Wait.assertEquals(2L, () -> getMessageCount(queueControl), 2000, 100); + + // the message IDs are set on the server + CompositeData[] browse = queueControl.browse(null); + + Assert.assertEquals(2, browse.length); + + String body = (String) browse[0].get("text"); + + Assert.assertNotNull(body); + + body = (String) browse[1].get("text"); + + Assert.assertNotNull(body); + + } + @Test public void testSendMessageWithMessageId() throws Exception { SimpleString address = RandomUtil.randomSimpleString(); @@ -3763,7 +3888,7 @@ public class QueueControlTest extends ManagementTestBase { @Before public void setUp() throws Exception { super.setUp(); - Configuration conf = createDefaultInVMConfig().setJMXManagementEnabled(true); + Configuration conf = createDefaultConfig(true).setJMXManagementEnabled(true); server = addServer(ActiveMQServers.newActiveMQServer(conf, mbeanServer, true)); server.start();
