[
https://issues.apache.org/jira/browse/ARTEMIS-4657?focusedWorklogId=907349&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-907349
]
ASF GitHub Bot logged work on ARTEMIS-4657:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 28/Feb/24 13:42
Start Date: 28/Feb/24 13:42
Worklog Time Spent: 10m
Work Description: gemmellr commented on code in PR #4833:
URL: https://github.com/apache/activemq-artemis/pull/4833#discussion_r1505615830
##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java:
##########
@@ -1562,7 +1562,7 @@ public final Object getObjectProperty(String key) {
return getAMQPUserID();
case MessageUtil.CORRELATIONID_HEADER_NAME_STRING:
if (properties != null && properties.getCorrelationId() != null) {
- return
AMQPMessageIdHelper.INSTANCE.toCorrelationIdString(properties.getCorrelationId());
+ return
AMQPMessageIdHelper.INSTANCE.toCorrelationIdStringOrBytes(properties.getCorrelationId());
Review Comment:
So this is actually going to potentially break some exiting usage. Was it
decided thats ok? Plus not to offer ability to restore the prior behaviour?
##########
artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java:
##########
@@ -159,7 +160,9 @@ public static org.apache.activemq.artemis.api.core.Message
inbound(final Message
coreMessage.putIntProperty(OpenWireConstants.AMQ_MSG_COMMAND_ID,
messageSend.getCommandId());
final String corrId = messageSend.getCorrelationId();
if (corrId != null) {
-
coreMessage.putStringProperty(OpenWireConstants.JMS_CORRELATION_ID_PROPERTY,
new SimpleString(corrId));
+ // this mimics what the OpenWire JMS client will do when it writes
the correlation ID before sending
+ byte[] bytes = corrId.getBytes(StandardCharsets.UTF_8);
+ coreMessage.setCorrelationID(bytes);
Review Comment:
This also seems like it is going to break a bunch of things. Stuff that got
a String before, will now get bytes/Binary instead. Even though a String is
almost certainly what was sent originally.
E.g try sending a String CorrelationID from the OpenWire JMS client and
retrieving a String CorrelationID from the AMQP JMS client. Before it would see
exactly what the original client sent, as a String. Now it will now return an
encoded binary hex since it will actually receive a Binary correlationID
instead of a String one?
##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPMessageIdHelper.java:
##########
@@ -130,6 +130,8 @@ public String toCorrelationIdString(Object idObject) {
// It has "ID:" prefix and doesn't have encoding prefix, use it
as-is.
return stringId;
}
+ } else if (idObject instanceof Binary) {
+ return ((Binary)idObject).getArray();
Review Comment:
Strictly speaking, its possible the array isnt just the id...the Binary
should be checked that it doesnt have an array offset and is the same length as
the array.
##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/multiprotocol/JMSCorrelationIDTest.java:
##########
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.jms.multiprotocol;
+
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+import org.apache.activemq.artemis.tests.util.RandomUtil;
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Test;
+
+/**
+ * Test that correlation ID is handled as expected between JMS clients.
+ */
+public class JMSCorrelationIDTest extends MultiprotocolJMSClientTestSupport {
+
+ private void testCorrelationIDAsBytesSendReceive(Connection
producerConnection, Connection consumerConnection) throws Throwable {
+ Session session = producerConnection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue(getQueueName());
+
+ byte[] bytes = new byte[0xf + 1];
+ for (int i = 0; i <= 0xf; i++) {
+ bytes[i] = (byte) i;
+ }
+
+ MessageProducer producer = session.createProducer(queue);
+ Message message = session.createMessage();
+ message.setJMSCorrelationIDAsBytes(bytes);
+ producer.send(message);
+ producer.close();
+
+ Session sessionConsumer = consumerConnection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ Queue consumerQueue = sessionConsumer.createQueue(getQueueName());
+ final MessageConsumer consumer =
sessionConsumer.createConsumer(consumerQueue);
+
+ Message m = consumer.receive(5000);
+ Assert.assertNotNull("Could not receive message on consumer", m);
+
+ Assert.assertArrayEquals(bytes, m.getJMSCorrelationIDAsBytes());
+ }
+
+ @Test(timeout = 60000)
+ public void testCorrelationIDAsBytesSendReceiveFromAMQPToAMQP() throws
Throwable {
+ testCorrelationIDAsBytesSendReceive(createConnection(),
createConnection());
+ }
+
+ @Test(timeout = 60000)
+ public void testCorrelationIDAsBytesSendReceiveFromAMQPToCore() throws
Throwable {
+ testCorrelationIDAsBytesSendReceive(createConnection(),
createCoreConnection());
+ }
+
+ @Test(timeout = 60000)
+ public void testCorrelationIDAsBytesSendReceiveFromAMQPToOpenWire() throws
Throwable {
+ testCorrelationIDAsBytesSendReceive(createConnection(),
createOpenWireConnection());
+ }
+
+ @Test(timeout = 60000)
+ public void testCorrelationIDAsBytesSendReceiveFromCoreToCore() throws
Throwable {
+ testCorrelationIDAsBytesSendReceive(createCoreConnection(),
createCoreConnection());
+ }
+
+ @Test(timeout = 60000)
+ public void testCorrelationIDAsBytesSendReceiveFromCoreToAMQP() throws
Throwable {
+ testCorrelationIDAsBytesSendReceive(createCoreConnection(),
createConnection());
+ }
+
+ @Test(timeout = 60000)
+ public void testCorrelationIDAsBytesSendReceiveFromCoreToOpenWire() throws
Throwable {
+ testCorrelationIDAsBytesSendReceive(createCoreConnection(),
createOpenWireConnection());
+ }
+
+ @Test(timeout = 60000)
+ public void testCorrelationIDAsBytesSendReceiveFromOpenWireToOpenWire()
throws Throwable {
+ testCorrelationIDAsBytesSendReceive(createOpenWireConnection(),
createOpenWireConnection());
+ }
+
+ @Test(timeout = 60000)
+ public void testCorrelationIDAsBytesSendReceiveFromOpenWireToAMQP() throws
Throwable {
+ testCorrelationIDAsBytesSendReceive(createOpenWireConnection(),
createConnection());
+ }
+
+ @Test(timeout = 60000)
+ public void testCorrelationIDAsBytesSendReceiveFromOpenWireToCore() throws
Throwable {
+ testCorrelationIDAsBytesSendReceive(createOpenWireConnection(),
createCoreConnection());
+ }
+
+ private void testCorrelationIDAsStringSendReceive(Connection
producerConnection, Connection consumerConnection) throws Throwable {
+ final String correlationId = RandomUtil.randomString();
+
+ Session session = producerConnection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue(getQueueName());
+
+ MessageProducer producer = session.createProducer(queue);
+ Message message = session.createMessage();
+ message.setJMSCorrelationID(correlationId);
+ producer.send(message);
+ producer.close();
+
+ Session sessionConsumer = consumerConnection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ Queue consumerQueue = sessionConsumer.createQueue(getQueueName());
+ final MessageConsumer consumer =
sessionConsumer.createConsumer(consumerQueue);
+
+ Message m = consumer.receive(5000);
+ Assert.assertNotNull("Could not receive message on consumer", m);
+
+ Assert.assertEquals(correlationId, m.getJMSCorrelationID());
+ }
+
+ @Test(timeout = 60000)
+ public void testCorrelationIDAsStringSendReceiveFromAMQPToAMQP() throws
Throwable {
+ testCorrelationIDAsStringSendReceive(createConnection(),
createConnection());
+ }
+
+ @Test(timeout = 60000)
+ public void testCorrelationIDAsStringSendReceiveFromAMQPToCore() throws
Throwable {
+ testCorrelationIDAsStringSendReceive(createConnection(),
createCoreConnection());
+ }
+
+ @Test(timeout = 60000)
+ public void testCorrelationIDAsStringSendReceiveFromAMQPToOpenWire() throws
Throwable {
+ testCorrelationIDAsStringSendReceive(createConnection(),
createOpenWireConnection());
+ }
+
+ @Test(timeout = 60000)
+ public void testCorrelationIDAsStringSendReceiveFromCoreToCore() throws
Throwable {
+ testCorrelationIDAsStringSendReceive(createCoreConnection(),
createCoreConnection());
+ }
+
+ @Test(timeout = 60000)
+ public void testCorrelationIDAsStringSendReceiveFromCoreToAMQP() throws
Throwable {
+ testCorrelationIDAsStringSendReceive(createCoreConnection(),
createConnection());
+ }
+
+ @Test(timeout = 60000)
+ public void testCorrelationIDAsStringSendReceiveFromCoreToOpenWire() throws
Throwable {
+ testCorrelationIDAsStringSendReceive(createCoreConnection(),
createOpenWireConnection());
+ }
+
+ @Test(timeout = 60000)
+ public void testCorrelationIDAsStringSendReceiveFromOpenWireToOpenWire()
throws Throwable {
+ testCorrelationIDAsStringSendReceive(createOpenWireConnection(),
createOpenWireConnection());
+ }
+
+ /*
+ * JMS supports setting the correlation ID as a String or a byte[].
However, OpenWire only supports correlation ID as
+ * a String. When it is set as a byte[] the OpenWire JMS client just
converts it to a UTF-8 encoded String, and
+ * therefore when it sends a JMS message with a correlation ID the broker
can't tell if the value was set as a String
+ * or a byte[]. Due to this ambiguity the broker is hard-coded to treat the
value as a byte[]. This doesn't cause any
+ * problems if the consumer is also OpenWire, but if the consumer is core
or AMQP (which both differentiate between
+ * String and binary values) then retrieving the correlation ID as a String
(i.e. via Message.getJMSCorrelationID())
+ * will fail.
+ *
+ * JMS means for the correlation ID as a byte[] to be used for "native"
clients which makes it a good candidate for
+ * interoperability between other protocols like MQTT 5 which *only*
supports correlation ID as byte[].
+ */
+ @Ignore
+ @Test(timeout = 60000)
+ public void testCorrelationIDAsStringSendReceiveFromOpenWireToAMQP() throws
Throwable {
+ testCorrelationIDAsStringSendReceive(createOpenWireConnection(),
createConnection());
+ }
Review Comment:
Seems that you actually hit the interop issues I commented on from looking
at the code.
Why is the broker 'hard coded to byte[]' for Openwire when this comment
explicitly notes it effectively only does String? Why isnt it hard coded to
using String...like it was before?
If MQTT only supports byte[] then it seem like it is the MQTT stuff that
should be jumping through hoops such as converting to/from a UTF-8 bytes, not
the Openwire bits, especially as doing it this way breaks the typical+existing
Openwire<->AMQP/Core interop.
Issue Time Tracking
-------------------
Worklog Id: (was: 907349)
Time Spent: 0.5h (was: 20m)
> Support correlation ID compatibility between JMS clients
> --------------------------------------------------------
>
> Key: ARTEMIS-4657
> URL: https://issues.apache.org/jira/browse/ARTEMIS-4657
> Project: ActiveMQ Artemis
> Issue Type: Improvement
> Reporter: Justin Bertram
> Assignee: Justin Bertram
> Priority: Major
> Time Spent: 0.5h
> Remaining Estimate: 0h
>
> Currently there are some use-cases with both {{String}} and {{byte[]}} values
> of JMS correlation ID that don't work between Core, OpenWire, and AMQP. We
> should support as many as possible.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)