QPID-8182: Ensure messageId fidelity during conversion where possible (cherry picked from commit 2696d7a542a8927d48b43ce48a1d8104d7b8dc4c)
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/f4f9859b Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/f4f9859b Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/f4f9859b Branch: refs/heads/7.0.x Commit: f4f9859b92d3314bf54454e5192d9a8d53b4ae86 Parents: 40df817 Author: Keith Wall <kw...@apache.org> Authored: Wed May 9 11:54:59 2018 +0100 Committer: Alex Rudyy <oru...@apache.org> Committed: Fri May 11 17:30:23 2018 +0100 ---------------------------------------------------------------------- .../MessageConverter_1_0_to_v0_10.java | 10 +++- .../PropertyConverter_1_0_to_0_10Test.java | 28 +++++++++ .../v0_8_v1_0/MessageConverter_0_8_to_1_0.java | 22 ++++++- .../v0_8_v1_0/MessageConverter_1_0_to_v0_8.java | 61 ++++++++++---------- .../PropertyConverter_0_8_to_1_0Test.java | 27 +++++++++ .../SimpleConversionTest.java | 26 +++------ 6 files changed, 120 insertions(+), 54 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/f4f9859b/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java ---------------------------------------------------------------------- diff --git a/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java b/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java index 50a9484..3f6837b 100644 --- a/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java +++ b/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java @@ -366,13 +366,18 @@ public class MessageConverter_1_0_to_v0_10 implements MessageConverter<Message_1 } else if (messageId instanceof String) { + String messageIdString = (String) messageId; try { - return UUID.fromString(((String) messageId)); + if (messageIdString.startsWith("ID:")) + { + messageIdString = messageIdString.substring(3); + } + return UUID.fromString(messageIdString); } catch (IllegalArgumentException e) { - return UUID.nameUUIDFromBytes(((String) messageId).getBytes(UTF_8)); + return UUID.nameUUIDFromBytes(messageIdString.getBytes(UTF_8)); } } else if (messageId instanceof Binary) @@ -413,6 +418,7 @@ public class MessageConverter_1_0_to_v0_10 implements MessageConverter<Message_1 { UUID uuid = (UUID)correlationIdObject; correlationId = longToBytes(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits()); + // KW: perhaps this would be more useful as the bytes of the UUID expressed as a string? } else if (correlationIdObject instanceof UnsignedLong) { http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/f4f9859b/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/test/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/PropertyConverter_1_0_to_0_10Test.java ---------------------------------------------------------------------- diff --git a/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/test/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/PropertyConverter_1_0_to_0_10Test.java b/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/test/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/PropertyConverter_1_0_to_0_10Test.java index fea4a40..f024f87 100644 --- a/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/test/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/PropertyConverter_1_0_to_0_10Test.java +++ b/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/test/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/PropertyConverter_1_0_to_0_10Test.java @@ -565,6 +565,34 @@ public class PropertyConverter_1_0_to_0_10Test extends QpidTestCase assertEquals("Unexpected messageId", messageId, messageProperties.getMessageId()); } + public void testMessageIdStringifiedUUIDConversion() + { + final UUID messageId = UUID.randomUUID(); + Properties properties = new Properties(); + properties.setMessageId(messageId.toString()); + Message_1_0 message = createTestMessage(properties); + + final MessageTransferMessage convertedMessage = _messageConverter.convert(message, _namedAddressSpace); + + final MessageProperties messageProperties = + convertedMessage.getStoredMessage().getMetaData().getMessageProperties(); + assertEquals("Unexpected messageId", messageId, messageProperties.getMessageId()); + } + + public void testMessageIdPrefixedStringifiedUUIDConversion() + { + final UUID messageId = UUID.randomUUID(); + Properties properties = new Properties(); + properties.setMessageId("ID:" + messageId.toString()); + Message_1_0 message = createTestMessage(properties); + + final MessageTransferMessage convertedMessage = _messageConverter.convert(message, _namedAddressSpace); + + final MessageProperties messageProperties = + convertedMessage.getStoredMessage().getMetaData().getMessageProperties(); + assertEquals("Unexpected messageId", messageId, messageProperties.getMessageId()); + } + public void testMessageIdUnsignedLongConversion() { final UnsignedLong messageId = UnsignedLong.valueOf(-1); http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/f4f9859b/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_0_8_to_1_0.java ---------------------------------------------------------------------- diff --git a/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_0_8_to_1_0.java b/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_0_8_to_1_0.java index 6839993..27b31b8 100644 --- a/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_0_8_to_1_0.java +++ b/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_0_8_to_1_0.java @@ -26,6 +26,7 @@ import java.util.Arrays; import java.util.Date; import java.util.LinkedHashMap; import java.util.Map; +import java.util.UUID; import org.apache.qpid.server.plugin.PluggableService; import org.apache.qpid.server.protocol.converter.MessageConversionException; @@ -94,7 +95,7 @@ public class MessageConverter_0_8_to_1_0 extends MessageConverter_to_1_0<AMQMess final String correlationIdAsString = contentHeader.getCorrelationIdAsString(); if (Arrays.equals(correlationIdAsBytes, correlationIdAsString.getBytes(StandardCharsets.UTF_8))) { - props.setCorrelationId(correlationIdAsString); + props.setCorrelationId(convertMessageId(correlationId)); } else { @@ -105,7 +106,7 @@ public class MessageConverter_0_8_to_1_0 extends MessageConverter_to_1_0<AMQMess final AMQShortString messageId = contentHeader.getMessageId(); if(messageId != null) { - props.setMessageId(messageId.toString()); + props.setMessageId(convertMessageId(messageId)); } if (contentHeader.getReplyTo() != null) @@ -196,6 +197,23 @@ public class MessageConverter_0_8_to_1_0 extends MessageConverter_to_1_0<AMQMess bodySection.getEncodedSize()); } + private Object convertMessageId(final AMQShortString messageId) + { + try + { + String messageIdAsString = messageId.toString(); + if(messageIdAsString.startsWith("ID:")) + { + messageIdAsString = messageIdAsString.substring(3); + } + return UUID.fromString(messageIdAsString); + } + catch (IllegalArgumentException e) + { + return messageId.toString(); + } + } + private String convertReplyTo(final AMQShortString replyTo) { String convertedReplyTo; http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/f4f9859b/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java ---------------------------------------------------------------------- diff --git a/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java b/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java index 0576b33..f9a2b08 100644 --- a/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java +++ b/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java @@ -308,28 +308,38 @@ public class MessageConverter_1_0_to_v0_8 implements MessageConverter<Message_1_ private AMQShortString getMessageIdAsShortString(final Message_1_0 serverMsg) { - Object messageId = getMessageId(serverMsg); try { - if (messageId instanceof Binary) - { - return AMQShortString.createAMQShortString(((Binary) messageId).getArray()); - } - else if (messageId instanceof byte[]) - { - return AMQShortString.createAMQShortString(((byte[]) messageId)); - } - else - { - return AMQShortString.valueOf(messageId); - } + Object messageId = getMessageId(serverMsg); + return covertMessageIdTo08MessageId(messageId); } catch (IllegalArgumentException e) { - // pass + return null; } - return null; + } + private AMQShortString covertMessageIdTo08MessageId(final Object messageId) + { + if (messageId == null) + { + return null; + } + + final AMQShortString result; + if (messageId instanceof Binary) + { + result = AMQShortString.createAMQShortString(((Binary) messageId).getArray()); + } + else if (messageId instanceof byte[]) + { + result = AMQShortString.createAMQShortString((byte[]) messageId); + } + else + { + result = AMQShortString.createAMQShortString(String.valueOf(messageId)); + } + return result; } private AMQShortString getReplyTo(final Message_1_0 serverMsg, final NamedAddressSpace addressSpace) @@ -382,28 +392,17 @@ public class MessageConverter_1_0_to_v0_8 implements MessageConverter<Message_1_ private AMQShortString getCorrelationIdAsShortString(final Message_1_0 serverMsg) { - Object correlationIdObject = getCorrelationId(serverMsg); - final AMQShortString correlationId; try { - if (correlationIdObject instanceof Binary) - { - correlationId = AMQShortString.createAMQShortString(((Binary) correlationIdObject).getArray()); - } - else if (correlationIdObject instanceof byte[]) - { - correlationId = AMQShortString.createAMQShortString(((byte[]) correlationIdObject)); - } - else - { - correlationId = AMQShortString.valueOf(correlationIdObject); - } + Object correlationIdObject = getCorrelationId(serverMsg); + return covertMessageIdTo08MessageId(correlationIdObject); } catch (IllegalArgumentException e) { - throw new MessageConversionException("Could not convert message from 1.0 to 0-8 because conversion of 'correlation-id' failed.", e); + throw new MessageConversionException( + "Could not convert message from 1.0 to 0-8 because conversion of 'correlation-id' failed.", + e); } - return correlationId; } private AMQShortString convertToShortStringForProperty(String propertyName, String s) http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/f4f9859b/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/test/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/PropertyConverter_0_8_to_1_0Test.java ---------------------------------------------------------------------- diff --git a/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/test/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/PropertyConverter_0_8_to_1_0Test.java b/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/test/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/PropertyConverter_0_8_to_1_0Test.java index f9443e0..cae59f0 100644 --- a/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/test/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/PropertyConverter_0_8_to_1_0Test.java +++ b/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/test/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/PropertyConverter_0_8_to_1_0Test.java @@ -27,6 +27,7 @@ import static org.mockito.Mockito.when; import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.UUID; import org.apache.qpid.server.bytebuffer.QpidByteBuffer; import org.apache.qpid.server.model.NamedAddressSpace; @@ -180,6 +181,19 @@ public class PropertyConverter_0_8_to_1_0Test extends QpidTestCase assertEquals("Unexpected correlationId", correlationId, properties.getCorrelationId()); } + public void testCorrelationUuidIdConversion() + { + BasicContentHeaderProperties basicContentHeaderProperties = new BasicContentHeaderProperties(); + UUID correlationId = UUID.randomUUID(); + basicContentHeaderProperties.setCorrelationId(AMQShortString.valueOf("ID:" + correlationId.toString())); + AMQMessage message = createTestMessage(basicContentHeaderProperties); + + final Message_1_0 convertedMessage = _messageConverter.convert(message, _namedAddressSpace); + + Properties properties = convertedMessage.getPropertiesSection().getValue(); + assertEquals("Unexpected correlationId", correlationId, properties.getCorrelationId()); + } + public void testReplyToConversionWhenBindingURLFormatIsUsed() { BasicContentHeaderProperties basicContentHeaderProperties = new BasicContentHeaderProperties(); @@ -282,6 +296,19 @@ public class PropertyConverter_0_8_to_1_0Test extends QpidTestCase assertEquals("Unexpected messageId", messageId, properties.getMessageId()); } + public void testMessageUUiddConversion() + { + BasicContentHeaderProperties basicContentHeaderProperties = new BasicContentHeaderProperties(); + final UUID messageId = UUID.randomUUID(); + basicContentHeaderProperties.setMessageId("ID:" + messageId.toString()); + AMQMessage message = createTestMessage(basicContentHeaderProperties); + + final Message_1_0 convertedMessage = _messageConverter.convert(message, _namedAddressSpace); + + Properties properties = convertedMessage.getPropertiesSection().getValue(); + assertEquals("Unexpected messageId", messageId, properties.getMessageId()); + } + public void testTimestampConversion() { BasicContentHeaderProperties basicContentHeaderProperties = new BasicContentHeaderProperties(); http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/f4f9859b/systests/end-to-end-conversion-tests/src/test/java/org/apache/qpid/systests/end_to_end_conversion/SimpleConversionTest.java ---------------------------------------------------------------------- diff --git a/systests/end-to-end-conversion-tests/src/test/java/org/apache/qpid/systests/end_to_end_conversion/SimpleConversionTest.java b/systests/end-to-end-conversion-tests/src/test/java/org/apache/qpid/systests/end_to_end_conversion/SimpleConversionTest.java index 2385cd2..34c11dd 100644 --- a/systests/end-to-end-conversion-tests/src/test/java/org/apache/qpid/systests/end_to_end_conversion/SimpleConversionTest.java +++ b/systests/end-to-end-conversion-tests/src/test/java/org/apache/qpid/systests/end_to_end_conversion/SimpleConversionTest.java @@ -41,7 +41,6 @@ import com.google.common.collect.Lists; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import org.junit.Before; -import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TestName; @@ -216,10 +215,11 @@ public class SimpleConversionTest extends EndToEndConversionTestBase ClientMessage publishedMessage = clientResults.get(0); ClientMessage subscriberMessage = clientResults.get(1); - // TODO: On the wire the AMQP 1.0 client receives a string containing a message with - // message-id-string contain a ID: prefixed UUID. Would be better if the conversion layer sent a message-id-uuid - // as this would offer most compatibility and miminise the exposure of the ID prefix. - assertThat(subscriberMessage.getJMSMessageID(), equalTo(publishedMessage.getJMSMessageID())); + String publishedJmsMessageID = publishedMessage.getJMSMessageID(); + assertThat(publishedJmsMessageID, startsWith("ID:")); + String barePublishedJmsMessageID = publishedJmsMessageID.substring("ID:".length()); + String expectedSubscriberJmsMessageID = String.format("ID:AMQP_UUID:%s", barePublishedJmsMessageID); + assertThat(subscriberMessage.getJMSMessageID(), equalTo(expectedSubscriberJmsMessageID)); } @Test @@ -274,7 +274,6 @@ public class SimpleConversionTest extends EndToEndConversionTestBase } @Test - @Ignore("Currently subscriber receives the correct message id but without the ID prefix") public void providerAssignedMessageId_UuidMode_10_09() throws Exception { assumeTrue(EnumSet.of(Protocol.AMQP_1_0).contains(getPublisherProtocolVersion()) @@ -291,17 +290,11 @@ public class SimpleConversionTest extends EndToEndConversionTestBase final String publishedJmsMessageID = publishedMessage.getJMSMessageID(); assertThat(publishedJmsMessageID, startsWith("ID:AMQP_UUID:")); String barePublishedJmsMessageID = publishedJmsMessageID.substring("ID:AMQP_UUID:".length()); - String expectedSubscriberJmsMessageID = String.format("ID:%s", barePublishedJmsMessageID); + String expectedSubscriberJmsMessageID = String.format("%s", barePublishedJmsMessageID); assertThat(subscriberMessage.getJMSMessageID(), equalTo(expectedSubscriberJmsMessageID)); - - - // TODO: On the wire the AMQP 0-x client receives a message id containing the a stringified UUID without prefix. - // This is inconsistent - in all other cases the client receives message ids prefixed. Would be - // better if the conversion layer sent a synthesized the ID prefix. } @Test - @Ignore("Currently subscriber receives the correct message id but without the ID prefix") public void providerAssignedMessageId_UuidStringMode_10_09() throws Exception { assumeTrue(EnumSet.of(Protocol.AMQP_1_0).contains(getPublisherProtocolVersion()) @@ -317,10 +310,9 @@ public class SimpleConversionTest extends EndToEndConversionTestBase final String publishedJmsMessageID = publishedMessage.getJMSMessageID(); assertThat(publishedJmsMessageID, startsWith("ID:AMQP_NO_PREFIX:")); String barePublishedJmsMessageID = publishedJmsMessageID.substring("ID:AMQP_NO_PREFIX:".length()); - String expectedSubscriberJmsMessageID = String.format("ID:%s", barePublishedJmsMessageID); + String expectedSubscriberJmsMessageID = String.format("%s", barePublishedJmsMessageID); assertThat(subscriberMessage.getJMSMessageID(), equalTo(expectedSubscriberJmsMessageID)); - // TODO ditto above } @Test @@ -401,7 +393,6 @@ public class SimpleConversionTest extends EndToEndConversionTestBase } @Test - @Ignore("Currently subscriber receives a UUID that differs from the one sent") public void providerAssignedMessageId_PrefixedUuidStringMode_10_010() throws Exception { assumeTrue(EnumSet.of(Protocol.AMQP_1_0).contains(getPublisherProtocolVersion()) @@ -419,9 +410,6 @@ public class SimpleConversionTest extends EndToEndConversionTestBase String barePublishedJmsMessageID = publishedJmsMessageID.substring("ID:".length()); String expectedSubscriberJmsMessageID = String.format("ID:%s", barePublishedJmsMessageID); assertThat(subscriberMessage.getJMSMessageID(), equalTo(expectedSubscriberJmsMessageID)); - - // TODO correct conversion layer so that a string that contains a ID prefixed UUID is converted - // as a UUID. } @Test --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org