Repository: qpid-jms
Updated Branches:
  refs/heads/master 06a721625 -> 952de60ae


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/952de60a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
index 64f7023..363c76d 100644
--- 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
+++ 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
@@ -19,6 +19,7 @@
 package org.apache.qpid.jms.test.testpeer;
 
 import static 
org.apache.qpid.jms.provider.amqp.AmqpSupport.DYNAMIC_NODE_LIFETIME_POLICY;
+import static org.apache.qpid.jms.provider.amqp.AmqpSupport.SHARED_SUBS;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.arrayContaining;
 import static org.hamcrest.Matchers.equalTo;
@@ -41,6 +42,7 @@ import javax.net.ssl.SSLContext;
 
 import org.apache.qpid.jms.provider.amqp.AmqpSupport;
 import org.apache.qpid.jms.provider.amqp.message.AmqpDestinationHelper;
+import org.apache.qpid.jms.test.testpeer.basictypes.AmqpError;
 import org.apache.qpid.jms.test.testpeer.basictypes.ReceiverSettleMode;
 import org.apache.qpid.jms.test.testpeer.basictypes.Role;
 import org.apache.qpid.jms.test.testpeer.basictypes.SenderSettleMode;
@@ -1060,7 +1062,14 @@ public class TestAmqpPeer implements AutoCloseable
         expectReceiverAttach(linkNameMatcher, sourceMatcher, settled, 
refuseLink, false, deferAttachResponseWrite, null, null);
     }
 
-    public void expectReceiverAttach(final Matcher<?> linkNameMatcher, final 
Matcher<?> sourceMatcher, final boolean settled, final boolean refuseLink, 
boolean omitDetach, boolean deferAttachResponseWrite, Symbol errorType, String 
errorMessage)
+    public void expectReceiverAttach(final Matcher<?> linkNameMatcher, final 
Matcher<?> sourceMatcher, final boolean settled, final boolean refuseLink,
+                                     boolean omitDetach, boolean 
deferAttachResponseWrite, Symbol errorType, String errorMessage)
+    {
+        expectReceiverAttach(linkNameMatcher, sourceMatcher, settled, 
refuseLink, omitDetach, deferAttachResponseWrite, errorType, errorMessage, 
null);
+    }
+
+    public void expectReceiverAttach(final Matcher<?> linkNameMatcher, final 
Matcher<?> sourceMatcher, final boolean settled, final boolean refuseLink,
+                                     boolean omitDetach, boolean 
deferAttachResponseWrite, Symbol errorType, String errorMessage, final Source 
responseSourceOverride)
     {
         final AttachMatcher attachMatcher = new AttachMatcher()
                 .withName(linkNameMatcher)
@@ -1092,6 +1101,8 @@ public class TestAmqpPeer implements AutoCloseable
                 attachResponse.setTarget(attachMatcher.getReceivedTarget());
                 if(refuseLink) {
                     attachResponse.setSource(null);
+                } else if(responseSourceOverride != null){
+                    attachResponse.setSource(responseSourceOverride);
                 } else {
                     
attachResponse.setSource(createSourceObjectFromDescribedType(attachMatcher.getReceivedSource()));
                 }
@@ -1143,6 +1154,44 @@ public class TestAmqpPeer implements AutoCloseable
         addHandler(attachMatcher);
     }
 
+    public void expectSharedDurableSubscriberAttach(String topicName, String 
subscriptionName, Matcher<?> linkNameMatcher, boolean clientIdSet) {
+        expectSharedSubscriberAttach(topicName, subscriptionName, true, 
linkNameMatcher, false, clientIdSet);
+    }
+
+    public void expectSharedVolatileSubscriberAttach(String topicName, String 
subscriptionName, Matcher<?> linkNameMatcher, boolean clientIdSet) {
+        expectSharedSubscriberAttach(topicName, subscriptionName, false, 
linkNameMatcher, false, clientIdSet);
+    }
+
+    public void expectSharedDurableSubscriberAttach(String topicName, String 
subscriptionName, Matcher<?> linkNameMatcher, boolean refuseLink, boolean 
clientIdSet) {
+        expectSharedSubscriberAttach(topicName, subscriptionName, true, 
linkNameMatcher, refuseLink, clientIdSet);
+    }
+
+    private void expectSharedSubscriberAttach(String topicName, String 
subscriptionName, boolean durable, Matcher<?> linkNameMatcher, boolean 
refuseLink, boolean clientIdSet)
+    {
+        Symbol[] sourceCapabilities;
+        if(clientIdSet) {
+            sourceCapabilities = new Symbol[] { 
AmqpDestinationHelper.TOPIC_CAPABILITY, AmqpSupport.SHARED };
+        } else {
+            sourceCapabilities = new Symbol[] { 
AmqpDestinationHelper.TOPIC_CAPABILITY, AmqpSupport.SHARED, AmqpSupport.GLOBAL 
};
+        }
+
+        SourceMatcher sourceMatcher = new SourceMatcher();
+        sourceMatcher.withAddress(equalTo(topicName));
+        sourceMatcher.withDynamic(equalTo(false));
+        if(durable) {
+            //TODO: will possibly be changed to a 1/config durability
+            
sourceMatcher.withDurable(equalTo(TerminusDurability.UNSETTLED_STATE));
+            
sourceMatcher.withExpiryPolicy(equalTo(TerminusExpiryPolicy.NEVER));
+        } else {
+            sourceMatcher.withDurable(equalTo(TerminusDurability.NONE));
+            
sourceMatcher.withExpiryPolicy(equalTo(TerminusExpiryPolicy.LINK_DETACH));
+        }
+
+        sourceMatcher.withCapabilities(arrayContaining(sourceCapabilities));
+
+        expectReceiverAttach(linkNameMatcher, sourceMatcher, refuseLink, 
false);
+    }
+
     public void expectDurableSubscriberAttach(String topicName, String 
subscriptionName)
     {
         SourceMatcher sourceMatcher = new SourceMatcher();
@@ -1152,9 +1201,43 @@ public class TestAmqpPeer implements AutoCloseable
         sourceMatcher.withDurable(equalTo(TerminusDurability.UNSETTLED_STATE));
         sourceMatcher.withExpiryPolicy(equalTo(TerminusExpiryPolicy.NEVER));
 
+        
sourceMatcher.withCapabilities(arrayContaining(AmqpDestinationHelper.TOPIC_CAPABILITY));
+
         expectReceiverAttach(equalTo(subscriptionName), sourceMatcher);
     }
 
+    public void expectDurableSubUnsubscribeNullSourceLookup(boolean 
failLookup, boolean shared, String subscriptionName, String topicName, boolean 
hasClientID) {
+        String linkName = subscriptionName;
+        if(!hasClientID) {
+            linkName += AmqpSupport.SUB_NAME_DELIMITER + "global";
+        }
+
+        Matcher<String> linkNameMatcher = equalTo(linkName);
+        Matcher<Object> nullSourceMatcher = nullValue();
+
+        Source responseSourceOverride = null;
+        Symbol errorType = null;
+        String errorMessage = null;
+
+        if(failLookup){
+            errorType = AmqpError.NOT_FOUND;
+            errorMessage = "No subscription link found";
+        } else {
+            responseSourceOverride = new Source();
+            responseSourceOverride.setAddress(topicName);
+            responseSourceOverride.setDynamic(false);
+            //TODO: will possibly be changed to a 1/config durability
+            
responseSourceOverride.setDurable(TerminusDurability.UNSETTLED_STATE);
+            responseSourceOverride.setExpiryPolicy(TerminusExpiryPolicy.NEVER);
+
+            if(shared) {
+                responseSourceOverride.setCapabilities(new 
Symbol[]{SHARED_SUBS});
+            }
+        }
+
+        expectReceiverAttach(linkNameMatcher, nullSourceMatcher, false, 
failLookup, false, false, errorType, errorMessage, responseSourceOverride);
+    }
+
     public void expectDetach(boolean expectClosed, boolean sendResponse, 
boolean replyClosed)
     {
         Matcher<Boolean> closeMatcher = null;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to