This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch 2.19.x
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git

commit ffe15b4123311130b62dff2f8256245bf6a03e43
Author: Clebert Suconic <[email protected]>
AuthorDate: Thu Oct 14 13:25:56 2021 -0400

    ARTEMIS-3461 Generalize MBean Support on Messages and avoid converstion to 
core on AMQP Messages on console browsing
    
    Done in collaboration with Erwin Dondorp through 
https://github.com/apache/activemq-artemis/pull/3794/
    
    (cherry picked from commit a833d95c1fb7fd5ab5a5220a64df80d06815da47)
---
 .../apache/activemq/artemis/api/core/JsonUtil.java |  13 +-
 .../apache/activemq/artemis/api/core/Message.java  |   6 +
 .../artemis/core/message/impl/CoreMessage.java     |  95 +++++++
 .../message}/openmbean/CompositeDataConstants.java |   2 +-
 .../message/openmbean/MessageOpenTypeFactory.java  | 224 +++++++++++++++
 .../artemis/protocol/amqp/broker/AMQPMessage.java  | 164 +++++++++++
 .../core/management/impl/QueueControlImpl.java     |  11 +-
 .../impl/openmbean/CompositeDataConstants.java     |  63 +----
 .../management/impl/openmbean/OpenTypeSupport.java | 305 ---------------------
 .../impl/openmbean/OpenTypeSupportTest.java        |   3 +-
 .../integration/management/QueueControlTest.java   | 131 ++++++++-
 11 files changed, 645 insertions(+), 372 deletions(-)

diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JsonUtil.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JsonUtil.java
index 58cac51..69dfa8a 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JsonUtil.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JsonUtil.java
@@ -325,14 +325,19 @@ public final class JsonUtil {
    private JsonUtil() {
    }
 
+   public static String truncateString(final String str, final int 
valueSizeLimit) {
+      if (str.length() > valueSizeLimit) {
+         return new StringBuilder(valueSizeLimit + 32).append(str.substring(0, 
valueSizeLimit)).append(", + ").append(str.length() - valueSizeLimit).append(" 
more").toString();
+      } else {
+         return str;
+      }
+   }
+
    public static Object truncate(final Object value, final int valueSizeLimit) 
{
       Object result = value;
       if (valueSizeLimit >= 0) {
          if (String.class.equals(value.getClass())) {
-            String str = (String) value;
-            if (str.length() > valueSizeLimit) {
-               result = new StringBuilder(valueSizeLimit + 
32).append(str.substring(0, valueSizeLimit)).append(", + ").append(str.length() 
- valueSizeLimit).append(" more").toString();
-            }
+            result = truncateString((String)value, valueSizeLimit);
          } else if (value.getClass().isArray()) {
             if (byte[].class.equals(value.getClass())) {
                if (((byte[]) value).length > valueSizeLimit) {
diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
index 5525fee..bef0fa4 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
@@ -16,6 +16,8 @@
  */
 package org.apache.activemq.artemis.api.core;
 
+import javax.management.openmbean.CompositeData;
+import javax.management.openmbean.OpenDataException;
 import java.io.InputStream;
 import java.util.HashMap;
 import java.util.Map;
@@ -769,6 +771,10 @@ public interface Message {
    /** This should make you convert your message into Core format. */
    ICoreMessage toCore();
 
+   default CompositeData toCompositeData(int fieldsLimit, int deliveryCount) 
throws OpenDataException {
+      return null;
+   }
+
    /** This should make you convert your message into Core format. */
    ICoreMessage toCore(CoreMessageObjectPools coreMessageObjectPools);
 
diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
index 1129f55..1d5acda 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
@@ -17,8 +17,15 @@
 
 package org.apache.activemq.artemis.core.message.impl;
 
+import javax.management.openmbean.ArrayType;
+import javax.management.openmbean.CompositeData;
+import javax.management.openmbean.CompositeDataSupport;
+import javax.management.openmbean.CompositeType;
+import javax.management.openmbean.OpenDataException;
+import javax.management.openmbean.SimpleType;
 import java.io.InputStream;
 import java.nio.ByteBuffer;
+import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 import java.util.zip.DataFormatException;
@@ -33,6 +40,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import 
org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException;
 import org.apache.activemq.artemis.api.core.ICoreMessage;
+import org.apache.activemq.artemis.api.core.JsonUtil;
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.RefCountMessage;
 import org.apache.activemq.artemis.api.core.RoutingType;
@@ -40,6 +48,8 @@ import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
 import 
org.apache.activemq.artemis.core.buffers.impl.ResetLimitWrappedActiveMQBuffer;
 import org.apache.activemq.artemis.core.message.LargeBodyReader;
+import 
org.apache.activemq.artemis.core.message.openmbean.CompositeDataConstants;
+import 
org.apache.activemq.artemis.core.message.openmbean.MessageOpenTypeFactory;
 import org.apache.activemq.artemis.core.persistence.CoreMessageObjectPools;
 import org.apache.activemq.artemis.core.persistence.Persister;
 import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
@@ -1216,6 +1226,7 @@ public class CoreMessage extends RefCountMessage 
implements ICoreMessage {
       return this;
    }
 
+
    @Override
    public CoreMessage toCore(CoreMessageObjectPools coreMessageObjectPools) {
       return this;
@@ -1290,4 +1301,88 @@ public class CoreMessage extends RefCountMessage 
implements ICoreMessage {
 
       return body;
    }
+
+
+   // 
*******************************************************************************************************************************
+   // Composite Data implementation
+
+   private static MessageOpenTypeFactory TEXT_FACTORY = new 
TextMessageOpenTypeFactory();
+   private static MessageOpenTypeFactory BYTES_FACTORY = new 
BytesMessageOpenTypeFactory();
+
+
+   @Override
+   public CompositeData toCompositeData(int fieldsLimit, int deliveryCount) 
throws OpenDataException {
+      CompositeType ct;
+      Map<String, Object> fields;
+      byte type = getType();
+      switch (type) {
+         case Message.TEXT_TYPE:
+            ct = TEXT_FACTORY.getCompositeType();
+            fields = TEXT_FACTORY.getFields(this, fieldsLimit, deliveryCount);
+            break;
+         default:
+            ct = BYTES_FACTORY.getCompositeType();
+            fields = BYTES_FACTORY.getFields(this, fieldsLimit, deliveryCount);
+            break;
+      }
+      return new CompositeDataSupport(ct, fields);
+
+   }
+
+   static class BytesMessageOpenTypeFactory extends 
MessageOpenTypeFactory<CoreMessage> {
+      protected ArrayType body;
+
+      @Override
+      protected void init() throws OpenDataException {
+         super.init();
+         body = new ArrayType(SimpleType.BYTE, true);
+         addItem(CompositeDataConstants.TYPE, 
CompositeDataConstants.TYPE_DESCRIPTION, SimpleType.BYTE);
+         addItem(CompositeDataConstants.BODY, 
CompositeDataConstants.BODY_DESCRIPTION, body);
+      }
+
+      @Override
+      public Map<String, Object> getFields(CoreMessage m, int valueSizeLimit, 
int delivery) throws OpenDataException {
+         Map<String, Object> rc = super.getFields(m, valueSizeLimit, delivery);
+         rc.put(CompositeDataConstants.TYPE, m.getType());
+         if (!m.isLargeMessage()) {
+            ActiveMQBuffer bodyCopy = m.toCore().getReadOnlyBodyBuffer();
+            byte[] bytes = new byte[bodyCopy.readableBytes() <= valueSizeLimit 
? bodyCopy.readableBytes() : valueSizeLimit + 1];
+            bodyCopy.readBytes(bytes);
+            rc.put(CompositeDataConstants.BODY, JsonUtil.truncate(bytes, 
valueSizeLimit));
+         } else {
+            rc.put(CompositeDataConstants.BODY, new byte[0]);
+         }
+         return rc;
+      }
+   }
+
+   static class TextMessageOpenTypeFactory extends 
MessageOpenTypeFactory<CoreMessage> {
+      @Override
+      protected void init() throws OpenDataException {
+         super.init();
+         addItem(CompositeDataConstants.TYPE, 
CompositeDataConstants.TYPE_DESCRIPTION, SimpleType.BYTE);
+         addItem(CompositeDataConstants.TEXT_BODY, 
CompositeDataConstants.TEXT_BODY, SimpleType.STRING);
+      }
+
+      @Override
+      public Map<String, Object> getFields(CoreMessage m, int valueSizeLimit, 
int delivery) throws OpenDataException {
+         Map<String, Object> rc = super.getFields(m, valueSizeLimit, delivery);
+         rc.put(CompositeDataConstants.TYPE, m.getType());
+         if (!m.isLargeMessage()) {
+            if (m.containsProperty(Message.HDR_LARGE_COMPRESSED)) {
+               rc.put(CompositeDataConstants.TEXT_BODY, "[compressed]");
+            } else {
+               SimpleString text = 
m.toCore().getReadOnlyBodyBuffer().readNullableSimpleString();
+               rc.put(CompositeDataConstants.TEXT_BODY, text != null ? 
JsonUtil.truncate(text.toString(), valueSizeLimit) : "");
+            }
+         } else {
+            rc.put(CompositeDataConstants.TEXT_BODY, "[large message]");
+         }
+         return rc;
+      }
+   }
+
+   // Composite Data implementation
+   // 
*******************************************************************************************************************************
+
 }
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/openmbean/CompositeDataConstants.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/openmbean/CompositeDataConstants.java
similarity index 97%
copy from 
artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/openmbean/CompositeDataConstants.java
copy to 
artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/openmbean/CompositeDataConstants.java
index a8cde03..84fbbfd 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/openmbean/CompositeDataConstants.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/openmbean/CompositeDataConstants.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.activemq.artemis.core.management.impl.openmbean;
+package org.apache.activemq.artemis.core.message.openmbean;
 
 public interface CompositeDataConstants {
 
diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/openmbean/MessageOpenTypeFactory.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/openmbean/MessageOpenTypeFactory.java
new file mode 100644
index 0000000..0028051
--- /dev/null
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/openmbean/MessageOpenTypeFactory.java
@@ -0,0 +1,224 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.message.openmbean;
+
+import javax.management.openmbean.CompositeDataSupport;
+import javax.management.openmbean.CompositeType;
+import javax.management.openmbean.OpenDataException;
+import javax.management.openmbean.OpenType;
+import javax.management.openmbean.SimpleType;
+import javax.management.openmbean.TabularDataSupport;
+import javax.management.openmbean.TabularType;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.JsonUtil;
+import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.jboss.logging.Logger;
+
+public class MessageOpenTypeFactory<M extends Message> {
+
+   private static final Logger logger = 
Logger.getLogger(MessageOpenTypeFactory.class);
+
+   public MessageOpenTypeFactory() {
+      try {
+         init();
+         compositeType = createCompositeType();
+      } catch (Exception e) {
+         logger.warn(e.getMessage(), e);
+      }
+   }
+
+
+   private CompositeType compositeType;
+   private final List<String> itemNamesList = new ArrayList<>();
+   private final List<String> itemDescriptionsList = new ArrayList<>();
+   private final List<OpenType> itemTypesList = new ArrayList<>();
+
+   protected TabularType stringPropertyTabularType;
+   protected TabularType booleanPropertyTabularType;
+   protected TabularType bytePropertyTabularType;
+   protected TabularType shortPropertyTabularType;
+   protected TabularType intPropertyTabularType;
+   protected TabularType longPropertyTabularType;
+   protected TabularType floatPropertyTabularType;
+   protected TabularType doublePropertyTabularType;
+   protected Object[][] typedPropertyFields;
+
+   protected String getTypeName() {
+      return Message.class.getName();
+   }
+
+   public CompositeType getCompositeType() throws OpenDataException {
+      return compositeType;
+   }
+
+   protected void init() throws OpenDataException {
+
+      addItem(CompositeDataConstants.ADDRESS, 
CompositeDataConstants.ADDRESS_DESCRIPTION, SimpleType.STRING);
+      addItem(CompositeDataConstants.MESSAGE_ID, 
CompositeDataConstants.MESSAGE_ID_DESCRIPTION, SimpleType.STRING);
+      addItem(CompositeDataConstants.USER_ID, 
CompositeDataConstants.USER_ID_DESCRIPTION, SimpleType.STRING);
+      addItem(CompositeDataConstants.DURABLE, 
CompositeDataConstants.DURABLE_DESCRIPTION, SimpleType.BOOLEAN);
+      addItem(CompositeDataConstants.EXPIRATION, 
CompositeDataConstants.EXPIRATION_DESCRIPTION, SimpleType.LONG);
+      addItem(CompositeDataConstants.PRIORITY, 
CompositeDataConstants.PRIORITY_DESCRIPTION, SimpleType.BYTE);
+      addItem(CompositeDataConstants.REDELIVERED, 
CompositeDataConstants.REDELIVERED_DESCRIPTION, SimpleType.BOOLEAN);
+      addItem(CompositeDataConstants.TIMESTAMP, 
CompositeDataConstants.TIMESTAMP_DESCRIPTION, SimpleType.LONG);
+      addItem(CompositeDataConstants.LARGE_MESSAGE, 
CompositeDataConstants.LARGE_MESSAGE_DESCRIPTION, SimpleType.BOOLEAN);
+      addItem(CompositeDataConstants.PERSISTENT_SIZE, 
CompositeDataConstants.PERSISTENT_SIZE_DESCRIPTION, SimpleType.LONG);
+
+      addItem(CompositeDataConstants.PROPERTIES, 
CompositeDataConstants.PROPERTIES_DESCRIPTION, SimpleType.STRING);
+
+      // now lets expose the type safe properties
+      stringPropertyTabularType = createTabularType(String.class, 
SimpleType.STRING);
+      booleanPropertyTabularType = createTabularType(Boolean.class, 
SimpleType.BOOLEAN);
+      bytePropertyTabularType = createTabularType(Byte.class, SimpleType.BYTE);
+      shortPropertyTabularType = createTabularType(Short.class, 
SimpleType.SHORT);
+      intPropertyTabularType = createTabularType(Integer.class, 
SimpleType.INTEGER);
+      longPropertyTabularType = createTabularType(Long.class, SimpleType.LONG);
+      floatPropertyTabularType = createTabularType(Float.class, 
SimpleType.FLOAT);
+      doublePropertyTabularType = createTabularType(Double.class, 
SimpleType.DOUBLE);
+
+      addItem(CompositeDataConstants.STRING_PROPERTIES, 
CompositeDataConstants.STRING_PROPERTIES_DESCRIPTION, 
stringPropertyTabularType);
+      addItem(CompositeDataConstants.BOOLEAN_PROPERTIES, 
CompositeDataConstants.BOOLEAN_PROPERTIES_DESCRIPTION, 
booleanPropertyTabularType);
+      addItem(CompositeDataConstants.BYTE_PROPERTIES, 
CompositeDataConstants.BYTE_PROPERTIES_DESCRIPTION, bytePropertyTabularType);
+      addItem(CompositeDataConstants.SHORT_PROPERTIES, 
CompositeDataConstants.SHORT_PROPERTIES_DESCRIPTION, shortPropertyTabularType);
+      addItem(CompositeDataConstants.INT_PROPERTIES, 
CompositeDataConstants.INT_PROPERTIES_DESCRIPTION, intPropertyTabularType);
+      addItem(CompositeDataConstants.LONG_PROPERTIES, 
CompositeDataConstants.LONG_PROPERTIES_DESCRIPTION, longPropertyTabularType);
+      addItem(CompositeDataConstants.FLOAT_PROPERTIES, 
CompositeDataConstants.FLOAT_PROPERTIES_DESCRIPTION, floatPropertyTabularType);
+      addItem(CompositeDataConstants.DOUBLE_PROPERTIES, 
CompositeDataConstants.DOUBLE_PROPERTIES_DESCRIPTION, 
doublePropertyTabularType);
+
+      typedPropertyFields = new Object[][] {
+         {CompositeDataConstants.STRING_PROPERTIES, stringPropertyTabularType, 
String.class},
+         {CompositeDataConstants.BOOLEAN_PROPERTIES, 
booleanPropertyTabularType, Boolean.class},
+         {CompositeDataConstants.BYTE_PROPERTIES, bytePropertyTabularType, 
Byte.class},
+         {CompositeDataConstants.SHORT_PROPERTIES, shortPropertyTabularType, 
Short.class},
+         {CompositeDataConstants.INT_PROPERTIES, intPropertyTabularType, 
Integer.class},
+         {CompositeDataConstants.LONG_PROPERTIES, longPropertyTabularType, 
Long.class},
+         {CompositeDataConstants.FLOAT_PROPERTIES, floatPropertyTabularType, 
Float.class},
+         {CompositeDataConstants.DOUBLE_PROPERTIES, doublePropertyTabularType, 
Double.class}
+      };
+
+   }
+
+   public Map<String, Object> getFields(M m, int valueSizeLimit, int 
deliveryCount) throws OpenDataException {
+      Map<String, Object> rc = new HashMap<>();
+      rc.put(CompositeDataConstants.MESSAGE_ID, "" + m.getMessageID());
+      if (m.getUserID() != null) {
+         rc.put(CompositeDataConstants.USER_ID, "ID:" + 
m.getUserID().toString());
+      } else {
+         rc.put(CompositeDataConstants.USER_ID, "");
+      }
+      rc.put(CompositeDataConstants.ADDRESS, m.getAddress() == null ? "" : 
m.getAddress().toString());
+      rc.put(CompositeDataConstants.DURABLE, m.isDurable());
+      rc.put(CompositeDataConstants.EXPIRATION, m.getExpiration());
+      rc.put(CompositeDataConstants.TIMESTAMP, m.getTimestamp());
+      rc.put(CompositeDataConstants.PRIORITY, m.getPriority());
+      rc.put(CompositeDataConstants.REDELIVERED, deliveryCount > 1);
+      rc.put(CompositeDataConstants.LARGE_MESSAGE, m.isLargeMessage());
+      try {
+         rc.put(CompositeDataConstants.PERSISTENT_SIZE, m.getPersistentSize());
+      } catch (final ActiveMQException e1) {
+         rc.put(CompositeDataConstants.PERSISTENT_SIZE, -1);
+      }
+
+      Map<String, Object> propertyMap = m.toPropertyMap(valueSizeLimit);
+
+      rc.put(CompositeDataConstants.PROPERTIES, JsonUtil.truncate("" + 
propertyMap, valueSizeLimit));
+
+      // only populate if there are some values
+      TabularDataSupport tabularData;
+      for (Object[] typedPropertyInfo : typedPropertyFields) {
+         tabularData = null;
+         try {
+            tabularData = createTabularData(propertyMap, (TabularType) 
typedPropertyInfo[1], (Class) typedPropertyInfo[2]);
+         } catch (Exception ignored) {
+         }
+         if (tabularData != null && !tabularData.isEmpty()) {
+            rc.put((String) typedPropertyInfo[0], tabularData);
+         } else {
+            rc.put((String) typedPropertyInfo[0], null);
+         }
+      }
+      return rc;
+   }
+
+   protected String toString(Object value) {
+      if (value == null) {
+         return null;
+      }
+      return value.toString();
+   }
+
+   protected CompositeType createCompositeType() throws OpenDataException {
+      String[] itemNames = itemNamesList.toArray(new 
String[itemNamesList.size()]);
+      String[] itemDescriptions = itemDescriptionsList.toArray(new 
String[itemDescriptionsList.size()]);
+      OpenType[] itemTypes = itemTypesList.toArray(new 
OpenType[itemTypesList.size()]);
+      return new CompositeType(getTypeName(), getDescription(), itemNames, 
itemDescriptions, itemTypes);
+   }
+
+   protected String getDescription() {
+      return getTypeName();
+   }
+
+   protected <T> TabularType createTabularType(Class<T> type, OpenType 
openType) throws OpenDataException {
+      String typeName = "java.util.Map<java.lang.String, " + type.getName() + 
">";
+      String[] keyValue = new String[]{"key", "value"};
+      OpenType[] openTypes = new OpenType[]{SimpleType.STRING, openType};
+      CompositeType rowType = new CompositeType(typeName, typeName, keyValue, 
keyValue, openTypes);
+      return new TabularType(typeName, typeName, rowType, new String[]{"key"});
+   }
+
+   protected TabularDataSupport createTabularData(Map<String, Object> entries,
+                                                  TabularType type,
+                                                  Class valueType) throws 
IOException, OpenDataException {
+      TabularDataSupport answer = new TabularDataSupport(type);
+
+      for (String key : entries.keySet()) {
+         Object value = entries.get(key);
+         if (valueType.isInstance(value)) {
+            CompositeDataSupport compositeData = createTabularRowValue(type, 
key, value);
+            answer.put(compositeData);
+         } else if (valueType == String.class && value instanceof 
SimpleString) {
+            CompositeDataSupport compositeData = createTabularRowValue(type, 
key, value.toString());
+            answer.put(compositeData);
+         }
+      }
+      return answer;
+   }
+
+   protected CompositeDataSupport createTabularRowValue(TabularType type,
+                                                        String key,
+                                                        Object value) throws 
OpenDataException {
+      Map<String, Object> fields = new HashMap<>();
+      fields.put("key", key);
+      fields.put("value", value);
+      return new CompositeDataSupport(type.getRowType(), fields);
+   }
+
+   protected void addItem(String name, String description, OpenType type) {
+      itemNamesList.add(name);
+      itemDescriptionsList.add(description);
+      itemTypesList.add(type);
+   }
+}
+
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
index 3926fa5..0609a76 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
@@ -16,11 +16,17 @@
  */
 package org.apache.activemq.artemis.protocol.amqp.broker;
 
+import javax.management.openmbean.CompositeData;
+import javax.management.openmbean.CompositeDataSupport;
+import javax.management.openmbean.OpenDataException;
+import javax.management.openmbean.SimpleType;
+import java.nio.charset.Charset;
 import java.nio.charset.StandardCharsets;
 import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
@@ -34,6 +40,8 @@ import org.apache.activemq.artemis.api.core.JsonUtil;
 import org.apache.activemq.artemis.api.core.RefCountMessage;
 import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
+import 
org.apache.activemq.artemis.core.message.openmbean.CompositeDataConstants;
+import 
org.apache.activemq.artemis.core.message.openmbean.MessageOpenTypeFactory;
 import org.apache.activemq.artemis.core.paging.PagingStore;
 import org.apache.activemq.artemis.core.persistence.CoreMessageObjectPools;
 import org.apache.activemq.artemis.core.persistence.Persister;
@@ -72,6 +80,8 @@ import org.apache.qpid.proton.message.Message;
 import org.apache.qpid.proton.message.impl.MessageImpl;
 import org.jboss.logging.Logger;
 
+import static 
org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.getCharsetForTextualContent;
+
 /**
  * See <a 
href="https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#section-message-format";>AMQP
 v1.0 message format</a>
  * <pre>
@@ -834,9 +844,64 @@ public abstract class AMQPMessage extends RefCountMessage 
implements org.apache.
          value = JsonUtil.truncate(value, valueSizeLimit);
          map.put(name.toString(), value);
       }
+
+      TypedProperties extraProperties = getExtraProperties();
+      if (extraProperties != null) {
+         extraProperties.forEach((s, o) -> {
+            map.put(s.toString(), JsonUtil.truncate(o.toString(), 
valueSizeLimit));
+         });
+      }
+      if (!isLargeMessage()) {
+         addAnnotationsAsProperties(map, messageAnnotations);
+      }
+
+      if (properties != null) {
+         if (properties.getContentType() != null) {
+            map.put("properties.getContentType()", 
properties.getContentType().toString());
+         }
+         if (properties.getContentEncoding() != null) {
+            map.put("properties.getContentEncoding()", 
properties.getContentEncoding().toString());
+         }
+         if (properties.getGroupId() != null) {
+            map.put("properties.getGroupID()", properties.getGroupId());
+         }
+         if (properties.getGroupSequence() != null) {
+            map.put("properties.getGroupSequence()", 
properties.getGroupSequence().intValue());
+         }
+         if (properties.getReplyToGroupId() != null) {
+            map.put("properties.getReplyToGroupId()", 
properties.getReplyToGroupId());
+         }
+      }
+
       return map;
    }
 
+
+   protected static void addAnnotationsAsProperties(Map map, 
MessageAnnotations annotations) {
+      if (annotations != null && annotations.getValue() != null) {
+         for (Map.Entry<?, ?> entry : annotations.getValue().entrySet()) {
+            String key = entry.getKey().toString();
+            if ("x-opt-delivery-time".equals(key) && entry.getValue() != null) 
{
+               long deliveryTime = ((Number) entry.getValue()).longValue();
+               map.put("annotation x-opt-delivery-time", deliveryTime);
+            } else if ("x-opt-delivery-delay".equals(key) && entry.getValue() 
!= null) {
+               long delay = ((Number) entry.getValue()).longValue();
+               if (delay > 0) {
+                  map.put("annotation x-opt-delivery-delay", 
System.currentTimeMillis() + delay);
+               }
+            } else if (AMQPMessageSupport.X_OPT_INGRESS_TIME.equals(key) && 
entry.getValue() != null) {
+               map.put("annotation X_OPT_INGRESS_TIME", ((Number) 
entry.getValue()).longValue());
+            } else {
+               try {
+                  map.put("annotation " + key, entry.getValue());
+               } catch (ActiveMQPropertyConversionException e) {
+               }
+            }
+         }
+      }
+   }
+
+
    @Override
    public ICoreMessage toCore(CoreMessageObjectPools coreMessageObjectPools) {
       try {
@@ -1726,4 +1791,103 @@ public abstract class AMQPMessage extends 
RefCountMessage implements org.apache.
    public void setOwner(Object object) {
       this.owner = object;
    }
+
+
+   // 
*******************************************************************************************************************************
+   // Composite Data implementation
+
+   private static MessageOpenTypeFactory AMQP_FACTORY = new 
AmqpMessageOpenTypeFactory();
+
+   static class AmqpMessageOpenTypeFactory extends 
MessageOpenTypeFactory<AMQPMessage> {
+      @Override
+      protected void init() throws OpenDataException {
+         super.init();
+         addItem(CompositeDataConstants.TEXT_BODY, 
CompositeDataConstants.TEXT_BODY, SimpleType.STRING);
+         addItem(CompositeDataConstants.TYPE, 
CompositeDataConstants.TYPE_DESCRIPTION, SimpleType.BYTE);
+      }
+
+      @Override
+      public Map<String, Object> getFields(AMQPMessage m, int valueSizeLimit, 
int delivery) throws OpenDataException {
+         Map<String, Object> rc = super.getFields(m, valueSizeLimit, delivery);
+
+         if (!m.isLargeMessage()) {
+            m.ensureScanning();
+         }
+
+         Properties properties = m.getCurrentProperties();
+
+         byte type = getType(m, properties);
+
+         rc.put(CompositeDataConstants.TYPE, type);
+
+         if (m.isLargeMessage())  {
+            rc.put(CompositeDataConstants.TEXT_BODY, "... Large message ...");
+         } else {
+            if (m.getBody() instanceof AmqpValue) {
+               Object amqpValue = ((AmqpValue) m.getBody()).getValue();
+
+               rc.put(CompositeDataConstants.TEXT_BODY, 
JsonUtil.truncateString(amqpValue.toString(), valueSizeLimit));
+            } else {
+               rc.put(CompositeDataConstants.TEXT_BODY, 
JsonUtil.truncateString("" + m.getBody(), valueSizeLimit));
+            }
+         }
+
+         return rc;
+      }
+
+      private byte getType(AMQPMessage m, Properties properties) {
+         if (m.isLargeMessage()) {
+            return DEFAULT_TYPE;
+         }
+         byte type = BYTES_TYPE;
+
+         final Symbol contentType = properties != null ? 
properties.getContentType() : null;
+         final String contentTypeString = contentType != null ? 
contentType.toString() : null;
+
+         if (m.getBody() instanceof Data) {
+
+            if 
(contentType.equals(AMQPMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE)) {
+               type = OBJECT_TYPE;
+            } else if 
(contentType.equals(AMQPMessageSupport.OCTET_STREAM_CONTENT_TYPE)) {
+               type = BYTES_TYPE;
+            } else {
+               Charset charset = 
getCharsetForTextualContent(contentTypeString);
+               if (StandardCharsets.UTF_8.equals(charset)) {
+                  type = TEXT_TYPE;
+               }
+            }
+         } else if (m.getBody() instanceof AmqpSequence) {
+            type = STREAM_TYPE;
+         } else if (m.getBody() instanceof AmqpValue) {
+            Object value = ((AmqpValue) m.getBody()).getValue();
+
+            if (value instanceof String) {
+               type = TEXT_TYPE;
+            } else if (value instanceof Binary) {
+
+               if 
(contentType.equals(AMQPMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE)) {
+                  type = OBJECT_TYPE;
+               } else {
+                  type = BYTES_TYPE;
+               }
+            } else if (value instanceof List) {
+               type = STREAM_TYPE;
+            } else if (value instanceof Map) {
+               type = MAP_TYPE;
+            }
+         }
+         return type;
+      }
+   }
+
+   @Override
+   public CompositeData toCompositeData(int fieldsLimit, int deliveryCount) 
throws OpenDataException {
+      Map<String, Object> fields;
+      fields = AMQP_FACTORY.getFields(this, fieldsLimit, deliveryCount);
+      return new CompositeDataSupport(AMQP_FACTORY.getCompositeType(), fields);
+   }
+
+   // Composite Data implementation
+   // 
*******************************************************************************************************************************
+
 }
\ No newline at end of file
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
index 5643a78..fbb4d78 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
@@ -39,7 +39,6 @@ import 
org.apache.activemq.artemis.api.core.management.QueueControl;
 import org.apache.activemq.artemis.api.core.management.ResourceNames;
 import org.apache.activemq.artemis.core.filter.Filter;
 import org.apache.activemq.artemis.core.filter.impl.FilterImpl;
-import 
org.apache.activemq.artemis.core.management.impl.openmbean.OpenTypeSupport;
 import org.apache.activemq.artemis.core.messagecounter.MessageCounter;
 import 
org.apache.activemq.artemis.core.messagecounter.impl.MessageCounterHelper;
 import org.apache.activemq.artemis.core.persistence.StorageManager;
@@ -61,9 +60,12 @@ import 
org.apache.activemq.artemis.selector.filter.Filterable;
 import org.apache.activemq.artemis.logs.AuditLogger;
 import org.apache.activemq.artemis.utils.JsonLoader;
 import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
+import org.jboss.logging.Logger;
 
 public class QueueControlImpl extends AbstractControl implements QueueControl {
 
+   private static final Logger logger = 
Logger.getLogger(QueueControlImpl.class);
+
    public static final int FLUSH_LIMIT = 500;
 
    // Constants -----------------------------------------------------
@@ -1583,7 +1585,7 @@ public class QueueControlImpl extends AbstractControl 
implements QueueControl {
                   MessageReference ref = iterator.next();
                   if (thefilter == null || thefilter.match(ref.getMessage())) {
                      if (index >= start) {
-                        c.add(OpenTypeSupport.convert(ref, 
attributeSizeLimit));
+                        
c.add(ref.getMessage().toCompositeData(attributeSizeLimit, 
ref.getDeliveryCount()));
                      }
                      //we only increase the index if we add a message, 
otherwise we could stop before we get to a filtered message
                      index++;
@@ -1600,7 +1602,8 @@ public class QueueControlImpl extends AbstractControl 
implements QueueControl {
             }
             return rc;
          }
-      } catch (ActiveMQException e) {
+      } catch (Exception e) {
+         logger.warn(e.getMessage(), e);
          if (AuditLogger.isResourceLoggingEnabled()) {
             AuditLogger.browseMessagesFailure(queue.getName().toString());
          }
@@ -1635,7 +1638,7 @@ public class QueueControlImpl extends AbstractControl 
implements QueueControl {
                while (iterator.hasNext() && currentPageSize++ < limit) {
                   MessageReference ref = iterator.next();
                   if (thefilter == null || thefilter.match(ref.getMessage())) {
-                     c.add(OpenTypeSupport.convert(ref, attributeSizeLimit));
+                     
c.add(ref.getMessage().toCompositeData(attributeSizeLimit, 
ref.getDeliveryCount()));
 
                   }
                }
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/openmbean/CompositeDataConstants.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/openmbean/CompositeDataConstants.java
index a8cde03..2970c0c 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/openmbean/CompositeDataConstants.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/openmbean/CompositeDataConstants.java
@@ -1,12 +1,12 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
+ * contributor license agreements. See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
+ * the License. You may obtain a copy of the License at
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -14,56 +14,13 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.activemq.artemis.core.management.impl.openmbean;
-
-public interface CompositeDataConstants {
-
-   String ADDRESS = "address";
-   String MESSAGE_ID = "messageID";
-   String USER_ID = "userID";
-   String TYPE = "type";
-   String DURABLE = "durable";
-   String EXPIRATION = "expiration";
-   String PRIORITY = "priority";
-   String REDELIVERED = "redelivered";
-   String TIMESTAMP = "timestamp";
-   String BODY = "BodyPreview";
-   String TEXT_BODY = "text";
-   String LARGE_MESSAGE = "largeMessage";
-   String PERSISTENT_SIZE = "persistentSize";
-   String PROPERTIES = "PropertiesText";
 
-   String ADDRESS_DESCRIPTION = "The Address";
-   String MESSAGE_ID_DESCRIPTION = "The message ID";
-   String USER_ID_DESCRIPTION = "The user ID";
-   String TYPE_DESCRIPTION = "The message type";
-   String DURABLE_DESCRIPTION = "Is the message durable";
-   String EXPIRATION_DESCRIPTION = "The message expiration";
-   String PRIORITY_DESCRIPTION = "The message priority";
-   String REDELIVERED_DESCRIPTION = "Has the message been redelivered";
-   String TIMESTAMP_DESCRIPTION = "The message timestamp";
-   String BODY_DESCRIPTION = "The message body";
-   String LARGE_MESSAGE_DESCRIPTION = "Is the message treated as a large 
message";
-   String PERSISTENT_SIZE_DESCRIPTION = "The message size when persisted on 
disk";
-   String PROPERTIES_DESCRIPTION = "The properties text";
-
-   // User properties
-   String STRING_PROPERTIES = "StringProperties";
-   String BOOLEAN_PROPERTIES = "BooleanProperties";
-   String BYTE_PROPERTIES = "ByteProperties";
-   String SHORT_PROPERTIES = "ShortProperties";
-   String INT_PROPERTIES = "IntProperties";
-   String LONG_PROPERTIES = "LongProperties";
-   String FLOAT_PROPERTIES = "FloatProperties";
-   String DOUBLE_PROPERTIES = "DoubleProperties";
+package org.apache.activemq.artemis.core.management.impl.openmbean;
 
-   String STRING_PROPERTIES_DESCRIPTION = "User String Properties";
-   String BOOLEAN_PROPERTIES_DESCRIPTION = "User Boolean Properties";
-   String BYTE_PROPERTIES_DESCRIPTION = "User Byte Properties";
-   String SHORT_PROPERTIES_DESCRIPTION = "User Short Properties";
-   String INT_PROPERTIES_DESCRIPTION = "User Int Properties";
-   String LONG_PROPERTIES_DESCRIPTION = "User Long Properties";
-   String FLOAT_PROPERTIES_DESCRIPTION = "User Float Properties";
-   String DOUBLE_PROPERTIES_DESCRIPTION = "User Double Properties";
+/**
+ * @deprecated use 
org.apache.activemq.artemis.core.message.openmbean.CompositeDataConstants
+ */
+@Deprecated
+public interface CompositeDataConstants extends 
org.apache.activemq.artemis.core.message.openmbean.CompositeDataConstants {
 
 }
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/openmbean/OpenTypeSupport.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/openmbean/OpenTypeSupport.java
deleted file mode 100644
index 21e9437..0000000
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/openmbean/OpenTypeSupport.java
+++ /dev/null
@@ -1,305 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.core.management.impl.openmbean;
-
-import javax.management.openmbean.ArrayType;
-import javax.management.openmbean.CompositeData;
-import javax.management.openmbean.CompositeDataSupport;
-import javax.management.openmbean.CompositeType;
-import javax.management.openmbean.OpenDataException;
-import javax.management.openmbean.OpenType;
-import javax.management.openmbean.SimpleType;
-import javax.management.openmbean.TabularDataSupport;
-import javax.management.openmbean.TabularType;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
-import org.apache.activemq.artemis.api.core.ActiveMQException;
-import org.apache.activemq.artemis.api.core.ICoreMessage;
-import org.apache.activemq.artemis.api.core.JsonUtil;
-import org.apache.activemq.artemis.api.core.Message;
-import org.apache.activemq.artemis.api.core.SimpleString;
-import org.apache.activemq.artemis.core.server.MessageReference;
-
-public final class OpenTypeSupport {
-
-   private static MessageOpenTypeFactory TEXT_FACTORY = new 
TextMessageOpenTypeFactory();
-   private static MessageOpenTypeFactory BYTES_FACTORY = new 
BytesMessageOpenTypeFactory();
-
-   private OpenTypeSupport() {
-   }
-
-   public static CompositeData convert(MessageReference ref, int 
valueSizeLimit) throws OpenDataException {
-      CompositeType ct;
-
-      ICoreMessage message = ref.getMessage().toCore();
-
-      Map<String, Object> fields;
-      byte type = message.getType();
-
-      switch(type) {
-         case Message.TEXT_TYPE:
-            ct = TEXT_FACTORY.getCompositeType();
-            fields = TEXT_FACTORY.getFields(ref, valueSizeLimit);
-            break;
-         default:
-            ct = BYTES_FACTORY.getCompositeType();
-            fields = BYTES_FACTORY.getFields(ref, valueSizeLimit);
-            break;
-      }
-      return new CompositeDataSupport(ct, fields);
-   }
-
-   static class MessageOpenTypeFactory {
-
-      private CompositeType compositeType;
-      private final List<String> itemNamesList = new ArrayList<>();
-      private final List<String> itemDescriptionsList = new ArrayList<>();
-      private final List<OpenType> itemTypesList = new ArrayList<>();
-
-      protected TabularType stringPropertyTabularType;
-      protected TabularType booleanPropertyTabularType;
-      protected TabularType bytePropertyTabularType;
-      protected TabularType shortPropertyTabularType;
-      protected TabularType intPropertyTabularType;
-      protected TabularType longPropertyTabularType;
-      protected TabularType floatPropertyTabularType;
-      protected TabularType doublePropertyTabularType;
-      protected Object[][] typedPropertyFields;
-
-      protected String getTypeName() {
-         return Message.class.getName();
-      }
-
-      public CompositeType getCompositeType() throws OpenDataException {
-         if (compositeType == null) {
-            init();
-            compositeType = createCompositeType();
-         }
-         return compositeType;
-      }
-
-      protected void init() throws OpenDataException {
-
-         addItem(CompositeDataConstants.ADDRESS, 
CompositeDataConstants.ADDRESS_DESCRIPTION, SimpleType.STRING);
-         addItem(CompositeDataConstants.MESSAGE_ID, 
CompositeDataConstants.MESSAGE_ID_DESCRIPTION, SimpleType.STRING);
-         addItem(CompositeDataConstants.USER_ID, 
CompositeDataConstants.USER_ID_DESCRIPTION, SimpleType.STRING);
-         addItem(CompositeDataConstants.TYPE, 
CompositeDataConstants.TYPE_DESCRIPTION, SimpleType.BYTE);
-         addItem(CompositeDataConstants.DURABLE, 
CompositeDataConstants.DURABLE_DESCRIPTION, SimpleType.BOOLEAN);
-         addItem(CompositeDataConstants.EXPIRATION, 
CompositeDataConstants.EXPIRATION_DESCRIPTION, SimpleType.LONG);
-         addItem(CompositeDataConstants.PRIORITY, 
CompositeDataConstants.PRIORITY_DESCRIPTION, SimpleType.BYTE);
-         addItem(CompositeDataConstants.REDELIVERED, 
CompositeDataConstants.REDELIVERED_DESCRIPTION, SimpleType.BOOLEAN);
-         addItem(CompositeDataConstants.TIMESTAMP, 
CompositeDataConstants.TIMESTAMP_DESCRIPTION, SimpleType.LONG);
-         addItem(CompositeDataConstants.LARGE_MESSAGE, 
CompositeDataConstants.LARGE_MESSAGE_DESCRIPTION, SimpleType.BOOLEAN);
-         addItem(CompositeDataConstants.PERSISTENT_SIZE, 
CompositeDataConstants.PERSISTENT_SIZE_DESCRIPTION, SimpleType.LONG);
-
-         addItem(CompositeDataConstants.PROPERTIES, 
CompositeDataConstants.PROPERTIES_DESCRIPTION, SimpleType.STRING);
-
-         // now lets expose the type safe properties
-         stringPropertyTabularType = createTabularType(String.class, 
SimpleType.STRING);
-         booleanPropertyTabularType = createTabularType(Boolean.class, 
SimpleType.BOOLEAN);
-         bytePropertyTabularType = createTabularType(Byte.class, 
SimpleType.BYTE);
-         shortPropertyTabularType = createTabularType(Short.class, 
SimpleType.SHORT);
-         intPropertyTabularType = createTabularType(Integer.class, 
SimpleType.INTEGER);
-         longPropertyTabularType = createTabularType(Long.class, 
SimpleType.LONG);
-         floatPropertyTabularType = createTabularType(Float.class, 
SimpleType.FLOAT);
-         doublePropertyTabularType = createTabularType(Double.class, 
SimpleType.DOUBLE);
-
-         addItem(CompositeDataConstants.STRING_PROPERTIES, 
CompositeDataConstants.STRING_PROPERTIES_DESCRIPTION, 
stringPropertyTabularType);
-         addItem(CompositeDataConstants.BOOLEAN_PROPERTIES, 
CompositeDataConstants.BOOLEAN_PROPERTIES_DESCRIPTION, 
booleanPropertyTabularType);
-         addItem(CompositeDataConstants.BYTE_PROPERTIES, 
CompositeDataConstants.BYTE_PROPERTIES_DESCRIPTION, bytePropertyTabularType);
-         addItem(CompositeDataConstants.SHORT_PROPERTIES, 
CompositeDataConstants.SHORT_PROPERTIES_DESCRIPTION, shortPropertyTabularType);
-         addItem(CompositeDataConstants.INT_PROPERTIES, 
CompositeDataConstants.INT_PROPERTIES_DESCRIPTION, intPropertyTabularType);
-         addItem(CompositeDataConstants.LONG_PROPERTIES, 
CompositeDataConstants.LONG_PROPERTIES_DESCRIPTION, longPropertyTabularType);
-         addItem(CompositeDataConstants.FLOAT_PROPERTIES, 
CompositeDataConstants.FLOAT_PROPERTIES_DESCRIPTION, floatPropertyTabularType);
-         addItem(CompositeDataConstants.DOUBLE_PROPERTIES, 
CompositeDataConstants.DOUBLE_PROPERTIES_DESCRIPTION, 
doublePropertyTabularType);
-
-         typedPropertyFields = new Object[][] {
-            {CompositeDataConstants.STRING_PROPERTIES, 
stringPropertyTabularType, String.class},
-            {CompositeDataConstants.BOOLEAN_PROPERTIES, 
booleanPropertyTabularType, Boolean.class},
-            {CompositeDataConstants.BYTE_PROPERTIES, bytePropertyTabularType, 
Byte.class},
-            {CompositeDataConstants.SHORT_PROPERTIES, 
shortPropertyTabularType, Short.class},
-            {CompositeDataConstants.INT_PROPERTIES, intPropertyTabularType, 
Integer.class},
-            {CompositeDataConstants.LONG_PROPERTIES, longPropertyTabularType, 
Long.class},
-            {CompositeDataConstants.FLOAT_PROPERTIES, 
floatPropertyTabularType, Float.class},
-            {CompositeDataConstants.DOUBLE_PROPERTIES, 
doublePropertyTabularType, Double.class}
-         };
-
-      }
-
-      public Map<String, Object> getFields(MessageReference ref, int 
valueSizeLimit) throws OpenDataException {
-         Map<String, Object> rc = new HashMap<>();
-         ICoreMessage m = ref.getMessage().toCore();
-         rc.put(CompositeDataConstants.MESSAGE_ID, "" + m.getMessageID());
-         if (m.getUserID() != null) {
-            rc.put(CompositeDataConstants.USER_ID, "ID:" + 
m.getUserID().toString());
-         } else {
-            rc.put(CompositeDataConstants.USER_ID, "");
-         }
-         rc.put(CompositeDataConstants.ADDRESS, m.getAddress() == null ? "" : 
m.getAddress().toString());
-         rc.put(CompositeDataConstants.TYPE, m.getType());
-         rc.put(CompositeDataConstants.DURABLE, m.isDurable());
-         rc.put(CompositeDataConstants.EXPIRATION, m.getExpiration());
-         rc.put(CompositeDataConstants.TIMESTAMP, m.getTimestamp());
-         rc.put(CompositeDataConstants.PRIORITY, m.getPriority());
-         rc.put(CompositeDataConstants.REDELIVERED, ref.getDeliveryCount() > 
1);
-         rc.put(CompositeDataConstants.LARGE_MESSAGE, m.isLargeMessage());
-         try {
-            rc.put(CompositeDataConstants.PERSISTENT_SIZE, 
m.getPersistentSize());
-         } catch (final ActiveMQException e1) {
-            rc.put(CompositeDataConstants.PERSISTENT_SIZE, -1);
-         }
-
-         Map<String, Object> propertyMap = m.toPropertyMap(valueSizeLimit);
-
-         rc.put(CompositeDataConstants.PROPERTIES, JsonUtil.truncate("" + 
propertyMap, valueSizeLimit));
-
-         // only populate if there are some values
-         TabularDataSupport tabularData;
-         for (Object[] typedPropertyInfo : typedPropertyFields) {
-            tabularData = null;
-            try {
-               tabularData = createTabularData(propertyMap, (TabularType) 
typedPropertyInfo[1], (Class) typedPropertyInfo[2]);
-            } catch (Exception ignored) {
-            }
-            if (tabularData != null && !tabularData.isEmpty()) {
-               rc.put((String) typedPropertyInfo[0], tabularData);
-            } else {
-               rc.put((String) typedPropertyInfo[0], null);
-            }
-         }
-         return rc;
-      }
-
-      protected String toString(Object value) {
-         if (value == null) {
-            return null;
-         }
-         return value.toString();
-      }
-
-      protected CompositeType createCompositeType() throws OpenDataException {
-         String[] itemNames = itemNamesList.toArray(new 
String[itemNamesList.size()]);
-         String[] itemDescriptions = itemDescriptionsList.toArray(new 
String[itemDescriptionsList.size()]);
-         OpenType[] itemTypes = itemTypesList.toArray(new 
OpenType[itemTypesList.size()]);
-         return new CompositeType(getTypeName(), getDescription(), itemNames, 
itemDescriptions, itemTypes);
-      }
-
-      protected String getDescription() {
-         return getTypeName();
-      }
-
-      protected <T> TabularType createTabularType(Class<T> type, OpenType 
openType) throws OpenDataException {
-         String typeName = "java.util.Map<java.lang.String, " + type.getName() 
+ ">";
-         String[] keyValue = new String[]{"key", "value"};
-         OpenType[] openTypes = new OpenType[]{SimpleType.STRING, openType};
-         CompositeType rowType = new CompositeType(typeName, typeName, 
keyValue, keyValue, openTypes);
-         return new TabularType(typeName, typeName, rowType, new 
String[]{"key"});
-      }
-
-      protected TabularDataSupport createTabularData(Map<String, Object> 
entries,
-                                                     TabularType type,
-                                                     Class valueType) throws 
IOException, OpenDataException {
-         TabularDataSupport answer = new TabularDataSupport(type);
-
-         for (String key : entries.keySet()) {
-            Object value = entries.get(key);
-            if (valueType.isInstance(value)) {
-               CompositeDataSupport compositeData = 
createTabularRowValue(type, key, value);
-               answer.put(compositeData);
-            } else if (valueType == String.class && value instanceof 
SimpleString) {
-               CompositeDataSupport compositeData = 
createTabularRowValue(type, key, value.toString());
-               answer.put(compositeData);
-            }
-         }
-         return answer;
-      }
-
-      protected CompositeDataSupport createTabularRowValue(TabularType type,
-                                                           String key,
-                                                           Object value) 
throws OpenDataException {
-         Map<String, Object> fields = new HashMap<>();
-         fields.put("key", key);
-         fields.put("value", value);
-         return new CompositeDataSupport(type.getRowType(), fields);
-      }
-
-      protected void addItem(String name, String description, OpenType type) {
-         itemNamesList.add(name);
-         itemDescriptionsList.add(description);
-         itemTypesList.add(type);
-      }
-   }
-
-
-   static class BytesMessageOpenTypeFactory extends MessageOpenTypeFactory {
-      protected ArrayType body;
-
-      @Override
-      protected void init() throws OpenDataException {
-         super.init();
-         body = new ArrayType(SimpleType.BYTE, true);
-         addItem(CompositeDataConstants.BODY, 
CompositeDataConstants.BODY_DESCRIPTION, body);
-      }
-
-      @Override
-      public Map<String, Object> getFields(MessageReference ref, int 
valueSizeLimit) throws OpenDataException {
-         Map<String, Object> rc = super.getFields(ref, valueSizeLimit);
-         ICoreMessage m = ref.getMessage().toCore();
-         if (!m.isLargeMessage()) {
-            ActiveMQBuffer bodyCopy = m.getReadOnlyBodyBuffer();
-            byte[] bytes = new byte[bodyCopy.readableBytes() <= valueSizeLimit 
? bodyCopy.readableBytes() : valueSizeLimit + 1];
-            bodyCopy.readBytes(bytes);
-            rc.put(CompositeDataConstants.BODY, JsonUtil.truncate(bytes, 
valueSizeLimit));
-         } else {
-            rc.put(CompositeDataConstants.BODY, new byte[0]);
-         }
-         return rc;
-      }
-   }
-
-   static class TextMessageOpenTypeFactory extends MessageOpenTypeFactory {
-      protected SimpleType text;
-
-      @Override
-      protected void init() throws OpenDataException {
-         super.init();
-         addItem(CompositeDataConstants.TEXT_BODY, 
CompositeDataConstants.TEXT_BODY, SimpleType.STRING);
-      }
-
-      @Override
-      public Map<String, Object> getFields(MessageReference ref, int 
valueSizeLimit) throws OpenDataException {
-         Map<String, Object> rc = super.getFields(ref, valueSizeLimit);
-         ICoreMessage m = ref.getMessage().toCore();
-         if (!m.isLargeMessage()) {
-            if (m.containsProperty(Message.HDR_LARGE_COMPRESSED)) {
-               rc.put(CompositeDataConstants.TEXT_BODY, "[compressed]");
-            } else {
-               SimpleString text = 
m.getReadOnlyBodyBuffer().readNullableSimpleString();
-               rc.put(CompositeDataConstants.TEXT_BODY, text != null ? 
JsonUtil.truncate(text.toString(), valueSizeLimit) : "");
-            }
-         } else {
-            rc.put(CompositeDataConstants.TEXT_BODY, "[large message]");
-         }
-         return rc;
-      }
-   }
-}
diff --git 
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/management/impl/openmbean/OpenTypeSupportTest.java
 
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/management/impl/openmbean/OpenTypeSupportTest.java
index 1839fe5..877237d 100644
--- 
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/management/impl/openmbean/OpenTypeSupportTest.java
+++ 
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/management/impl/openmbean/OpenTypeSupportTest.java
@@ -20,7 +20,6 @@ package 
org.apache.activemq.artemis.core.management.impl.openmbean;
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.message.impl.CoreMessage;
-import org.apache.activemq.artemis.core.server.impl.MessageReferenceImpl;
 import org.apache.activemq.artemis.reader.TextMessageUtil;
 import org.junit.Assert;
 import org.junit.Test;
@@ -39,7 +38,7 @@ public class OpenTypeSupportTest {
 
       TextMessageUtil.writeBodyText(coreMessage.getBodyBuffer(), 
SimpleString.toSimpleString(bodyText));
 
-      CompositeData cd = OpenTypeSupport.convert(new 
MessageReferenceImpl(coreMessage, null), 256);
+      CompositeData cd = coreMessage.toCompositeData(256, 1);
 
       Assert.assertEquals(bodyText, cd.get("text"));
    }
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
index d45bcff..66dbcff 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
@@ -16,6 +16,12 @@
  */
 package org.apache.activemq.artemis.tests.integration.management;
 
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
 import javax.json.JsonArray;
 import javax.json.JsonObject;
 import javax.management.Notification;
@@ -70,10 +76,12 @@ import 
org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.ActiveMQServers;
 import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
 import org.apache.activemq.artemis.core.server.impl.QueueImpl;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import org.apache.activemq.artemis.core.transaction.impl.XidImpl;
 import 
org.apache.activemq.artemis.tests.integration.jms.server.management.JMSUtil;
+import org.apache.activemq.artemis.tests.util.CFUtil;
 import org.apache.activemq.artemis.tests.util.Wait;
 import org.apache.activemq.artemis.utils.Base64;
 import org.apache.activemq.artemis.utils.RandomUtil;
@@ -86,8 +94,8 @@ import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
-import static 
org.apache.activemq.artemis.core.management.impl.openmbean.CompositeDataConstants.BODY;
-import static 
org.apache.activemq.artemis.core.management.impl.openmbean.CompositeDataConstants.STRING_PROPERTIES;
+import static 
org.apache.activemq.artemis.core.message.openmbean.CompositeDataConstants.BODY;
+import static 
org.apache.activemq.artemis.core.message.openmbean.CompositeDataConstants.STRING_PROPERTIES;
 
 @RunWith(value = Parameterized.class)
 public class QueueControlTest extends ManagementTestBase {
@@ -3447,6 +3455,123 @@ public class QueueControlTest extends 
ManagementTestBase {
       Assert.assertEquals(new String(body), "theBody");
    }
 
+
+   @Test
+   public void testSendMessageWithAMQP() throws Exception {
+      SimpleString address = new 
SimpleString("address_testSendMessageWithAMQP");
+      SimpleString queue = new SimpleString("queue_testSendMessageWithAMQP");
+
+      server.addAddressInfo(new 
AddressInfo(address).setAutoCreated(false).addRoutingType(RoutingType.ANYCAST));
+      server.createQueue(new 
QueueConfiguration(queue).setAddress(address).setDurable(durable).setRoutingType(RoutingType.ANYCAST));
+
+      Wait.assertTrue(() -> server.locateQueue(queue) != null && 
server.getAddressInfo(address) != null);
+
+      QueueControl queueControl = createManagementControl(address, queue, 
RoutingType.ANYCAST);
+
+      { // a namespace
+         ConnectionFactory factory = CFUtil.createConnectionFactory("amqp", 
"tcp://localhost:61616");
+         try (Connection connection = factory.createConnection("myUser", 
"myPassword")) {
+            Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+            MessageProducer producer =  
session.createProducer(session.createQueue(address.toString()));
+            producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+            TextMessage message = session.createTextMessage("theAMQPBody");
+            message.setStringProperty("protocolUsed", "amqp");
+            producer.send(message);
+         }
+      }
+
+      { // a namespace
+         ConnectionFactory factory = CFUtil.createConnectionFactory("core", 
"tcp://localhost:61616");
+         try (Connection connection = factory.createConnection("myUser", 
"myPassword")) {
+            Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+            MessageProducer producer =  
session.createProducer(session.createQueue(address.toString()));
+            producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+            TextMessage message = session.createTextMessage("theCoreBody");
+            message.setStringProperty("protocolUsed", "core");
+            producer.send(message);
+         }
+      }
+
+      Wait.assertEquals(2L, () -> getMessageCount(queueControl), 2000, 100);
+
+      // the message IDs are set on the server
+      CompositeData[] browse = queueControl.browse(null);
+
+      Assert.assertEquals(2, browse.length);
+
+      String body = (String) browse[0].get("text");
+
+      Assert.assertNotNull(body);
+
+      Assert.assertEquals("theAMQPBody", body);
+
+      body = (String) browse[1].get("text");
+
+      Assert.assertNotNull(body);
+
+      Assert.assertEquals("theCoreBody", body);
+
+   }
+
+
+   @Test
+   public void testSendMessageWithAMQPLarge() throws Exception {
+      SimpleString address = new 
SimpleString("address_testSendMessageWithAMQP");
+      SimpleString queue = new SimpleString("queue_testSendMessageWithAMQP");
+
+      server.addAddressInfo(new 
AddressInfo(address).setAutoCreated(false).addRoutingType(RoutingType.ANYCAST));
+      server.createQueue(new 
QueueConfiguration(queue).setAddress(address).setDurable(durable).setRoutingType(RoutingType.ANYCAST));
+
+      Wait.assertTrue(() -> server.locateQueue(queue) != null && 
server.getAddressInfo(address) != null);
+
+      QueueControl queueControl = createManagementControl(address, queue, 
RoutingType.ANYCAST);
+
+      StringBuffer bufferLarge = new StringBuffer();
+      for (int i = 0; i < 100 * 1024; i++) {
+         bufferLarge.append("*-");
+      }
+
+      { // a namespace
+         ConnectionFactory factory = CFUtil.createConnectionFactory("amqp", 
"tcp://localhost:61616");
+         try (Connection connection = factory.createConnection("myUser", 
"myPassword")) {
+            Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+            MessageProducer producer =  
session.createProducer(session.createQueue(address.toString()));
+            producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+            TextMessage message = 
session.createTextMessage(bufferLarge.toString());
+            message.setStringProperty("protocolUsed", "amqp");
+            producer.send(message);
+         }
+      }
+
+      { // a namespace
+         ConnectionFactory factory = CFUtil.createConnectionFactory("core", 
"tcp://localhost:61616");
+         try (Connection connection = factory.createConnection("myUser", 
"myPassword")) {
+            Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+            MessageProducer producer =  
session.createProducer(session.createQueue(address.toString()));
+            producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+            TextMessage message = 
session.createTextMessage(bufferLarge.toString());
+            message.setStringProperty("protocolUsed", "core");
+            producer.send(message);
+         }
+      }
+
+      Wait.assertEquals(2L, () -> getMessageCount(queueControl), 2000, 100);
+
+      // the message IDs are set on the server
+      CompositeData[] browse = queueControl.browse(null);
+
+      Assert.assertEquals(2, browse.length);
+
+      String body = (String) browse[0].get("text");
+
+      Assert.assertNotNull(body);
+
+      body = (String) browse[1].get("text");
+
+      Assert.assertNotNull(body);
+
+   }
+
    @Test
    public void testSendMessageWithMessageId() throws Exception {
       SimpleString address = RandomUtil.randomSimpleString();
@@ -3763,7 +3888,7 @@ public class QueueControlTest extends ManagementTestBase {
    @Before
    public void setUp() throws Exception {
       super.setUp();
-      Configuration conf = 
createDefaultInVMConfig().setJMXManagementEnabled(true);
+      Configuration conf = 
createDefaultConfig(true).setJMXManagementEnabled(true);
       server = addServer(ActiveMQServers.newActiveMQServer(conf, mbeanServer, 
true));
 
       server.start();

Reply via email to