This is an automated email from the ASF dual-hosted git repository.

robbie pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new 50fae08b09 ARTEMIS-4657 support better correlation ID compat b/w JMS 
clients
50fae08b09 is described below

commit 50fae08b09a76e200ef107d06cc867231f644ccd
Author: Justin Bertram <[email protected]>
AuthorDate: Tue Feb 27 11:25:51 2024 -0600

    ARTEMIS-4657 support better correlation ID compat b/w JMS clients
---
 .../activemq/artemis/reader/MessageUtil.java       |   1 +
 .../artemis/protocol/amqp/broker/AMQPMessage.java  |   4 +-
 .../amqp/converter/AMQPMessageIdHelper.java        |   7 +-
 .../protocol/amqp/converter/AmqpCoreConverter.java |   2 +-
 .../protocol/amqp/converter/CoreAmqpConverter.java |   2 +
 .../converter/message/AMQPMessageIdHelperTest.java |  62 ++++---
 .../openwire/OpenWireMessageConverter.java         |  19 ++-
 .../openwire/OpenWireMessageConverterTest.java     |  55 ++++++
 .../artemis/core/server/ActiveMQServerLogger.java  |   3 +
 .../jms/multiprotocol/JMSCorrelationIDTest.java    | 184 +++++++++++++++++++++
 10 files changed, 306 insertions(+), 33 deletions(-)

diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/MessageUtil.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/MessageUtil.java
index 4e4d3e5928..780085f4fd 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/MessageUtil.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/MessageUtil.java
@@ -21,6 +21,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Objects;
 import java.util.Set;
+
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import 
org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException;
 import org.apache.activemq.artemis.api.core.Message;
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
index 018d0a27e6..e10ae2b9e2 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
@@ -20,6 +20,7 @@ import javax.management.openmbean.CompositeData;
 import javax.management.openmbean.CompositeDataSupport;
 import javax.management.openmbean.OpenDataException;
 import javax.management.openmbean.SimpleType;
+import java.lang.invoke.MethodHandles;
 import java.nio.charset.Charset;
 import java.nio.charset.StandardCharsets;
 import java.util.Collections;
@@ -80,7 +81,6 @@ import org.apache.qpid.proton.message.Message;
 import org.apache.qpid.proton.message.impl.MessageImpl;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import java.lang.invoke.MethodHandles;
 
 import static 
org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.getCharsetForTextualContent;
 
@@ -1562,7 +1562,7 @@ public abstract class AMQPMessage extends RefCountMessage 
implements org.apache.
             return getAMQPUserID();
          case MessageUtil.CORRELATIONID_HEADER_NAME_STRING:
             if (properties != null && properties.getCorrelationId() != null) {
-               return 
AMQPMessageIdHelper.INSTANCE.toCorrelationIdString(properties.getCorrelationId());
+               return 
AMQPMessageIdHelper.INSTANCE.toCorrelationIdStringOrBytes(properties.getCorrelationId());
             }
             return null;
          default:
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPMessageIdHelper.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPMessageIdHelper.java
index baf08aeefb..06f97f1d3a 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPMessageIdHelper.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPMessageIdHelper.java
@@ -113,7 +113,7 @@ public class AMQPMessageIdHelper {
       }
    }
 
-   public String toCorrelationIdString(Object idObject) {
+   public Object toCorrelationIdStringOrBytes(Object idObject) {
       if (idObject instanceof String) {
          final String stringId = (String) idObject;
 
@@ -130,6 +130,11 @@ public class AMQPMessageIdHelper {
             // It has "ID:" prefix and doesn't have encoding prefix, use it 
as-is.
             return stringId;
          }
+      } else if (idObject instanceof Binary) {
+         ByteBuffer dup = ((Binary) idObject).asByteBuffer();
+         byte[] bytes = new byte[dup.remaining()];
+         dup.get(bytes);
+         return bytes;
       } else {
          // Not a string, convert it
          return convertToIdString(idObject);
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java
index 24f84474c0..416c88fb6a 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java
@@ -378,7 +378,7 @@ public class AmqpCoreConverter {
 
          if (correlationID != null) {
             try {
-               
jms.getInnerMessage().setCorrelationID(AMQPMessageIdHelper.INSTANCE.toCorrelationIdString(correlationID));
+               
jms.getInnerMessage().setCorrelationID(AMQPMessageIdHelper.INSTANCE.toCorrelationIdStringOrBytes(correlationID));
             } catch (IllegalArgumentException e) {
                
jms.getInnerMessage().setCorrelationID(String.valueOf(correlationID));
             }
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/CoreAmqpConverter.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/CoreAmqpConverter.java
index 2d1a5d4bd8..0fe67e2da2 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/CoreAmqpConverter.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/CoreAmqpConverter.java
@@ -159,6 +159,8 @@ public class CoreAmqpConverter {
             } catch (ActiveMQAMQPIllegalStateException e) {
                properties.setCorrelationId(correlationID);
             }
+         } else if (correlationID instanceof byte[]) {
+            properties.setCorrelationId(new Binary(((byte[])correlationID)));
          } else {
             properties.setCorrelationId(correlationID);
          }
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageIdHelperTest.java
 
b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageIdHelperTest.java
index bd7192bb93..166418077a 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageIdHelperTest.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageIdHelperTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.activemq.artemis.protocol.amqp.converter.message;
 
+import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
@@ -320,22 +321,22 @@ public class AMQPMessageIdHelperTest {
    }
 
    /**
-    * Test that {@link AMQPMessageIdHelper#toCorrelationIdString(Object)}
+    * Test that {@link 
AMQPMessageIdHelper#toCorrelationIdStringOrBytes(Object)}
     * returns null if given null
     */
    @Test
    public void testToCorrelationIdStringWithNull() {
-      assertNull("null string should have been returned", 
messageIdHelper.toCorrelationIdString(null));
+      assertNull("null string should have been returned", 
messageIdHelper.toCorrelationIdStringOrBytes(null));
    }
 
    /**
-    * Test that {@link AMQPMessageIdHelper#toCorrelationIdString(Object)} 
throws
+    * Test that {@link 
AMQPMessageIdHelper#toCorrelationIdStringOrBytes(Object)} throws
     * an IAE if given an unexpected object type.
     */
    @Test
    public void testToCorrelationIdStringThrowsIAEWithUnexpectedType() {
       try {
-         messageIdHelper.toCorrelationIdString(new Object());
+         messageIdHelper.toCorrelationIdStringOrBytes(new Object());
          fail("expected exception not thrown");
       } catch (IllegalArgumentException iae) {
          // expected
@@ -343,13 +344,19 @@ public class AMQPMessageIdHelperTest {
    }
 
    private void doToCorrelationIDTestImpl(Object idObject, String expected) {
-      String idString = messageIdHelper.toCorrelationIdString(idObject);
+      String idString = (String) 
messageIdHelper.toCorrelationIdStringOrBytes(idObject);
       assertNotNull("null string should not have been returned", idString);
       assertEquals("expected id string was not returned", expected, idString);
    }
 
+   private void doToCorrelationIDBytesTestImpl(Object idObject, byte[] 
expected) {
+      byte[] idBytes = (byte[]) 
messageIdHelper.toCorrelationIdStringOrBytes(idObject);
+      assertNotNull("null byte[] should not have been returned", idBytes);
+      assertArrayEquals("expected id byte[] was not returned", expected, 
idBytes);
+   }
+
    /**
-    * Test that {@link AMQPMessageIdHelper#toCorrelationIdString(Object)}
+    * Test that {@link 
AMQPMessageIdHelper#toCorrelationIdStringOrBytes(Object)}
     * returns the given basic string unchanged when it has the "ID:" prefix 
(but
     * no others).
     */
@@ -361,7 +368,7 @@ public class AMQPMessageIdHelperTest {
    }
 
    /**
-    * Test that {@link AMQPMessageIdHelper#toCorrelationIdString(Object)}
+    * Test that {@link 
AMQPMessageIdHelper#toCorrelationIdStringOrBytes(Object)}
     * returns the given basic string unchanged when it lacks the "ID:" prefix
     * (and any others)
     */
@@ -373,7 +380,7 @@ public class AMQPMessageIdHelperTest {
    }
 
    /**
-    * Test that {@link AMQPMessageIdHelper#toCorrelationIdString(Object)}
+    * Test that {@link 
AMQPMessageIdHelper#toCorrelationIdStringOrBytes(Object)}
     * returns a string unchanged when it lacks the "ID:" prefix but happens to
     * already begin with the {@link AMQPMessageIdHelper#AMQP_UUID_PREFIX}.
     */
@@ -385,7 +392,7 @@ public class AMQPMessageIdHelperTest {
    }
 
    /**
-    * Test that {@link AMQPMessageIdHelper#toCorrelationIdString(Object)}
+    * Test that {@link 
AMQPMessageIdHelper#toCorrelationIdStringOrBytes(Object)}
     * returns a string unchanged when it lacks the "ID:" prefix but happens to
     * already begin with the {@link AMQPMessageIdHelper#AMQP_ULONG_PREFIX}.
     */
@@ -397,7 +404,7 @@ public class AMQPMessageIdHelperTest {
    }
 
    /**
-    * Test that {@link AMQPMessageIdHelper#toCorrelationIdString(Object)}
+    * Test that {@link 
AMQPMessageIdHelper#toCorrelationIdStringOrBytes(Object)}
     * returns a string unchanged when it lacks the "ID:" prefix but happens to
     * already begin with the {@link AMQPMessageIdHelper#AMQP_BINARY_PREFIX}.
     */
@@ -409,7 +416,7 @@ public class AMQPMessageIdHelperTest {
    }
 
    /**
-    * Test that {@link AMQPMessageIdHelper#toCorrelationIdString(Object)}
+    * Test that {@link 
AMQPMessageIdHelper#toCorrelationIdStringOrBytes(Object)}
     * returns a string unchanged when it lacks the "ID:" prefix but happens to
     * already begin with the {@link AMQPMessageIdHelper#AMQP_STRING_PREFIX}.
     */
@@ -421,7 +428,7 @@ public class AMQPMessageIdHelperTest {
    }
 
    /**
-    * Test that {@link AMQPMessageIdHelper#toCorrelationIdString(Object)}
+    * Test that {@link 
AMQPMessageIdHelper#toCorrelationIdStringOrBytes(Object)}
     * returns a string unchanged when it lacks the "ID:" prefix but happens to
     * already begin with the {@link AMQPMessageIdHelper#AMQP_NO_PREFIX}.
     */
@@ -433,7 +440,7 @@ public class AMQPMessageIdHelperTest {
    }
 
    /**
-    * Test that {@link AMQPMessageIdHelper#toCorrelationIdString(Object)}
+    * Test that {@link 
AMQPMessageIdHelper#toCorrelationIdStringOrBytes(Object)}
     * returns a string indicating an AMQP encoded UUID when given a UUID 
object.
     */
    @Test
@@ -445,7 +452,7 @@ public class AMQPMessageIdHelperTest {
    }
 
    /**
-    * Test that {@link AMQPMessageIdHelper#toCorrelationIdString(Object)}
+    * Test that {@link 
AMQPMessageIdHelper#toCorrelationIdStringOrBytes(Object)}
     * returns a string indicating an AMQP encoded ulong when given a
     * UnsignedLong object.
     */
@@ -458,22 +465,27 @@ public class AMQPMessageIdHelperTest {
    }
 
    /**
-    * Test that {@link AMQPMessageIdHelper#toCorrelationIdString(Object)}
-    * returns a string indicating an AMQP encoded binary when given a Binary
-    * object.
+    * Test that {@link 
AMQPMessageIdHelper#toCorrelationIdStringOrBytes(Object)}
+    * returns a byte[] when given a Binary object.
     */
    @Test
-   public void testToCorrelationIdStringWithBinary() {
+   public void testToCorrelationIdByteArrayWithBinary() {
       byte[] bytes = new byte[] {(byte) 0x00, (byte) 0xAB, (byte) 0x09, (byte) 
0xFF};
       Binary binary = new Binary(bytes);
 
-      String expected = AMQPMessageIdHelper.JMS_ID_PREFIX + 
AMQPMessageIdHelper.AMQP_BINARY_PREFIX + "00AB09FF";
+      doToCorrelationIDBytesTestImpl(binary, bytes);
+   }
+
+   @Test
+   public void testToCorrelationIdByteArrayWithBinaryWithOffset() {
+      byte[] bytes = new byte[] {(byte) 0x00, (byte) 0xAB, (byte) 0x09, (byte) 
0xFF};
+      Binary binary = new Binary(bytes, 2, 2);
 
-      doToCorrelationIDTestImpl(binary, expected);
+      doToCorrelationIDBytesTestImpl(binary, new byte[] {(byte) 0x09, (byte) 
0xFF});
    }
 
    /**
-    * Test that {@link AMQPMessageIdHelper#toCorrelationIdString(Object)}
+    * Test that {@link 
AMQPMessageIdHelper#toCorrelationIdStringOrBytes(Object)}
     * returns a string indicating an escaped string, when given an input string
     * that already has the "ID:" prefix, but follows it with an encoding 
prefix,
     * in this case the {@link AMQPMessageIdHelper#AMQP_STRING_PREFIX}.
@@ -487,7 +499,7 @@ public class AMQPMessageIdHelperTest {
    }
 
    /**
-    * Test that {@link AMQPMessageIdHelper#toCorrelationIdString(Object)}
+    * Test that {@link 
AMQPMessageIdHelper#toCorrelationIdStringOrBytes(Object)}
     * returns a string indicating an escaped string, when given an input string
     * that already has the "ID:" prefix, but follows it with an encoding 
prefix,
     * in this case the {@link AMQPMessageIdHelper#AMQP_UUID_PREFIX}.
@@ -501,7 +513,7 @@ public class AMQPMessageIdHelperTest {
    }
 
    /**
-    * Test that {@link AMQPMessageIdHelper#toCorrelationIdString(Object)}
+    * Test that {@link 
AMQPMessageIdHelper#toCorrelationIdStringOrBytes(Object)}
     * returns a string indicating an escaped string, when given an input string
     * that already has the "ID:" prefix, but follows it with an encoding 
prefix,
     * in this case the {@link AMQPMessageIdHelper#AMQP_ULONG_PREFIX}.
@@ -515,7 +527,7 @@ public class AMQPMessageIdHelperTest {
    }
 
    /**
-    * Test that {@link AMQPMessageIdHelper#toCorrelationIdString(Object)}
+    * Test that {@link 
AMQPMessageIdHelper#toCorrelationIdStringOrBytes(Object)}
     * returns a string indicating an escaped string, when given an input string
     * that already has the "ID:" prefix, but follows it with an encoding 
prefix,
     * in this case the {@link AMQPMessageIdHelper#AMQP_BINARY_PREFIX}.
@@ -529,7 +541,7 @@ public class AMQPMessageIdHelperTest {
    }
 
    /**
-    * Test that {@link AMQPMessageIdHelper#toCorrelationIdString(Object)}
+    * Test that {@link 
AMQPMessageIdHelper#toCorrelationIdStringOrBytes(Object)}
     * returns a string indicating an escaped string, when given an input string
     * that already has the "ID:" prefix, but follows it with an encoding 
prefix,
     * in this case the {@link AMQPMessageIdHelper#AMQP_NO_PREFIX}.
diff --git 
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
 
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
index e6d69757d3..617f076af9 100644
--- 
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
+++ 
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
@@ -28,6 +28,9 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.lang.invoke.MethodHandles;
+import java.nio.ByteBuffer;
+import java.nio.charset.MalformedInputException;
+import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -159,7 +162,7 @@ public final class OpenWireMessageConverter {
       coreMessage.putIntProperty(OpenWireConstants.AMQ_MSG_COMMAND_ID, 
messageSend.getCommandId());
       final String corrId = messageSend.getCorrelationId();
       if (corrId != null) {
-         
coreMessage.putStringProperty(OpenWireConstants.JMS_CORRELATION_ID_PROPERTY, 
new SimpleString(corrId));
+         coreMessage.setCorrelationID(corrId);
       }
       final DataStructure ds = messageSend.getDataStructure();
       if (ds != null) {
@@ -590,9 +593,15 @@ public final class OpenWireMessageConverter {
       }
       amqMsg.setCommandId(commandId);
 
-      final SimpleString corrId = getObjectProperty(coreMessage, 
SimpleString.class, OpenWireConstants.JMS_CORRELATION_ID_PROPERTY);
-      if (corrId != null) {
-         amqMsg.setCorrelationId(corrId.toString());
+      final Object correlationID = coreMessage.getCorrelationID();
+      if (correlationID instanceof String || correlationID instanceof 
SimpleString) {
+         amqMsg.setCorrelationId(correlationID.toString());
+      } else if (correlationID instanceof byte[]) {
+         try {
+            
amqMsg.setCorrelationId(StandardCharsets.UTF_8.newDecoder().decode(ByteBuffer.wrap((byte[])
 correlationID)).toString());
+         } catch (MalformedInputException e) {
+            
ActiveMQServerLogger.LOGGER.unableToDecodeCorrelationId(e.getMessage());
+         }
       }
 
       final byte[] dsBytes = getObjectProperty(coreMessage, byte[].class, 
OpenWireConstants.AMQ_MSG_DATASTRUCTURE);
@@ -944,6 +953,8 @@ public final class OpenWireMessageConverter {
          }
          if 
(!coreMessage.containsProperty(ManagementHelper.HDR_NOTIFICATION_TYPE) && 
(keyStr.startsWith("_AMQ") || keyStr.startsWith("__HDR_"))) {
             continue;
+         } else if (s.equals(OpenWireConstants.JMS_CORRELATION_ID_PROPERTY)) {
+            continue;
          }
          final Object prop = coreMessage.getObjectProperty(s);
          try {
diff --git 
a/artemis-protocols/artemis-openwire-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverterTest.java
 
b/artemis-protocols/artemis-openwire-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverterTest.java
index 0580f5cad0..0624bc35f4 100644
--- 
a/artemis-protocols/artemis-openwire-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverterTest.java
+++ 
b/artemis-protocols/artemis-openwire-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverterTest.java
@@ -16,10 +16,13 @@
  */
 package org.apache.activemq.artemis.core.protocol.openwire;
 
+import java.nio.charset.StandardCharsets;
+
 import org.apache.activemq.ActiveMQMessageAuditNoSync;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.ICoreMessage;
 import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.message.impl.CoreMessage;
 import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConsumer;
 import org.apache.activemq.artemis.core.server.MessageReference;
@@ -182,6 +185,58 @@ public class OpenWireMessageConverterTest {
       assertEquals(PRODUCER_ID, 
messageDispatch.getMessage().getProducerId().toString());
    }
 
+   @Test
+   public void testStringCorrelationId() throws Exception {
+      final String CORRELATION_ID = RandomUtil.randomString();
+
+      ICoreMessage coreMessage = new CoreMessage().initBuffer(8);
+      coreMessage.setCorrelationID(CORRELATION_ID);
+      MessageReference messageReference = new 
MessageReferenceImpl(coreMessage, Mockito.mock(Queue.class));
+      AMQConsumer amqConsumer = Mockito.mock(AMQConsumer.class);
+      
Mockito.when(amqConsumer.getOpenwireDestination()).thenReturn(destination);
+      MessageDispatch messageDispatch = 
OpenWireMessageConverter.createMessageDispatch(messageReference, coreMessage, 
openWireFormat, amqConsumer, nodeUUID, 0);
+      assertEquals(CORRELATION_ID, 
messageDispatch.getMessage().getCorrelationId());
+   }
+
+   @Test
+   public void testBytesCorrelationId() throws Exception {
+      final byte[] CORRELATION_ID = 
RandomUtil.randomString().getBytes(StandardCharsets.UTF_8);
+
+      ICoreMessage coreMessage = new CoreMessage().initBuffer(8);
+      coreMessage.setCorrelationID(CORRELATION_ID);
+      MessageReference messageReference = new 
MessageReferenceImpl(coreMessage, Mockito.mock(Queue.class));
+      AMQConsumer amqConsumer = Mockito.mock(AMQConsumer.class);
+      
Mockito.when(amqConsumer.getOpenwireDestination()).thenReturn(destination);
+      MessageDispatch messageDispatch = 
OpenWireMessageConverter.createMessageDispatch(messageReference, coreMessage, 
openWireFormat, amqConsumer, nodeUUID, 0);
+      assertEquals(new String(CORRELATION_ID, StandardCharsets.UTF_8), 
messageDispatch.getMessage().getCorrelationId());
+   }
+
+   @Test
+   public void testInvalidUtf8BytesCorrelationId() throws Exception {
+      final byte[] CORRELATION_ID = new byte[]{1, (byte)0xFF, (byte)0xFF};
+
+      ICoreMessage coreMessage = new CoreMessage().initBuffer(8);
+      coreMessage.setCorrelationID(CORRELATION_ID);
+      MessageReference messageReference = new 
MessageReferenceImpl(coreMessage, Mockito.mock(Queue.class));
+      AMQConsumer amqConsumer = Mockito.mock(AMQConsumer.class);
+      
Mockito.when(amqConsumer.getOpenwireDestination()).thenReturn(destination);
+      MessageDispatch messageDispatch = 
OpenWireMessageConverter.createMessageDispatch(messageReference, coreMessage, 
openWireFormat, amqConsumer, nodeUUID, 0);
+      assertNull(messageDispatch.getMessage().getCorrelationId());
+   }
+
+   @Test
+   public void testLegacyCorrelationId() throws Exception {
+      final String CORRELATION_ID = RandomUtil.randomString();
+
+      ICoreMessage coreMessage = new CoreMessage().initBuffer(8);
+      
coreMessage.putStringProperty(OpenWireConstants.JMS_CORRELATION_ID_PROPERTY, 
new SimpleString(CORRELATION_ID));
+      MessageReference messageReference = new 
MessageReferenceImpl(coreMessage, Mockito.mock(Queue.class));
+      AMQConsumer amqConsumer = Mockito.mock(AMQConsumer.class);
+      
Mockito.when(amqConsumer.getOpenwireDestination()).thenReturn(destination);
+      MessageDispatch messageDispatch = 
OpenWireMessageConverter.createMessageDispatch(messageReference, coreMessage, 
openWireFormat, amqConsumer, nodeUUID, 0);
+      assertEquals(CORRELATION_ID, 
messageDispatch.getMessage().getCorrelationId());
+   }
+
    @Test
    public void testMessageId() throws Exception {
       final String MESSAGE_ID = "ID:123:456:789";
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
index d3320883e2..2fdad78462 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
@@ -1608,4 +1608,7 @@ public interface ActiveMQServerLogger {
 
    @LogMessage(id = 224135, value = "nodeID {} is closing. Topology update 
ignored", level = LogMessage.Level.INFO)
    void nodeLeavingCluster(String nodeID);
+
+   @LogMessage(id = 224136, value = "Skipping correlation ID when converting 
message to OpenWire since byte[] value is not valid UTF-8: {}", level = 
LogMessage.Level.WARN)
+   void unableToDecodeCorrelationId(String message);
 }
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/multiprotocol/JMSCorrelationIDTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/multiprotocol/JMSCorrelationIDTest.java
new file mode 100644
index 0000000000..31250693aa
--- /dev/null
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/multiprotocol/JMSCorrelationIDTest.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.jms.multiprotocol;
+
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+import org.apache.activemq.artemis.tests.util.RandomUtil;
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Test;
+
+/*
+ * JMS supports setting the correlation ID as a String or a byte[]. However, 
OpenWire only supports correlation ID as
+ * a String. When it is set as a byte[] the OpenWire JMS client just converts 
it to a UTF-8 encoded String, and
+ * therefore when it sends a JMS message with a correlation ID the broker 
can't tell if the value was set as a String
+ * or a byte[]. Due to this ambiguity the broker is hard-coded to treat the 
incoming OpenWire value as a String. This
+ * doesn't cause any problems if the consumer is also OpenWire, but if the 
consumer is Core or AMQP (which both
+ * differentiate between String and binary values) then retrieving the 
correlation ID as a byte[] will fail and nothing
+ * can be done about it aside from updating the OpenWire protocol.
+ *
+ * Therefore, all the tests which involve the OpenWire JMS client using 
Message.setJMSCorrelationIDAsBytes() on a
+ * message sent to a different JMS implementation are ignored. The test are 
ignored rather that being completely
+ * removed to make clear this was an explicit decision not to test & support 
this use-case.
+ */
+public class JMSCorrelationIDTest extends MultiprotocolJMSClientTestSupport {
+
+   private void testCorrelationIDAsBytesSendReceive(Connection 
producerConnection, Connection consumerConnection) throws Throwable {
+      Session session = producerConnection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+      Queue queue = session.createQueue(getQueueName());
+
+      byte[] bytes = new byte[0xf + 1];
+      for (int i = 0; i <= 0xf; i++) {
+         bytes[i] = (byte) i;
+      }
+
+      MessageProducer producer = session.createProducer(queue);
+      Message message = session.createMessage();
+      message.setJMSCorrelationIDAsBytes(bytes);
+      producer.send(message);
+      producer.close();
+
+      Session sessionConsumer = consumerConnection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+      Queue consumerQueue = sessionConsumer.createQueue(getQueueName());
+      final MessageConsumer consumer = 
sessionConsumer.createConsumer(consumerQueue);
+
+      Message m = consumer.receive(5000);
+      Assert.assertNotNull("Could not receive message on consumer", m);
+
+      Assert.assertArrayEquals(bytes, m.getJMSCorrelationIDAsBytes());
+   }
+
+   @Test(timeout = 60000)
+   public void testCorrelationIDAsBytesSendReceiveFromAMQPToAMQP() throws 
Throwable {
+      testCorrelationIDAsBytesSendReceive(createConnection(), 
createConnection());
+   }
+
+   @Test(timeout = 60000)
+   public void testCorrelationIDAsBytesSendReceiveFromAMQPToCore() throws 
Throwable {
+      testCorrelationIDAsBytesSendReceive(createConnection(), 
createCoreConnection());
+   }
+
+   @Test(timeout = 60000)
+   public void testCorrelationIDAsBytesSendReceiveFromAMQPToOpenWire() throws 
Throwable {
+      testCorrelationIDAsBytesSendReceive(createConnection(), 
createOpenWireConnection());
+   }
+
+   @Test(timeout = 60000)
+   public void testCorrelationIDAsBytesSendReceiveFromCoreToCore() throws 
Throwable {
+      testCorrelationIDAsBytesSendReceive(createCoreConnection(), 
createCoreConnection());
+   }
+
+   @Test(timeout = 60000)
+   public void testCorrelationIDAsBytesSendReceiveFromCoreToAMQP() throws 
Throwable {
+      testCorrelationIDAsBytesSendReceive(createCoreConnection(), 
createConnection());
+   }
+
+   @Test(timeout = 60000)
+   public void testCorrelationIDAsBytesSendReceiveFromCoreToOpenWire() throws 
Throwable {
+      testCorrelationIDAsBytesSendReceive(createCoreConnection(), 
createOpenWireConnection());
+   }
+
+   @Test(timeout = 60000)
+   public void testCorrelationIDAsBytesSendReceiveFromOpenWireToOpenWire() 
throws Throwable {
+      testCorrelationIDAsBytesSendReceive(createOpenWireConnection(), 
createOpenWireConnection());
+   }
+
+   @Test(timeout = 60000)
+   @Ignore
+   public void testCorrelationIDAsBytesSendReceiveFromOpenWireToAMQP() throws 
Throwable {
+      testCorrelationIDAsBytesSendReceive(createOpenWireConnection(), 
createConnection());
+   }
+
+   @Test(timeout = 60000)
+   @Ignore
+   public void testCorrelationIDAsBytesSendReceiveFromOpenWireToCore() throws 
Throwable {
+      testCorrelationIDAsBytesSendReceive(createOpenWireConnection(), 
createCoreConnection());
+   }
+
+   private void testCorrelationIDAsStringSendReceive(Connection 
producerConnection, Connection consumerConnection) throws Throwable {
+      final String correlationId = RandomUtil.randomString();
+
+      Session session = producerConnection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+      Queue queue = session.createQueue(getQueueName());
+
+      MessageProducer producer = session.createProducer(queue);
+      Message message = session.createMessage();
+      message.setJMSCorrelationID(correlationId);
+      producer.send(message);
+      producer.close();
+
+      Session sessionConsumer = consumerConnection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+      Queue consumerQueue = sessionConsumer.createQueue(getQueueName());
+      final MessageConsumer consumer = 
sessionConsumer.createConsumer(consumerQueue);
+
+      Message m = consumer.receive(5000);
+      Assert.assertNotNull("Could not receive message on consumer", m);
+
+      Assert.assertEquals(correlationId, m.getJMSCorrelationID());
+   }
+
+   @Test(timeout = 60000)
+   public void testCorrelationIDAsStringSendReceiveFromAMQPToAMQP() throws 
Throwable {
+      testCorrelationIDAsStringSendReceive(createConnection(), 
createConnection());
+   }
+
+   @Test(timeout = 60000)
+   public void testCorrelationIDAsStringSendReceiveFromAMQPToCore() throws 
Throwable {
+      testCorrelationIDAsStringSendReceive(createConnection(), 
createCoreConnection());
+   }
+
+   @Test(timeout = 60000)
+   public void testCorrelationIDAsStringSendReceiveFromAMQPToOpenWire() throws 
Throwable {
+      testCorrelationIDAsStringSendReceive(createConnection(), 
createOpenWireConnection());
+   }
+
+   @Test(timeout = 60000)
+   public void testCorrelationIDAsStringSendReceiveFromCoreToCore() throws 
Throwable {
+      testCorrelationIDAsStringSendReceive(createCoreConnection(), 
createCoreConnection());
+   }
+
+   @Test(timeout = 60000)
+   public void testCorrelationIDAsStringSendReceiveFromCoreToAMQP() throws 
Throwable {
+      testCorrelationIDAsStringSendReceive(createCoreConnection(), 
createConnection());
+   }
+
+   @Test(timeout = 60000)
+   public void testCorrelationIDAsStringSendReceiveFromCoreToOpenWire() throws 
Throwable {
+      testCorrelationIDAsStringSendReceive(createCoreConnection(), 
createOpenWireConnection());
+   }
+
+   @Test(timeout = 60000)
+   public void testCorrelationIDAsStringSendReceiveFromOpenWireToOpenWire() 
throws Throwable {
+      testCorrelationIDAsStringSendReceive(createOpenWireConnection(), 
createOpenWireConnection());
+   }
+
+   @Test(timeout = 60000)
+   public void testCorrelationIDAsStringSendReceiveFromOpenWireToAMQP() throws 
Throwable {
+      testCorrelationIDAsStringSendReceive(createOpenWireConnection(), 
createConnection());
+   }
+
+   @Test(timeout = 60000)
+   public void testCorrelationIDAsStringSendReceiveFromOpenWireToCore() throws 
Throwable {
+      testCorrelationIDAsStringSendReceive(createOpenWireConnection(), 
createCoreConnection());
+   }
+}
\ No newline at end of file

Reply via email to