Repository: activemq
Updated Branches:
  refs/heads/master 8031d77f9 -> 4d6f4d747


https://issues.apache.org/jira/browse/AMQ-6263

Encode the incoming messageId value into a string using type prefixes
and decode them on the way out to ensure that we preserve the original
AMQP MessageId type and value. 

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/4d6f4d74
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/4d6f4d74
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/4d6f4d74

Branch: refs/heads/master
Commit: 4d6f4d74755d6c9985078766fce2eba95d9bbb6a
Parents: 8031d77
Author: Timothy Bish <[email protected]>
Authored: Tue Apr 26 18:08:11 2016 -0400
Committer: Timothy Bish <[email protected]>
Committed: Tue Apr 26 18:08:11 2016 -0400

----------------------------------------------------------------------
 .../amqp/message/AMQPMessageIdHelper.java       | 255 ++++++++++++
 .../amqp/message/InboundTransformer.java        |   8 +-
 .../message/JMSMappingOutboundTransformer.java  |   7 +-
 .../transport/amqp/client/AmqpMessage.java      |  26 ++
 .../interop/AmqpMessageIdPreservationTest.java  | 166 ++++++++
 .../amqp/message/AMQPMessageIdHelperTest.java   | 406 +++++++++++++++++++
 6 files changed, 865 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/4d6f4d74/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPMessageIdHelper.java
----------------------------------------------------------------------
diff --git 
a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPMessageIdHelper.java
 
b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPMessageIdHelper.java
new file mode 100644
index 0000000..dad365d
--- /dev/null
+++ 
b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPMessageIdHelper.java
@@ -0,0 +1,255 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.activemq.transport.amqp.message;
+
+import java.nio.ByteBuffer;
+import java.util.UUID;
+
+import org.apache.activemq.transport.amqp.AmqpProtocolException;
+import org.apache.qpid.proton.amqp.Binary;
+import org.apache.qpid.proton.amqp.UnsignedLong;
+
+/**
+ * Helper class for identifying and converting message-id and correlation-id 
values between
+ * the AMQP types and the Strings values used by JMS.
+ *
+ * <p>AMQP messages allow for 4 types of message-id/correlation-id: 
message-id-string, message-id-binary,
+ * message-id-uuid, or message-id-ulong. In order to accept or return a string 
representation of these
+ * for interoperability with other AMQP clients, the following encoding can be 
used after removing or
+ * before adding the "ID:" prefix used for a JMSMessageID value:<br>
+ *
+ * {@literal "AMQP_BINARY:<hex representation of binary content>"}<br>
+ * {@literal "AMQP_UUID:<string representation of uuid>"}<br>
+ * {@literal "AMQP_ULONG:<string representation of ulong>"}<br>
+ * {@literal "AMQP_STRING:<string>"}<br>
+ *
+ * <p>The AMQP_STRING encoding exists only for escaping message-id-string 
values that happen to begin
+ * with one of the encoding prefixes (including AMQP_STRING itself). It MUST 
NOT be used otherwise.
+ *
+ * <p>When provided a string for conversion which attempts to identify itself 
as an encoded binary, uuid, or
+ * ulong but can't be converted into the indicated format, an exception will 
be thrown.
+ *
+ */
+public class AMQPMessageIdHelper {
+
+    public static final AMQPMessageIdHelper INSTANCE = new 
AMQPMessageIdHelper();
+
+    public static final String AMQP_STRING_PREFIX = "AMQP_STRING:";
+    public static final String AMQP_UUID_PREFIX = "AMQP_UUID:";
+    public static final String AMQP_ULONG_PREFIX = "AMQP_ULONG:";
+    public static final String AMQP_BINARY_PREFIX = "AMQP_BINARY:";
+
+    private static final int AMQP_UUID_PREFIX_LENGTH = 
AMQP_UUID_PREFIX.length();
+    private static final int AMQP_ULONG_PREFIX_LENGTH = 
AMQP_ULONG_PREFIX.length();
+    private static final int AMQP_STRING_PREFIX_LENGTH = 
AMQP_STRING_PREFIX.length();
+    private static final int AMQP_BINARY_PREFIX_LENGTH = 
AMQP_BINARY_PREFIX.length();
+    private static final char[] HEX_CHARS = "0123456789ABCDEF".toCharArray();
+
+    /**
+     * Takes the provided AMQP messageId style object, and convert it to a 
base string.
+     * Encodes type information as a prefix where necessary to convey or 
escape the type
+     * of the provided object.
+     *
+     * @param messageId
+     *      the raw messageId object to process
+     *
+     * @return the base string to be used in creating the actual id.
+     */
+    public String toBaseMessageIdString(Object messageId) {
+        if (messageId == null) {
+            return null;
+        } else if (messageId instanceof String) {
+            String stringId = (String) messageId;
+
+            // If the given string has a type encoding prefix,
+            // we need to escape it as an encoded string (even if
+            // the existing encoding prefix was also for string)
+            if (hasTypeEncodingPrefix(stringId)) {
+                return AMQP_STRING_PREFIX + stringId;
+            } else {
+                return stringId;
+            }
+        } else if (messageId instanceof UUID) {
+            return AMQP_UUID_PREFIX + messageId.toString();
+        } else if (messageId instanceof UnsignedLong) {
+            return AMQP_ULONG_PREFIX + messageId.toString();
+        } else if (messageId instanceof Binary) {
+            ByteBuffer dup = ((Binary) messageId).asByteBuffer();
+
+            byte[] bytes = new byte[dup.remaining()];
+            dup.get(bytes);
+
+            String hex = convertBinaryToHexString(bytes);
+
+            return AMQP_BINARY_PREFIX + hex;
+        } else {
+            throw new IllegalArgumentException("Unsupported type provided: " + 
messageId.getClass());
+        }
+    }
+
+    /**
+     * Takes the provided base id string and return the appropriate amqp 
messageId style object.
+     * Converts the type based on any relevant encoding information found as a 
prefix.
+     *
+     * @param baseId
+     *      the object to be converted to an AMQP MessageId value.
+     *
+     * @return the AMQP messageId style object
+     *
+     * @throws AmqpProtocolException if the provided baseId String indicates 
an encoded type but can't be converted to that type.
+     */
+    public Object toIdObject(String baseId) throws AmqpProtocolException {
+        if (baseId == null) {
+            return null;
+        }
+
+        try {
+            if (hasAmqpUuidPrefix(baseId)) {
+                String uuidString = strip(baseId, AMQP_UUID_PREFIX_LENGTH);
+                return UUID.fromString(uuidString);
+            } else if (hasAmqpUlongPrefix(baseId)) {
+                String longString = strip(baseId, AMQP_ULONG_PREFIX_LENGTH);
+                return UnsignedLong.valueOf(longString);
+            } else if (hasAmqpStringPrefix(baseId)) {
+                return strip(baseId, AMQP_STRING_PREFIX_LENGTH);
+            } else if (hasAmqpBinaryPrefix(baseId)) {
+                String hexString = strip(baseId, AMQP_BINARY_PREFIX_LENGTH);
+                byte[] bytes = convertHexStringToBinary(hexString);
+                return new Binary(bytes);
+            } else {
+                // We have a string without any type prefix, transmit it as-is.
+                return baseId;
+            }
+        } catch (IllegalArgumentException e) {
+            throw new AmqpProtocolException("Unable to convert ID value");
+        }
+    }
+
+    /**
+     * Convert the provided hex-string into a binary representation where each 
byte represents
+     * two characters of the hex string.
+     *
+     * The hex characters may be upper or lower case.
+     *
+     * @param hexString
+     *      string to convert to a binary value.
+     *
+     * @return a byte array containing the binary representation
+     *
+     * @throws IllegalArgumentException if the provided String is a non-even 
length or contains
+     *                                  non-hex characters
+     */
+    public byte[] convertHexStringToBinary(String hexString) throws 
IllegalArgumentException {
+        int length = hexString.length();
+
+        // As each byte needs two characters in the hex encoding, the string 
must be an even length.
+        if (length % 2 != 0) {
+            throw new IllegalArgumentException("The provided hex String must 
be an even length, but was of length " + length + ": " + hexString);
+        }
+
+        byte[] binary = new byte[length / 2];
+
+        for (int i = 0; i < length; i += 2) {
+            char highBitsChar = hexString.charAt(i);
+            char lowBitsChar = hexString.charAt(i + 1);
+
+            int highBits = hexCharToInt(highBitsChar, hexString) << 4;
+            int lowBits = hexCharToInt(lowBitsChar, hexString);
+
+            binary[i / 2] = (byte) (highBits + lowBits);
+        }
+
+        return binary;
+    }
+
+    /**
+     * Convert the provided binary into a hex-string representation where each 
character
+     * represents 4 bits of the provided binary, i.e each byte requires two 
characters.
+     *
+     * The returned hex characters are upper-case.
+     *
+     * @param bytes
+     *      the binary value to convert to a hex String instance.
+     *
+     * @return a String containing a hex representation of the bytes
+     */
+    public String convertBinaryToHexString(byte[] bytes) {
+        // Each byte is represented as 2 chars
+        StringBuilder builder = new StringBuilder(bytes.length * 2);
+
+        for (byte b : bytes) {
+            // The byte will be expanded to int before shifting, replicating 
the
+            // sign bit, so mask everything beyond the first 4 bits afterwards
+            int highBitsInt = (b >> 4) & 0xF;
+            // We only want the first 4 bits
+            int lowBitsInt = b & 0xF;
+
+            builder.append(HEX_CHARS[highBitsInt]);
+            builder.append(HEX_CHARS[lowBitsInt]);
+        }
+
+        return builder.toString();
+    }
+
+    //----- Internal implementation 
------------------------------------------//
+
+    private boolean hasTypeEncodingPrefix(String stringId) {
+        return hasAmqpBinaryPrefix(stringId) || hasAmqpUuidPrefix(stringId) ||
+               hasAmqpUlongPrefix(stringId) || hasAmqpStringPrefix(stringId);
+    }
+
+    private boolean hasAmqpStringPrefix(String stringId) {
+        return stringId.startsWith(AMQP_STRING_PREFIX);
+    }
+
+    private boolean hasAmqpUlongPrefix(String stringId) {
+        return stringId.startsWith(AMQP_ULONG_PREFIX);
+    }
+
+    private boolean hasAmqpUuidPrefix(String stringId) {
+        return stringId.startsWith(AMQP_UUID_PREFIX);
+    }
+
+    private boolean hasAmqpBinaryPrefix(String stringId) {
+        return stringId.startsWith(AMQP_BINARY_PREFIX);
+    }
+
+    private String strip(String id, int numChars) {
+        return id.substring(numChars);
+    }
+
+    private int hexCharToInt(char ch, String orig) throws 
IllegalArgumentException {
+        if (ch >= '0' && ch <= '9') {
+            // subtract '0' to get difference in position as an int
+            return ch - '0';
+        } else if (ch >= 'A' && ch <= 'F') {
+            // subtract 'A' to get difference in position as an int
+            // and then add 10 for the offset of 'A'
+            return ch - 'A' + 10;
+        } else if (ch >= 'a' && ch <= 'f') {
+            // subtract 'a' to get difference in position as an int
+            // and then add 10 for the offset of 'a'
+            return ch - 'a' + 10;
+        }
+
+        throw new IllegalArgumentException("The provided hex string contains 
non-hex character '" + ch + "': " + orig);
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/4d6f4d74/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/InboundTransformer.java
----------------------------------------------------------------------
diff --git 
a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/InboundTransformer.java
 
b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/InboundTransformer.java
index c3dc1d3..2223b5a 100644
--- 
a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/InboundTransformer.java
+++ 
b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/InboundTransformer.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -118,14 +118,17 @@ public abstract class InboundTransformer {
         } else {
             jms.setJMSDeliveryMode(defaultDeliveryMode);
         }
+
         if (header.getPriority() != null) {
             jms.setJMSPriority(header.getPriority().intValue());
         } else {
             jms.setJMSPriority(defaultPriority);
         }
+
         if (header.getFirstAcquirer() != null) {
             jms.setBooleanProperty(prefixVendor + "FirstAcquirer", 
header.getFirstAcquirer());
         }
+
         if (header.getDeliveryCount() != null) {
             vendor.setJMSXDeliveryCount(jms, 
header.getDeliveryCount().longValue());
         }
@@ -188,7 +191,7 @@ public abstract class InboundTransformer {
         final Properties properties = amqp.getProperties();
         if (properties != null) {
             if (properties.getMessageId() != null) {
-                jms.setJMSMessageID(properties.getMessageId().toString());
+                
jms.setJMSMessageID(AMQPMessageIdHelper.INSTANCE.toBaseMessageIdString(properties.getMessageId()));
             }
             Binary userId = properties.getUserId();
             if (userId != null) {
@@ -236,6 +239,7 @@ public abstract class InboundTransformer {
             if (header.getTtl() != null) {
                 ttl = header.getTtl().longValue();
             }
+
             if (ttl == 0) {
                 jms.setJMSExpiration(0);
             } else {

http://git-wip-us.apache.org/repos/asf/activemq/blob/4d6f4d74/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformer.java
----------------------------------------------------------------------
diff --git 
a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformer.java
 
b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformer.java
index b215f80..7e6af2f 100644
--- 
a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformer.java
+++ 
b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformer.java
@@ -41,6 +41,7 @@ import javax.jms.Topic;
 
 import org.apache.activemq.command.ActiveMQMessage;
 import org.apache.activemq.command.MessageId;
+import org.apache.activemq.transport.amqp.AmqpProtocolException;
 import org.apache.qpid.proton.amqp.Binary;
 import org.apache.qpid.proton.amqp.Symbol;
 import org.apache.qpid.proton.amqp.UnsignedByte;
@@ -180,7 +181,11 @@ public class JMSMappingOutboundTransformer extends 
OutboundTransformer {
 
             MessageId msgId = amqMsg.getMessageId();
             if (msgId.getTextView() != null) {
-                props.setMessageId(msgId.getTextView());
+                try {
+                    
props.setMessageId(AMQPMessageIdHelper.INSTANCE.toIdObject(msgId.getTextView()));
+                } catch (AmqpProtocolException e) {
+                    props.setMessageId(msgId.getTextView().toString());
+                }
             } else {
                 props.setMessageId(msgId.toString());
             }

http://git-wip-us.apache.org/repos/asf/activemq/blob/4d6f4d74/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
----------------------------------------------------------------------
diff --git 
a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
 
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
index e8ad793..d974690 100644
--- 
a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
+++ 
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
@@ -199,6 +199,32 @@ public class AmqpMessage {
     }
 
     /**
+     * Return the set MessageId value in the original form, if there are no 
properties
+     * in the given message return null.
+     *
+     * @return the set message ID in its original form or null if not set.
+     */
+    public Object getRawMessageId() {
+        if (message.getProperties() == null) {
+            return null;
+        }
+
+        return message.getProperties().getMessageId();
+    }
+
+    /**
+     * Sets the MessageId property on an outbound message using the provided 
value
+     *
+     * @param messageId
+     *        the message ID value to set.
+     */
+    public void setRawMessageId(Object messageId) {
+        checkReadOnly();
+        lazyCreateProperties();
+        getWrappedMessage().setMessageId(messageId);
+    }
+
+    /**
      * Sets the GroupId property on an outbound message using the provided 
String
      *
      * @param messageId

http://git-wip-us.apache.org/repos/asf/activemq/blob/4d6f4d74/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpMessageIdPreservationTest.java
----------------------------------------------------------------------
diff --git 
a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpMessageIdPreservationTest.java
 
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpMessageIdPreservationTest.java
new file mode 100644
index 0000000..144f677
--- /dev/null
+++ 
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpMessageIdPreservationTest.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.interop;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.broker.jmx.QueueViewMBean;
+import org.apache.activemq.transport.amqp.client.AmqpClient;
+import org.apache.activemq.transport.amqp.client.AmqpClientTestSupport;
+import org.apache.activemq.transport.amqp.client.AmqpConnection;
+import org.apache.activemq.transport.amqp.client.AmqpMessage;
+import org.apache.activemq.transport.amqp.client.AmqpReceiver;
+import org.apache.activemq.transport.amqp.client.AmqpSender;
+import org.apache.activemq.transport.amqp.client.AmqpSession;
+import org.apache.qpid.proton.amqp.Binary;
+import org.apache.qpid.proton.amqp.UnsignedLong;
+import org.junit.Test;
+
+/**
+ * Tests that the AMQP MessageID value and type are preserved.
+ */
+public class AmqpMessageIdPreservationTest extends AmqpClientTestSupport {
+
+    @Override
+    protected boolean isPersistent() {
+        return true;
+    }
+
+    @Test(timeout = 60000)
+    public void testStringMessageIdIsPreserved() throws Exception {
+        doTestMessageIdPreservation("msg-id-string:1");
+    }
+
+    @Test(timeout = 60000)
+    public void testStringMessageIdIsPreservedAfterRestart() throws Exception {
+        doTestMessageIdPreservationOnBrokerRestart("msg-id-string:1");
+    }
+
+    @Test(timeout = 60000)
+    public void testUUIDMessageIdIsPreserved() throws Exception {
+        doTestMessageIdPreservation(UUID.randomUUID());
+    }
+
+    @Test(timeout = 60000)
+    public void testUUIDMessageIdIsPreservedAfterRestart() throws Exception {
+        doTestMessageIdPreservationOnBrokerRestart(UUID.randomUUID());
+    }
+
+    @Test(timeout = 60000)
+    public void testUnsignedLongMessageIdIsPreserved() throws Exception {
+        doTestMessageIdPreservation(new UnsignedLong(255l));
+    }
+
+    @Test(timeout = 60000)
+    public void testUnsignedLongMessageIdIsPreservedAfterRestart() throws 
Exception {
+        doTestMessageIdPreservationOnBrokerRestart(new UnsignedLong(255l));
+    }
+
+    @Test(timeout = 60000)
+    public void testBinaryLongMessageIdIsPreserved() throws Exception {
+        byte[] payload = new byte[32];
+        for (int i = 0; i < 32; ++i) {
+            payload[i] = (byte) ('a' + i);
+        }
+
+        doTestMessageIdPreservation(new Binary(payload));
+    }
+
+    @Test(timeout = 60000)
+    public void testBinaryLongMessageIdIsPreservedAfterRestart() throws 
Exception {
+        byte[] payload = new byte[32];
+        for (int i = 0; i < 32; ++i) {
+            payload[i] = (byte) ('a' + i);
+        }
+
+        doTestMessageIdPreservationOnBrokerRestart(new Binary(payload));
+    }
+
+    @Test(timeout = 60000)
+    public void testStringMessageIdPrefixIsPreserved() throws Exception {
+        doTestMessageIdPreservation("ID:msg-id-string:1");
+    }
+
+    public void doTestMessageIdPreservation(Object messageId) throws Exception 
{
+        AmqpClient client = createAmqpClient();
+        AmqpConnection connection = client.connect();
+        AmqpSession session = connection.createSession();
+
+        AmqpSender sender = session.createSender("queue://" + getTestName());
+
+        AmqpMessage message = new AmqpMessage();
+
+        message.setRawMessageId(messageId);
+        message.setText("Test-Message");
+
+        sender.send(message);
+
+        sender.close();
+
+        QueueViewMBean queue = getProxyToQueue(getTestName());
+        assertEquals(1, queue.getQueueSize());
+
+        AmqpReceiver receiver = session.createReceiver("queue://" + 
getTestName());
+        receiver.flow(1);
+        AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS);
+        assertNotNull("Should have got a message", received);
+        assertEquals(received.getRawMessageId().getClass(), 
messageId.getClass());
+        assertEquals(messageId, received.getRawMessageId());
+        receiver.close();
+        connection.close();
+    }
+
+    public void doTestMessageIdPreservationOnBrokerRestart(Object messageId) 
throws Exception {
+        AmqpClient client = createAmqpClient();
+        AmqpConnection connection = client.connect();
+        AmqpSession session = connection.createSession();
+
+        AmqpSender sender = session.createSender("queue://" + getTestName());
+
+        AmqpMessage message = new AmqpMessage();
+
+        message.setRawMessageId(messageId);
+        message.setText("Test-Message");
+        message.setDurable(true);
+
+        sender.send(message);
+
+        sender.close();
+        connection.close();
+
+        restartBroker();
+
+        QueueViewMBean queue = getProxyToQueue(getTestName());
+        assertEquals(1, queue.getQueueSize());
+
+        connection = client.connect();
+        session = connection.createSession();
+
+        AmqpReceiver receiver = session.createReceiver("queue://" + 
getTestName());
+        receiver.flow(1);
+        AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS);
+        assertNotNull("Should have got a message", received);
+        assertEquals(received.getRawMessageId().getClass(), 
messageId.getClass());
+        assertEquals(messageId, received.getRawMessageId());
+        receiver.close();
+        connection.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/4d6f4d74/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/AMQPMessageIdHelperTest.java
----------------------------------------------------------------------
diff --git 
a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/AMQPMessageIdHelperTest.java
 
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/AMQPMessageIdHelperTest.java
new file mode 100644
index 0000000..d3e29bc
--- /dev/null
+++ 
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/AMQPMessageIdHelperTest.java
@@ -0,0 +1,406 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.activemq.transport.amqp.message;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.fail;
+
+import java.util.UUID;
+
+import org.apache.activemq.transport.amqp.AmqpProtocolException;
+import org.apache.qpid.jms.exceptions.IdConversionException;
+import org.apache.qpid.proton.amqp.Binary;
+import org.apache.qpid.proton.amqp.UnsignedLong;
+import org.junit.Before;
+import org.junit.Test;
+
+public class AMQPMessageIdHelperTest {
+
+    private AMQPMessageIdHelper messageIdHelper;
+
+    @Before
+    public void setUp() throws Exception {
+        messageIdHelper = new AMQPMessageIdHelper();
+    }
+
+    /**
+     * Test that {@link AMQPMessageIdHelper#toBaseMessageIdString(Object)}
+     * returns null if given null
+     */
+    @Test
+    public void testToBaseMessageIdStringWithNull() {
+        String nullString = null;
+        assertNull("null string should have been returned", 
messageIdHelper.toBaseMessageIdString(nullString));
+    }
+
+    /**
+     * Test that {@link AMQPMessageIdHelper#toBaseMessageIdString(Object)}
+     * throws an IAE if given an unexpected object type.
+     */
+    @Test
+    public void testToBaseMessageIdStringThrowsIAEWithUnexpectedType() {
+        try {
+            messageIdHelper.toBaseMessageIdString(new Object());
+            fail("expected exception not thrown");
+        } catch (IllegalArgumentException iae) {
+            // expected
+        }
+    }
+
+    /**
+     * Test that {@link AMQPMessageIdHelper#toBaseMessageIdString(Object)}
+     * returns the given basic string unchanged
+     */
+    @Test
+    public void testToBaseMessageIdStringWithString() {
+        String stringMessageId = "myIdString";
+
+        String baseMessageIdString = 
messageIdHelper.toBaseMessageIdString(stringMessageId);
+        assertNotNull("null string should not have been returned", 
baseMessageIdString);
+        assertEquals("expected base id string was not returned", 
stringMessageId, baseMessageIdString);
+    }
+
+    /**
+     * Test that {@link AMQPMessageIdHelper#toBaseMessageIdString(Object)}
+     * returns a string indicating an AMQP encoded string, when the given 
string
+     * happens to already begin with the
+     * {@link AMQPMessageIdHelper#AMQP_UUID_PREFIX}.
+     */
+    @Test
+    public void 
testToBaseMessageIdStringWithStringBeginningWithEncodingPrefixForUUID() {
+        String uuidStringMessageId = AMQPMessageIdHelper.AMQP_UUID_PREFIX + 
UUID.randomUUID();
+        String expected = AMQPMessageIdHelper.AMQP_STRING_PREFIX + 
uuidStringMessageId;
+
+        String baseMessageIdString = 
messageIdHelper.toBaseMessageIdString(uuidStringMessageId);
+        assertNotNull("null string should not have been returned", 
baseMessageIdString);
+        assertEquals("expected base id string was not returned", expected, 
baseMessageIdString);
+    }
+
+    /**
+     * Test that {@link AMQPMessageIdHelper#toBaseMessageIdString(Object)}
+     * returns a string indicating an AMQP encoded string, when the given 
string
+     * happens to already begin with the
+     * {@link AMQPMessageIdHelper#AMQP_ULONG_PREFIX}.
+     */
+    @Test
+    public void 
testToBaseMessageIdStringWithStringBeginningWithEncodingPrefixForLong() {
+        String longStringMessageId = AMQPMessageIdHelper.AMQP_ULONG_PREFIX + 
Long.valueOf(123456789L);
+        String expected = AMQPMessageIdHelper.AMQP_STRING_PREFIX + 
longStringMessageId;
+
+        String baseMessageIdString = 
messageIdHelper.toBaseMessageIdString(longStringMessageId);
+        assertNotNull("null string should not have been returned", 
baseMessageIdString);
+        assertEquals("expected base id string was not returned", expected, 
baseMessageIdString);
+    }
+
+    /**
+     * Test that {@link AMQPMessageIdHelper#toBaseMessageIdString(Object)}
+     * returns a string indicating an AMQP encoded string, when the given 
string
+     * happens to already begin with the
+     * {@link AMQPMessageIdHelper#AMQP_BINARY_PREFIX}.
+     */
+    @Test
+    public void 
testToBaseMessageIdStringWithStringBeginningWithEncodingPrefixForBinary() {
+        String binaryStringMessageId = AMQPMessageIdHelper.AMQP_BINARY_PREFIX 
+ "0123456789ABCDEF";
+        String expected = AMQPMessageIdHelper.AMQP_STRING_PREFIX + 
binaryStringMessageId;
+
+        String baseMessageIdString = 
messageIdHelper.toBaseMessageIdString(binaryStringMessageId);
+        assertNotNull("null string should not have been returned", 
baseMessageIdString);
+        assertEquals("expected base id string was not returned", expected, 
baseMessageIdString);
+    }
+
+    /**
+     * Test that {@link AMQPMessageIdHelper#toBaseMessageIdString(Object)}
+     * returns a string indicating an AMQP encoded string (effectively twice),
+     * when the given string happens to already begin with the
+     * {@link AMQPMessageIdHelper#AMQP_STRING_PREFIX}.
+     */
+    @Test
+    public void 
testToBaseMessageIdStringWithStringBeginningWithEncodingPrefixForString() {
+        String stringMessageId = AMQPMessageIdHelper.AMQP_STRING_PREFIX + 
"myStringId";
+        String expected = AMQPMessageIdHelper.AMQP_STRING_PREFIX + 
stringMessageId;
+
+        String baseMessageIdString = 
messageIdHelper.toBaseMessageIdString(stringMessageId);
+        assertNotNull("null string should not have been returned", 
baseMessageIdString);
+        assertEquals("expected base id string was not returned", expected, 
baseMessageIdString);
+    }
+
+    /**
+     * Test that {@link AMQPMessageIdHelper#toBaseMessageIdString(Object)}
+     * returns a string indicating an AMQP encoded UUID when given a UUID
+     * object.
+     */
+    @Test
+    public void testToBaseMessageIdStringWithUUID() {
+        UUID uuidMessageId = UUID.randomUUID();
+        String expected = AMQPMessageIdHelper.AMQP_UUID_PREFIX + 
uuidMessageId.toString();
+
+        String baseMessageIdString = 
messageIdHelper.toBaseMessageIdString(uuidMessageId);
+        assertNotNull("null string should not have been returned", 
baseMessageIdString);
+        assertEquals("expected base id string was not returned", expected, 
baseMessageIdString);
+    }
+
+    /**
+     * Test that {@link AMQPMessageIdHelper#toBaseMessageIdString(Object)}
+     * returns a string indicating an AMQP encoded ulong when given a
+     * UnsignedLong object.
+     */
+    @Test
+    public void testToBaseMessageIdStringWithUnsignedLong() {
+        UnsignedLong uLongMessageId = UnsignedLong.valueOf(123456789L);
+        String expected = AMQPMessageIdHelper.AMQP_ULONG_PREFIX + 
uLongMessageId.toString();
+
+        String baseMessageIdString = 
messageIdHelper.toBaseMessageIdString(uLongMessageId);
+        assertNotNull("null string should not have been returned", 
baseMessageIdString);
+        assertEquals("expected base id string was not returned", expected, 
baseMessageIdString);
+    }
+
+    /**
+     * Test that {@link AMQPMessageIdHelper#toBaseMessageIdString(Object)}
+     * returns a string indicating an AMQP encoded binary when given a Binary
+     * object.
+     */
+    @Test
+    public void testToBaseMessageIdStringWithBinary() {
+        byte[] bytes = new byte[] { (byte) 0x00, (byte) 0xAB, (byte) 0x09, 
(byte) 0xFF };
+        Binary binary = new Binary(bytes);
+
+        String expected = AMQPMessageIdHelper.AMQP_BINARY_PREFIX + "00AB09FF";
+
+        String baseMessageIdString = 
messageIdHelper.toBaseMessageIdString(binary);
+        assertNotNull("null string should not have been returned", 
baseMessageIdString);
+        assertEquals("expected base id string was not returned", expected, 
baseMessageIdString);
+    }
+
+    /**
+     * Test that {@link AMQPMessageIdHelper#toIdObject(String)} returns an
+     * UnsignedLong when given a string indicating an encoded AMQP ulong id.
+     *
+     * @throws Exception
+     *         if an error occurs during the test.
+     */
+    @Test
+    public void testToIdObjectWithEncodedUlong() throws Exception {
+        UnsignedLong longId = UnsignedLong.valueOf(123456789L);
+        String provided = AMQPMessageIdHelper.AMQP_ULONG_PREFIX + "123456789";
+
+        Object idObject = messageIdHelper.toIdObject(provided);
+        assertNotNull("null object should not have been returned", idObject);
+        assertEquals("expected id object was not returned", longId, idObject);
+    }
+
+    /**
+     * Test that {@link AMQPMessageIdHelper#toIdObject(String)} returns a 
Binary
+     * when given a string indicating an encoded AMQP binary id, using upper
+     * case hex characters
+     *
+     * @throws Exception
+     *         if an error occurs during the test.
+     */
+    @Test
+    public void testToIdObjectWithEncodedBinaryUppercaseHexString() throws 
Exception {
+        byte[] bytes = new byte[] { (byte) 0x00, (byte) 0xAB, (byte) 0x09, 
(byte) 0xFF };
+        Binary binaryId = new Binary(bytes);
+
+        String provided = AMQPMessageIdHelper.AMQP_BINARY_PREFIX + "00AB09FF";
+
+        Object idObject = messageIdHelper.toIdObject(provided);
+        assertNotNull("null object should not have been returned", idObject);
+        assertEquals("expected id object was not returned", binaryId, 
idObject);
+    }
+
+    /**
+     * Test that {@link AMQPMessageIdHelper#toIdObject(String)} returns null
+     * when given null.
+     *
+     * @throws Exception
+     *         if an error occurs during the test.
+     */
+    @Test
+    public void testToIdObjectWithNull() throws Exception {
+        assertNull("null object should have been returned", 
messageIdHelper.toIdObject(null));
+    }
+
+    /**
+     * Test that {@link AMQPMessageIdHelper#toIdObject(String)} returns a 
Binary
+     * when given a string indicating an encoded AMQP binary id, using lower
+     * case hex characters.
+     *
+     * @throws Exception
+     *         if an error occurs during the test.
+     */
+    @Test
+    public void testToIdObjectWithEncodedBinaryLowercaseHexString() throws 
Exception {
+        byte[] bytes = new byte[] { (byte) 0x00, (byte) 0xAB, (byte) 0x09, 
(byte) 0xFF };
+        Binary binaryId = new Binary(bytes);
+
+        String provided = AMQPMessageIdHelper.AMQP_BINARY_PREFIX + "00ab09ff";
+
+        Object idObject = messageIdHelper.toIdObject(provided);
+        assertNotNull("null object should not have been returned", idObject);
+        assertEquals("expected id object was not returned", binaryId, 
idObject);
+    }
+
+    /**
+     * Test that {@link AMQPMessageIdHelper#toIdObject(String)} returns a UUID
+     * when given a string indicating an encoded AMQP uuid id.
+     *
+     * @throws Exception
+     *         if an error occurs during the test.
+     */
+    @Test
+    public void testToIdObjectWithEncodedUuid() throws Exception {
+        UUID uuid = UUID.randomUUID();
+        String provided = AMQPMessageIdHelper.AMQP_UUID_PREFIX + 
uuid.toString();
+
+        Object idObject = messageIdHelper.toIdObject(provided);
+        assertNotNull("null object should not have been returned", idObject);
+        assertEquals("expected id object was not returned", uuid, idObject);
+    }
+
+    /**
+     * Test that {@link AMQPMessageIdHelper#toIdObject(String)} returns a 
string
+     * when given a string without any type encoding prefix.
+     *
+     * @throws Exception
+     *         if an error occurs during the test.
+     */
+    @Test
+    public void testToIdObjectWithStringContainingNoEncodingPrefix() throws 
Exception {
+        String stringId = "myStringId";
+
+        Object idObject = messageIdHelper.toIdObject(stringId);
+        assertNotNull("null object should not have been returned", idObject);
+        assertEquals("expected id object was not returned", stringId, 
idObject);
+    }
+
+    /**
+     * Test that {@link AMQPMessageIdHelper#toIdObject(String)} returns the
+     * remainder of the provided string after removing the
+     * {@link AMQPMessageIdHelper#AMQP_STRING_PREFIX} prefix.
+     *
+     * @throws Exception
+     *         if an error occurs during the test.
+     */
+    @Test
+    public void testToIdObjectWithStringContainingStringEncodingPrefix() 
throws Exception {
+        String suffix = "myStringSuffix";
+        String stringId = AMQPMessageIdHelper.AMQP_STRING_PREFIX + suffix;
+
+        Object idObject = messageIdHelper.toIdObject(stringId);
+        assertNotNull("null object should not have been returned", idObject);
+        assertEquals("expected id object was not returned", suffix, idObject);
+    }
+
+    /**
+     * Test that when given a string with with the
+     * {@link AMQPMessageIdHelper#AMQP_STRING_PREFIX} prefix and then
+     * additionally the {@link AMQPMessageIdHelper#AMQP_UUID_PREFIX}, the
+     * {@link AMQPMessageIdHelper#toIdObject(String)} method returns the
+     * remainder of the provided string after removing the
+     * {@link AMQPMessageIdHelper#AMQP_STRING_PREFIX} prefix.
+     *
+     * @throws Exception
+     *         if an error occurs during the test.
+     */
+    @Test
+    public void 
testToIdObjectWithStringContainingStringEncodingPrefixAndThenUuidPrefix() 
throws Exception {
+        String encodedUuidString = AMQPMessageIdHelper.AMQP_UUID_PREFIX + 
UUID.randomUUID().toString();
+        String stringId = AMQPMessageIdHelper.AMQP_STRING_PREFIX + 
encodedUuidString;
+
+        Object idObject = messageIdHelper.toIdObject(stringId);
+        assertNotNull("null object should not have been returned", idObject);
+        assertEquals("expected id object was not returned", encodedUuidString, 
idObject);
+    }
+
+    /**
+     * Test that {@link AMQPMessageIdHelper#toIdObject(String)} throws an
+     * {@link IdConversionException} when presented with an encoded binary hex
+     * string of uneven length (after the prefix) that thus can't be converted
+     * due to each byte using 2 characters
+     */
+    @Test
+    public void 
testToIdObjectWithStringContainingBinaryHexThrowsWithUnevenLengthString() {
+        String unevenHead = AMQPMessageIdHelper.AMQP_BINARY_PREFIX + "123";
+
+        try {
+            messageIdHelper.toIdObject(unevenHead);
+            fail("expected exception was not thrown");
+        } catch (AmqpProtocolException ex) {
+            // expected
+        }
+    }
+
+    /**
+     * Test that {@link AMQPMessageIdHelper#toIdObject(String)} throws an
+     * {@link IdConversionException} when presented with an encoded binary hex
+     * string (after the prefix) that contains characters other than 0-9 and 
A-F
+     * and a-f, and thus can't be converted
+     */
+    @Test
+    public void 
testToIdObjectWithStringContainingBinaryHexThrowsWithNonHexCharacters() {
+
+        // char before '0'
+        char nonHexChar = '/';
+        String nonHexString = AMQPMessageIdHelper.AMQP_BINARY_PREFIX + 
nonHexChar + nonHexChar;
+
+        try {
+            messageIdHelper.toIdObject(nonHexString);
+            fail("expected exception was not thrown");
+        } catch (AmqpProtocolException ex) {
+            // expected
+        }
+
+        // char after '9', before 'A'
+        nonHexChar = ':';
+        nonHexString = AMQPMessageIdHelper.AMQP_BINARY_PREFIX + nonHexChar + 
nonHexChar;
+
+        try {
+            messageIdHelper.toIdObject(nonHexString);
+            fail("expected exception was not thrown");
+        } catch (AmqpProtocolException ex) {
+            // expected
+        }
+
+        // char after 'F', before 'a'
+        nonHexChar = 'G';
+        nonHexString = AMQPMessageIdHelper.AMQP_BINARY_PREFIX + nonHexChar + 
nonHexChar;
+
+        try {
+            messageIdHelper.toIdObject(nonHexString);
+            fail("expected exception was not thrown");
+        } catch (AmqpProtocolException ex) {
+            // expected
+        }
+
+        // char after 'f'
+        nonHexChar = 'g';
+        nonHexString = AMQPMessageIdHelper.AMQP_BINARY_PREFIX + nonHexChar + 
nonHexChar;
+
+        try {
+            messageIdHelper.toIdObject(nonHexString);
+            fail("expected exception was not thrown");
+        } catch (AmqpProtocolException ex) {
+            // expected
+        }
+    }
+}

Reply via email to