This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new 54418df ARTEMIS-3529 Fixing test and tweaks on properties
54418df is described below
commit 54418dfcafb09786af49db6c3ac65db95e5303a6
Author: Clebert Suconic <[email protected]>
AuthorDate: Mon Oct 18 10:47:12 2021 -0400
ARTEMIS-3529 Fixing test and tweaks on properties
---
.../artemis/core/message/impl/CoreMessage.java | 4 +-
.../artemis/protocol/amqp/broker/AMQPMessage.java | 61 +++++++++++++---------
.../SimpleStreamingLargeMessageTest.java | 2 +-
3 files changed, 40 insertions(+), 27 deletions(-)
diff --git
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
index 981f55b..3d4438e 100644
---
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
+++
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
@@ -1344,7 +1344,7 @@ public class CoreMessage extends RefCountMessage
implements ICoreMessage {
Map<String, Object> rc = super.getFields(m, valueSizeLimit, delivery);
rc.put(CompositeDataConstants.TYPE, m.getType());
if (!m.isLargeMessage()) {
- ActiveMQBuffer bodyCopy = m.toCore().getReadOnlyBodyBuffer();
+ ActiveMQBuffer bodyCopy = m.getReadOnlyBodyBuffer();
byte[] bytes = new byte[bodyCopy.readableBytes() <= valueSizeLimit
? bodyCopy.readableBytes() : valueSizeLimit + 1];
bodyCopy.readBytes(bytes);
rc.put(CompositeDataConstants.BODY, JsonUtil.truncate(bytes,
valueSizeLimit));
@@ -1370,7 +1370,7 @@ public class CoreMessage extends RefCountMessage
implements ICoreMessage {
if (m.containsProperty(Message.HDR_LARGE_COMPRESSED)) {
rc.put(CompositeDataConstants.TEXT_BODY, "[compressed]");
} else {
- SimpleString text =
m.toCore().getReadOnlyBodyBuffer().readNullableSimpleString();
+ SimpleString text =
m.getReadOnlyBodyBuffer().readNullableSimpleString();
rc.put(CompositeDataConstants.TEXT_BODY, text != null ?
JsonUtil.truncate(text.toString(), valueSizeLimit) : "");
}
} else {
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
index 54aa270..91a3adb 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
@@ -842,34 +842,45 @@ public abstract class AMQPMessage extends RefCountMessage
implements org.apache.
value = ((Binary)value).getArray();
}
value = JsonUtil.truncate(value, valueSizeLimit);
- map.put(name.toString(), value);
+ map.put("applicationProperties." + name, value);
}
TypedProperties extraProperties = getExtraProperties();
if (extraProperties != null) {
extraProperties.forEach((s, o) -> {
- map.put(s.toString(), JsonUtil.truncate(o.toString(),
valueSizeLimit));
+ map.put("extraProperties." + s.toString(),
JsonUtil.truncate(o.toString(), valueSizeLimit));
});
}
- if (!isLargeMessage()) {
- addAnnotationsAsProperties(map, messageAnnotations);
- }
+
+ addAnnotationsAsProperties(map, messageAnnotations);
if (properties != null) {
if (properties.getContentType() != null) {
- map.put("properties.getContentType()",
properties.getContentType().toString());
+ map.put("properties.contentType",
properties.getContentType().toString());
}
if (properties.getContentEncoding() != null) {
- map.put("properties.getContentEncoding()",
properties.getContentEncoding().toString());
+ map.put("properties.contentEncoding",
properties.getContentEncoding().toString());
}
if (properties.getGroupId() != null) {
- map.put("properties.getGroupID()", properties.getGroupId());
+ map.put("properties.groupID", properties.getGroupId());
}
if (properties.getGroupSequence() != null) {
- map.put("properties.getGroupSequence()",
properties.getGroupSequence().intValue());
+ map.put("properties.groupSequence",
properties.getGroupSequence().intValue());
}
if (properties.getReplyToGroupId() != null) {
- map.put("properties.getReplyToGroupId()",
properties.getReplyToGroupId());
+ map.put("properties.replyToGroupId",
properties.getReplyToGroupId());
+ }
+ if (properties.getCreationTime() != null) {
+ map.put("properties.creationTime",
properties.getCreationTime().getTime());
+ }
+ if (properties.getAbsoluteExpiryTime() != null) {
+ map.put("properties.absoluteExpiryTime",
properties.getCreationTime().getTime());
+ }
+ if (properties.getTo() != null) {
+ map.put("properties.to", properties.getTo());
+ }
+ if (properties.getSubject() != null) {
+ map.put("properties.subject", properties.getSubject());
}
}
@@ -883,18 +894,17 @@ public abstract class AMQPMessage extends RefCountMessage
implements org.apache.
String key = entry.getKey().toString();
if ("x-opt-delivery-time".equals(key) && entry.getValue() != null)
{
long deliveryTime = ((Number) entry.getValue()).longValue();
- map.put("annotation x-opt-delivery-time", deliveryTime);
+ map.put("message-annotation.x-opt-delivery-time", deliveryTime);
} else if ("x-opt-delivery-delay".equals(key) && entry.getValue()
!= null) {
long delay = ((Number) entry.getValue()).longValue();
- if (delay > 0) {
- map.put("annotation x-opt-delivery-delay",
System.currentTimeMillis() + delay);
- }
+ map.put("message-annotation.x-opt-delivery-delay", delay);
} else if (AMQPMessageSupport.X_OPT_INGRESS_TIME.equals(key) &&
entry.getValue() != null) {
- map.put("annotation X_OPT_INGRESS_TIME", ((Number)
entry.getValue()).longValue());
+ map.put("message-annotation.X_OPT_INGRESS_TIME", ((Number)
entry.getValue()).longValue());
} else {
try {
- map.put("annotation " + key, entry.getValue());
+ map.put("message-annotation." + key, entry.getValue());
} catch (ActiveMQPropertyConversionException e) {
+ logger.warn(e.getMessage(), e);
}
}
}
@@ -1807,12 +1817,12 @@ public abstract class AMQPMessage extends
RefCountMessage implements org.apache.
@Override
public Map<String, Object> getFields(AMQPMessage m, int valueSizeLimit,
int delivery) throws OpenDataException {
- Map<String, Object> rc = super.getFields(m, valueSizeLimit, delivery);
-
if (!m.isLargeMessage()) {
m.ensureScanning();
}
+ Map<String, Object> rc = super.getFields(m, valueSizeLimit, delivery);
+
Properties properties = m.getCurrentProperties();
byte type = getType(m, properties);
@@ -1822,12 +1832,11 @@ public abstract class AMQPMessage extends
RefCountMessage implements org.apache.
if (m.isLargeMessage()) {
rc.put(CompositeDataConstants.TEXT_BODY, "... Large message ...");
} else {
- if (m.getBody() instanceof AmqpValue) {
- Object amqpValue = ((AmqpValue) m.getBody()).getValue();
-
- rc.put(CompositeDataConstants.TEXT_BODY,
JsonUtil.truncateString(amqpValue.toString(), valueSizeLimit));
+ Object amqpValue;
+ if (m.getBody() instanceof AmqpValue && (amqpValue = ((AmqpValue)
m.getBody()).getValue()) != null) {
+ rc.put(CompositeDataConstants.TEXT_BODY,
JsonUtil.truncateString(String.valueOf(amqpValue), valueSizeLimit));
} else {
- rc.put(CompositeDataConstants.TEXT_BODY,
JsonUtil.truncateString("" + m.getBody(), valueSizeLimit));
+ rc.put(CompositeDataConstants.TEXT_BODY,
JsonUtil.truncateString(String.valueOf(m.getBody()), valueSizeLimit));
}
}
@@ -1840,10 +1849,14 @@ public abstract class AMQPMessage extends
RefCountMessage implements org.apache.
}
byte type = BYTES_TYPE;
+ if (m.getBody() == null) {
+ return DEFAULT_TYPE;
+ }
+
final Symbol contentType = properties != null ?
properties.getContentType() : null;
final String contentTypeString = contentType != null ?
contentType.toString() : null;
- if (m.getBody() instanceof Data) {
+ if (m.getBody() instanceof Data && contentType != null) {
if
(contentType.equals(AMQPMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE)) {
type = OBJECT_TYPE;
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/largemessages/SimpleStreamingLargeMessageTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/largemessages/SimpleStreamingLargeMessageTest.java
index 3ab129b..2af5653 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/largemessages/SimpleStreamingLargeMessageTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/largemessages/SimpleStreamingLargeMessageTest.java
@@ -359,7 +359,7 @@ public class SimpleStreamingLargeMessageTest extends
AmqpClientTestSupport {
assertEquals(1, browseResult.length);
if ((boolean) browseResult[0].get("largeMessage")) {
- assertTrue(browseResult[0].containsKey("BodyPreview"));
+ assertTrue(browseResult[0].containsKey("text"));
}
connection = client.createConnection();