Repository: qpid-jms Updated Branches: refs/heads/master e68072910 -> badfb1b4d
add ability to toggle the destination type annotation values used Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/badfb1b4 Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/badfb1b4 Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/badfb1b4 Branch: refs/heads/master Commit: badfb1b4dacd4dd4f7f30d9aa30d243133449ba8 Parents: e680729 Author: Robert Gemmell <[email protected]> Authored: Fri Oct 17 12:23:34 2014 +0100 Committer: Robert Gemmell <[email protected]> Committed: Fri Oct 17 12:23:34 2014 +0100 ---------------------------------------------------------------------- .../qpid/jms/provider/amqp/AmqpConnection.java | 8 + .../qpid/jms/provider/amqp/AmqpProvider.java | 9 + .../amqp/message/AmqpJmsMessageFacade.java | 4 +- .../jms/integration/IntegrationTestFixture.java | 12 +- .../jms/integration/MessageIntegrationTest.java | 171 ++++++++++++++++++- .../jms/integration/SenderIntegrationTest.java | 1 - 6 files changed, 200 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/badfb1b4/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java index d9a52da..052a893 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java @@ -284,6 +284,14 @@ public class AmqpConnection extends AmqpAbstractResource<JmsConnectionInfo, Conn } /** + * @return true if the provider has been configured to use byte values for + * destination type annotations. + */ + public boolean isUseByteDestintionTypeAnnotation() { + return provider.isUseByteDestintionTypeAnnotation(); + } + + /** * @return true if anonymous producers should be cached or closed on send complete. */ public boolean isAnonymousProducerCache() { http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/badfb1b4/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java index e5b5f7a..a36a308 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java @@ -88,6 +88,7 @@ public class AmqpProvider extends AbstractProvider implements TransportListener private boolean traceBytes; private boolean presettleConsumers; private boolean presettleProducers; + private boolean useByteDestintionTypeAnnotation = false;//TODO: enable private long connectTimeout = JmsConnectionInfo.DEFAULT_CONNECT_TIMEOUT; private long closeTimeout = JmsConnectionInfo.DEFAULT_CLOSE_TIMEOUT; private long requestTimeout = JmsConnectionInfo.DEFAULT_REQUEST_TIMEOUT; @@ -823,6 +824,14 @@ public class AmqpProvider extends AbstractProvider implements TransportListener this.presettleProducers = presettle; } + public void setUseByteDestintionTypeAnnotation(boolean useByteDestintionTypeAnnotation) { + this.useByteDestintionTypeAnnotation = useByteDestintionTypeAnnotation; + } + + public boolean isUseByteDestintionTypeAnnotation() { + return useByteDestintionTypeAnnotation; + } + /** * @return the currently set Max Frame Size value. */ http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/badfb1b4/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFacade.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFacade.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFacade.java index 9776b94..b65a1da 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFacade.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFacade.java @@ -655,7 +655,7 @@ public class AmqpJmsMessageFacade implements JmsMessageFacade { public void setDestination(JmsDestination destination) { this.destination = destination; lazyCreateMessageAnnotations(); - AmqpDestinationHelper.INSTANCE.setToAddressFromDestination(this, destination, false); + AmqpDestinationHelper.INSTANCE.setToAddressFromDestination(this, destination, connection.isUseByteDestintionTypeAnnotation()); } @Override @@ -676,7 +676,7 @@ public class AmqpJmsMessageFacade implements JmsMessageFacade { public void setReplyTo(JmsDestination replyTo) { this.replyTo = replyTo; lazyCreateMessageAnnotations(); - AmqpDestinationHelper.INSTANCE.setReplyToAddressFromDestination(this, replyTo, false); + AmqpDestinationHelper.INSTANCE.setReplyToAddressFromDestination(this, replyTo, connection.isUseByteDestintionTypeAnnotation()); } @Override http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/badfb1b4/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/IntegrationTestFixture.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/IntegrationTestFixture.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/IntegrationTestFixture.java index cf19d67..4029bf4 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/IntegrationTestFixture.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/IntegrationTestFixture.java @@ -31,12 +31,22 @@ public class IntegrationTestFixture { static final int PORT = 25672; Connection establishConnecton(TestAmqpPeer testPeer) throws JMSException { + return establishConnecton(testPeer, null); + } + + Connection establishConnecton(TestAmqpPeer testPeer, String optionsString) throws JMSException { testPeer.expectPlainConnect("guest", "guest", true); // Each connection creates a session for managing temporary destinations etc testPeer.expectBegin(true); - ConnectionFactory factory = new JmsConnectionFactory("amqp://localhost:" + PORT); + final String baseURI = "amqp://localhost:" + PORT; + String brokerURI = baseURI; + if (optionsString != null) { + brokerURI = baseURI + optionsString; + } + + ConnectionFactory factory = new JmsConnectionFactory(brokerURI); Connection connection = factory.createConnection("guest", "guest"); // Set a clientId to provoke the actual AMQP connection process to occur. http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/badfb1b4/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MessageIntegrationTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MessageIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MessageIntegrationTest.java index 98c79a7..45c257a 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MessageIntegrationTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MessageIntegrationTest.java @@ -40,6 +40,7 @@ import javax.jms.Session; import javax.jms.Topic; import org.apache.qpid.jms.JmsClientProperties; +import org.apache.qpid.jms.provider.amqp.message.AmqpDestinationHelper; import org.apache.qpid.jms.provider.amqp.message.AmqpMessageIdHelper; import org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport; import org.apache.qpid.jms.test.QpidJmsTestCase; @@ -59,6 +60,7 @@ import org.apache.qpid.proton.amqp.DescribedType; import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.amqp.UnsignedInteger; import org.apache.qpid.proton.amqp.UnsignedLong; +import org.junit.Ignore; import org.junit.Test; public class MessageIntegrationTest extends QpidJmsTestCase @@ -268,7 +270,174 @@ public class MessageIntegrationTest extends QpidJmsTestCase } } - // --- old string values --- // + // --- byte type annotation values --- // + + /** + * Tests that the {@link AmqpMessageSupport#AMQP_TO_ANNOTATION} is set as a byte on + * a sent message to indicate its 'to' address represents a Topic JMSDestination. + */ + @Ignore //TODO: enable when toggling default + @Test(timeout = 5000) + public void testSentMessageContainsToTypeAnnotationByte() throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer(IntegrationTestFixture.PORT);) { + Connection connection = testFixture.establishConnecton(testPeer); + testPeer.expectBegin(true); + testPeer.expectSenderAttach(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + String topicName = "myTopic"; + + MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true).withDurable(equalTo(true)); + MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true); + Symbol annotationKey = AmqpMessageSupport.getSymbol(AmqpMessageSupport.AMQP_TO_ANNOTATION); + msgAnnotationsMatcher.withEntry(annotationKey, equalTo(AmqpDestinationHelper.TOPIC_TYPE)); + + MessagePropertiesSectionMatcher propsMatcher = new MessagePropertiesSectionMatcher(true).withTo(equalTo(topicName)); + + TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher(); + messageMatcher.setHeadersMatcher(headersMatcher); + messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher); + messageMatcher.setPropertiesMatcher(propsMatcher); + + testPeer.expectTransfer(messageMatcher); + + Message message = session.createMessage(); + Topic topic = session.createTopic(topicName); + MessageProducer producer = session.createProducer(topic); + producer.send(message); + + testPeer.waitForAllHandlersToComplete(2000); + } + } + + /** + * Tests that the {@link AmqpMessageSupport#AMQP_REPLY_TO_ANNOTATION} is set as a byte on + * a sent message to indicate its 'reply-to' address represents a Topic JMSDestination. + */ + @Ignore //TODO: enable when toggling default + @Test(timeout = 5000) + public void testSentMessageContainsReplyToTypeAnnotationByte() throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer(IntegrationTestFixture.PORT);) { + Connection connection = testFixture.establishConnecton(testPeer); + + testPeer.expectBegin(true); + testPeer.expectSenderAttach(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + String queueName = "myQueue"; + String replyTopicName = "myReplyTopic"; + + MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true).withDurable(equalTo(true)); + MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true); + Symbol annotationKey = AmqpMessageSupport.getSymbol(AmqpMessageSupport.AMQP_REPLY_TO_ANNOTATION); + msgAnnotationsMatcher.withEntry(annotationKey, equalTo(AmqpDestinationHelper.TOPIC_TYPE)); + + MessagePropertiesSectionMatcher propsMatcher = new MessagePropertiesSectionMatcher(true).withReplyTo(equalTo(replyTopicName)); + + TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher(); + messageMatcher.setHeadersMatcher(headersMatcher); + messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher); + messageMatcher.setPropertiesMatcher(propsMatcher); + + testPeer.expectTransfer(messageMatcher); + + Topic replyTopic = session.createTopic(replyTopicName); + Message message = session.createMessage(); + message.setJMSReplyTo(replyTopic); + + Queue queue = session.createQueue(queueName); + MessageProducer producer = session.createProducer(queue); + producer.send(message); + + testPeer.waitForAllHandlersToComplete(2000); + } + } + + // --- old string type annotation values --- // + + /** + * Tests that the {@link AmqpMessageSupport#AMQP_TO_ANNOTATION} is set as a string on + * a sent message to indicate its 'to' address represents a Topic JMSDestination, when + * the provider has been configured to do so. + */ + @Test(timeout = 5000) + public void testSentMessageContainsToTypeAnnotationStringIfConfigured() throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer(IntegrationTestFixture.PORT);) { + // Enable the old string destination type annotation values + Connection connection = testFixture.establishConnecton(testPeer, "?provider.useByteDestintionTypeAnnotation=false"); + + testPeer.expectBegin(true); + testPeer.expectSenderAttach(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + String topicName = "myTopic"; + + MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true).withDurable(equalTo(true)); + MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true); + Symbol annotationKey = AmqpMessageSupport.getSymbol(AmqpMessageSupport.AMQP_TO_ANNOTATION); + msgAnnotationsMatcher.withEntry(annotationKey, equalTo(AmqpDestinationHelper.TOPIC_ATTRIBUTES_STRING)); + + MessagePropertiesSectionMatcher propsMatcher = new MessagePropertiesSectionMatcher(true).withTo(equalTo(topicName)); + + TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher(); + messageMatcher.setHeadersMatcher(headersMatcher); + messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher); + messageMatcher.setPropertiesMatcher(propsMatcher); + + testPeer.expectTransfer(messageMatcher); + + Message message = session.createMessage(); + Topic topic = session.createTopic(topicName); + MessageProducer producer = session.createProducer(topic); + producer.send(message); + + testPeer.waitForAllHandlersToComplete(2000); + } + } + + /** + * Tests that the {@link AmqpMessageSupport#AMQP_REPLY_TO_ANNOTATION} is set as a string on + * a sent message to indicate its 'reply-to' address represents a Topic JMSDestination, when + * the provider has been configured to do so. + */ + @Test(timeout = 5000) + public void testSentMessageContainsReplyToTypeAnnotationStringIfConfigured() throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer(IntegrationTestFixture.PORT);) { + // Enable the old string destination type annotation values + Connection connection = testFixture.establishConnecton(testPeer, "?provider.useByteDestintionTypeAnnotation=false"); + + testPeer.expectBegin(true); + testPeer.expectSenderAttach(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + String queueName = "myQueue"; + String replyTopicName = "myReplyTopic"; + + MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true).withDurable(equalTo(true)); + MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true); + Symbol annotationKey = AmqpMessageSupport.getSymbol(AmqpMessageSupport.AMQP_REPLY_TO_ANNOTATION); + msgAnnotationsMatcher.withEntry(annotationKey, equalTo(AmqpDestinationHelper.TOPIC_ATTRIBUTES_STRING)); + + MessagePropertiesSectionMatcher propsMatcher = new MessagePropertiesSectionMatcher(true).withReplyTo(equalTo(replyTopicName)); + + TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher(); + messageMatcher.setHeadersMatcher(headersMatcher); + messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher); + messageMatcher.setPropertiesMatcher(propsMatcher); + + testPeer.expectTransfer(messageMatcher); + + Topic replyTopic = session.createTopic(replyTopicName); + Message message = session.createMessage(); + message.setJMSReplyTo(replyTopic); + + Queue queue = session.createQueue(queueName); + MessageProducer producer = session.createProducer(queue); + producer.send(message); + + testPeer.waitForAllHandlersToComplete(2000); + } + } /** * Tests that the {@link AmqpMessageSupport#AMQP_TO_ANNOTATION} set on a message to http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/badfb1b4/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SenderIntegrationTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SenderIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SenderIntegrationTest.java index c82462b..108b36c 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SenderIntegrationTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SenderIntegrationTest.java @@ -50,7 +50,6 @@ import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.amqp.UnsignedByte; import org.apache.qpid.proton.amqp.UnsignedInteger; import org.hamcrest.Matcher; -import org.junit.Ignore; import org.junit.Test; public class SenderIntegrationTest extends QpidJmsTestCase { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
