Repository: qpid-jms Updated Branches: refs/heads/master 0e590e7ce -> 3dbfdb7be
support adding prefixes to the to/reply-to fields on messages when setting JMSDestination/JMSReplyTo value if the connection has prefixes configured Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/9bd7e592 Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/9bd7e592 Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/9bd7e592 Branch: refs/heads/master Commit: 9bd7e5920ea08c380efeb1672b1f94b67134ca8f Parents: 0e590e7 Author: Robert Gemmell <rob...@apache.org> Authored: Fri Dec 19 19:46:32 2014 +0000 Committer: Robert Gemmell <rob...@apache.org> Committed: Fri Dec 19 19:51:08 2014 +0000 ---------------------------------------------------------------------- .../amqp/message/AmqpDestinationHelper.java | 31 +++- .../jms/integration/MessageIntegrationTest.java | 144 ++++++++++++++++++- .../amqp/message/AmqpDestinationHelperTest.java | 8 ++ 3 files changed, 178 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/9bd7e592/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpDestinationHelper.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpDestinationHelper.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpDestinationHelper.java index 0f770fe..da10adb 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpDestinationHelper.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpDestinationHelper.java @@ -150,7 +150,7 @@ public class AmqpDestinationHelper { } public void setToAddressFromDestination(AmqpJmsMessageFacade message, JmsDestination destination) { - String address = destination != null ? destination.getName() : null; + String address = getDestinationAddress(destination, message.getConnection()); Object typeValue = toTypeAnnotation(destination); message.setToAddress(address); @@ -163,7 +163,7 @@ public class AmqpDestinationHelper { } public void setReplyToAddressFromDestination(AmqpJmsMessageFacade message, JmsDestination destination) { - String replyToAddress = destination != null ? destination.getName() : null; + String replyToAddress = getDestinationAddress(destination, message.getConnection()); Object typeValue = toTypeAnnotation(destination); message.setReplyToAddress(replyToAddress); @@ -175,6 +175,33 @@ public class AmqpDestinationHelper { } } + private String getDestinationAddress(JmsDestination destination, AmqpConnection conn) { + if (destination == null) { + return null; + } + + final String name = destination.getName(); + + // Add prefix if necessary + if (!destination.isTemporary()) { + if (destination.isQueue()) { + String queuePrefix = conn.getQueuePrefix(); + if (queuePrefix != null && !name.startsWith(queuePrefix)) { + return queuePrefix + name; + } + } + + if (destination.isTopic()) { + String topicPrefix = conn.getTopicPrefix(); + if (topicPrefix != null && !name.startsWith(topicPrefix)) { + return topicPrefix + name; + } + } + } + + return name; + } + /** * @return the annotation type value, or null if the supplied destination * is null or can't be classified http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/9bd7e592/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MessageIntegrationTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MessageIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MessageIntegrationTest.java index 0734718..ba64434 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MessageIntegrationTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MessageIntegrationTest.java @@ -54,6 +54,7 @@ import org.apache.qpid.jms.test.testpeer.describedtypes.sections.ApplicationProp import org.apache.qpid.jms.test.testpeer.describedtypes.sections.MessageAnnotationsDescribedType; import org.apache.qpid.jms.test.testpeer.describedtypes.sections.PropertiesDescribedType; import org.apache.qpid.jms.test.testpeer.matchers.SourceMatcher; +import org.apache.qpid.jms.test.testpeer.matchers.TargetMatcher; import org.apache.qpid.jms.test.testpeer.matchers.sections.ApplicationPropertiesSectionMatcher; import org.apache.qpid.jms.test.testpeer.matchers.sections.MessageAnnotationsSectionMatcher; import org.apache.qpid.jms.test.testpeer.matchers.sections.MessageHeaderSectionMatcher; @@ -120,7 +121,7 @@ public class MessageIntegrationTest extends QpidJmsTestCase MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true); - MessagePropertiesSectionMatcher propsMatcher = new MessagePropertiesSectionMatcher(true).withTo(equalTo(queueName)); + MessagePropertiesSectionMatcher propsMatcher = new MessagePropertiesSectionMatcher(true).withTo(equalTo("queue://" + queueName)); TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher(); messageMatcher.setHeadersMatcher(headersMatcher); @@ -514,6 +515,143 @@ public class MessageIntegrationTest extends QpidJmsTestCase } } + /** + * Tests that the a connection with a 'topic prefix' set on it adds the + * prefix to the content of the to/reply-to fields for outgoing messages. + */ + @Test(timeout = 2000) + public void testSendMessageWithTopicDestinationsOnConnectionWithTopicPrefix() throws Exception { + Class<? extends Destination> destType = Topic.class; + String destPrefix = "t12321-"; + String destName = "myTopic"; + String destAddress = destPrefix + destName; + Byte annotationValue = AmqpDestinationHelper.TOPIC_TYPE; + + doSendMessageOnConnectionWithPrefixTestImpl(destType, destPrefix, destName, destAddress, annotationValue); + } + + /** + * Tests that the a connection with a 'queue prefix' set on it adds the + * prefix to the content of the to/reply-to fields for outgoing messages. + */ + @Test(timeout = 2000) + public void testSendMessageWithQueueDestinationsOnConnectionWithQueuePrefix() throws Exception { + Class<? extends Destination> destType = Queue.class; + String destPrefix = "q12321-"; + String destName = "myQueue"; + String destAddress = destPrefix + destName; + Byte annotationValue = AmqpDestinationHelper.QUEUE_TYPE; + + doSendMessageOnConnectionWithPrefixTestImpl(destType, destPrefix, destName, destAddress, annotationValue); + } + + /** + * Tests that the a connection with 'destination prefixes' set on it does not add + * the prefix to the content of the to/reply-to fields for TemporaryQueues. + */ + @Test(timeout = 2000) + public void testSendMessageWithTemporaryQueueDestinationsOnConnectionWithDestinationPrefixes() throws Exception { + Class<? extends Destination> destType = TemporaryQueue.class; + String destPrefix = "q12321-"; + String destName = null; + String destAddress = "temp-queue://myTempQueue"; + Byte annotationValue = AmqpDestinationHelper.TEMP_QUEUE_TYPE; + + doSendMessageOnConnectionWithPrefixTestImpl(destType, destPrefix, destName, destAddress, annotationValue); + } + + /** + * Tests that the a connection with 'destination prefixes' set on it does not add + * the prefix to the content of the to/reply-to fields for TemporaryTopics. + */ + @Test(timeout = 2000) + public void testSendMessageWithTemporaryTopicDestinationsOnConnectionWithDestinationPrefixes() throws Exception { + Class<? extends Destination> destType = TemporaryTopic.class; + String destPrefix = "q12321-"; + String destName = null; + String destAddress = "temp-topic://myTempTopic"; + Byte annotationValue = AmqpDestinationHelper.TEMP_TOPIC_TYPE; + + doSendMessageOnConnectionWithPrefixTestImpl(destType, destPrefix, destName, destAddress, annotationValue); + } + + private void doSendMessageOnConnectionWithPrefixTestImpl(Class<? extends Destination> destType, + String destPrefix, + String destName, + String destAddress, + Byte destTypeAnnotationValue) throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer(IntegrationTestFixture.PORT);) { + Connection connection = null; + if (destType == Topic.class) { + connection = testFixture.establishConnecton(testPeer, "?jms.topicPrefix=" + destPrefix); + } else if (destType == Queue.class) { + connection = testFixture.establishConnecton(testPeer, "?jms.queuePrefix=" + destPrefix); + } else { + // Set both the non-temporary prefixes, we wont use non-temp dests but want to ensure they don't affect anything + connection = testFixture.establishConnecton(testPeer, "?jms.topicPrefix=" + destPrefix + "&jms.queuePrefix=" + destPrefix); + } + + connection.start(); + + // Set the prefix if Topic or Queue dest type. + if (destType == Topic.class) { + ((JmsConnection) connection).setTopicPrefix(destPrefix); + } else if (destType == Queue.class) { + ((JmsConnection) connection).setQueuePrefix(destPrefix); + } + + testPeer.expectBegin(true); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + // Create the destination + Destination dest = null; + if (destType == Topic.class) { + dest = session.createTopic(destName); + } else if (destType == Queue.class) { + dest = session.createQueue(destName); + } else if (destType == TemporaryTopic.class) { + // TODO:add method to expect temp topic creation + testPeer.expectTempQueueCreationAttach(destAddress); + dest = session.createTemporaryTopic(); + } else if (destType == TemporaryQueue.class) { + testPeer.expectTempQueueCreationAttach(destAddress); + dest = session.createTemporaryQueue(); + } + + TargetMatcher targetMatcher = new TargetMatcher(); + targetMatcher.withAddress(equalTo(destAddress)); + + testPeer.expectSenderAttach(targetMatcher, false, false); + + MessageProducer producer = session.createProducer(dest); + + MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true); + MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true); + msgAnnotationsMatcher.withEntry(Symbol.valueOf(AmqpMessageSupport.AMQP_TO_ANNOTATION), equalTo(destTypeAnnotationValue)); + msgAnnotationsMatcher.withEntry(Symbol.valueOf(AmqpMessageSupport.AMQP_REPLY_TO_ANNOTATION), equalTo(destTypeAnnotationValue)); + MessagePropertiesSectionMatcher propsMatcher = new MessagePropertiesSectionMatcher(true); + propsMatcher.withTo(equalTo(destAddress)); + propsMatcher.withReplyTo(equalTo(destAddress)); + + TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher(); + messageMatcher.setHeadersMatcher(headersMatcher); + messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher); + messageMatcher.setPropertiesMatcher(propsMatcher); + + //TODO: currently we aren't sending any body section, decide if this is allowed + //messageMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(null)); + testPeer.expectTransfer(messageMatcher); + + Message message = session.createMessage(); + message.setJMSReplyTo(dest); + + producer.send(message); + + testPeer.waitForAllHandlersToComplete(2000); + } + } + // --- byte type annotation values --- // /** @@ -535,7 +673,7 @@ public class MessageIntegrationTest extends QpidJmsTestCase Symbol annotationKey = AmqpMessageSupport.getSymbol(AmqpMessageSupport.AMQP_TO_ANNOTATION); msgAnnotationsMatcher.withEntry(annotationKey, equalTo(AmqpDestinationHelper.TOPIC_TYPE)); - MessagePropertiesSectionMatcher propsMatcher = new MessagePropertiesSectionMatcher(true).withTo(equalTo(topicName)); + MessagePropertiesSectionMatcher propsMatcher = new MessagePropertiesSectionMatcher(true).withTo(equalTo("topic://" + topicName)); TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher(); messageMatcher.setHeadersMatcher(headersMatcher); @@ -574,7 +712,7 @@ public class MessageIntegrationTest extends QpidJmsTestCase Symbol annotationKey = AmqpMessageSupport.getSymbol(AmqpMessageSupport.AMQP_REPLY_TO_ANNOTATION); msgAnnotationsMatcher.withEntry(annotationKey, equalTo(AmqpDestinationHelper.TOPIC_TYPE)); - MessagePropertiesSectionMatcher propsMatcher = new MessagePropertiesSectionMatcher(true).withReplyTo(equalTo(replyTopicName)); + MessagePropertiesSectionMatcher propsMatcher = new MessagePropertiesSectionMatcher(true).withReplyTo(equalTo("topic://" + replyTopicName)); TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher(); messageMatcher.setHeadersMatcher(headersMatcher); http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/9bd7e592/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpDestinationHelperTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpDestinationHelperTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpDestinationHelperTest.java index 2f516de..852199f 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpDestinationHelperTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpDestinationHelperTest.java @@ -487,6 +487,8 @@ public class AmqpDestinationHelperTest { String testAddress = "testAddress"; JmsDestination destination = new JmsQueue("testAddress"); AmqpJmsMessageFacade message = Mockito.mock(AmqpJmsMessageFacade.class); + AmqpConnection conn = Mockito.mock(AmqpConnection.class); + Mockito.when(message.getConnection()).thenReturn(conn); helper.setToAddressFromDestination(message, destination); @@ -499,6 +501,8 @@ public class AmqpDestinationHelperTest { String testAddress = "testAddress"; JmsDestination destination = new JmsTopic("testAddress"); AmqpJmsMessageFacade message = Mockito.mock(AmqpJmsMessageFacade.class); + AmqpConnection conn = Mockito.mock(AmqpConnection.class); + Mockito.when(message.getConnection()).thenReturn(conn); helper.setToAddressFromDestination(message, destination); @@ -569,6 +573,8 @@ public class AmqpDestinationHelperTest { String testAddress = "testAddress"; JmsDestination destination = new JmsQueue("testAddress"); AmqpJmsMessageFacade message = Mockito.mock(AmqpJmsMessageFacade.class); + AmqpConnection conn = Mockito.mock(AmqpConnection.class); + Mockito.when(message.getConnection()).thenReturn(conn); helper.setReplyToAddressFromDestination(message, destination); @@ -581,6 +587,8 @@ public class AmqpDestinationHelperTest { String testAddress = "testAddress"; JmsDestination destination = new JmsTopic("testAddress"); AmqpJmsMessageFacade message = Mockito.mock(AmqpJmsMessageFacade.class); + AmqpConnection conn = Mockito.mock(AmqpConnection.class); + Mockito.when(message.getConnection()).thenReturn(conn); helper.setReplyToAddressFromDestination(message, destination); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org