This is an automated email from the ASF dual-hosted git repository.
robbie pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new 50fae08b09 ARTEMIS-4657 support better correlation ID compat b/w JMS
clients
50fae08b09 is described below
commit 50fae08b09a76e200ef107d06cc867231f644ccd
Author: Justin Bertram <[email protected]>
AuthorDate: Tue Feb 27 11:25:51 2024 -0600
ARTEMIS-4657 support better correlation ID compat b/w JMS clients
---
.../activemq/artemis/reader/MessageUtil.java | 1 +
.../artemis/protocol/amqp/broker/AMQPMessage.java | 4 +-
.../amqp/converter/AMQPMessageIdHelper.java | 7 +-
.../protocol/amqp/converter/AmqpCoreConverter.java | 2 +-
.../protocol/amqp/converter/CoreAmqpConverter.java | 2 +
.../converter/message/AMQPMessageIdHelperTest.java | 62 ++++---
.../openwire/OpenWireMessageConverter.java | 19 ++-
.../openwire/OpenWireMessageConverterTest.java | 55 ++++++
.../artemis/core/server/ActiveMQServerLogger.java | 3 +
.../jms/multiprotocol/JMSCorrelationIDTest.java | 184 +++++++++++++++++++++
10 files changed, 306 insertions(+), 33 deletions(-)
diff --git
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/MessageUtil.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/MessageUtil.java
index 4e4d3e5928..780085f4fd 100644
---
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/MessageUtil.java
+++
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/MessageUtil.java
@@ -21,6 +21,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
+
import org.apache.activemq.artemis.api.core.ActiveMQException;
import
org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException;
import org.apache.activemq.artemis.api.core.Message;
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
index 018d0a27e6..e10ae2b9e2 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
@@ -20,6 +20,7 @@ import javax.management.openmbean.CompositeData;
import javax.management.openmbean.CompositeDataSupport;
import javax.management.openmbean.OpenDataException;
import javax.management.openmbean.SimpleType;
+import java.lang.invoke.MethodHandles;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
@@ -80,7 +81,6 @@ import org.apache.qpid.proton.message.Message;
import org.apache.qpid.proton.message.impl.MessageImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.lang.invoke.MethodHandles;
import static
org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.getCharsetForTextualContent;
@@ -1562,7 +1562,7 @@ public abstract class AMQPMessage extends RefCountMessage
implements org.apache.
return getAMQPUserID();
case MessageUtil.CORRELATIONID_HEADER_NAME_STRING:
if (properties != null && properties.getCorrelationId() != null) {
- return
AMQPMessageIdHelper.INSTANCE.toCorrelationIdString(properties.getCorrelationId());
+ return
AMQPMessageIdHelper.INSTANCE.toCorrelationIdStringOrBytes(properties.getCorrelationId());
}
return null;
default:
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPMessageIdHelper.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPMessageIdHelper.java
index baf08aeefb..06f97f1d3a 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPMessageIdHelper.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPMessageIdHelper.java
@@ -113,7 +113,7 @@ public class AMQPMessageIdHelper {
}
}
- public String toCorrelationIdString(Object idObject) {
+ public Object toCorrelationIdStringOrBytes(Object idObject) {
if (idObject instanceof String) {
final String stringId = (String) idObject;
@@ -130,6 +130,11 @@ public class AMQPMessageIdHelper {
// It has "ID:" prefix and doesn't have encoding prefix, use it
as-is.
return stringId;
}
+ } else if (idObject instanceof Binary) {
+ ByteBuffer dup = ((Binary) idObject).asByteBuffer();
+ byte[] bytes = new byte[dup.remaining()];
+ dup.get(bytes);
+ return bytes;
} else {
// Not a string, convert it
return convertToIdString(idObject);
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java
index 24f84474c0..416c88fb6a 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java
@@ -378,7 +378,7 @@ public class AmqpCoreConverter {
if (correlationID != null) {
try {
-
jms.getInnerMessage().setCorrelationID(AMQPMessageIdHelper.INSTANCE.toCorrelationIdString(correlationID));
+
jms.getInnerMessage().setCorrelationID(AMQPMessageIdHelper.INSTANCE.toCorrelationIdStringOrBytes(correlationID));
} catch (IllegalArgumentException e) {
jms.getInnerMessage().setCorrelationID(String.valueOf(correlationID));
}
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/CoreAmqpConverter.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/CoreAmqpConverter.java
index 2d1a5d4bd8..0fe67e2da2 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/CoreAmqpConverter.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/CoreAmqpConverter.java
@@ -159,6 +159,8 @@ public class CoreAmqpConverter {
} catch (ActiveMQAMQPIllegalStateException e) {
properties.setCorrelationId(correlationID);
}
+ } else if (correlationID instanceof byte[]) {
+ properties.setCorrelationId(new Binary(((byte[])correlationID)));
} else {
properties.setCorrelationId(correlationID);
}
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageIdHelperTest.java
b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageIdHelperTest.java
index bd7192bb93..166418077a 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageIdHelperTest.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageIdHelperTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.activemq.artemis.protocol.amqp.converter.message;
+import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
@@ -320,22 +321,22 @@ public class AMQPMessageIdHelperTest {
}
/**
- * Test that {@link AMQPMessageIdHelper#toCorrelationIdString(Object)}
+ * Test that {@link
AMQPMessageIdHelper#toCorrelationIdStringOrBytes(Object)}
* returns null if given null
*/
@Test
public void testToCorrelationIdStringWithNull() {
- assertNull("null string should have been returned",
messageIdHelper.toCorrelationIdString(null));
+ assertNull("null string should have been returned",
messageIdHelper.toCorrelationIdStringOrBytes(null));
}
/**
- * Test that {@link AMQPMessageIdHelper#toCorrelationIdString(Object)}
throws
+ * Test that {@link
AMQPMessageIdHelper#toCorrelationIdStringOrBytes(Object)} throws
* an IAE if given an unexpected object type.
*/
@Test
public void testToCorrelationIdStringThrowsIAEWithUnexpectedType() {
try {
- messageIdHelper.toCorrelationIdString(new Object());
+ messageIdHelper.toCorrelationIdStringOrBytes(new Object());
fail("expected exception not thrown");
} catch (IllegalArgumentException iae) {
// expected
@@ -343,13 +344,19 @@ public class AMQPMessageIdHelperTest {
}
private void doToCorrelationIDTestImpl(Object idObject, String expected) {
- String idString = messageIdHelper.toCorrelationIdString(idObject);
+ String idString = (String)
messageIdHelper.toCorrelationIdStringOrBytes(idObject);
assertNotNull("null string should not have been returned", idString);
assertEquals("expected id string was not returned", expected, idString);
}
+ private void doToCorrelationIDBytesTestImpl(Object idObject, byte[]
expected) {
+ byte[] idBytes = (byte[])
messageIdHelper.toCorrelationIdStringOrBytes(idObject);
+ assertNotNull("null byte[] should not have been returned", idBytes);
+ assertArrayEquals("expected id byte[] was not returned", expected,
idBytes);
+ }
+
/**
- * Test that {@link AMQPMessageIdHelper#toCorrelationIdString(Object)}
+ * Test that {@link
AMQPMessageIdHelper#toCorrelationIdStringOrBytes(Object)}
* returns the given basic string unchanged when it has the "ID:" prefix
(but
* no others).
*/
@@ -361,7 +368,7 @@ public class AMQPMessageIdHelperTest {
}
/**
- * Test that {@link AMQPMessageIdHelper#toCorrelationIdString(Object)}
+ * Test that {@link
AMQPMessageIdHelper#toCorrelationIdStringOrBytes(Object)}
* returns the given basic string unchanged when it lacks the "ID:" prefix
* (and any others)
*/
@@ -373,7 +380,7 @@ public class AMQPMessageIdHelperTest {
}
/**
- * Test that {@link AMQPMessageIdHelper#toCorrelationIdString(Object)}
+ * Test that {@link
AMQPMessageIdHelper#toCorrelationIdStringOrBytes(Object)}
* returns a string unchanged when it lacks the "ID:" prefix but happens to
* already begin with the {@link AMQPMessageIdHelper#AMQP_UUID_PREFIX}.
*/
@@ -385,7 +392,7 @@ public class AMQPMessageIdHelperTest {
}
/**
- * Test that {@link AMQPMessageIdHelper#toCorrelationIdString(Object)}
+ * Test that {@link
AMQPMessageIdHelper#toCorrelationIdStringOrBytes(Object)}
* returns a string unchanged when it lacks the "ID:" prefix but happens to
* already begin with the {@link AMQPMessageIdHelper#AMQP_ULONG_PREFIX}.
*/
@@ -397,7 +404,7 @@ public class AMQPMessageIdHelperTest {
}
/**
- * Test that {@link AMQPMessageIdHelper#toCorrelationIdString(Object)}
+ * Test that {@link
AMQPMessageIdHelper#toCorrelationIdStringOrBytes(Object)}
* returns a string unchanged when it lacks the "ID:" prefix but happens to
* already begin with the {@link AMQPMessageIdHelper#AMQP_BINARY_PREFIX}.
*/
@@ -409,7 +416,7 @@ public class AMQPMessageIdHelperTest {
}
/**
- * Test that {@link AMQPMessageIdHelper#toCorrelationIdString(Object)}
+ * Test that {@link
AMQPMessageIdHelper#toCorrelationIdStringOrBytes(Object)}
* returns a string unchanged when it lacks the "ID:" prefix but happens to
* already begin with the {@link AMQPMessageIdHelper#AMQP_STRING_PREFIX}.
*/
@@ -421,7 +428,7 @@ public class AMQPMessageIdHelperTest {
}
/**
- * Test that {@link AMQPMessageIdHelper#toCorrelationIdString(Object)}
+ * Test that {@link
AMQPMessageIdHelper#toCorrelationIdStringOrBytes(Object)}
* returns a string unchanged when it lacks the "ID:" prefix but happens to
* already begin with the {@link AMQPMessageIdHelper#AMQP_NO_PREFIX}.
*/
@@ -433,7 +440,7 @@ public class AMQPMessageIdHelperTest {
}
/**
- * Test that {@link AMQPMessageIdHelper#toCorrelationIdString(Object)}
+ * Test that {@link
AMQPMessageIdHelper#toCorrelationIdStringOrBytes(Object)}
* returns a string indicating an AMQP encoded UUID when given a UUID
object.
*/
@Test
@@ -445,7 +452,7 @@ public class AMQPMessageIdHelperTest {
}
/**
- * Test that {@link AMQPMessageIdHelper#toCorrelationIdString(Object)}
+ * Test that {@link
AMQPMessageIdHelper#toCorrelationIdStringOrBytes(Object)}
* returns a string indicating an AMQP encoded ulong when given a
* UnsignedLong object.
*/
@@ -458,22 +465,27 @@ public class AMQPMessageIdHelperTest {
}
/**
- * Test that {@link AMQPMessageIdHelper#toCorrelationIdString(Object)}
- * returns a string indicating an AMQP encoded binary when given a Binary
- * object.
+ * Test that {@link
AMQPMessageIdHelper#toCorrelationIdStringOrBytes(Object)}
+ * returns a byte[] when given a Binary object.
*/
@Test
- public void testToCorrelationIdStringWithBinary() {
+ public void testToCorrelationIdByteArrayWithBinary() {
byte[] bytes = new byte[] {(byte) 0x00, (byte) 0xAB, (byte) 0x09, (byte)
0xFF};
Binary binary = new Binary(bytes);
- String expected = AMQPMessageIdHelper.JMS_ID_PREFIX +
AMQPMessageIdHelper.AMQP_BINARY_PREFIX + "00AB09FF";
+ doToCorrelationIDBytesTestImpl(binary, bytes);
+ }
+
+ @Test
+ public void testToCorrelationIdByteArrayWithBinaryWithOffset() {
+ byte[] bytes = new byte[] {(byte) 0x00, (byte) 0xAB, (byte) 0x09, (byte)
0xFF};
+ Binary binary = new Binary(bytes, 2, 2);
- doToCorrelationIDTestImpl(binary, expected);
+ doToCorrelationIDBytesTestImpl(binary, new byte[] {(byte) 0x09, (byte)
0xFF});
}
/**
- * Test that {@link AMQPMessageIdHelper#toCorrelationIdString(Object)}
+ * Test that {@link
AMQPMessageIdHelper#toCorrelationIdStringOrBytes(Object)}
* returns a string indicating an escaped string, when given an input string
* that already has the "ID:" prefix, but follows it with an encoding
prefix,
* in this case the {@link AMQPMessageIdHelper#AMQP_STRING_PREFIX}.
@@ -487,7 +499,7 @@ public class AMQPMessageIdHelperTest {
}
/**
- * Test that {@link AMQPMessageIdHelper#toCorrelationIdString(Object)}
+ * Test that {@link
AMQPMessageIdHelper#toCorrelationIdStringOrBytes(Object)}
* returns a string indicating an escaped string, when given an input string
* that already has the "ID:" prefix, but follows it with an encoding
prefix,
* in this case the {@link AMQPMessageIdHelper#AMQP_UUID_PREFIX}.
@@ -501,7 +513,7 @@ public class AMQPMessageIdHelperTest {
}
/**
- * Test that {@link AMQPMessageIdHelper#toCorrelationIdString(Object)}
+ * Test that {@link
AMQPMessageIdHelper#toCorrelationIdStringOrBytes(Object)}
* returns a string indicating an escaped string, when given an input string
* that already has the "ID:" prefix, but follows it with an encoding
prefix,
* in this case the {@link AMQPMessageIdHelper#AMQP_ULONG_PREFIX}.
@@ -515,7 +527,7 @@ public class AMQPMessageIdHelperTest {
}
/**
- * Test that {@link AMQPMessageIdHelper#toCorrelationIdString(Object)}
+ * Test that {@link
AMQPMessageIdHelper#toCorrelationIdStringOrBytes(Object)}
* returns a string indicating an escaped string, when given an input string
* that already has the "ID:" prefix, but follows it with an encoding
prefix,
* in this case the {@link AMQPMessageIdHelper#AMQP_BINARY_PREFIX}.
@@ -529,7 +541,7 @@ public class AMQPMessageIdHelperTest {
}
/**
- * Test that {@link AMQPMessageIdHelper#toCorrelationIdString(Object)}
+ * Test that {@link
AMQPMessageIdHelper#toCorrelationIdStringOrBytes(Object)}
* returns a string indicating an escaped string, when given an input string
* that already has the "ID:" prefix, but follows it with an encoding
prefix,
* in this case the {@link AMQPMessageIdHelper#AMQP_NO_PREFIX}.
diff --git
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
index e6d69757d3..617f076af9 100644
---
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
+++
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
@@ -28,6 +28,9 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.lang.invoke.MethodHandles;
+import java.nio.ByteBuffer;
+import java.nio.charset.MalformedInputException;
+import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Map;
import java.util.Map.Entry;
@@ -159,7 +162,7 @@ public final class OpenWireMessageConverter {
coreMessage.putIntProperty(OpenWireConstants.AMQ_MSG_COMMAND_ID,
messageSend.getCommandId());
final String corrId = messageSend.getCorrelationId();
if (corrId != null) {
-
coreMessage.putStringProperty(OpenWireConstants.JMS_CORRELATION_ID_PROPERTY,
new SimpleString(corrId));
+ coreMessage.setCorrelationID(corrId);
}
final DataStructure ds = messageSend.getDataStructure();
if (ds != null) {
@@ -590,9 +593,15 @@ public final class OpenWireMessageConverter {
}
amqMsg.setCommandId(commandId);
- final SimpleString corrId = getObjectProperty(coreMessage,
SimpleString.class, OpenWireConstants.JMS_CORRELATION_ID_PROPERTY);
- if (corrId != null) {
- amqMsg.setCorrelationId(corrId.toString());
+ final Object correlationID = coreMessage.getCorrelationID();
+ if (correlationID instanceof String || correlationID instanceof
SimpleString) {
+ amqMsg.setCorrelationId(correlationID.toString());
+ } else if (correlationID instanceof byte[]) {
+ try {
+
amqMsg.setCorrelationId(StandardCharsets.UTF_8.newDecoder().decode(ByteBuffer.wrap((byte[])
correlationID)).toString());
+ } catch (MalformedInputException e) {
+
ActiveMQServerLogger.LOGGER.unableToDecodeCorrelationId(e.getMessage());
+ }
}
final byte[] dsBytes = getObjectProperty(coreMessage, byte[].class,
OpenWireConstants.AMQ_MSG_DATASTRUCTURE);
@@ -944,6 +953,8 @@ public final class OpenWireMessageConverter {
}
if
(!coreMessage.containsProperty(ManagementHelper.HDR_NOTIFICATION_TYPE) &&
(keyStr.startsWith("_AMQ") || keyStr.startsWith("__HDR_"))) {
continue;
+ } else if (s.equals(OpenWireConstants.JMS_CORRELATION_ID_PROPERTY)) {
+ continue;
}
final Object prop = coreMessage.getObjectProperty(s);
try {
diff --git
a/artemis-protocols/artemis-openwire-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverterTest.java
b/artemis-protocols/artemis-openwire-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverterTest.java
index 0580f5cad0..0624bc35f4 100644
---
a/artemis-protocols/artemis-openwire-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverterTest.java
+++
b/artemis-protocols/artemis-openwire-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverterTest.java
@@ -16,10 +16,13 @@
*/
package org.apache.activemq.artemis.core.protocol.openwire;
+import java.nio.charset.StandardCharsets;
+
import org.apache.activemq.ActiveMQMessageAuditNoSync;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.message.impl.CoreMessage;
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConsumer;
import org.apache.activemq.artemis.core.server.MessageReference;
@@ -182,6 +185,58 @@ public class OpenWireMessageConverterTest {
assertEquals(PRODUCER_ID,
messageDispatch.getMessage().getProducerId().toString());
}
+ @Test
+ public void testStringCorrelationId() throws Exception {
+ final String CORRELATION_ID = RandomUtil.randomString();
+
+ ICoreMessage coreMessage = new CoreMessage().initBuffer(8);
+ coreMessage.setCorrelationID(CORRELATION_ID);
+ MessageReference messageReference = new
MessageReferenceImpl(coreMessage, Mockito.mock(Queue.class));
+ AMQConsumer amqConsumer = Mockito.mock(AMQConsumer.class);
+
Mockito.when(amqConsumer.getOpenwireDestination()).thenReturn(destination);
+ MessageDispatch messageDispatch =
OpenWireMessageConverter.createMessageDispatch(messageReference, coreMessage,
openWireFormat, amqConsumer, nodeUUID, 0);
+ assertEquals(CORRELATION_ID,
messageDispatch.getMessage().getCorrelationId());
+ }
+
+ @Test
+ public void testBytesCorrelationId() throws Exception {
+ final byte[] CORRELATION_ID =
RandomUtil.randomString().getBytes(StandardCharsets.UTF_8);
+
+ ICoreMessage coreMessage = new CoreMessage().initBuffer(8);
+ coreMessage.setCorrelationID(CORRELATION_ID);
+ MessageReference messageReference = new
MessageReferenceImpl(coreMessage, Mockito.mock(Queue.class));
+ AMQConsumer amqConsumer = Mockito.mock(AMQConsumer.class);
+
Mockito.when(amqConsumer.getOpenwireDestination()).thenReturn(destination);
+ MessageDispatch messageDispatch =
OpenWireMessageConverter.createMessageDispatch(messageReference, coreMessage,
openWireFormat, amqConsumer, nodeUUID, 0);
+ assertEquals(new String(CORRELATION_ID, StandardCharsets.UTF_8),
messageDispatch.getMessage().getCorrelationId());
+ }
+
+ @Test
+ public void testInvalidUtf8BytesCorrelationId() throws Exception {
+ final byte[] CORRELATION_ID = new byte[]{1, (byte)0xFF, (byte)0xFF};
+
+ ICoreMessage coreMessage = new CoreMessage().initBuffer(8);
+ coreMessage.setCorrelationID(CORRELATION_ID);
+ MessageReference messageReference = new
MessageReferenceImpl(coreMessage, Mockito.mock(Queue.class));
+ AMQConsumer amqConsumer = Mockito.mock(AMQConsumer.class);
+
Mockito.when(amqConsumer.getOpenwireDestination()).thenReturn(destination);
+ MessageDispatch messageDispatch =
OpenWireMessageConverter.createMessageDispatch(messageReference, coreMessage,
openWireFormat, amqConsumer, nodeUUID, 0);
+ assertNull(messageDispatch.getMessage().getCorrelationId());
+ }
+
+ @Test
+ public void testLegacyCorrelationId() throws Exception {
+ final String CORRELATION_ID = RandomUtil.randomString();
+
+ ICoreMessage coreMessage = new CoreMessage().initBuffer(8);
+
coreMessage.putStringProperty(OpenWireConstants.JMS_CORRELATION_ID_PROPERTY,
new SimpleString(CORRELATION_ID));
+ MessageReference messageReference = new
MessageReferenceImpl(coreMessage, Mockito.mock(Queue.class));
+ AMQConsumer amqConsumer = Mockito.mock(AMQConsumer.class);
+
Mockito.when(amqConsumer.getOpenwireDestination()).thenReturn(destination);
+ MessageDispatch messageDispatch =
OpenWireMessageConverter.createMessageDispatch(messageReference, coreMessage,
openWireFormat, amqConsumer, nodeUUID, 0);
+ assertEquals(CORRELATION_ID,
messageDispatch.getMessage().getCorrelationId());
+ }
+
@Test
public void testMessageId() throws Exception {
final String MESSAGE_ID = "ID:123:456:789";
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
index d3320883e2..2fdad78462 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
@@ -1608,4 +1608,7 @@ public interface ActiveMQServerLogger {
@LogMessage(id = 224135, value = "nodeID {} is closing. Topology update
ignored", level = LogMessage.Level.INFO)
void nodeLeavingCluster(String nodeID);
+
+ @LogMessage(id = 224136, value = "Skipping correlation ID when converting
message to OpenWire since byte[] value is not valid UTF-8: {}", level =
LogMessage.Level.WARN)
+ void unableToDecodeCorrelationId(String message);
}
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/multiprotocol/JMSCorrelationIDTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/multiprotocol/JMSCorrelationIDTest.java
new file mode 100644
index 0000000000..31250693aa
--- /dev/null
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/multiprotocol/JMSCorrelationIDTest.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.jms.multiprotocol;
+
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+import org.apache.activemq.artemis.tests.util.RandomUtil;
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Test;
+
+/*
+ * JMS supports setting the correlation ID as a String or a byte[]. However,
OpenWire only supports correlation ID as
+ * a String. When it is set as a byte[] the OpenWire JMS client just converts
it to a UTF-8 encoded String, and
+ * therefore when it sends a JMS message with a correlation ID the broker
can't tell if the value was set as a String
+ * or a byte[]. Due to this ambiguity the broker is hard-coded to treat the
incoming OpenWire value as a String. This
+ * doesn't cause any problems if the consumer is also OpenWire, but if the
consumer is Core or AMQP (which both
+ * differentiate between String and binary values) then retrieving the
correlation ID as a byte[] will fail and nothing
+ * can be done about it aside from updating the OpenWire protocol.
+ *
+ * Therefore, all the tests which involve the OpenWire JMS client using
Message.setJMSCorrelationIDAsBytes() on a
+ * message sent to a different JMS implementation are ignored. The test are
ignored rather that being completely
+ * removed to make clear this was an explicit decision not to test & support
this use-case.
+ */
+public class JMSCorrelationIDTest extends MultiprotocolJMSClientTestSupport {
+
+ private void testCorrelationIDAsBytesSendReceive(Connection
producerConnection, Connection consumerConnection) throws Throwable {
+ Session session = producerConnection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue(getQueueName());
+
+ byte[] bytes = new byte[0xf + 1];
+ for (int i = 0; i <= 0xf; i++) {
+ bytes[i] = (byte) i;
+ }
+
+ MessageProducer producer = session.createProducer(queue);
+ Message message = session.createMessage();
+ message.setJMSCorrelationIDAsBytes(bytes);
+ producer.send(message);
+ producer.close();
+
+ Session sessionConsumer = consumerConnection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ Queue consumerQueue = sessionConsumer.createQueue(getQueueName());
+ final MessageConsumer consumer =
sessionConsumer.createConsumer(consumerQueue);
+
+ Message m = consumer.receive(5000);
+ Assert.assertNotNull("Could not receive message on consumer", m);
+
+ Assert.assertArrayEquals(bytes, m.getJMSCorrelationIDAsBytes());
+ }
+
+ @Test(timeout = 60000)
+ public void testCorrelationIDAsBytesSendReceiveFromAMQPToAMQP() throws
Throwable {
+ testCorrelationIDAsBytesSendReceive(createConnection(),
createConnection());
+ }
+
+ @Test(timeout = 60000)
+ public void testCorrelationIDAsBytesSendReceiveFromAMQPToCore() throws
Throwable {
+ testCorrelationIDAsBytesSendReceive(createConnection(),
createCoreConnection());
+ }
+
+ @Test(timeout = 60000)
+ public void testCorrelationIDAsBytesSendReceiveFromAMQPToOpenWire() throws
Throwable {
+ testCorrelationIDAsBytesSendReceive(createConnection(),
createOpenWireConnection());
+ }
+
+ @Test(timeout = 60000)
+ public void testCorrelationIDAsBytesSendReceiveFromCoreToCore() throws
Throwable {
+ testCorrelationIDAsBytesSendReceive(createCoreConnection(),
createCoreConnection());
+ }
+
+ @Test(timeout = 60000)
+ public void testCorrelationIDAsBytesSendReceiveFromCoreToAMQP() throws
Throwable {
+ testCorrelationIDAsBytesSendReceive(createCoreConnection(),
createConnection());
+ }
+
+ @Test(timeout = 60000)
+ public void testCorrelationIDAsBytesSendReceiveFromCoreToOpenWire() throws
Throwable {
+ testCorrelationIDAsBytesSendReceive(createCoreConnection(),
createOpenWireConnection());
+ }
+
+ @Test(timeout = 60000)
+ public void testCorrelationIDAsBytesSendReceiveFromOpenWireToOpenWire()
throws Throwable {
+ testCorrelationIDAsBytesSendReceive(createOpenWireConnection(),
createOpenWireConnection());
+ }
+
+ @Test(timeout = 60000)
+ @Ignore
+ public void testCorrelationIDAsBytesSendReceiveFromOpenWireToAMQP() throws
Throwable {
+ testCorrelationIDAsBytesSendReceive(createOpenWireConnection(),
createConnection());
+ }
+
+ @Test(timeout = 60000)
+ @Ignore
+ public void testCorrelationIDAsBytesSendReceiveFromOpenWireToCore() throws
Throwable {
+ testCorrelationIDAsBytesSendReceive(createOpenWireConnection(),
createCoreConnection());
+ }
+
+ private void testCorrelationIDAsStringSendReceive(Connection
producerConnection, Connection consumerConnection) throws Throwable {
+ final String correlationId = RandomUtil.randomString();
+
+ Session session = producerConnection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue(getQueueName());
+
+ MessageProducer producer = session.createProducer(queue);
+ Message message = session.createMessage();
+ message.setJMSCorrelationID(correlationId);
+ producer.send(message);
+ producer.close();
+
+ Session sessionConsumer = consumerConnection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ Queue consumerQueue = sessionConsumer.createQueue(getQueueName());
+ final MessageConsumer consumer =
sessionConsumer.createConsumer(consumerQueue);
+
+ Message m = consumer.receive(5000);
+ Assert.assertNotNull("Could not receive message on consumer", m);
+
+ Assert.assertEquals(correlationId, m.getJMSCorrelationID());
+ }
+
+ @Test(timeout = 60000)
+ public void testCorrelationIDAsStringSendReceiveFromAMQPToAMQP() throws
Throwable {
+ testCorrelationIDAsStringSendReceive(createConnection(),
createConnection());
+ }
+
+ @Test(timeout = 60000)
+ public void testCorrelationIDAsStringSendReceiveFromAMQPToCore() throws
Throwable {
+ testCorrelationIDAsStringSendReceive(createConnection(),
createCoreConnection());
+ }
+
+ @Test(timeout = 60000)
+ public void testCorrelationIDAsStringSendReceiveFromAMQPToOpenWire() throws
Throwable {
+ testCorrelationIDAsStringSendReceive(createConnection(),
createOpenWireConnection());
+ }
+
+ @Test(timeout = 60000)
+ public void testCorrelationIDAsStringSendReceiveFromCoreToCore() throws
Throwable {
+ testCorrelationIDAsStringSendReceive(createCoreConnection(),
createCoreConnection());
+ }
+
+ @Test(timeout = 60000)
+ public void testCorrelationIDAsStringSendReceiveFromCoreToAMQP() throws
Throwable {
+ testCorrelationIDAsStringSendReceive(createCoreConnection(),
createConnection());
+ }
+
+ @Test(timeout = 60000)
+ public void testCorrelationIDAsStringSendReceiveFromCoreToOpenWire() throws
Throwable {
+ testCorrelationIDAsStringSendReceive(createCoreConnection(),
createOpenWireConnection());
+ }
+
+ @Test(timeout = 60000)
+ public void testCorrelationIDAsStringSendReceiveFromOpenWireToOpenWire()
throws Throwable {
+ testCorrelationIDAsStringSendReceive(createOpenWireConnection(),
createOpenWireConnection());
+ }
+
+ @Test(timeout = 60000)
+ public void testCorrelationIDAsStringSendReceiveFromOpenWireToAMQP() throws
Throwable {
+ testCorrelationIDAsStringSendReceive(createOpenWireConnection(),
createConnection());
+ }
+
+ @Test(timeout = 60000)
+ public void testCorrelationIDAsStringSendReceiveFromOpenWireToCore() throws
Throwable {
+ testCorrelationIDAsStringSendReceive(createOpenWireConnection(),
createCoreConnection());
+ }
+}
\ No newline at end of file