gemmellr commented on code in PR #4833:
URL: https://github.com/apache/activemq-artemis/pull/4833#discussion_r1505615830


##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java:
##########
@@ -1562,7 +1562,7 @@ public final Object getObjectProperty(String key) {
             return getAMQPUserID();
          case MessageUtil.CORRELATIONID_HEADER_NAME_STRING:
             if (properties != null && properties.getCorrelationId() != null) {
-               return 
AMQPMessageIdHelper.INSTANCE.toCorrelationIdString(properties.getCorrelationId());
+               return 
AMQPMessageIdHelper.INSTANCE.toCorrelationIdStringOrBytes(properties.getCorrelationId());

Review Comment:
   So this is actually going to potentially break some exiting usage. Was it 
decided thats ok? Plus not to offer ability to restore the prior behaviour?



##########
artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java:
##########
@@ -159,7 +160,9 @@ public static org.apache.activemq.artemis.api.core.Message 
inbound(final Message
       coreMessage.putIntProperty(OpenWireConstants.AMQ_MSG_COMMAND_ID, 
messageSend.getCommandId());
       final String corrId = messageSend.getCorrelationId();
       if (corrId != null) {
-         
coreMessage.putStringProperty(OpenWireConstants.JMS_CORRELATION_ID_PROPERTY, 
new SimpleString(corrId));
+         // this mimics what the OpenWire JMS client will do when it writes 
the correlation ID before sending
+         byte[] bytes = corrId.getBytes(StandardCharsets.UTF_8);
+         coreMessage.setCorrelationID(bytes);

Review Comment:
   This also seems like it is going to break a bunch of things. Stuff that got 
a String before, will now get bytes/Binary instead. Even though a String is 
almost certainly what was sent originally.
   
   E.g try sending a String CorrelationID from the OpenWire JMS client and 
retrieving a String CorrelationID from the AMQP JMS client. Before it would see 
exactly what the original client sent, as a String. Now it will now return an 
encoded binary hex since it will actually receive a Binary correlationID 
instead of a String one?



##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPMessageIdHelper.java:
##########
@@ -130,6 +130,8 @@ public String toCorrelationIdString(Object idObject) {
             // It has "ID:" prefix and doesn't have encoding prefix, use it 
as-is.
             return stringId;
          }
+      } else if (idObject instanceof Binary) {
+         return ((Binary)idObject).getArray();

Review Comment:
   Strictly speaking, its possible the array isnt just the id...the Binary 
should be checked that it doesnt have an array offset and is the same length as 
the array.



##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/multiprotocol/JMSCorrelationIDTest.java:
##########
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.jms.multiprotocol;
+
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+import org.apache.activemq.artemis.tests.util.RandomUtil;
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Test;
+
+/**
+ * Test that correlation ID is handled as expected between JMS clients.
+ */
+public class JMSCorrelationIDTest extends MultiprotocolJMSClientTestSupport {
+
+   private void testCorrelationIDAsBytesSendReceive(Connection 
producerConnection, Connection consumerConnection) throws Throwable {
+      Session session = producerConnection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+      Queue queue = session.createQueue(getQueueName());
+
+      byte[] bytes = new byte[0xf + 1];
+      for (int i = 0; i <= 0xf; i++) {
+         bytes[i] = (byte) i;
+      }
+
+      MessageProducer producer = session.createProducer(queue);
+      Message message = session.createMessage();
+      message.setJMSCorrelationIDAsBytes(bytes);
+      producer.send(message);
+      producer.close();
+
+      Session sessionConsumer = consumerConnection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+      Queue consumerQueue = sessionConsumer.createQueue(getQueueName());
+      final MessageConsumer consumer = 
sessionConsumer.createConsumer(consumerQueue);
+
+      Message m = consumer.receive(5000);
+      Assert.assertNotNull("Could not receive message on consumer", m);
+
+      Assert.assertArrayEquals(bytes, m.getJMSCorrelationIDAsBytes());
+   }
+
+   @Test(timeout = 60000)
+   public void testCorrelationIDAsBytesSendReceiveFromAMQPToAMQP() throws 
Throwable {
+      testCorrelationIDAsBytesSendReceive(createConnection(), 
createConnection());
+   }
+
+   @Test(timeout = 60000)
+   public void testCorrelationIDAsBytesSendReceiveFromAMQPToCore() throws 
Throwable {
+      testCorrelationIDAsBytesSendReceive(createConnection(), 
createCoreConnection());
+   }
+
+   @Test(timeout = 60000)
+   public void testCorrelationIDAsBytesSendReceiveFromAMQPToOpenWire() throws 
Throwable {
+      testCorrelationIDAsBytesSendReceive(createConnection(), 
createOpenWireConnection());
+   }
+
+   @Test(timeout = 60000)
+   public void testCorrelationIDAsBytesSendReceiveFromCoreToCore() throws 
Throwable {
+      testCorrelationIDAsBytesSendReceive(createCoreConnection(), 
createCoreConnection());
+   }
+
+   @Test(timeout = 60000)
+   public void testCorrelationIDAsBytesSendReceiveFromCoreToAMQP() throws 
Throwable {
+      testCorrelationIDAsBytesSendReceive(createCoreConnection(), 
createConnection());
+   }
+
+   @Test(timeout = 60000)
+   public void testCorrelationIDAsBytesSendReceiveFromCoreToOpenWire() throws 
Throwable {
+      testCorrelationIDAsBytesSendReceive(createCoreConnection(), 
createOpenWireConnection());
+   }
+
+   @Test(timeout = 60000)
+   public void testCorrelationIDAsBytesSendReceiveFromOpenWireToOpenWire() 
throws Throwable {
+      testCorrelationIDAsBytesSendReceive(createOpenWireConnection(), 
createOpenWireConnection());
+   }
+
+   @Test(timeout = 60000)
+   public void testCorrelationIDAsBytesSendReceiveFromOpenWireToAMQP() throws 
Throwable {
+      testCorrelationIDAsBytesSendReceive(createOpenWireConnection(), 
createConnection());
+   }
+
+   @Test(timeout = 60000)
+   public void testCorrelationIDAsBytesSendReceiveFromOpenWireToCore() throws 
Throwable {
+      testCorrelationIDAsBytesSendReceive(createOpenWireConnection(), 
createCoreConnection());
+   }
+
+   private void testCorrelationIDAsStringSendReceive(Connection 
producerConnection, Connection consumerConnection) throws Throwable {
+      final String correlationId = RandomUtil.randomString();
+
+      Session session = producerConnection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+      Queue queue = session.createQueue(getQueueName());
+
+      MessageProducer producer = session.createProducer(queue);
+      Message message = session.createMessage();
+      message.setJMSCorrelationID(correlationId);
+      producer.send(message);
+      producer.close();
+
+      Session sessionConsumer = consumerConnection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+      Queue consumerQueue = sessionConsumer.createQueue(getQueueName());
+      final MessageConsumer consumer = 
sessionConsumer.createConsumer(consumerQueue);
+
+      Message m = consumer.receive(5000);
+      Assert.assertNotNull("Could not receive message on consumer", m);
+
+      Assert.assertEquals(correlationId, m.getJMSCorrelationID());
+   }
+
+   @Test(timeout = 60000)
+   public void testCorrelationIDAsStringSendReceiveFromAMQPToAMQP() throws 
Throwable {
+      testCorrelationIDAsStringSendReceive(createConnection(), 
createConnection());
+   }
+
+   @Test(timeout = 60000)
+   public void testCorrelationIDAsStringSendReceiveFromAMQPToCore() throws 
Throwable {
+      testCorrelationIDAsStringSendReceive(createConnection(), 
createCoreConnection());
+   }
+
+   @Test(timeout = 60000)
+   public void testCorrelationIDAsStringSendReceiveFromAMQPToOpenWire() throws 
Throwable {
+      testCorrelationIDAsStringSendReceive(createConnection(), 
createOpenWireConnection());
+   }
+
+   @Test(timeout = 60000)
+   public void testCorrelationIDAsStringSendReceiveFromCoreToCore() throws 
Throwable {
+      testCorrelationIDAsStringSendReceive(createCoreConnection(), 
createCoreConnection());
+   }
+
+   @Test(timeout = 60000)
+   public void testCorrelationIDAsStringSendReceiveFromCoreToAMQP() throws 
Throwable {
+      testCorrelationIDAsStringSendReceive(createCoreConnection(), 
createConnection());
+   }
+
+   @Test(timeout = 60000)
+   public void testCorrelationIDAsStringSendReceiveFromCoreToOpenWire() throws 
Throwable {
+      testCorrelationIDAsStringSendReceive(createCoreConnection(), 
createOpenWireConnection());
+   }
+
+   @Test(timeout = 60000)
+   public void testCorrelationIDAsStringSendReceiveFromOpenWireToOpenWire() 
throws Throwable {
+      testCorrelationIDAsStringSendReceive(createOpenWireConnection(), 
createOpenWireConnection());
+   }
+
+   /*
+    * JMS supports setting the correlation ID as a String or a byte[]. 
However, OpenWire only supports correlation ID as
+    * a String. When it is set as a byte[] the OpenWire JMS client just 
converts it to a UTF-8 encoded String, and
+    * therefore when it sends a JMS message with a correlation ID the broker 
can't tell if the value was set as a String
+    * or a byte[]. Due to this ambiguity the broker is hard-coded to treat the 
value as a byte[]. This doesn't cause any
+    * problems if the consumer is also OpenWire, but if the consumer is core 
or AMQP (which both differentiate between
+    * String and binary values) then retrieving the correlation ID as a String 
(i.e. via Message.getJMSCorrelationID())
+    * will fail.
+    *
+    * JMS means for the correlation ID as a byte[] to be used for "native" 
clients which makes it a good candidate for
+    * interoperability between other protocols like MQTT 5 which *only* 
supports correlation ID as byte[].
+    */
+   @Ignore
+   @Test(timeout = 60000)
+   public void testCorrelationIDAsStringSendReceiveFromOpenWireToAMQP() throws 
Throwable {
+      testCorrelationIDAsStringSendReceive(createOpenWireConnection(), 
createConnection());
+   }

Review Comment:
   Seems that you actually hit the interop issues I commented on from looking 
at the code.
   
   Why is the broker 'hard coded to byte[]' for Openwire when this comment 
explicitly notes it effectively only does String? Why isnt it hard coded to 
using String...like it was before?
   
   If MQTT only supports byte[] then it seem like it is the MQTT stuff that 
should be jumping through hoops such as converting to/from a UTF-8 bytes, not 
the Openwire bits, especially as doing it this way breaks the typical+existing 
Openwire<->AMQP/Core interop.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to