make the producer targets contain a capability indicating the desired destination type/capability
Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/0422085a Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/0422085a Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/0422085a Branch: refs/heads/master Commit: 0422085ab5ab634fc719112fcd9ba9ab4029fa01 Parents: 5eca494 Author: Robert Gemmell <[email protected]> Authored: Wed Jan 7 11:41:18 2015 +0000 Committer: Robert Gemmell <[email protected]> Committed: Wed Jan 7 16:34:47 2015 +0000 ---------------------------------------------------------------------- .../jms/provider/amqp/AmqpFixedProducer.java | 12 ++-- .../amqp/message/AmqpDestinationHelper.java | 26 +++++++++ .../jms/integration/SessionIntegrationTest.java | 60 ++++++++++++++++++++ 3 files changed, 92 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/0422085a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java index d05824d..32eecd7 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java @@ -235,12 +235,8 @@ public class AmqpFixedProducer extends AmqpProducer { @Override protected void doOpen() { - String targetAddress = null; - - if (resource.getDestination() != null) { - JmsDestination destination = resource.getDestination(); - targetAddress = AmqpDestinationHelper.INSTANCE.getDestinationAddress(destination, session.getConnection()); - } + JmsDestination destination = resource.getDestination(); + String targetAddress = AmqpDestinationHelper.INSTANCE.getDestinationAddress(destination, session.getConnection()); Symbol[] outcomes = new Symbol[]{Accepted.DESCRIPTOR_SYMBOL, Rejected.DESCRIPTOR_SYMBOL}; String sourceAddress = getProducerId().toString(); @@ -251,6 +247,10 @@ public class AmqpFixedProducer extends AmqpProducer { Target target = new Target(); target.setAddress(targetAddress); + Symbol typeCapability = AmqpDestinationHelper.INSTANCE.toTypeCapability(destination); + if(typeCapability != null) { + target.setCapabilities(typeCapability); + } String senderName = sourceAddress + ":" + targetAddress; http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/0422085a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpDestinationHelper.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpDestinationHelper.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpDestinationHelper.java index 47a7bb1..afeeb69 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpDestinationHelper.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpDestinationHelper.java @@ -25,6 +25,8 @@ import org.apache.qpid.jms.JmsTemporaryQueue; import org.apache.qpid.jms.JmsTemporaryTopic; import org.apache.qpid.jms.JmsTopic; import org.apache.qpid.jms.provider.amqp.AmqpConnection; +import org.apache.qpid.jms.provider.amqp.AmqpTemporaryDestination; +import org.apache.qpid.proton.amqp.Symbol; /** * A set of static utility method useful when mapping JmsDestination types to / from the AMQP @@ -289,4 +291,28 @@ public class AmqpDestinationHelper { } } + /** + * @return the type capability, or null if the supplied destination is null or can't be classified + */ + public Symbol toTypeCapability(JmsDestination destination) { + if (destination == null) { + return null; + } + + if (destination.isQueue()) { + if (destination.isTemporary()) { + return AmqpTemporaryDestination.TEMP_QUEUE_CAPABILITY; + } else { + return Symbol.valueOf("queue");// TODO: constant; + } + } else if (destination.isTopic()) { + if (destination.isTemporary()) { + return AmqpTemporaryDestination.TEMP_TOPIC_CAPABILITY; + } else { + return Symbol.valueOf("topic");// TODO: constant; + } + } + + return null; + } } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/0422085a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java index 53f846b..ebabe4e 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java @@ -33,6 +33,7 @@ import static org.junit.Assert.fail; import java.io.IOException; import javax.jms.Connection; +import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; @@ -47,6 +48,7 @@ import javax.jms.TopicSubscriber; import org.apache.qpid.jms.JmsConnection; import org.apache.qpid.jms.provider.amqp.AmqpConnectionProperties; +import org.apache.qpid.jms.provider.amqp.AmqpTemporaryDestination; import org.apache.qpid.jms.test.QpidJmsTestCase; import org.apache.qpid.jms.test.testpeer.TestAmqpPeer; import org.apache.qpid.jms.test.testpeer.describedtypes.Accepted; @@ -193,6 +195,64 @@ public class SessionIntegrationTest extends QpidJmsTestCase { } @Test(timeout = 5000) + public void testCreateProducerTargetContainsQueueCapability() throws Exception { + doCreateProducerTargetContainsCapabilityTestImpl(Queue.class); + } + + @Test(timeout = 5000) + public void testCreateProducerTargetContainsTopicCapability() throws Exception { + doCreateProducerTargetContainsCapabilityTestImpl(Topic.class); + } + + @Test(timeout = 5000) + public void testCreateProducerTargetContainsTempQueueCapability() throws Exception { + doCreateProducerTargetContainsCapabilityTestImpl(TemporaryQueue.class); + } + + @Test(timeout = 5000) + public void testCreateProducerTargetContainsTempTopicCapability() throws Exception { + doCreateProducerTargetContainsCapabilityTestImpl(TemporaryTopic.class); + } + + private void doCreateProducerTargetContainsCapabilityTestImpl(Class<? extends Destination> destType) throws JMSException, Exception, IOException { + try (TestAmqpPeer testPeer = new TestAmqpPeer(IntegrationTestFixture.PORT);) { + Connection connection = testFixture.establishConnecton(testPeer); + testPeer.expectBegin(true); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + String destName = "myDest"; + Symbol nodeTypeCapability = null; + + Destination dest = null; + if (destType == Queue.class) { + dest = session.createQueue(destName); + nodeTypeCapability = Symbol.valueOf("queue");// TODO: constant + } else if (destType == Topic.class) { + dest = session.createTopic(destName); + nodeTypeCapability = Symbol.valueOf("topic");// TODO: constant + } else if (destType == TemporaryQueue.class) { + testPeer.expectTempQueueCreationAttach(destName); + dest = session.createTemporaryQueue(); + nodeTypeCapability = AmqpTemporaryDestination.TEMP_QUEUE_CAPABILITY; + } else if (destType == TemporaryTopic.class) { + testPeer.expectTempTopicCreationAttach(destName); + dest = session.createTemporaryTopic(); + nodeTypeCapability = AmqpTemporaryDestination.TEMP_TOPIC_CAPABILITY; + } else { + fail("unexpected type"); + } + + TargetMatcher targetMatcher = new TargetMatcher(); + targetMatcher.withCapabilities(arrayContaining(nodeTypeCapability)); + + testPeer.expectSenderAttach(targetMatcher, false, false); + + session.createProducer(dest); + } + } + + @Test(timeout = 5000) public void testCreateDurableTopicSubscriber() throws Exception { try (TestAmqpPeer testPeer = new TestAmqpPeer(testFixture.getAvailablePort());) { Connection connection = testFixture.establishConnecton(testPeer); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
