Repository: qpid-jms
Updated Branches:
  refs/heads/master a587e3fd2 -> 8e3f1bd50


make the consumer sources contain a capability indicating the desired 
destination type/capability


Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/082df05f
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/082df05f
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/082df05f

Branch: refs/heads/master
Commit: 082df05f44c204d7b4b4b671da97422ed66420ad
Parents: 752f1cc
Author: Robert Gemmell <[email protected]>
Authored: Wed Jan 7 15:06:41 2015 +0000
Committer: Robert Gemmell <[email protected]>
Committed: Wed Jan 7 16:34:47 2015 +0000

----------------------------------------------------------------------
 .../qpid/jms/provider/amqp/AmqpConsumer.java    |  5 ++
 .../jms/integration/SessionIntegrationTest.java | 61 ++++++++++++++++++++
 2 files changed, 66 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/082df05f/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
index 3319297..816f959 100644
--- 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
+++ 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
@@ -195,6 +195,11 @@ public class AmqpConsumer extends 
AmqpAbstractResource<JmsConsumerInfo, Receiver
             source.setExpiryPolicy(TerminusExpiryPolicy.LINK_DETACH);
         }
 
+        Symbol typeCapability =  
AmqpDestinationHelper.INSTANCE.toTypeCapability(resource.getDestination());
+        if(typeCapability != null) {
+            source.setCapabilities(typeCapability);
+        }
+
         source.setOutcomes(outcomes);
 
         Modified modified = new Modified();

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/082df05f/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
index 48ae0ca..2d96c74 100644
--- 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
+++ 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
@@ -196,6 +196,67 @@ public class SessionIntegrationTest extends 
QpidJmsTestCase {
     }
 
     @Test(timeout = 5000)
+    public void testCreateConsumerSourceContainsQueueCapability() throws 
Exception {
+        doCreateConsumerSourceContainsCapabilityTestImpl(Queue.class);
+    }
+
+    @Test(timeout = 5000)
+    public void testCreateConsumerSourceContainsTopicCapability() throws 
Exception {
+        doCreateConsumerSourceContainsCapabilityTestImpl(Topic.class);
+    }
+
+    @Test(timeout = 5000)
+    public void testCreateConsumerSourceContainsTempQueueCapability() throws 
Exception {
+        doCreateConsumerSourceContainsCapabilityTestImpl(TemporaryQueue.class);
+    }
+
+    @Test(timeout = 5000)
+    public void testCreateConsumerSourceContainsTempTopicCapability() throws 
Exception {
+        doCreateConsumerSourceContainsCapabilityTestImpl(TemporaryTopic.class);
+    }
+
+    private void doCreateConsumerSourceContainsCapabilityTestImpl(Class<? 
extends Destination> destType) throws JMSException, Exception, IOException {
+        try (TestAmqpPeer testPeer = new 
TestAmqpPeer(IntegrationTestFixture.PORT);) {
+            Connection connection = testFixture.establishConnecton(testPeer);
+            testPeer.expectBegin(true);
+
+            Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+
+            String destName = "myDest";
+            Symbol nodeTypeCapability = null;
+
+            Destination dest = null;
+            if (destType == Queue.class) {
+                dest = session.createQueue(destName);
+                nodeTypeCapability = AmqpDestinationHelper.QUEUE_CAPABILITY;
+            } else if (destType == Topic.class) {
+                dest = session.createTopic(destName);
+                nodeTypeCapability = AmqpDestinationHelper.TOPIC_CAPABILITY;
+            } else if (destType == TemporaryQueue.class) {
+                testPeer.expectTempQueueCreationAttach(destName);
+                dest = session.createTemporaryQueue();
+                nodeTypeCapability = 
AmqpDestinationHelper.TEMP_QUEUE_CAPABILITY;
+            } else if (destType == TemporaryTopic.class) {
+                testPeer.expectTempTopicCreationAttach(destName);
+                dest = session.createTemporaryTopic();
+                nodeTypeCapability = 
AmqpDestinationHelper.TEMP_TOPIC_CAPABILITY;
+            } else {
+                fail("unexpected type");
+            }
+
+            SourceMatcher sourceMatcher = new SourceMatcher();
+            
sourceMatcher.withCapabilities(arrayContaining(nodeTypeCapability));
+
+            testPeer.expectReceiverAttach(notNullValue(), sourceMatcher);
+            testPeer.expectLinkFlow();
+
+            session.createConsumer(dest);
+
+            testPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
+    @Test(timeout = 5000)
     public void testCreateProducerTargetContainsQueueCapability() throws 
Exception {
         doCreateProducerTargetContainsCapabilityTestImpl(Queue.class);
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to