This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch 2.19.x
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git

commit ea6b98cfb7c6b38d7ed2f82d703649a1bcd2f75c
Author: Clebert Suconic <[email protected]>
AuthorDate: Mon Oct 18 10:47:12 2021 -0400

    ARTEMIS-3529 Fixing test and tweaks on properties
    
    (cherry picked from commit 54418dfcafb09786af49db6c3ac65db95e5303a6)
---
 .../artemis/core/message/impl/CoreMessage.java     |  4 +-
 .../artemis/protocol/amqp/broker/AMQPMessage.java  | 61 +++++++++++++---------
 .../SimpleStreamingLargeMessageTest.java           |  2 +-
 3 files changed, 40 insertions(+), 27 deletions(-)

diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
index 981f55b..3d4438e 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
@@ -1344,7 +1344,7 @@ public class CoreMessage extends RefCountMessage 
implements ICoreMessage {
          Map<String, Object> rc = super.getFields(m, valueSizeLimit, delivery);
          rc.put(CompositeDataConstants.TYPE, m.getType());
          if (!m.isLargeMessage()) {
-            ActiveMQBuffer bodyCopy = m.toCore().getReadOnlyBodyBuffer();
+            ActiveMQBuffer bodyCopy = m.getReadOnlyBodyBuffer();
             byte[] bytes = new byte[bodyCopy.readableBytes() <= valueSizeLimit 
? bodyCopy.readableBytes() : valueSizeLimit + 1];
             bodyCopy.readBytes(bytes);
             rc.put(CompositeDataConstants.BODY, JsonUtil.truncate(bytes, 
valueSizeLimit));
@@ -1370,7 +1370,7 @@ public class CoreMessage extends RefCountMessage 
implements ICoreMessage {
             if (m.containsProperty(Message.HDR_LARGE_COMPRESSED)) {
                rc.put(CompositeDataConstants.TEXT_BODY, "[compressed]");
             } else {
-               SimpleString text = 
m.toCore().getReadOnlyBodyBuffer().readNullableSimpleString();
+               SimpleString text = 
m.getReadOnlyBodyBuffer().readNullableSimpleString();
                rc.put(CompositeDataConstants.TEXT_BODY, text != null ? 
JsonUtil.truncate(text.toString(), valueSizeLimit) : "");
             }
          } else {
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
index 54aa270..91a3adb 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
@@ -842,34 +842,45 @@ public abstract class AMQPMessage extends RefCountMessage 
implements org.apache.
             value = ((Binary)value).getArray();
          }
          value = JsonUtil.truncate(value, valueSizeLimit);
-         map.put(name.toString(), value);
+         map.put("applicationProperties." + name, value);
       }
 
       TypedProperties extraProperties = getExtraProperties();
       if (extraProperties != null) {
          extraProperties.forEach((s, o) -> {
-            map.put(s.toString(), JsonUtil.truncate(o.toString(), 
valueSizeLimit));
+            map.put("extraProperties." + s.toString(), 
JsonUtil.truncate(o.toString(), valueSizeLimit));
          });
       }
-      if (!isLargeMessage()) {
-         addAnnotationsAsProperties(map, messageAnnotations);
-      }
+
+      addAnnotationsAsProperties(map, messageAnnotations);
 
       if (properties != null) {
          if (properties.getContentType() != null) {
-            map.put("properties.getContentType()", 
properties.getContentType().toString());
+            map.put("properties.contentType", 
properties.getContentType().toString());
          }
          if (properties.getContentEncoding() != null) {
-            map.put("properties.getContentEncoding()", 
properties.getContentEncoding().toString());
+            map.put("properties.contentEncoding", 
properties.getContentEncoding().toString());
          }
          if (properties.getGroupId() != null) {
-            map.put("properties.getGroupID()", properties.getGroupId());
+            map.put("properties.groupID", properties.getGroupId());
          }
          if (properties.getGroupSequence() != null) {
-            map.put("properties.getGroupSequence()", 
properties.getGroupSequence().intValue());
+            map.put("properties.groupSequence", 
properties.getGroupSequence().intValue());
          }
          if (properties.getReplyToGroupId() != null) {
-            map.put("properties.getReplyToGroupId()", 
properties.getReplyToGroupId());
+            map.put("properties.replyToGroupId", 
properties.getReplyToGroupId());
+         }
+         if (properties.getCreationTime() != null) {
+            map.put("properties.creationTime", 
properties.getCreationTime().getTime());
+         }
+         if (properties.getAbsoluteExpiryTime() != null) {
+            map.put("properties.absoluteExpiryTime", 
properties.getCreationTime().getTime());
+         }
+         if (properties.getTo() != null) {
+            map.put("properties.to", properties.getTo());
+         }
+         if (properties.getSubject() != null) {
+            map.put("properties.subject", properties.getSubject());
          }
       }
 
@@ -883,18 +894,17 @@ public abstract class AMQPMessage extends RefCountMessage 
implements org.apache.
             String key = entry.getKey().toString();
             if ("x-opt-delivery-time".equals(key) && entry.getValue() != null) 
{
                long deliveryTime = ((Number) entry.getValue()).longValue();
-               map.put("annotation x-opt-delivery-time", deliveryTime);
+               map.put("message-annotation.x-opt-delivery-time", deliveryTime);
             } else if ("x-opt-delivery-delay".equals(key) && entry.getValue() 
!= null) {
                long delay = ((Number) entry.getValue()).longValue();
-               if (delay > 0) {
-                  map.put("annotation x-opt-delivery-delay", 
System.currentTimeMillis() + delay);
-               }
+               map.put("message-annotation.x-opt-delivery-delay", delay);
             } else if (AMQPMessageSupport.X_OPT_INGRESS_TIME.equals(key) && 
entry.getValue() != null) {
-               map.put("annotation X_OPT_INGRESS_TIME", ((Number) 
entry.getValue()).longValue());
+               map.put("message-annotation.X_OPT_INGRESS_TIME", ((Number) 
entry.getValue()).longValue());
             } else {
                try {
-                  map.put("annotation " + key, entry.getValue());
+                  map.put("message-annotation." + key, entry.getValue());
                } catch (ActiveMQPropertyConversionException e) {
+                  logger.warn(e.getMessage(), e);
                }
             }
          }
@@ -1807,12 +1817,12 @@ public abstract class AMQPMessage extends 
RefCountMessage implements org.apache.
 
       @Override
       public Map<String, Object> getFields(AMQPMessage m, int valueSizeLimit, 
int delivery) throws OpenDataException {
-         Map<String, Object> rc = super.getFields(m, valueSizeLimit, delivery);
-
          if (!m.isLargeMessage()) {
             m.ensureScanning();
          }
 
+         Map<String, Object> rc = super.getFields(m, valueSizeLimit, delivery);
+
          Properties properties = m.getCurrentProperties();
 
          byte type = getType(m, properties);
@@ -1822,12 +1832,11 @@ public abstract class AMQPMessage extends 
RefCountMessage implements org.apache.
          if (m.isLargeMessage())  {
             rc.put(CompositeDataConstants.TEXT_BODY, "... Large message ...");
          } else {
-            if (m.getBody() instanceof AmqpValue) {
-               Object amqpValue = ((AmqpValue) m.getBody()).getValue();
-
-               rc.put(CompositeDataConstants.TEXT_BODY, 
JsonUtil.truncateString(amqpValue.toString(), valueSizeLimit));
+            Object amqpValue;
+            if (m.getBody() instanceof AmqpValue && (amqpValue = ((AmqpValue) 
m.getBody()).getValue()) != null) {
+               rc.put(CompositeDataConstants.TEXT_BODY, 
JsonUtil.truncateString(String.valueOf(amqpValue), valueSizeLimit));
             } else {
-               rc.put(CompositeDataConstants.TEXT_BODY, 
JsonUtil.truncateString("" + m.getBody(), valueSizeLimit));
+               rc.put(CompositeDataConstants.TEXT_BODY, 
JsonUtil.truncateString(String.valueOf(m.getBody()), valueSizeLimit));
             }
          }
 
@@ -1840,10 +1849,14 @@ public abstract class AMQPMessage extends 
RefCountMessage implements org.apache.
          }
          byte type = BYTES_TYPE;
 
+         if (m.getBody() == null) {
+            return DEFAULT_TYPE;
+         }
+
          final Symbol contentType = properties != null ? 
properties.getContentType() : null;
          final String contentTypeString = contentType != null ? 
contentType.toString() : null;
 
-         if (m.getBody() instanceof Data) {
+         if (m.getBody() instanceof Data && contentType != null) {
 
             if 
(contentType.equals(AMQPMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE)) {
                type = OBJECT_TYPE;
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/largemessages/SimpleStreamingLargeMessageTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/largemessages/SimpleStreamingLargeMessageTest.java
index 3ab129b..2af5653 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/largemessages/SimpleStreamingLargeMessageTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/largemessages/SimpleStreamingLargeMessageTest.java
@@ -359,7 +359,7 @@ public class SimpleStreamingLargeMessageTest extends 
AmqpClientTestSupport {
          assertEquals(1, browseResult.length);
 
          if ((boolean) browseResult[0].get("largeMessage")) {
-            assertTrue(browseResult[0].containsKey("BodyPreview"));
+            assertTrue(browseResult[0].containsKey("text"));
          }
 
          connection = client.createConnection();

Reply via email to