Repository: qpid-jms Updated Branches: refs/heads/master a587e3fd2 -> 8e3f1bd50
make the consumer sources contain a capability indicating the desired destination type/capability Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/082df05f Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/082df05f Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/082df05f Branch: refs/heads/master Commit: 082df05f44c204d7b4b4b671da97422ed66420ad Parents: 752f1cc Author: Robert Gemmell <[email protected]> Authored: Wed Jan 7 15:06:41 2015 +0000 Committer: Robert Gemmell <[email protected]> Committed: Wed Jan 7 16:34:47 2015 +0000 ---------------------------------------------------------------------- .../qpid/jms/provider/amqp/AmqpConsumer.java | 5 ++ .../jms/integration/SessionIntegrationTest.java | 61 ++++++++++++++++++++ 2 files changed, 66 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/082df05f/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java index 3319297..816f959 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java @@ -195,6 +195,11 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver source.setExpiryPolicy(TerminusExpiryPolicy.LINK_DETACH); } + Symbol typeCapability = AmqpDestinationHelper.INSTANCE.toTypeCapability(resource.getDestination()); + if(typeCapability != null) { + source.setCapabilities(typeCapability); + } + source.setOutcomes(outcomes); Modified modified = new Modified(); http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/082df05f/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java index 48ae0ca..2d96c74 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java @@ -196,6 +196,67 @@ public class SessionIntegrationTest extends QpidJmsTestCase { } @Test(timeout = 5000) + public void testCreateConsumerSourceContainsQueueCapability() throws Exception { + doCreateConsumerSourceContainsCapabilityTestImpl(Queue.class); + } + + @Test(timeout = 5000) + public void testCreateConsumerSourceContainsTopicCapability() throws Exception { + doCreateConsumerSourceContainsCapabilityTestImpl(Topic.class); + } + + @Test(timeout = 5000) + public void testCreateConsumerSourceContainsTempQueueCapability() throws Exception { + doCreateConsumerSourceContainsCapabilityTestImpl(TemporaryQueue.class); + } + + @Test(timeout = 5000) + public void testCreateConsumerSourceContainsTempTopicCapability() throws Exception { + doCreateConsumerSourceContainsCapabilityTestImpl(TemporaryTopic.class); + } + + private void doCreateConsumerSourceContainsCapabilityTestImpl(Class<? extends Destination> destType) throws JMSException, Exception, IOException { + try (TestAmqpPeer testPeer = new TestAmqpPeer(IntegrationTestFixture.PORT);) { + Connection connection = testFixture.establishConnecton(testPeer); + testPeer.expectBegin(true); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + String destName = "myDest"; + Symbol nodeTypeCapability = null; + + Destination dest = null; + if (destType == Queue.class) { + dest = session.createQueue(destName); + nodeTypeCapability = AmqpDestinationHelper.QUEUE_CAPABILITY; + } else if (destType == Topic.class) { + dest = session.createTopic(destName); + nodeTypeCapability = AmqpDestinationHelper.TOPIC_CAPABILITY; + } else if (destType == TemporaryQueue.class) { + testPeer.expectTempQueueCreationAttach(destName); + dest = session.createTemporaryQueue(); + nodeTypeCapability = AmqpDestinationHelper.TEMP_QUEUE_CAPABILITY; + } else if (destType == TemporaryTopic.class) { + testPeer.expectTempTopicCreationAttach(destName); + dest = session.createTemporaryTopic(); + nodeTypeCapability = AmqpDestinationHelper.TEMP_TOPIC_CAPABILITY; + } else { + fail("unexpected type"); + } + + SourceMatcher sourceMatcher = new SourceMatcher(); + sourceMatcher.withCapabilities(arrayContaining(nodeTypeCapability)); + + testPeer.expectReceiverAttach(notNullValue(), sourceMatcher); + testPeer.expectLinkFlow(); + + session.createConsumer(dest); + + testPeer.waitForAllHandlersToComplete(1000); + } + } + + @Test(timeout = 5000) public void testCreateProducerTargetContainsQueueCapability() throws Exception { doCreateProducerTargetContainsCapabilityTestImpl(Queue.class); } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
