start on brokerless integration test for durable subscribers

Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/b1bf687e
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/b1bf687e
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/b1bf687e

Branch: refs/heads/master
Commit: b1bf687e4b92d0dd0488982f4dc0601b83af22e7
Parents: 54b7d68
Author: Robert Gemmell <[email protected]>
Authored: Mon Oct 27 17:53:28 2014 +0000
Committer: Robert Gemmell <[email protected]>
Committed: Mon Oct 27 17:53:28 2014 +0000

----------------------------------------------------------------------
 .../jms/integration/SessionIntegrationTest.java | 28 +++++++++++
 .../qpid/jms/test/testpeer/TestAmqpPeer.java    | 50 ++++++++++++++++++++
 2 files changed, 78 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/b1bf687e/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
index bff1189..ba97522 100644
--- 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
+++ 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
@@ -19,12 +19,16 @@
 package org.apache.qpid.jms.integration;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 
 import javax.jms.Connection;
 import javax.jms.Queue;
 import javax.jms.Session;
 import javax.jms.TemporaryQueue;
+import javax.jms.Topic;
+import javax.jms.TopicSubscriber;
 
 import org.apache.qpid.jms.test.QpidJmsTestCase;
 import org.apache.qpid.jms.test.testpeer.TestAmqpPeer;
@@ -100,4 +104,28 @@ public class SessionIntegrationTest extends 
QpidJmsTestCase {
             testPeer.waitForAllHandlersToComplete(1000);
         }
     }
+
+    @Test(timeout = 5000)
+    public void testCreateDurableTopicSubscriber() throws Exception {
+        try (TestAmqpPeer testPeer = new 
TestAmqpPeer(IntegrationTestFixture.PORT);) {
+            Connection connection = testFixture.establishConnecton(testPeer);
+            connection.start();
+
+            testPeer.expectBegin(true);
+            Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+
+            String topicName = "myTopic";
+            Topic dest = session.createTopic(topicName);
+            String subscriptionName = "mySubscription";
+
+            testPeer.expectDurableSubscriberAttach(topicName, 
subscriptionName);
+
+            TopicSubscriber subscriber = session.createDurableSubscriber(dest, 
subscriptionName);
+            assertNotNull("TopicSubscriber object was null", subscriber);
+            assertFalse("TopicSubscriber should not be no-local", 
subscriber.getNoLocal());
+            assertNull("TopicSubscriber should not have a selector", 
subscriber.getMessageSelector());
+
+            testPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/b1bf687e/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
index 9b97a67..094668b 100644
--- 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
+++ 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
@@ -30,6 +30,8 @@ import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.qpid.jms.JmsConnectionFactory;
+import org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport;
 import org.apache.qpid.jms.test.testpeer.describedtypes.Accepted;
 import org.apache.qpid.jms.test.testpeer.describedtypes.AttachFrame;
 import org.apache.qpid.jms.test.testpeer.describedtypes.BeginFrame;
@@ -56,6 +58,7 @@ import org.apache.qpid.jms.test.testpeer.matchers.EndMatcher;
 import org.apache.qpid.jms.test.testpeer.matchers.FlowMatcher;
 import org.apache.qpid.jms.test.testpeer.matchers.OpenMatcher;
 import org.apache.qpid.jms.test.testpeer.matchers.SaslInitMatcher;
+import org.apache.qpid.jms.test.testpeer.matchers.SourceMatcher;
 import org.apache.qpid.jms.test.testpeer.matchers.TargetMatcher;
 import org.apache.qpid.jms.test.testpeer.matchers.TransferMatcher;
 import org.apache.qpid.proton.Proton;
@@ -520,6 +523,53 @@ public class TestAmqpPeer implements AutoCloseable
         addHandler(attachMatcher);
     }
 
+    public void expectDurableSubscriberAttach(String topicName, String 
subscriptionName)
+    {
+        String topicPrefix = "topic://"; //TODO: this will be removed, delete 
when tests start failing
+
+        SourceMatcher sourceMatcher = new SourceMatcher();
+        sourceMatcher.withAddress(equalTo(topicPrefix + topicName));
+        sourceMatcher.withDynamic(equalTo(false));
+        //TODO: will possibly be changed to a 1/config durability
+        sourceMatcher.withDurable(equalTo(UnsignedInteger.valueOf(2)));//TODO: 
non-literal values for TerminusDurability etc.
+        
sourceMatcher.withExpiryPolicy(equalTo(Symbol.valueOf("never")));//TODO: 
non-literal values for ExpiryPolicy etc.
+
+        final AttachMatcher attachMatcher = new AttachMatcher()
+                .withName(equalTo(subscriptionName))
+                .withHandle(notNullValue())
+                .withRole(equalTo(RECEIVER_ROLE))
+                .withSndSettleMode(equalTo(ATTACH_SND_SETTLE_MODE_UNSETTLED))
+                .withRcvSettleMode(equalTo(ATTACH_RCV_SETTLE_MODE_FIRST))
+                .withSource(sourceMatcher)
+                .withTarget(notNullValue());
+
+        UnsignedInteger linkHandle = 
UnsignedInteger.valueOf(_nextLinkHandle++);
+        final AttachFrame attachResponse = new AttachFrame()
+                            .setHandle(linkHandle)
+                            .setRole(SENDER_ROLE)
+                            .setSndSettleMode(ATTACH_SND_SETTLE_MODE_UNSETTLED)
+                            .setRcvSettleMode(ATTACH_RCV_SETTLE_MODE_FIRST)
+                            .setInitialDeliveryCount(UnsignedInteger.ZERO);
+
+        // The response frame channel will be dynamically set based on the 
incoming frame. Using the -1 is an illegal placeholder.
+        final FrameSender attachResponseSender = new FrameSender(this, 
FrameType.AMQP, -1, attachResponse, null);
+        attachResponseSender.setValueProvider(new ValueProvider()
+        {
+            @Override
+            public void setValues()
+            {
+                
attachResponseSender.setChannel(attachMatcher.getActualChannel());
+                attachResponse.setName(attachMatcher.getReceivedName());
+                attachResponse.setSource(attachMatcher.getReceivedSource());
+                attachResponse.setTarget(attachMatcher.getReceivedTarget());
+            }
+        });
+
+        attachMatcher.onSuccess(attachResponseSender);
+
+        addHandler(attachMatcher);
+    }
+
     public void expectReceiverAttach()
     {
         final AttachMatcher attachMatcher = new AttachMatcher()


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to