start on brokerless integration test for durable subscribers
Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/b1bf687e Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/b1bf687e Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/b1bf687e Branch: refs/heads/master Commit: b1bf687e4b92d0dd0488982f4dc0601b83af22e7 Parents: 54b7d68 Author: Robert Gemmell <[email protected]> Authored: Mon Oct 27 17:53:28 2014 +0000 Committer: Robert Gemmell <[email protected]> Committed: Mon Oct 27 17:53:28 2014 +0000 ---------------------------------------------------------------------- .../jms/integration/SessionIntegrationTest.java | 28 +++++++++++ .../qpid/jms/test/testpeer/TestAmqpPeer.java | 50 ++++++++++++++++++++ 2 files changed, 78 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/b1bf687e/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java index bff1189..ba97522 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java @@ -19,12 +19,16 @@ package org.apache.qpid.jms.integration; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import javax.jms.Connection; import javax.jms.Queue; import javax.jms.Session; import javax.jms.TemporaryQueue; +import javax.jms.Topic; +import javax.jms.TopicSubscriber; import org.apache.qpid.jms.test.QpidJmsTestCase; import org.apache.qpid.jms.test.testpeer.TestAmqpPeer; @@ -100,4 +104,28 @@ public class SessionIntegrationTest extends QpidJmsTestCase { testPeer.waitForAllHandlersToComplete(1000); } } + + @Test(timeout = 5000) + public void testCreateDurableTopicSubscriber() throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer(IntegrationTestFixture.PORT);) { + Connection connection = testFixture.establishConnecton(testPeer); + connection.start(); + + testPeer.expectBegin(true); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + String topicName = "myTopic"; + Topic dest = session.createTopic(topicName); + String subscriptionName = "mySubscription"; + + testPeer.expectDurableSubscriberAttach(topicName, subscriptionName); + + TopicSubscriber subscriber = session.createDurableSubscriber(dest, subscriptionName); + assertNotNull("TopicSubscriber object was null", subscriber); + assertFalse("TopicSubscriber should not be no-local", subscriber.getNoLocal()); + assertNull("TopicSubscriber should not have a selector", subscriber.getMessageSelector()); + + testPeer.waitForAllHandlersToComplete(1000); + } + } } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/b1bf687e/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java index 9b97a67..094668b 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java @@ -30,6 +30,8 @@ import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import org.apache.qpid.jms.JmsConnectionFactory; +import org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport; import org.apache.qpid.jms.test.testpeer.describedtypes.Accepted; import org.apache.qpid.jms.test.testpeer.describedtypes.AttachFrame; import org.apache.qpid.jms.test.testpeer.describedtypes.BeginFrame; @@ -56,6 +58,7 @@ import org.apache.qpid.jms.test.testpeer.matchers.EndMatcher; import org.apache.qpid.jms.test.testpeer.matchers.FlowMatcher; import org.apache.qpid.jms.test.testpeer.matchers.OpenMatcher; import org.apache.qpid.jms.test.testpeer.matchers.SaslInitMatcher; +import org.apache.qpid.jms.test.testpeer.matchers.SourceMatcher; import org.apache.qpid.jms.test.testpeer.matchers.TargetMatcher; import org.apache.qpid.jms.test.testpeer.matchers.TransferMatcher; import org.apache.qpid.proton.Proton; @@ -520,6 +523,53 @@ public class TestAmqpPeer implements AutoCloseable addHandler(attachMatcher); } + public void expectDurableSubscriberAttach(String topicName, String subscriptionName) + { + String topicPrefix = "topic://"; //TODO: this will be removed, delete when tests start failing + + SourceMatcher sourceMatcher = new SourceMatcher(); + sourceMatcher.withAddress(equalTo(topicPrefix + topicName)); + sourceMatcher.withDynamic(equalTo(false)); + //TODO: will possibly be changed to a 1/config durability + sourceMatcher.withDurable(equalTo(UnsignedInteger.valueOf(2)));//TODO: non-literal values for TerminusDurability etc. + sourceMatcher.withExpiryPolicy(equalTo(Symbol.valueOf("never")));//TODO: non-literal values for ExpiryPolicy etc. + + final AttachMatcher attachMatcher = new AttachMatcher() + .withName(equalTo(subscriptionName)) + .withHandle(notNullValue()) + .withRole(equalTo(RECEIVER_ROLE)) + .withSndSettleMode(equalTo(ATTACH_SND_SETTLE_MODE_UNSETTLED)) + .withRcvSettleMode(equalTo(ATTACH_RCV_SETTLE_MODE_FIRST)) + .withSource(sourceMatcher) + .withTarget(notNullValue()); + + UnsignedInteger linkHandle = UnsignedInteger.valueOf(_nextLinkHandle++); + final AttachFrame attachResponse = new AttachFrame() + .setHandle(linkHandle) + .setRole(SENDER_ROLE) + .setSndSettleMode(ATTACH_SND_SETTLE_MODE_UNSETTLED) + .setRcvSettleMode(ATTACH_RCV_SETTLE_MODE_FIRST) + .setInitialDeliveryCount(UnsignedInteger.ZERO); + + // The response frame channel will be dynamically set based on the incoming frame. Using the -1 is an illegal placeholder. + final FrameSender attachResponseSender = new FrameSender(this, FrameType.AMQP, -1, attachResponse, null); + attachResponseSender.setValueProvider(new ValueProvider() + { + @Override + public void setValues() + { + attachResponseSender.setChannel(attachMatcher.getActualChannel()); + attachResponse.setName(attachMatcher.getReceivedName()); + attachResponse.setSource(attachMatcher.getReceivedSource()); + attachResponse.setTarget(attachMatcher.getReceivedTarget()); + } + }); + + attachMatcher.onSuccess(attachResponseSender); + + addHandler(attachMatcher); + } + public void expectReceiverAttach() { final AttachMatcher attachMatcher = new AttachMatcher() --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
