Repository: qpid-jms Updated Branches: refs/heads/master 59a5ae3d2 -> 5c483dbcb
initial work on allowing the AMQP message facade to handle the various AMQP message-id and correlation-id types - maintain the underlying type across passing to the application and back - synthesize the JMS 'ID:' prefixing within the client Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/5c483dbc Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/5c483dbc Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/5c483dbc Branch: refs/heads/master Commit: 5c483dbcb602086434d1a4c2c022f1b1175ec87e Parents: 06d8ed1 Author: Robert Gemmell <[email protected]> Authored: Wed Sep 24 18:40:15 2014 +0100 Committer: Robert Gemmell <[email protected]> Committed: Wed Sep 24 19:06:18 2014 +0100 ---------------------------------------------------------------------- .../org/apache/qpid/jms/message/JmsMessage.java | 12 +- .../org/apache/qpid/jms/meta/JmsMessageId.java | 29 +- .../amqp/message/AmqpJmsMessageFacade.java | 80 ++++- .../amqp/message/AmqpJmsMessageFacadeTest.java | 348 +++++++++++++++++++ 4 files changed, 431 insertions(+), 38 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/5c483dbc/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMessage.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMessage.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMessage.java index 8bee2e3..df104de 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMessage.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMessage.java @@ -39,6 +39,7 @@ import org.apache.qpid.jms.util.TypeConversionSupport; public class JmsMessage implements javax.jms.Message { + private static final String ID_PREFIX = "ID:"; protected transient Callable<Void> acknowledgeCallback; protected transient JmsConnection connection; @@ -128,10 +129,17 @@ public class JmsMessage implements javax.jms.Message { @Override public String getJMSMessageID() throws JMSException { - if (facade.getMessageId() == null) { + JmsMessageId facadeId = facade.getMessageId(); + if (facadeId == null) { return null; } - return facade.getMessageId().toString(); + + String value = facadeId.getValue(); + if (value != null && !value.startsWith(ID_PREFIX)) { + value = ID_PREFIX + value; + } + + return value; } @Override http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/5c483dbc/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsMessageId.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsMessageId.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsMessageId.java index 0b1136b..1698231 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsMessageId.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsMessageId.java @@ -22,7 +22,7 @@ package org.apache.qpid.jms.meta; */ public class JmsMessageId extends JmsAbstractResourceId implements Comparable<JmsMessageId> { - protected Object messageId; + protected String messageId; public JmsMessageId(JmsProducerInfo producerInfo, long producerSequenceId) { this(producerInfo.getProducerId(), producerSequenceId); @@ -36,8 +36,8 @@ public class JmsMessageId extends JmsAbstractResourceId implements Comparable<Jm this(producerId + "-" + producerSequenceId); } - public JmsMessageId(Object messageId) { - setValue(messageId); + public JmsMessageId(String messageId) { + this.messageId = messageId; } public JmsMessageId copy() { @@ -46,19 +46,9 @@ public class JmsMessageId extends JmsAbstractResourceId implements Comparable<Jm } /** - * Sets the value as a opaque object - * - * @param messageId - * The new message Id value for this instance. - */ - public void setValue(Object messageId) { - this.messageId = messageId; - } - - /** * @return the set message ID value. */ - public Object getValue() { + public String getValue() { return messageId; } @@ -97,11 +87,12 @@ public class JmsMessageId extends JmsAbstractResourceId implements Comparable<Jm @Override public String toString() { - String result = messageId.toString(); - if (result != null) { - if (!result.startsWith("ID:")) { - result = "ID:" + messageId; - } + String result = "JmsMessageId{messageId = "; + Object id = messageId; + if (id == null) { + result = result + "<null>}"; + } else { + result = result + String.valueOf(id) + "}"; } return result; http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/5c483dbc/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFacade.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFacade.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFacade.java index 1131829..4937000 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFacade.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFacade.java @@ -33,6 +33,7 @@ import javax.jms.JMSException; import javax.jms.MessageFormatException; import org.apache.qpid.jms.JmsDestination; +import org.apache.qpid.jms.exceptions.IdConversionException; import org.apache.qpid.jms.message.facade.JmsMessageFacade; import org.apache.qpid.jms.meta.JmsMessageId; import org.apache.qpid.jms.provider.amqp.AmqpConnection; @@ -245,26 +246,30 @@ public class AmqpJmsMessageFacade implements JmsMessageFacade { @Override public JmsMessageId getMessageId() { - Object result = message.getMessageId(); - if (result != null) { - if (result instanceof String) { - return new JmsMessageId((String) result); - } else { - // TODO - throw new RuntimeException("No support for non-String IDs yet."); - } - } + Object underlying = message.getMessageId(); + AmqpMessageIdHelper helper = AmqpMessageIdHelper.INSTANCE; + String baseStringId = helper.toBaseMessageIdString(underlying); - //TODO: returning a null JmsMessageId object leads to NPE during delivery processing - return null; + //Ensure the ID: prefix is present. + //TODO: should we always do this? AMQP JMS Mapping says never to send the "ID:" prefix. + //TODO: should we make this part of the JmsMessageId, or JmsMessage object responsibilities? + // I Ended up putting it in JmsMessage after the above comment, as a workaround for the current JmsDefaultMessageFacade usage. + if(baseStringId != null && !helper.hasMessageIdPrefix(baseStringId)) + { + baseStringId = AmqpMessageIdHelper.JMS_ID_PREFIX + baseStringId; + } + return new JmsMessageId(baseStringId); } @Override public void setMessageId(JmsMessageId messageId) { - if (messageId != null) { - message.setMessageId(messageId.toString()); - } else { + if (messageId == null) { message.setMessageId(null); + } else { + String value = messageId.getValue(); + // Remove the first 'ID:' prefix if present + value = AmqpMessageIdHelper.INSTANCE.stripMessageIdPrefix(value); + message.setMessageId(value); } } @@ -293,14 +298,55 @@ public class AmqpJmsMessageFacade implements JmsMessageFacade { @Override public String getCorrelationId() { - // TODO Auto-generated method stub - return null; + AmqpMessageIdHelper messageIdHelper = AmqpMessageIdHelper.INSTANCE; + String baseIdString = messageIdHelper.toBaseMessageIdString(message.getCorrelationId()); + + if (baseIdString == null) { + return null; + } else { + Object annotation = getAnnotation(AmqpMessageSupport.JMS_APP_CORRELATION_ID); + boolean appSpecific = Boolean.TRUE.equals(annotation); + + if (appSpecific) { + return baseIdString; + } else { + return AmqpMessageIdHelper.JMS_ID_PREFIX + baseIdString; + } + } } @Override public void setCorrelationId(String correlationId) { - // TODO Auto-generated method stub + AmqpMessageIdHelper messageIdHelper = AmqpMessageIdHelper.INSTANCE; + if (correlationId == null) { + message.setMessageId(null); + } else { + boolean appSpecific = false; + boolean hasMessageIdPrefix = messageIdHelper.hasMessageIdPrefix(correlationId); + if (correlationId != null && !hasMessageIdPrefix) { + appSpecific = true; + } + String stripped = messageIdHelper.stripMessageIdPrefix(correlationId); + + if (hasMessageIdPrefix) { + try { + Object idObject = messageIdHelper.toIdObject(stripped); + message.setCorrelationId(idObject); + } catch (IdConversionException e) { + // TODO decided what to do with this exception + throw new RuntimeException(e); + } + } else { + message.setCorrelationId(stripped); + } + + if (appSpecific) { + setAnnotation(AmqpMessageSupport.JMS_APP_CORRELATION_ID, true); + } else { + removeAnnotation(AmqpMessageSupport.JMS_APP_CORRELATION_ID); + } + } } @Override http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/5c483dbc/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFacadeTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFacadeTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFacadeTest.java new file mode 100644 index 0000000..a276c7f --- /dev/null +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFacadeTest.java @@ -0,0 +1,348 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.jms.provider.amqp.message; + +import static org.junit.Assert.*; + +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; + +import org.apache.qpid.jms.meta.JmsMessageId; +import org.apache.qpid.jms.provider.amqp.AmqpConnection; +import org.apache.qpid.proton.Proton; +import org.apache.qpid.proton.amqp.Binary; +import org.apache.qpid.proton.amqp.Symbol; +import org.apache.qpid.proton.amqp.UnsignedLong; +import org.apache.qpid.proton.amqp.messaging.MessageAnnotations; +import org.apache.qpid.proton.amqp.messaging.Properties; +import org.apache.qpid.proton.message.Message; +import org.junit.Ignore; +import org.junit.Test; +import org.mockito.Mockito; + +public class AmqpJmsMessageFacadeTest { + + private AmqpJmsMessageFacade createNewMessageFacade() { + return new AmqpJmsMessageFacade(createMockAmqpConnection()); + } + + private AmqpJmsMessageFacade createReceivedMessageFacade(AmqpConnection amqpConnection, Message message) { + return new AmqpJmsMessageFacade(amqpConnection, message); + } + + private AmqpConnection createMockAmqpConnection() { + return Mockito.mock(AmqpConnection.class); + } + + // ====== AMQP Properties Section ======= + + @Test + public void testGetCorrelationIdIsNullOnNewMessage() { + AmqpJmsMessageFacade amqpMessageFacade = createNewMessageFacade(); + + assertNull("Expected correlationId to be null on new message", amqpMessageFacade.getCorrelationId()); + } + + /** + * Test that setting then getting an application-specific String as the CorrelationId returns + * the expected value and sets the expected value on the underlying AMQP message, additionally + * setting the annotation to indicate an application-specific correlation-id + */ + @Test + public void testSetGetCorrelationIdOnNewMessageWithStringAppSpecific() { + String testCorrelationId = "myAppSpecificStringCorrelationId"; + + AmqpJmsMessageFacade amqpMessageFacade = createNewMessageFacade(); + amqpMessageFacade.setCorrelationId(testCorrelationId); + + Message amqpMessage = amqpMessageFacade.getAmqpMessage(); + assertEquals("Unexpected correlationId value on underlying AMQP message", testCorrelationId, amqpMessage.getCorrelationId()); + assertEquals("Expected correlationId not returned", testCorrelationId, amqpMessageFacade.getCorrelationId()); + + MessageAnnotations messageAnnotations = amqpMessage.getMessageAnnotations(); + assertNotNull("Message Annotations not present", messageAnnotations); + Object annotation = messageAnnotations.getValue().get(Symbol.valueOf(AmqpMessageSupport.JMS_APP_CORRELATION_ID)); + assertTrue("Message annotation " + AmqpMessageSupport.JMS_APP_CORRELATION_ID + " not set as expected", Boolean.TRUE.equals(annotation)); + } + + /** + * Test that setting then getting an JMSMessageID String as the CorrelationId returns + * the expected value and sets the expected value on the underlying AMQP message, additionally + * checking it does not set the annotation to indicate an application-specific correlation-id + */ + @Test + public void testSetGetCorrelationIdOnNewMessageWithStringJMSMessageID() { + String testCorrelationId = "ID:myJMSMessageIDStringCorrelationId"; + //The underlying AMQP message should not contain the ID: prefix + String stripped = AmqpMessageIdHelper.INSTANCE.stripMessageIdPrefix(testCorrelationId); + + AmqpJmsMessageFacade amqpMessageFacade = createNewMessageFacade(); + amqpMessageFacade.setCorrelationId(testCorrelationId); + + Message amqpMessage = amqpMessageFacade.getAmqpMessage(); + assertEquals("Unexpected correlationId value on underlying AMQP message", stripped, amqpMessage.getCorrelationId()); + assertEquals("Expected correlationId not returned from facade", testCorrelationId, amqpMessageFacade.getCorrelationId()); + + assertNull("Message annotation " + AmqpMessageSupport.JMS_APP_CORRELATION_ID + " not null as expected", amqpMessageFacade.getAnnotation(AmqpMessageSupport.JMS_APP_CORRELATION_ID)); + } + + /** + * Test that getting the correlationId when using an underlying received message with + * an application-specific (no 'ID:' prefix) String correlation id returns the expected value. + */ + @Test + public void testGetCorrelationIdOnReceivedMessageWithStringAppSpecific() { + correlationIdOnReceivedMessageTestImpl("myCorrelationIdString", true); + } + + /** + * Test that getting the correlationId when using an underlying received message with + * a String correlation id representing a JMSMessageID (i.e there is no annotation to + * indicate it is an application-specific correlation-id) returns the expected value. + */ + @Test + public void testGetCorrelationIdOnReceivedMessageWithStringJMSMessageId() { + correlationIdOnReceivedMessageTestImpl("myCorrelationIdString", false); + } + + /** + * Test that setting then getting a UUID as the correlationId returns the expected value, + * and sets the expected value on the underlying AMQP message. + */ + @Test + public void testSetGetCorrelationIdOnNewMessageWithUUID() { + UUID testCorrelationId = UUID.randomUUID(); + String converted = appendIdAndTypePrefix(testCorrelationId); + + AmqpJmsMessageFacade amqpMessageFacade = createNewMessageFacade(); + amqpMessageFacade.setCorrelationId(converted); + + assertEquals("Unexpected correlationId value on underlying AMQP message", testCorrelationId, amqpMessageFacade.getAmqpMessage().getCorrelationId()); + assertEquals("Expected correlationId not returned", converted, amqpMessageFacade.getCorrelationId()); + } + + /** + * Test that getting the correlationId when using an underlying received message with a + * UUID correlation id returns the expected value. + */ + @Test + public void testGetCorrelationIdOnReceivedMessageWithUUID() { + correlationIdOnReceivedMessageTestImpl(UUID.randomUUID(), true); + } + + /** + * Test that setting then getting a ulong correlationId (using BigInteger) returns the expected value + * and sets the expected value on the underlying AMQP message + */ + @Test + public void testSetGetCorrelationIdOnNewMessageWithUnsignedLong() { + Object testCorrelationId = UnsignedLong.valueOf(123456789L); + String converted = appendIdAndTypePrefix(testCorrelationId); + + AmqpJmsMessageFacade amqpMessageFacade = createNewMessageFacade(); + amqpMessageFacade.setCorrelationId(converted); + + assertEquals("Unexpected correlationId value on underlying AMQP message", testCorrelationId, amqpMessageFacade.getAmqpMessage().getCorrelationId()); + assertEquals("Expected correlationId not returned", converted, amqpMessageFacade.getCorrelationId()); + } + + /** + * Test that getting the correlationId when using an underlying received message with a + * ulong correlation id (using BigInteger) returns the expected value. + */ + @Test + public void testGetCorrelationIdOnReceivedMessageWithUnsignedLong() { + correlationIdOnReceivedMessageTestImpl(UnsignedLong.valueOf(123456789L), true); + } + + /** + * Test that setting then getting binary as the correlationId returns the expected value + * and sets the correlation id field as expected on the underlying AMQP message + */ + @Test + public void testSetGetCorrelationIdOnNewMessageWithBinary() { + Binary testCorrelationId = createBinaryId(); + String converted = appendIdAndTypePrefix(testCorrelationId); + + AmqpJmsMessageFacade amqpMessageFacade = createNewMessageFacade(); + amqpMessageFacade.setCorrelationId(converted); + + assertEquals("Unexpected correlationId value on underlying AMQP message", testCorrelationId, amqpMessageFacade.getAmqpMessage().getCorrelationId()); + assertEquals("Expected correlationId not returned", converted, amqpMessageFacade.getCorrelationId()); + } + + /** + * Test that getting the correlationId when using an underlying received message with a + * Binary message id returns the expected ByteBuffer value. + */ + @Test + public void testGetCorrelationIdOnReceivedMessageWithBinary() { + Binary testCorrelationId = createBinaryId(); + + correlationIdOnReceivedMessageTestImpl(testCorrelationId, true); + } + + private void correlationIdOnReceivedMessageTestImpl(final Object testCorrelationId, boolean appSpecificCorrelationId) { + Message message = Proton.message(); + + Properties props = new Properties(); + props.setCorrelationId(testCorrelationId); + message.setProperties(props); + + if(appSpecificCorrelationId) + { + //Add the annotation instructing the client the correlation-id is not a JMS MessageID value. + Map<Symbol, Object> annMap = new HashMap<Symbol, Object>(); + annMap.put(Symbol.valueOf(AmqpMessageSupport.JMS_APP_CORRELATION_ID), true); + MessageAnnotations messageAnnotations = new MessageAnnotations(annMap); + message.setMessageAnnotations(messageAnnotations); + } + + AmqpMessageIdHelper helper = AmqpMessageIdHelper.INSTANCE; + String expected = helper.toBaseMessageIdString(testCorrelationId); + if(!appSpecificCorrelationId && !helper.hasMessageIdPrefix(expected)) + { + expected = AmqpMessageIdHelper.JMS_ID_PREFIX + expected; + } + + AmqpJmsMessageFacade amqpMessageFacade = createReceivedMessageFacade(createMockAmqpConnection(), message); + + assertNotNull("Expected a correlationId on received message", amqpMessageFacade.getCorrelationId()); + + assertEquals("Incorrect correlationId value received", expected, amqpMessageFacade.getCorrelationId()); + } + + @Test + public void testGetMessageIdIsNullOnNewMessage() { + AmqpJmsMessageFacade amqpMessageFacade = createNewMessageFacade(); + + assertNull("Expected messageId value to be null on new message", amqpMessageFacade.getMessageId().getValue()); + } + + /** + * Test that setting then getting a String value as the messageId returns the expected value + */ + @Test + public void testSetGetMessageIdOnNewMessageWithString() { + String testMessageId = "ID:myStringMessageId"; + + AmqpJmsMessageFacade amqpMessageFacade = createNewMessageFacade(); + + JmsMessageId jmsMessageId = new JmsMessageId(testMessageId); + amqpMessageFacade.setMessageId(jmsMessageId); + + assertEquals("Expected messageId object not returned", jmsMessageId, amqpMessageFacade.getMessageId()); + assertEquals("ID strings were not equal", testMessageId, amqpMessageFacade.getMessageId().getValue()); + } + + /** + * Test that getting the messageId when using an underlying received message with a + * String message id returns the expected value. + */ + @Test + public void testGetMessageIdOnReceivedMessageWithString() { + messageIdOnReceivedMessageTestImpl("myMessageIdString"); + } + + /** + * Test that getting the messageId when using an underlying received message with a + * UUID message id returns the expected value. + */ + @Test + public void testGetMessageIdOnReceivedMessageWithUUID() { + messageIdOnReceivedMessageTestImpl(UUID.randomUUID()); + } + + /** + * Test that getting the messageId when using an underlying received message with a + * ulong message id returns the expected value. + */ + @Test + public void testGetMessageIdOnReceivedMessageWithUnsignedLong() { + messageIdOnReceivedMessageTestImpl(UnsignedLong.valueOf(123456789L)); + } + + /** + * Test that getting the messageId when using an underlying received message with a + * Binary message id returns the expected ByteBuffer value. + */ + @Test + public void testGetMessageIdOnReceivedMessageWithBinary() { + Binary testMessageId = createBinaryId(); + + messageIdOnReceivedMessageTestImpl(testMessageId); + } + + private void messageIdOnReceivedMessageTestImpl(Object testMessageId) { + Object underlyingIdObject = testMessageId; + if (!(testMessageId == null || testMessageId instanceof Binary || testMessageId instanceof UnsignedLong || testMessageId instanceof String || testMessageId instanceof UUID)) { + throw new IllegalArgumentException("invalid id type"); + } + + Message message = Proton.message(); + + Properties props = new Properties(); + props.setMessageId(underlyingIdObject); + message.setProperties(props); + + AmqpJmsMessageFacade amqpMessageFacade = createReceivedMessageFacade(createMockAmqpConnection(), message); + + assertNotNull("Expected a messageId on received message", amqpMessageFacade.getMessageId()); + + String expectedString = appendIdAndTypePrefix(testMessageId); + + assertEquals("Incorrect messageId value received", new JmsMessageId(expectedString), amqpMessageFacade.getMessageId()); + } + + private String appendIdAndTypePrefix(Object testMessageId) { + if (testMessageId instanceof Binary) { + ByteBuffer buf = ((Binary) testMessageId).asByteBuffer(); + + byte[] bytes = new byte[buf.remaining()]; + buf.get(bytes); + + return "ID:AMQP_BINARY:" + new AmqpMessageIdHelper().convertBinaryToHexString(bytes); + } else if (testMessageId instanceof UnsignedLong) { + return ("ID:AMQP_ULONG:" + testMessageId); + } else if (testMessageId instanceof UUID) { + return ("ID:AMQP_UUID:" + testMessageId); + } else if (testMessageId instanceof String) { + return "ID:" + testMessageId; + } else if (testMessageId == null) { + return null; + } + + throw new IllegalArgumentException(); + } + + private Binary createBinaryId() { + byte length = 10; + byte[] idBytes = new byte[length]; + for (int i = 0; i < length; i++) { + idBytes[i] = (byte) (length - i); + } + + return new Binary(idBytes); + } + +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
