Repository: qpid-jms Updated Branches: refs/heads/master 06a721625 -> 952de60ae
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/952de60a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java index 64f7023..363c76d 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java @@ -19,6 +19,7 @@ package org.apache.qpid.jms.test.testpeer; import static org.apache.qpid.jms.provider.amqp.AmqpSupport.DYNAMIC_NODE_LIFETIME_POLICY; +import static org.apache.qpid.jms.provider.amqp.AmqpSupport.SHARED_SUBS; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.arrayContaining; import static org.hamcrest.Matchers.equalTo; @@ -41,6 +42,7 @@ import javax.net.ssl.SSLContext; import org.apache.qpid.jms.provider.amqp.AmqpSupport; import org.apache.qpid.jms.provider.amqp.message.AmqpDestinationHelper; +import org.apache.qpid.jms.test.testpeer.basictypes.AmqpError; import org.apache.qpid.jms.test.testpeer.basictypes.ReceiverSettleMode; import org.apache.qpid.jms.test.testpeer.basictypes.Role; import org.apache.qpid.jms.test.testpeer.basictypes.SenderSettleMode; @@ -1060,7 +1062,14 @@ public class TestAmqpPeer implements AutoCloseable expectReceiverAttach(linkNameMatcher, sourceMatcher, settled, refuseLink, false, deferAttachResponseWrite, null, null); } - public void expectReceiverAttach(final Matcher<?> linkNameMatcher, final Matcher<?> sourceMatcher, final boolean settled, final boolean refuseLink, boolean omitDetach, boolean deferAttachResponseWrite, Symbol errorType, String errorMessage) + public void expectReceiverAttach(final Matcher<?> linkNameMatcher, final Matcher<?> sourceMatcher, final boolean settled, final boolean refuseLink, + boolean omitDetach, boolean deferAttachResponseWrite, Symbol errorType, String errorMessage) + { + expectReceiverAttach(linkNameMatcher, sourceMatcher, settled, refuseLink, omitDetach, deferAttachResponseWrite, errorType, errorMessage, null); + } + + public void expectReceiverAttach(final Matcher<?> linkNameMatcher, final Matcher<?> sourceMatcher, final boolean settled, final boolean refuseLink, + boolean omitDetach, boolean deferAttachResponseWrite, Symbol errorType, String errorMessage, final Source responseSourceOverride) { final AttachMatcher attachMatcher = new AttachMatcher() .withName(linkNameMatcher) @@ -1092,6 +1101,8 @@ public class TestAmqpPeer implements AutoCloseable attachResponse.setTarget(attachMatcher.getReceivedTarget()); if(refuseLink) { attachResponse.setSource(null); + } else if(responseSourceOverride != null){ + attachResponse.setSource(responseSourceOverride); } else { attachResponse.setSource(createSourceObjectFromDescribedType(attachMatcher.getReceivedSource())); } @@ -1143,6 +1154,44 @@ public class TestAmqpPeer implements AutoCloseable addHandler(attachMatcher); } + public void expectSharedDurableSubscriberAttach(String topicName, String subscriptionName, Matcher<?> linkNameMatcher, boolean clientIdSet) { + expectSharedSubscriberAttach(topicName, subscriptionName, true, linkNameMatcher, false, clientIdSet); + } + + public void expectSharedVolatileSubscriberAttach(String topicName, String subscriptionName, Matcher<?> linkNameMatcher, boolean clientIdSet) { + expectSharedSubscriberAttach(topicName, subscriptionName, false, linkNameMatcher, false, clientIdSet); + } + + public void expectSharedDurableSubscriberAttach(String topicName, String subscriptionName, Matcher<?> linkNameMatcher, boolean refuseLink, boolean clientIdSet) { + expectSharedSubscriberAttach(topicName, subscriptionName, true, linkNameMatcher, refuseLink, clientIdSet); + } + + private void expectSharedSubscriberAttach(String topicName, String subscriptionName, boolean durable, Matcher<?> linkNameMatcher, boolean refuseLink, boolean clientIdSet) + { + Symbol[] sourceCapabilities; + if(clientIdSet) { + sourceCapabilities = new Symbol[] { AmqpDestinationHelper.TOPIC_CAPABILITY, AmqpSupport.SHARED }; + } else { + sourceCapabilities = new Symbol[] { AmqpDestinationHelper.TOPIC_CAPABILITY, AmqpSupport.SHARED, AmqpSupport.GLOBAL }; + } + + SourceMatcher sourceMatcher = new SourceMatcher(); + sourceMatcher.withAddress(equalTo(topicName)); + sourceMatcher.withDynamic(equalTo(false)); + if(durable) { + //TODO: will possibly be changed to a 1/config durability + sourceMatcher.withDurable(equalTo(TerminusDurability.UNSETTLED_STATE)); + sourceMatcher.withExpiryPolicy(equalTo(TerminusExpiryPolicy.NEVER)); + } else { + sourceMatcher.withDurable(equalTo(TerminusDurability.NONE)); + sourceMatcher.withExpiryPolicy(equalTo(TerminusExpiryPolicy.LINK_DETACH)); + } + + sourceMatcher.withCapabilities(arrayContaining(sourceCapabilities)); + + expectReceiverAttach(linkNameMatcher, sourceMatcher, refuseLink, false); + } + public void expectDurableSubscriberAttach(String topicName, String subscriptionName) { SourceMatcher sourceMatcher = new SourceMatcher(); @@ -1152,9 +1201,43 @@ public class TestAmqpPeer implements AutoCloseable sourceMatcher.withDurable(equalTo(TerminusDurability.UNSETTLED_STATE)); sourceMatcher.withExpiryPolicy(equalTo(TerminusExpiryPolicy.NEVER)); + sourceMatcher.withCapabilities(arrayContaining(AmqpDestinationHelper.TOPIC_CAPABILITY)); + expectReceiverAttach(equalTo(subscriptionName), sourceMatcher); } + public void expectDurableSubUnsubscribeNullSourceLookup(boolean failLookup, boolean shared, String subscriptionName, String topicName, boolean hasClientID) { + String linkName = subscriptionName; + if(!hasClientID) { + linkName += AmqpSupport.SUB_NAME_DELIMITER + "global"; + } + + Matcher<String> linkNameMatcher = equalTo(linkName); + Matcher<Object> nullSourceMatcher = nullValue(); + + Source responseSourceOverride = null; + Symbol errorType = null; + String errorMessage = null; + + if(failLookup){ + errorType = AmqpError.NOT_FOUND; + errorMessage = "No subscription link found"; + } else { + responseSourceOverride = new Source(); + responseSourceOverride.setAddress(topicName); + responseSourceOverride.setDynamic(false); + //TODO: will possibly be changed to a 1/config durability + responseSourceOverride.setDurable(TerminusDurability.UNSETTLED_STATE); + responseSourceOverride.setExpiryPolicy(TerminusExpiryPolicy.NEVER); + + if(shared) { + responseSourceOverride.setCapabilities(new Symbol[]{SHARED_SUBS}); + } + } + + expectReceiverAttach(linkNameMatcher, nullSourceMatcher, false, failLookup, false, false, errorType, errorMessage, responseSourceOverride); + } + public void expectDetach(boolean expectClosed, boolean sendResponse, boolean replyClosed) { Matcher<Boolean> closeMatcher = null; --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
