http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/952de60a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SubscriptionsIntegrationTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SubscriptionsIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SubscriptionsIntegrationTest.java new file mode 100644 index 0000000..3e08b56 --- /dev/null +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SubscriptionsIntegrationTest.java @@ -0,0 +1,1480 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.jms.integration; + +import static org.apache.qpid.jms.provider.amqp.AmqpSupport.SHARED_SUBS; +import static org.apache.qpid.jms.provider.amqp.AmqpSupport.SUB_NAME_DELIMITER; +import static org.hamcrest.Matchers.equalTo; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.IOException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import javax.jms.Connection; +import javax.jms.InvalidDestinationException; +import javax.jms.JMSException; +import javax.jms.MessageConsumer; +import javax.jms.Session; +import javax.jms.Topic; +import javax.jms.TopicSubscriber; + +import org.apache.qpid.jms.JmsConnection; +import org.apache.qpid.jms.JmsDefaultConnectionListener; +import org.apache.qpid.jms.provider.amqp.AmqpSupport; +import org.apache.qpid.jms.test.QpidJmsTestCase; +import org.apache.qpid.jms.test.testpeer.TestAmqpPeer; +import org.apache.qpid.jms.test.testpeer.basictypes.AmqpError; +import org.apache.qpid.proton.amqp.Symbol; +import org.hamcrest.Matcher; +import org.junit.Test; + +public class SubscriptionsIntegrationTest extends QpidJmsTestCase { + + private final IntegrationTestFixture testFixture = new IntegrationTestFixture(); + + // -------------------------------------- // + + @Test + public void testConstants() throws Exception { + assertEquals(Symbol.valueOf("SHARED-SUBS"), AmqpSupport.SHARED_SUBS); + assertEquals("|", AmqpSupport.SUB_NAME_DELIMITER); + assertEquals(Symbol.valueOf("shared"), AmqpSupport.SHARED); + assertEquals(Symbol.valueOf("global"), AmqpSupport.GLOBAL); + } + + /** + * Verifies that a shared durable subscriber detaches links with closed = false. + * + * @throws Exception if an unexpected exception occurs + */ + @Test(timeout = 20000) + public void testCloseSharedDurableTopicSubscriberDetachesWithCloseFalse() throws Exception { + doSharedTopicSubscriberDetachTestImpl(true); + } + + /** + * Verifies that a shared volatile subscriber detaches links with closed = true. + * + * @throws Exception if an unexpected exception occurs + */ + @Test(timeout = 20000) + public void testCloseSharedVolatileTopicSubscriberDetachesWithCloseTrue() throws Exception { + doSharedTopicSubscriberDetachTestImpl(false); + } + + private void doSharedTopicSubscriberDetachTestImpl(boolean durable) throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + // Add server connection capability to indicate support for shared-subs + Symbol[] serverCapabilities = new Symbol[]{SHARED_SUBS}; + + Connection connection = testFixture.establishConnecton(testPeer, serverCapabilities); + connection.start(); + + testPeer.expectBegin(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + String topicName = "myTopic"; + Topic dest = session.createTopic(topicName); + String subscriptionName = "mySubscription"; + + if (durable) { + Matcher<?> linkNameMatcher = equalTo(subscriptionName); + testPeer.expectSharedDurableSubscriberAttach(topicName, subscriptionName, linkNameMatcher, true); + } else { + Matcher<?> linkNameMatcher = equalTo(subscriptionName + SUB_NAME_DELIMITER + "volatile1"); + testPeer.expectSharedVolatileSubscriberAttach(topicName, subscriptionName, linkNameMatcher, true); + } + testPeer.expectLinkFlow(); + + MessageConsumer subscriber; + if (durable) { + subscriber = session.createSharedDurableConsumer(dest, subscriptionName); + } else { + subscriber = session.createSharedConsumer(dest, subscriptionName); + } + + testPeer.expectDetach(!durable, true, !durable); + subscriber.close(); + + testPeer.expectClose(); + connection.close(); + + testPeer.waitForAllHandlersToComplete(1000); + } + } + + // -------------------------------------- // + + /** + * Verifies that a shared durable subscribers name their links such that the first link is the + * bare subscription name, and subsquent links use a counter suffix to ensure they are unique. + * + * @throws Exception if an unexpected exception occurs + */ + @Test(timeout = 20000) + public void testSharedDurableSubscriberLinkNames() throws Exception { + doSharedSubsriberLinkNamesHaveUniqueCounterSuffixTestImpl(true, true); + } + + /** + * Verifies that a shared volatile subscribers names their links such that the subscription name + * is suffixed with a counter to ensure they are unique. + * + * @throws Exception if an unexpected exception occurs + */ + @Test(timeout = 20000) + public void testSharedVolatileSubscriberLinkNamesHaveUniqueCounterSuffix() throws Exception { + doSharedSubsriberLinkNamesHaveUniqueCounterSuffixTestImpl(false, true); + } + + /** + * Verifies that on a connection without a ClientID, shared durable subscribers name their links + * such that the first link is the subscription name with 'global' suffix, and subsequent links + * additionally append a counter suffix to ensure they are unique. + * + * @throws Exception if an unexpected exception occurs + */ + @Test(timeout = 20000) + public void testSharedDurableSubscriberLinkNamesNoClientID() throws Exception { + doSharedSubsriberLinkNamesHaveUniqueCounterSuffixTestImpl(true, false); + } + + /** + * Verifies that on a connection without a ClientID, shared volatile subscribers names their links + * such that the subscription name is suffixed with a 'global' qualifier an counter to ensure + * they are unique. + * + * @throws Exception if an unexpected exception occurs + */ + @Test(timeout = 20000) + public void testSharedVolatileSubscriberLinkNamesHaveUniqueCounterSuffixNoClientID() throws Exception { + doSharedSubsriberLinkNamesHaveUniqueCounterSuffixTestImpl(false, false); + } + + private void doSharedSubsriberLinkNamesHaveUniqueCounterSuffixTestImpl(boolean durable, boolean useClientID) throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + // Add server connection capability to indicate support for shared-subs + Symbol[] serverCapabilities = new Symbol[]{SHARED_SUBS}; + + Connection connection; + if(useClientID) { + connection = testFixture.establishConnecton(testPeer, serverCapabilities); + } else { + connection = testFixture.establishConnectonWithoutClientID(testPeer, serverCapabilities); + } + connection.start(); + + testPeer.expectBegin(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + String topicName = "myTopic"; + Topic dest = session.createTopic(topicName); + String subscriptionName = "mySubscription"; + + // Attach the first shared receiver + if (durable) { + String linkName = subscriptionName; + if(!useClientID) { + linkName += SUB_NAME_DELIMITER + "global"; + } + testPeer.expectSharedDurableSubscriberAttach(topicName, subscriptionName, equalTo(linkName), useClientID); + } else { + String linkName; + if(useClientID) { + linkName = subscriptionName + SUB_NAME_DELIMITER + "volatile1"; + } else { + linkName = subscriptionName + SUB_NAME_DELIMITER + "global-volatile1"; + } + testPeer.expectSharedVolatileSubscriberAttach(topicName, subscriptionName, equalTo(linkName), useClientID); + } + testPeer.expectLinkFlow(); + + if (durable) { + session.createSharedDurableConsumer(dest, subscriptionName); + } else { + session.createSharedConsumer(dest, subscriptionName); + } + + // Attach the second shared receiver, expect the link name to differ + if (durable) { + String linkName = subscriptionName; + if(useClientID) { + linkName += SUB_NAME_DELIMITER + "2"; + } else { + linkName += SUB_NAME_DELIMITER + "global2"; + } + testPeer.expectSharedDurableSubscriberAttach(topicName, subscriptionName, equalTo(linkName), useClientID); + } else { + String linkName; + if(useClientID) { + linkName = subscriptionName + SUB_NAME_DELIMITER + "volatile2"; + } else { + linkName = subscriptionName + SUB_NAME_DELIMITER + "global-volatile2"; + } + testPeer.expectSharedVolatileSubscriberAttach(topicName, subscriptionName, equalTo(linkName), useClientID); + } + testPeer.expectLinkFlow(); + + if (durable) { + session.createSharedDurableConsumer(dest, subscriptionName); + } else { + session.createSharedConsumer(dest, subscriptionName); + } + + testPeer.expectClose(); + connection.close(); + + testPeer.waitForAllHandlersToComplete(1000); + } + } + + // -------------------------------------- // + + /** + * Verifies that a shared durable subscribers names their links on a per-connection basis, such that + * suffix counter on one connection is not impacted by subscriptions on another connection. + * + * @throws Exception if an unexpected exception occurs + */ + @Test(timeout = 20000) + public void testSharedDurableSubsriberLinkNamesMultipleConnectionSubs() throws Exception { + doMultipleConnectionSharedSubscriberLinkNamesHaveUniqueCounterSuffixTestImpl(true); + } + + /** + * Verifies that a shared volatile subscribers names their links on a per-connection basis, such that + * suffix counter on one connection is not impacted by subscriptions on another connection. + * + * @throws Exception if an unexpected exception occurs + */ + @Test(timeout = 20000) + public void testSharedVolatileSubsriberLinkNamesHaveUniqueCounterSuffixMultipleConnectionSubs() throws Exception { + doMultipleConnectionSharedSubscriberLinkNamesHaveUniqueCounterSuffixTestImpl(false); + } + + private void doMultipleConnectionSharedSubscriberLinkNamesHaveUniqueCounterSuffixTestImpl(boolean durable) throws Exception { + try (TestAmqpPeer peer1 = new TestAmqpPeer(); + TestAmqpPeer peer2 = new TestAmqpPeer();) { + // Add server connection capability to indicate support for shared-subs + Symbol[] serverCapabilities = new Symbol[]{SHARED_SUBS}; + + // Establish first connection + Connection connection1 = testFixture.establishConnecton(peer1, serverCapabilities); + connection1.start(); + + peer1.expectBegin(); + Session sessionConn1 = connection1.createSession(false, Session.AUTO_ACKNOWLEDGE); + + // Establish second connection + Connection connection2 = testFixture.establishConnecton(peer2, serverCapabilities); + connection2.start(); + + peer2.expectBegin(); + Session sessionConn2 = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE); + + String topicName = "myTopic"; + Topic dest = sessionConn1.createTopic(topicName); + String subscriptionName = "mySubscription"; + + Matcher<?> durSubLinkNameMatcher = equalTo(subscriptionName); + Matcher<?> volatileSubLinkNameMatcher = equalTo(subscriptionName + SUB_NAME_DELIMITER + "volatile1"); + + // Attach the first connections shared receiver + if (durable) { + peer1.expectSharedDurableSubscriberAttach(topicName, subscriptionName, durSubLinkNameMatcher, true); + } else { + peer1.expectSharedVolatileSubscriberAttach(topicName, subscriptionName, volatileSubLinkNameMatcher, true); + } + peer1.expectLinkFlow(); + + if (durable) { + sessionConn1.createSharedDurableConsumer(dest, subscriptionName); + } else { + sessionConn1.createSharedConsumer(dest, subscriptionName); + } + + // Attach the second connections shared receiver, expect the link name to be the same since its per-connection + if (durable) { + peer2.expectSharedDurableSubscriberAttach(topicName, subscriptionName, durSubLinkNameMatcher, true); + } else { + peer2.expectSharedVolatileSubscriberAttach(topicName, subscriptionName, volatileSubLinkNameMatcher, true); + } + peer2.expectLinkFlow(); + + if (durable) { + sessionConn2.createSharedDurableConsumer(dest, subscriptionName); + } else { + sessionConn2.createSharedConsumer(dest, subscriptionName); + } + + peer1.expectClose(); + connection1.close(); + + peer2.expectClose(); + connection2.close(); + + peer1.waitForAllHandlersToComplete(1000); + peer2.waitForAllHandlersToComplete(1000); + } + } + + // -------------------------------------- // + + /** + * Verifies that a shared durable subscribers names their links on a per-connection basis, such that + * suffix counter for one session is impacted by subscriptions on another session on the same connection. + * + * @throws Exception if an unexpected exception occurs + */ + @Test(timeout = 20000) + public void testSharedDurableSubsriberLinkNamesMultipleSessionSubs() throws Exception { + doMultipleSessionSharedSubscriberLinkNamesHaveUniqueCounterSuffixTestImpl(true); + } + + /** + * Verifies that a shared volatile subscribers names their links on a per-connection basis, such that + * suffix counter for one session is impacted by subscriptions on another session on the same connection. + * + * @throws Exception if an unexpected exception occurs + */ + @Test(timeout = 20000) + public void testSharedVolatileSubsriberLinkNamesHaveUniqueCounterSuffixMultipleSessionSubs() throws Exception { + doMultipleSessionSharedSubscriberLinkNamesHaveUniqueCounterSuffixTestImpl(false); + } + + private void doMultipleSessionSharedSubscriberLinkNamesHaveUniqueCounterSuffixTestImpl(boolean durable) throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + // Add server connection capability to indicate support for shared-subs + Symbol[] serverCapabilities = new Symbol[]{SHARED_SUBS}; + + // Establish connection + Connection connection = testFixture.establishConnecton(testPeer, serverCapabilities); + connection.start(); + + testPeer.expectBegin(); + Session session1 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + testPeer.expectBegin(); + Session session2 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + String topicName = "myTopic"; + Topic dest = session1.createTopic(topicName); + String subscriptionName = "mySubscription"; + + // Attach the first sessions shared receiver + if (durable) { + Matcher<?> linkNameMatcher = equalTo(subscriptionName); + testPeer.expectSharedDurableSubscriberAttach(topicName, subscriptionName, linkNameMatcher, true); + } else { + Matcher<?> linkNameMatcher = equalTo(subscriptionName + SUB_NAME_DELIMITER + "volatile1"); + testPeer.expectSharedVolatileSubscriberAttach(topicName, subscriptionName, linkNameMatcher, true); + } + testPeer.expectLinkFlow(); + + if (durable) { + session1.createSharedDurableConsumer(dest, subscriptionName); + } else { + session1.createSharedConsumer(dest, subscriptionName); + } + + // Attach the second sessions shared receiver, expect the link name to be different since its per-connection + if (durable) { + Matcher<?> linkNameMatcher = equalTo(subscriptionName + SUB_NAME_DELIMITER + "2"); + testPeer.expectSharedDurableSubscriberAttach(topicName, subscriptionName, linkNameMatcher, true); + } else { + Matcher<?> linkNameMatcher = equalTo(subscriptionName + SUB_NAME_DELIMITER + "volatile2"); + testPeer.expectSharedVolatileSubscriberAttach(topicName, subscriptionName, linkNameMatcher, true); + } + testPeer.expectLinkFlow(); + + if (durable) { + session2.createSharedDurableConsumer(dest, subscriptionName); + } else { + session2.createSharedConsumer(dest, subscriptionName); + } + + testPeer.expectClose(); + connection.close(); + + testPeer.waitForAllHandlersToComplete(1000); + } + } + + // -------------------------------------- // + + /** + * Verifies that shared durable subscribers name their links on a per-subscription name basis, such that + * suffix counter for one subscription nameis not impacted by those for another subscription on the same connection. + * + * @throws Exception if an unexpected exception occurs + */ + @Test(timeout = 20000) + public void testSharedDurableSubsriberLinkNamesMultipleNamedSubs() throws Exception { + doMultipleNamedSharedSubscriberLinkNamesHaveUniqueCounterSuffixTestImpl(true); + } + + /** + * Verifies that shared volatile subscribers name their links on a per-subscription name basis, such that + * suffix counter for one subscription name is not impacted by those for another subscription on the same connection. + * + * @throws Exception if an unexpected exception occurs + */ + @Test(timeout = 20000) + public void testSharedVolatileSubsriberLinkNamesHaveUniqueCounterSuffixMultipleNamedSubs() throws Exception { + doMultipleNamedSharedSubscriberLinkNamesHaveUniqueCounterSuffixTestImpl(false); + } + + private void doMultipleNamedSharedSubscriberLinkNamesHaveUniqueCounterSuffixTestImpl(boolean durable) throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + // Add server connection capability to indicate support for shared-subs + Symbol[] serverCapabilities = new Symbol[]{SHARED_SUBS}; + + Connection connection = testFixture.establishConnecton(testPeer, serverCapabilities); + connection.start(); + + testPeer.expectBegin(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + String topicName = "myTopic"; + Topic dest = session.createTopic(topicName); + String subscriptionName1 = "mySubscription1"; + String subscriptionName2 = "mySubscription2"; + + // Attach the first subscriptions shared receivers + if (durable) { + Matcher<?> linkNameMatcher = equalTo(subscriptionName1); + testPeer.expectSharedDurableSubscriberAttach(topicName, subscriptionName1, linkNameMatcher, true); + testPeer.expectLinkFlow(); + linkNameMatcher = equalTo(subscriptionName1 + SUB_NAME_DELIMITER + "2"); + testPeer.expectSharedDurableSubscriberAttach(topicName, subscriptionName1, linkNameMatcher, true); + } else { + Matcher<?> linkNameMatcher = equalTo(subscriptionName1 + SUB_NAME_DELIMITER + "volatile1"); + testPeer.expectSharedVolatileSubscriberAttach(topicName, subscriptionName1, linkNameMatcher, true); + testPeer.expectLinkFlow(); + linkNameMatcher = equalTo(subscriptionName1 + SUB_NAME_DELIMITER + "volatile2"); + testPeer.expectSharedVolatileSubscriberAttach(topicName, subscriptionName1, linkNameMatcher, true); + } + testPeer.expectLinkFlow(); + + if (durable) { + session.createSharedDurableConsumer(dest, subscriptionName1); + session.createSharedDurableConsumer(dest, subscriptionName1); + } else { + session.createSharedConsumer(dest, subscriptionName1); + session.createSharedConsumer(dest, subscriptionName1); + } + + // Attach the first subscriptions shared receivers + if (durable) { + Matcher<?> linkNameMatcher = equalTo(subscriptionName2); + testPeer.expectSharedDurableSubscriberAttach(topicName, subscriptionName2, linkNameMatcher, true); + testPeer.expectLinkFlow(); + linkNameMatcher = equalTo(subscriptionName2 + SUB_NAME_DELIMITER + "2"); + testPeer.expectSharedDurableSubscriberAttach(topicName, subscriptionName2, linkNameMatcher, true); + } else { + Matcher<?> linkNameMatcher = equalTo(subscriptionName2 + SUB_NAME_DELIMITER + "volatile1"); + testPeer.expectSharedVolatileSubscriberAttach(topicName, subscriptionName2, linkNameMatcher, true); + testPeer.expectLinkFlow(); + linkNameMatcher = equalTo(subscriptionName2 + SUB_NAME_DELIMITER + "volatile2"); + testPeer.expectSharedVolatileSubscriberAttach(topicName, subscriptionName2, linkNameMatcher, true); + } + testPeer.expectLinkFlow(); + + if (durable) { + session.createSharedDurableConsumer(dest, subscriptionName2); + session.createSharedDurableConsumer(dest, subscriptionName2); + } else { + session.createSharedConsumer(dest, subscriptionName2); + session.createSharedConsumer(dest, subscriptionName2); + } + + testPeer.expectClose(); + connection.close(); + + testPeer.waitForAllHandlersToComplete(1000); + } + } + + // -------------------------------------- // + + /** + * Verifies that a shared volatile subscriber and shared durable subscriber with the same subscription name + * can be active on the same connection at the same time and names their links appropriately to distinguish + * themselves from each other. + * + * @throws Exception if an unexpected exception occurs + */ + @Test(timeout = 20000) + public void testSharedDurableAndVolatileSubsCoexistUsingDistinctLinkNames() throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + // Add server connection capability to indicate support for shared-subs + Symbol[] serverCapabilities = new Symbol[]{SHARED_SUBS}; + + // Establish connection + Connection connection = testFixture.establishConnecton(testPeer, serverCapabilities); + connection.start(); + + testPeer.expectBegin(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + String topicName = "myTopic"; + Topic dest = session.createTopic(topicName); + String subscriptionName = "mySubscription"; + + // Attach the durable shared receiver + Matcher<?> durableLinkNameMatcher = equalTo(subscriptionName); + testPeer.expectSharedDurableSubscriberAttach(topicName, subscriptionName, durableLinkNameMatcher, true); + testPeer.expectLinkFlow(); + + session.createSharedDurableConsumer(dest, subscriptionName); + + // Attach the volatile shared receiver + Matcher<?> volatileLinkNameMatcher = equalTo(subscriptionName + SUB_NAME_DELIMITER + "volatile1"); + testPeer.expectSharedVolatileSubscriberAttach(topicName, subscriptionName, volatileLinkNameMatcher, true); + testPeer.expectLinkFlow(); + + session.createSharedConsumer(dest, subscriptionName); + + testPeer.expectClose(); + connection.close(); + + testPeer.waitForAllHandlersToComplete(1000); + } + } + + // -------------------------------------- // + + /** + * Verifies that on a connection which doesn't identify as supporting shared subscriptions, the + * attempt to create a shared durable subscriber fails. + * + * @throws Exception if an unexpected exception occurs + */ + @Test(timeout = 20000) + public void testCreateSharedDurableTopicSubscriberFailsIfNotSupported() throws Exception { + doSharedTopicSubscriberSupportedTestImpl(true); + } + + /** + * Verifies that on a connection which doesn't identify as supporting shared subscriptions, the + * attempt to create a shared volatile subscriber fails. + * + * @throws Exception if an unexpected exception occurs + */ + @Test(timeout = 20000) + public void testCreateSharedVolatileTopicSubscriberFailsIfNotSupported() throws Exception { + doSharedTopicSubscriberSupportedTestImpl(true); + } + + private void doSharedTopicSubscriberSupportedTestImpl(boolean durable) throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + // DONT include server connection capability to indicate support for shared-subs + Symbol[] serverCapabilities = new Symbol[]{}; + + Connection connection = testFixture.establishConnecton(testPeer, serverCapabilities); + connection.start(); + + testPeer.expectBegin(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + String topicName = "myTopic"; + Topic dest = session.createTopic(topicName); + String subscriptionName = "mySubscription"; + + testPeer.expectClose(); + + try { + if (durable) { + session.createSharedDurableConsumer(dest, subscriptionName); + } else { + session.createSharedConsumer(dest, subscriptionName); + } + + fail("Expected an exception to be thrown"); + } catch (JMSException jmse) { + // expected + } + + connection.close(); + + testPeer.waitForAllHandlersToComplete(1000); + } + } + + // -------------------------------------- // + + /** + * Verifies that a shared durable subscriber and exclusive durable subscriber with the same subscription name + * can't be active on the same connection at the same time (shared first, then exclusive). + * + * @throws Exception if an unexpected exception occurs + */ + @Test(timeout = 20000) + public void testSharedAndExclusiveDurableSubsCantCoexistSharedFirst() throws Exception { + doSharedAndExclusiveDurableSubsCantCoexistTestImpl(true); + } + + /** + * Verifies that a shared durable subscriber and exclusive durable subscriber with the same subscription name + * can't be active on the same connection at the same time (exclusive first, then shared). + * + * @throws Exception if an unexpected exception occurs + */ + @Test(timeout = 20000) + public void testSharedAndExclusiveDurableSubsCantCoexistExclusiveFirst() throws Exception { + doSharedAndExclusiveDurableSubsCantCoexistTestImpl(false); + } + + private void doSharedAndExclusiveDurableSubsCantCoexistTestImpl(boolean sharedFirst) throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + // Add server connection capability to indicate support for shared-subs + Symbol[] serverCapabilities = new Symbol[]{SHARED_SUBS}; + + // Establish connection + Connection connection = testFixture.establishConnecton(testPeer, serverCapabilities); + connection.start(); + + testPeer.expectBegin(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + String topicName = "myTopic"; + Topic dest = session.createTopic(topicName); + String subscriptionName = "mySubscription"; + + if(sharedFirst) { + // Attach the durable shared receiver + Matcher<?> durableLinkNameMatcher = equalTo(subscriptionName); + testPeer.expectSharedDurableSubscriberAttach(topicName, subscriptionName, durableLinkNameMatcher, true); + testPeer.expectLinkFlow(); + + session.createSharedDurableConsumer(dest, subscriptionName); + } else { + // Attach the durable exclusive receiver + testPeer.expectDurableSubscriberAttach(topicName, subscriptionName); + testPeer.expectLinkFlow(); + + session.createDurableConsumer(dest, subscriptionName); + } + + testPeer.expectClose(); + + try { + if (sharedFirst) { + // Now try to attach a durable exclusive receiver + session.createDurableConsumer(dest, subscriptionName); + } else { + // Now try to attach a durable shared receiver + session.createSharedDurableConsumer(dest, subscriptionName); + } + fail("Expected to fail due to concurrent shared + non-shared durable sub attempt"); + } catch (JMSException jmse) { + // Expected + } + + connection.close(); + + testPeer.waitForAllHandlersToComplete(1000); + } + } + + // -------------------------------------- // + + @Test(timeout = 20000) + public void testExclusiveDurableSubCanOnlyBeActiveOnceAtATime() throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + // Establish connection + Connection connection = testFixture.establishConnecton(testPeer); + connection.start(); + + testPeer.expectBegin(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + String topicName = "myTopic"; + Topic dest = session.createTopic(topicName); + String subscriptionName = "mySubscription"; + + // Attach the durable exclusive receiver + testPeer.expectDurableSubscriberAttach(topicName, subscriptionName); + testPeer.expectLinkFlow(); + + MessageConsumer subscriber1 = session.createDurableConsumer(dest, subscriptionName); + + try { + // Now try to attach a second active durable exclusive receiver + session.createDurableConsumer(dest, subscriptionName); + fail("Expected to fail due to concurrent active subscriber attempt"); + } catch (JMSException jmse) { + // Expected + } + + testPeer.expectDetach(false, true, false); + subscriber1.close(); + + // Now try to attach a new active durable exclusive receiver + testPeer.expectDurableSubscriberAttach(topicName, subscriptionName); + testPeer.expectLinkFlow(); + + MessageConsumer subscriber2 = session.createDurableConsumer(dest, subscriptionName); + assertNotNull(subscriber2); + + testPeer.expectClose(); + + connection.close(); + + testPeer.waitForAllHandlersToComplete(1000); + } + } + + // -------------------------------------- // + + /** + * Verifies that a upon failure to locate a subscription link during an + * unsubscribe attempt an [InvalidDestination] exception is thrown. + * + * @throws Exception if an unexpected exception occurs + */ + @Test(timeout = 20000) + public void testUnsubscribeNonExistingSubscription() throws Exception { + doUnsubscribeNonExistingSubscriptionTestImpl(true); + } + + /** + * Verifies that a upon failure to locate a subscription link during an + * unsubscribe attempt an [InvalidDestination] exception is thrown, but + * this time using a connection without a ClientID set (and thus + * expecting a different link name to be used). + * + * @throws Exception if an unexpected exception occurs + */ + @Test(timeout = 20000) + public void testUnsubscribeNonExistingSubscriptionWithoutClientID() throws Exception { + doUnsubscribeNonExistingSubscriptionTestImpl(false); + } + + private void doUnsubscribeNonExistingSubscriptionTestImpl(boolean hasClientID) throws JMSException, InterruptedException, Exception, IOException { + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + Connection connection; + if(hasClientID) { + connection = testFixture.establishConnecton(testPeer); + } else { + connection = testFixture.establishConnectonWithoutClientID(testPeer, null); + } + connection.start(); + + testPeer.expectBegin(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + String subscriptionName = "myInvalidSubscriptionName"; + // Try to unsubscribe non-existing subscription + testPeer.expectDurableSubUnsubscribeNullSourceLookup(true, false, subscriptionName, null, hasClientID); + testPeer.expectDetach(true, true, true); + + try { + session.unsubscribe(subscriptionName); + fail("Should have thrown a InvalidDestinationException"); + } catch (InvalidDestinationException ide) { + // Expected + } + + testPeer.waitForAllHandlersToComplete(1000); + + testPeer.expectClose(); + connection.close(); + + testPeer.waitForAllHandlersToComplete(1000); + } + } + + /** + * Verifies that a upon attempt to unsubscribe a subscription that is + * currently in active use by a subscriber, an exception is thrown, + * and that once the subscriber is closed a further unsubscribe attempt + * is successfully undertaken. + * + * @throws Exception if an unexpected exception occurs + */ + @Test(timeout = 20000) + public void testUnsubscribeExclusiveDurableSubWhileActiveThenInactive() throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + Connection connection = testFixture.establishConnecton(testPeer); + connection.start(); + + testPeer.expectBegin(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + String topicName = "myTopic"; + Topic dest = session.createTopic(topicName); + String subscriptionName = "mySubscription"; + + // Attach the durable exclusive receiver + testPeer.expectDurableSubscriberAttach(topicName, subscriptionName); + testPeer.expectLinkFlow(); + + TopicSubscriber subscriber = session.createDurableSubscriber(dest, subscriptionName); + assertNotNull("TopicSubscriber object was null", subscriber); + + // Now try to unsubscribe, should fail + try { + session.unsubscribe(subscriptionName); + fail("Should have thrown a JMSException"); + } catch (JMSException ex) { + } + + // Now close the subscriber + testPeer.expectDetach(false, true, false); + + subscriber.close(); + + // Try to unsubscribe again, should work now + testPeer.expectDurableSubUnsubscribeNullSourceLookup(false, false, subscriptionName, topicName, true); + testPeer.expectDetach(true, true, true); + + session.unsubscribe(subscriptionName); + + testPeer.waitForAllHandlersToComplete(1000); + + testPeer.expectClose(); + connection.close(); + + testPeer.waitForAllHandlersToComplete(1000); + } + } + + /** + * Verifies that a upon attempt to unsubscribe a shared durable subscription that is + * currently in active use by multiple subscribers, an exception is thrown, and that + * only once the last subscriber is closed is an unsubscribe attempt successful. + * + * @throws Exception if an unexpected exception occurs + */ + @Test(timeout = 20000) + public void testUnsubscribeSharedDurableSubWhileActiveThenInactive() throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + // Add server connection capability to indicate support for shared-subs + Symbol[] serverCapabilities = new Symbol[]{SHARED_SUBS}; + + Connection connection = testFixture.establishConnecton(testPeer, serverCapabilities); + connection.start(); + + testPeer.expectBegin(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + String topicName = "myTopic"; + Topic dest = session.createTopic(topicName); + String subscriptionName = "mySubscription"; + + // Attach the first durable shared receiver + Matcher<?> linkNameMatcher = equalTo(subscriptionName); + testPeer.expectSharedDurableSubscriberAttach(topicName, subscriptionName, linkNameMatcher, true); + testPeer.expectLinkFlow(); + + MessageConsumer subscriber1 = session.createSharedDurableConsumer(dest, subscriptionName); + + // Attach the second durable shared receiver + linkNameMatcher = equalTo(subscriptionName + SUB_NAME_DELIMITER + "2"); + testPeer.expectSharedDurableSubscriberAttach(topicName, subscriptionName, linkNameMatcher, true); + testPeer.expectLinkFlow(); + + MessageConsumer subscriber2 = session.createSharedDurableConsumer(dest, subscriptionName); + + testPeer.waitForAllHandlersToComplete(1000); + + // Now try to unsubscribe, should fail + try { + session.unsubscribe(subscriptionName); + fail("Should have thrown a JMSException"); + } catch (JMSException ex) { + // Expected + } + + // Now close the first subscriber + testPeer.expectDetach(false, true, false); + subscriber1.close(); + + testPeer.waitForAllHandlersToComplete(1000); + + // Try to unsubscribe again, should still fail + try { + session.unsubscribe(subscriptionName); + fail("Should have thrown a JMSException"); + } catch (JMSException ex) { + // Expected + } + + testPeer.waitForAllHandlersToComplete(1000); + + // Now close the second subscriber + testPeer.expectDetach(false, true, false); + + subscriber2.close(); + + // Try to unsubscribe again, should now work + testPeer.expectDurableSubUnsubscribeNullSourceLookup(false, false, subscriptionName, topicName, true); + testPeer.expectDetach(true, true, true); + + session.unsubscribe(subscriptionName); + + testPeer.waitForAllHandlersToComplete(1000); + + testPeer.expectClose(); + connection.close(); + + testPeer.waitForAllHandlersToComplete(1000); + } + } + + // -------------------------------------- // + + /** + * Verifies that a upon a failed attempt to create a shared subscriber that it + * is possible to unsubscribe (not that it should be required), i.e that the + * failed creation does not leave an incorrect recording of an active subscriber. + * + * @throws Exception if an unexpected exception occurs + */ + @Test(timeout = 20000) + public void testUnsubscribeAfterFailedCreation() throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + // Add server connection capability to indicate support for shared-subs + Symbol[] serverCapabilities = new Symbol[]{SHARED_SUBS}; + + Connection connection = testFixture.establishConnecton(testPeer, serverCapabilities); + connection.start(); + + testPeer.expectBegin(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + String topicName = "myTopic"; + Topic dest = session.createTopic(topicName); + String subscriptionName = "mySubscription"; + + // Fail to attach a shared durable receiver + Matcher<?> linkNameMatcher = equalTo(subscriptionName); + testPeer.expectSharedDurableSubscriberAttach(topicName, subscriptionName, linkNameMatcher, true, true); + testPeer.expectDetach(true, false, true); + + try { + session.createSharedDurableConsumer(dest, subscriptionName); + fail("Should have thrown a JMSException"); + } catch (JMSException ex) { + // Expected + } + + // Try to unsubscribe, should be able to (strictly speaking an unsub attempt + // would probably fail normally, due to no subscription, but this test + // doesn't care about that, just that the attempt proceeds, so overlook that. + testPeer.expectDurableSubUnsubscribeNullSourceLookup(false, false, subscriptionName, topicName, true); + testPeer.expectDetach(true, true, true); + + session.unsubscribe(subscriptionName); + + testPeer.waitForAllHandlersToComplete(1000); + + testPeer.expectClose(); + connection.close(); + + testPeer.waitForAllHandlersToComplete(1000); + } + } + + /** + * Verifies that subscriber cleanup occurs when the subscriber is remotely closed (after creation). + */ + @Test(timeout = 20000) + public void testRemotelyDetachLinkWithDurableSharedConsumer() throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + // Add server connection capability to indicate support for shared-subs + Symbol[] serverCapabilities = new Symbol[]{SHARED_SUBS}; + + // Establish connection + Connection connection = testFixture.establishConnecton(testPeer, serverCapabilities); + + final CountDownLatch subscriberClosed = new CountDownLatch(1); + ((JmsConnection) connection).addConnectionListener(new JmsDefaultConnectionListener() { + @Override + public void onConsumerClosed(MessageConsumer consumer, Exception exception) { + subscriberClosed.countDown(); + } + }); + + // Create first session + testPeer.expectBegin(); + Session session1 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + String topicName = "myTopic"; + Topic dest = session1.createTopic(topicName); + + String subscriptionName = "mySubscription"; + + // Attach the first shared receiver on the first session + Matcher<?> durableLinkNameMatcher = equalTo(subscriptionName); + testPeer.expectSharedDurableSubscriberAttach(topicName, subscriptionName, durableLinkNameMatcher, true); + testPeer.expectLinkFlow(); + + MessageConsumer subscriber1 = session1.createSharedDurableConsumer(dest, subscriptionName); + + // Create second session + testPeer.expectBegin(); + Session session2 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + // Attach the second shared receiver on the second session + durableLinkNameMatcher = equalTo(subscriptionName + SUB_NAME_DELIMITER + "2"); + testPeer.expectSharedDurableSubscriberAttach(topicName, subscriptionName, durableLinkNameMatcher, true); + + testPeer.expectLinkFlow(); + + // Then remotely detach the link after the flow is received. + testPeer.remotelyDetachLastOpenedLinkOnLastOpenedSession(true, false, AmqpError.RESOURCE_LIMIT_EXCEEDED, "TestingRemoteDetach"); + + MessageConsumer subscriber2 = session2.createSharedDurableConsumer(dest, subscriptionName); + assertNotNull(subscriber2); + + assertTrue("Consumer closed callback didn't trigger", subscriberClosed.await(5, TimeUnit.SECONDS)); + + testPeer.waitForAllHandlersToComplete(1000); + + // Now try to unsubscribe (using first session). It should fail due to sub still + // being in use on the first session. No frames should be sent. + try { + session1.unsubscribe(subscriptionName); + fail("Should have thrown a JMSException"); + } catch (JMSException ex) { + // Expected + } + + // Now close the first subscriber + testPeer.expectDetach(false, true, false); + subscriber1.close(); + + testPeer.waitForAllHandlersToComplete(1000); + + // Try to unsubscribe again (using first session), should now work. + testPeer.expectDurableSubUnsubscribeNullSourceLookup(false, false, subscriptionName, topicName, true); + testPeer.expectDetach(true, true, true); + + session1.unsubscribe(subscriptionName); + + testPeer.expectClose(); + connection.close(); + } + } + + // -------------------------------------- // + + /** + * Verifies that a shared durable subscriber creation using the same subscription + * name but different topic than an existing subscriber can't be active on the + * same connection at the same time fails (can't change details in use). + * + * @throws Exception if an unexpected exception occurs + */ + @Test(timeout = 20000) + public void testSharedDurableSubChangeOfTopic() throws Exception { + doSharedSubChangeOfDetailsTestImpl(true, true, false); + } + + /** + * Verifies that a shared volatile subscriber creation using the same subscription + * name but different topic than an existing subscriber can't be active on the + * same connection at the same time fails (can't change details in use). + * + * @throws Exception if an unexpected exception occurs + */ + @Test(timeout = 20000) + public void testSharedVolatileSubChangeOfTopic() throws Exception { + doSharedSubChangeOfDetailsTestImpl(false, true, false); + } + + /** + * Verifies that a shared durable subscriber creation using the same subscription + * name but different selector than an existing subscriber can't be active on the + * same connection at the same time fails (can't change details in use). + * + * @throws Exception if an unexpected exception occurs + */ + @Test(timeout = 20000) + public void testSharedDurableSubChangeOfSelector() throws Exception { + doSharedSubChangeOfDetailsTestImpl(true, false, true); + } + + /** + * Verifies that a shared volatile subscriber creation using the same subscription + * name but different selector than an existing subscriber can't be active on the + * same connection at the same time fails (can't change details in use). + * + * @throws Exception if an unexpected exception occurs + */ + @Test(timeout = 20000) + public void testSharedVolatileSubChangeOfSelector() throws Exception { + doSharedSubChangeOfDetailsTestImpl(false, false, true); + } + + /** + * Verifies that a shared durable subscriber creation using the same subscription + * name but different topic and selector than an existing subscriber can't be active + * on the same connection at the same time fails (can't change details in use). + * + * @throws Exception if an unexpected exception occurs + */ + @Test(timeout = 20000) + public void testSharedDurableSubChangeOfTopicAndSelector() throws Exception { + doSharedSubChangeOfDetailsTestImpl(true, true, true); + } + + /** + * Verifies that a shared volatile subscriber creation using the same subscription + * name but different topic and selector than an existing subscriber can't be active + * on the same connection at the same time fails (can't change details in use). + * + * @throws Exception if an unexpected exception occurs + */ + @Test(timeout = 20000) + public void testSharedVolatileSubChangeOfTopicAndSelector() throws Exception { + doSharedSubChangeOfDetailsTestImpl(false, true, true); + } + + private void doSharedSubChangeOfDetailsTestImpl(boolean durable, boolean changeTopic, boolean changeSelector) throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + // Add server connection capability to indicate support for shared-subs + Symbol[] serverCapabilities = new Symbol[]{SHARED_SUBS}; + + // Establish connection + Connection connection = testFixture.establishConnecton(testPeer, serverCapabilities); + connection.start(); + + testPeer.expectBegin(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + String topicName1 = "myTopic"; + String topicName2 = topicName1; + if (changeTopic) { + topicName2 += "2"; + } + + String noSelectorNull = null; + String selector2 = null; + if (changeSelector) { + selector2 = "someProperty = 2"; + } + + Topic dest1 = session.createTopic(topicName1); + Topic dest2 = session.createTopic(topicName2); + + String subscriptionName = "mySubscription"; + + // Attach the first shared receiver + if (durable) { + Matcher<?> durableLinkNameMatcher = equalTo(subscriptionName); + testPeer.expectSharedDurableSubscriberAttach(topicName1, subscriptionName, durableLinkNameMatcher, true); + } else { + Matcher<?> volatileLinkNameMatcher = equalTo(subscriptionName + SUB_NAME_DELIMITER + "volatile1"); + testPeer.expectSharedVolatileSubscriberAttach(topicName1, subscriptionName, volatileLinkNameMatcher, true); + } + testPeer.expectLinkFlow(); + + MessageConsumer subscriber1; + if (durable) { + subscriber1 = session.createSharedDurableConsumer(dest1, subscriptionName, noSelectorNull); + } else { + subscriber1 = session.createSharedConsumer(dest1, subscriptionName, noSelectorNull); + } + + testPeer.waitForAllHandlersToComplete(1000); + + // Now try to create another subscriber, same name but different topic. Should fail. No frames should be sent. + try { + if (durable) { + session.createSharedDurableConsumer(dest2, subscriptionName, selector2); + } else { + session.createSharedConsumer(dest2, subscriptionName, selector2); + } + + fail("Expected to fail due to attempting change of subscription details while subscriber is active"); + } catch (JMSException jmse) { + // Expected + } + + // Now close the first subscriber + if (durable) { + testPeer.expectDetach(false, true, false); + } else { + testPeer.expectDetach(true, true, true); + } + subscriber1.close(); + + // Now try a new subscriber again, with changed topic, it should succeed (note also the verified reuse of link names). + if (durable) { + Matcher<?> durableLinkNameMatcher = equalTo(subscriptionName); + testPeer.expectSharedDurableSubscriberAttach(topicName2, subscriptionName, durableLinkNameMatcher, true); + } else { + Matcher<?> volatileLinkNameMatcher = equalTo(subscriptionName + SUB_NAME_DELIMITER + "volatile1"); + testPeer.expectSharedVolatileSubscriberAttach(topicName2, subscriptionName, volatileLinkNameMatcher, true); + } + testPeer.expectLinkFlow(); + + MessageConsumer subscriber2; + if (durable) { + subscriber2 = session.createSharedDurableConsumer(dest2, subscriptionName, selector2); + } else { + subscriber2 = session.createSharedConsumer(dest2, subscriptionName, selector2); + } + assertNotNull(subscriber2); + + testPeer.expectClose(); + connection.close(); + + testPeer.waitForAllHandlersToComplete(1000); + } + } + + // -------------------------------------- // + + /** + * Verifies that subscriber cleanup occurs when the session it is on is remotely closed. + */ + @Test(timeout = 20000) + public void testRemotelyEndSessionWithDurableSharedConsumer() throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + // Add server connection capability to indicate support for shared-subs + Symbol[] serverCapabilities = new Symbol[]{SHARED_SUBS}; + + // Establish connection + Connection connection = testFixture.establishConnecton(testPeer, serverCapabilities); + + final CountDownLatch sessionClosed = new CountDownLatch(1); + ((JmsConnection) connection).addConnectionListener(new JmsDefaultConnectionListener() { + @Override + public void onSessionClosed(Session session, Exception exception) { + sessionClosed.countDown(); + } + }); + + // Create first session + testPeer.expectBegin(); + Session session1 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + String topicName = "myTopic"; + Topic dest = session1.createTopic(topicName); + + String subscriptionName = "mySubscription"; + + // Attach the first shared receiver on the first session + Matcher<?> durableLinkNameMatcher = equalTo(subscriptionName); + testPeer.expectSharedDurableSubscriberAttach(topicName, subscriptionName, durableLinkNameMatcher, true); + testPeer.expectLinkFlow(); + + MessageConsumer subscriber1 = session1.createSharedDurableConsumer(dest, subscriptionName); + + // Create second session + testPeer.expectBegin(); + Session session2 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + // Attach the second shared receiver on the second session + durableLinkNameMatcher = equalTo(subscriptionName + SUB_NAME_DELIMITER + "2"); + testPeer.expectSharedDurableSubscriberAttach(topicName, subscriptionName, durableLinkNameMatcher, true); + + testPeer.expectLinkFlow(); + + // Then remotely end the session (and thus the subscriber along with it) after the flow is received. + testPeer.remotelyEndLastOpenedSession(true, 0, AmqpError.RESOURCE_LIMIT_EXCEEDED, "TestingRemoteClosure"); + + MessageConsumer subscriber2 = session2.createSharedDurableConsumer(dest, subscriptionName); + assertNotNull(subscriber2); + + assertTrue("Session closed callback didn't trigger", sessionClosed.await(5, TimeUnit.SECONDS)); + + testPeer.waitForAllHandlersToComplete(1000); + + // Now try to unsubscribe (using first session, still open). It should fail due to sub still + // being in use on the first session. No frames should be sent. + try { + session1.unsubscribe(subscriptionName); + fail("Should have thrown a JMSException"); + } catch (JMSException ex) { + // Expected + } + + // Now close the first subscriber + testPeer.expectDetach(false, true, false); + subscriber1.close(); + + testPeer.waitForAllHandlersToComplete(1000); + + // Try to unsubscribe again (using first session, still open), should now work. + testPeer.expectDurableSubUnsubscribeNullSourceLookup(false, false, subscriptionName, topicName, true); + testPeer.expectDetach(true, true, true); + + session1.unsubscribe(subscriptionName); + + testPeer.expectClose(); + connection.close(); + } + } + + /** + * Verifies that subscriber cleanup occurs when the session it is on is locally closed. + */ + @Test(timeout = 20000) + public void testLocallyEndSessionWithSharedConsumer() throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + // Add server connection capability to indicate support for shared-subs + Symbol[] serverCapabilities = new Symbol[]{SHARED_SUBS}; + + // Establish connection + Connection connection = testFixture.establishConnecton(testPeer, serverCapabilities); + + final CountDownLatch sessionClosed = new CountDownLatch(1); + ((JmsConnection) connection).addConnectionListener(new JmsDefaultConnectionListener() { + @Override + public void onSessionClosed(Session session, Exception exception) { + sessionClosed.countDown(); + } + }); + + // Create first session + testPeer.expectBegin(); + Session session1 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + String topicName = "myTopic"; + Topic dest = session1.createTopic(topicName); + + String subscriptionName = "mySubscription"; + + // Attach the first shared receiver on the first session + Matcher<?> durableLinkNameMatcher = equalTo(subscriptionName); + testPeer.expectSharedDurableSubscriberAttach(topicName, subscriptionName, durableLinkNameMatcher, true); + testPeer.expectLinkFlow(); + + MessageConsumer subscriber1 = session1.createSharedDurableConsumer(dest, subscriptionName); + + // Create second session + testPeer.expectBegin(); + Session session2 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + // Attach the second shared receiver on the second session + durableLinkNameMatcher = equalTo(subscriptionName + SUB_NAME_DELIMITER + "2"); + testPeer.expectSharedDurableSubscriberAttach(topicName, subscriptionName, durableLinkNameMatcher, true); + testPeer.expectLinkFlow(); + + MessageConsumer subscriber2 = session2.createSharedDurableConsumer(dest, subscriptionName); + assertNotNull(subscriber2); + + // Now close the second session (and thus the subscriber along with it). + testPeer.expectEnd(); + session2.close(); + + // Now try to unsubscribe (using first session, still open). It should fail due to sub still + // being in use on the first session. No frames should be sent. + try { + session1.unsubscribe(subscriptionName); + fail("Should have thrown a JMSException"); + } catch (JMSException ex) { + // Expected + } + + // Now close the first subscriber + testPeer.expectDetach(false, true, false); + subscriber1.close(); + + testPeer.waitForAllHandlersToComplete(1000); + + // Try to unsubscribe again (using first session, still open), should now work. + testPeer.expectDurableSubUnsubscribeNullSourceLookup(false, false, subscriptionName, topicName, true); + testPeer.expectDetach(true, true, true); + + session1.unsubscribe(subscriptionName); + + testPeer.expectClose(); + connection.close(); + } + } + + // -------------------------------------- // + + /** + * Verifies that subscription name passed is not allowed to have the subscription name + * delimiter used in the receiver link naming to separate the subscription name from + * a suffix used to ensure unique link names are used on a connection. + */ + @Test(timeout = 20000) + public void testSubscriptionNameNotAllowedToHaveNameSeparator() throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + // Add server connection capability to indicate support for shared-subs + Symbol[] serverCapabilities = new Symbol[]{SHARED_SUBS}; + + // Establish connection + Connection connection = testFixture.establishConnecton(testPeer, serverCapabilities); + + // Create session + testPeer.expectBegin(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + // Try the various methods that take subscription name with a name that + // contains the delimiter, they should fail. No frames should be sent. + String subscriptionName = "thisName|hasTheDelimiterAlready"; + + String topicName = "myTopic"; + Topic dest = session.createTopic(topicName); + + try { + session.createDurableSubscriber(dest, subscriptionName); + fail("Should have thrown a JMSException"); + } catch (JMSException ex) { + // Expected + } + + try { + session.createDurableConsumer(dest, subscriptionName); + fail("Should have thrown a JMSException"); + } catch (JMSException ex) { + // Expected + } + + try { + session.createSharedDurableConsumer(dest, subscriptionName); + fail("Should have thrown a JMSException"); + } catch (JMSException ex) { + // Expected + } + + try { + session.createSharedConsumer(dest, subscriptionName); + fail("Should have thrown a JMSException"); + } catch (JMSException ex) { + // Expected + } + + try { + session.unsubscribe(subscriptionName); + fail("Should have thrown a JMSException"); + } catch (JMSException ex) { + // Expected + } + + testPeer.waitForAllHandlersToComplete(1000); + + // Now close connection, should work. + testPeer.expectClose(); + connection.close(); + } + } +}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/952de60a/qpid-jms-client/src/test/java/org/apache/qpid/jms/meta/JmsConnectionInfoTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/meta/JmsConnectionInfoTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/meta/JmsConnectionInfoTest.java index 0511840..7ac99f8 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/meta/JmsConnectionInfoTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/meta/JmsConnectionInfoTest.java @@ -68,7 +68,7 @@ public class JmsConnectionInfoTest { JmsConnectionInfo info = new JmsConnectionInfo(firstId); info.setForceSyncSend(true); - info.setClientId("test"); + info.setClientId("test", true); info.setCloseTimeout(100); info.setConnectTimeout(200); info.setForceAsyncSends(true); @@ -89,6 +89,7 @@ public class JmsConnectionInfoTest { assertEquals(true, copy.isForceSyncSend()); assertEquals("test", copy.getClientId()); + assertEquals(true, copy.isExplicitClientID()); assertEquals(100, copy.getCloseTimeout()); assertEquals(200, copy.getConnectTimeout()); assertEquals(true, copy.isForceAsyncSend()); @@ -161,6 +162,14 @@ public class JmsConnectionInfoTest { } @Test + public void testIsExplicitClientId() { + final JmsConnectionInfo info = new JmsConnectionInfo(firstId); + assertFalse(info.isExplicitClientID()); + info.setClientId("something", true); + assertTrue(info.isExplicitClientID()); + } + + @Test public void testGetEncodedUsername() { final JmsConnectionInfo info = new JmsConnectionInfo(firstId); info.setUsername("user"); http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/952de60a/qpid-jms-client/src/test/java/org/apache/qpid/jms/meta/JmsConsumerInfoTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/meta/JmsConsumerInfoTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/meta/JmsConsumerInfoTest.java index 9bca41e..d4fdcc5 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/meta/JmsConsumerInfoTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/meta/JmsConsumerInfoTest.java @@ -20,6 +20,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotSame; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; @@ -85,12 +86,14 @@ public class JmsConsumerInfoTest { info.setAcknowledgementMode(1); info.setBrowser(true); - info.setClientId("test"); + info.setExplicitClientID(true); info.setDestination(new JmsTopic("Test")); info.setLastDeliveredSequenceId(42); info.setNoLocal(true); info.setPrefetchSize(123456); info.setSelector("select"); + info.setDurable(true); + info.setShared(true); info.setSubscriptionName("name"); info.setRedeliveryPolicy(new JmsDefaultRedeliveryPolicy()); info.setListener(true); @@ -99,10 +102,12 @@ public class JmsConsumerInfoTest { assertEquals(1, copy.getAcknowledgementMode()); assertEquals(true, copy.isBrowser()); - assertEquals("test", copy.getClientId()); + assertEquals(true, copy.isExplicitClientID()); assertEquals(new JmsTopic("Test"), copy.getDestination()); assertEquals(42, copy.getLastDeliveredSequenceId()); assertEquals(true, copy.isNoLocal()); + assertEquals(true, copy.isDurable()); + assertEquals(true, copy.isShared()); assertEquals(123456, copy.getPrefetchSize()); assertEquals("select", copy.getSelector()); assertEquals("name", copy.getSubscriptionName()); @@ -116,11 +121,37 @@ public class JmsConsumerInfoTest { public void testIsDurable() { JmsConsumerInfo info = new JmsConsumerInfo(firstId); assertFalse(info.isDurable()); - info.setSubscriptionName("name"); + info.setDurable(true); assertTrue(info.isDurable()); } @Test + public void testIsExplicitClientID() { + JmsConsumerInfo info = new JmsConsumerInfo(firstId); + assertFalse(info.isExplicitClientID()); + info.setExplicitClientID(true); + assertTrue(info.isExplicitClientID()); + } + + @Test + public void testIsShared() { + JmsConsumerInfo info = new JmsConsumerInfo(firstId); + assertFalse(info.isShared()); + info.setShared(true); + assertTrue(info.isShared()); + } + + @Test + public void testGetSubscriptionName() { + String subName = "name"; + + JmsConsumerInfo info = new JmsConsumerInfo(firstId); + assertNull(info.getSubscriptionName()); + info.setSubscriptionName(subName); + assertEquals(subName, info.getSubscriptionName()); + } + + @Test public void testCompareTo() { JmsConsumerInfo first = new JmsConsumerInfo(firstId); JmsConsumerInfo second = new JmsConsumerInfo(secondId); http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/952de60a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/AmqpSubscriptionTrackerTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/AmqpSubscriptionTrackerTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/AmqpSubscriptionTrackerTest.java new file mode 100644 index 0000000..2a7c893 --- /dev/null +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/AmqpSubscriptionTrackerTest.java @@ -0,0 +1,285 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.jms.provider.amqp; + +import static org.apache.qpid.jms.provider.amqp.AmqpSupport.SUB_NAME_DELIMITER; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.qpid.jms.JmsTopic; +import org.apache.qpid.jms.meta.JmsConsumerId; +import org.apache.qpid.jms.meta.JmsConsumerInfo; +import org.junit.Test; + +public class AmqpSubscriptionTrackerTest { + + private AtomicInteger consumerIdCounter = new AtomicInteger(); + + private JmsConsumerInfo createConsumerInfo(String subscriptionName, String topicName, boolean shared, boolean durable, boolean hasClientID) { + return createConsumerInfo(subscriptionName, topicName, shared, durable, null, hasClientID); + } + + private JmsConsumerInfo createConsumerInfo(String subscriptionName, String topicName, boolean shared, boolean durable, String selector, boolean isExplicitClientID) { + JmsConsumerId consumerId = new JmsConsumerId("ID:MOCK:1", 1, consumerIdCounter .incrementAndGet()); + JmsTopic topic = new JmsTopic(topicName); + + JmsConsumerInfo consumerInfo = new JmsConsumerInfo(consumerId); + + consumerInfo.setSubscriptionName(subscriptionName); + consumerInfo.setDestination(topic); + consumerInfo.setShared(shared); + consumerInfo.setDurable(durable); + consumerInfo.setSelector(selector); + consumerInfo.setExplicitClientID(isExplicitClientID); + + return consumerInfo; + } + + @Test + public void testReserveNextSubscriptionLinkNameSharedDurable() { + String topicName = "myTopic"; + String subscriptionName1 = "mySubscription1"; + String subscriptionName2 = "mySubscription2"; + + AmqpSubscriptionTracker tracker = new AmqpSubscriptionTracker(); + + // For the first shared sub name + JmsConsumerInfo sub1consumer1 = createConsumerInfo(subscriptionName1, topicName, true, true, true); + assertEquals("Unexpected first sub link name", subscriptionName1, tracker.reserveNextSubscriptionLinkName(subscriptionName1, sub1consumer1)); + JmsConsumerInfo sub1consumer2 = createConsumerInfo(subscriptionName1, topicName, true, true, true); + assertEquals("Unexpected second sub link name", subscriptionName1 + SUB_NAME_DELIMITER + "2", tracker.reserveNextSubscriptionLinkName(subscriptionName1, sub1consumer2)); + + // For the second shared sub name + JmsConsumerInfo sub2consumer1 = createConsumerInfo(subscriptionName2, topicName, true, true, true); + assertEquals("Unexpected first sub link name", subscriptionName2, tracker.reserveNextSubscriptionLinkName(subscriptionName2, sub2consumer1)); + JmsConsumerInfo sub2consumer2 = createConsumerInfo(subscriptionName2, topicName, true, true, true); + assertEquals("Unexpected second sub link name", subscriptionName2 + SUB_NAME_DELIMITER + "2", tracker.reserveNextSubscriptionLinkName(subscriptionName2, sub2consumer2)); + + // Register a third subscriber for a subscription, after removing the first subscriber for the subscription. + // Validate the new link name isn't the same as the second subscribers (which is still using its name...) + tracker.consumerRemoved(sub2consumer1); + JmsConsumerInfo sub2consumer3 = createConsumerInfo(subscriptionName2, topicName, true, true, true); + assertEquals("Unexpected third subscriber link name", subscriptionName2 + SUB_NAME_DELIMITER + "3", tracker.reserveNextSubscriptionLinkName(subscriptionName2, sub2consumer3)); + } + + @Test + public void testReserveNextSubscriptionLinkNameSharedDurableWithoutClientID() { + String topicName = "myTopic"; + String subscriptionName1 = "mySubscription1"; + String subscriptionName2 = "mySubscription2"; + + AmqpSubscriptionTracker tracker = new AmqpSubscriptionTracker(); + + // For the first shared sub name + JmsConsumerInfo sub1consumer1 = createConsumerInfo(subscriptionName1, topicName, true, true, false); + assertEquals("Unexpected first sub link name", subscriptionName1 + SUB_NAME_DELIMITER + "global", tracker.reserveNextSubscriptionLinkName(subscriptionName1, sub1consumer1)); + JmsConsumerInfo sub1consumer2 = createConsumerInfo(subscriptionName1, topicName, true, true, false); + assertEquals("Unexpected second sub link name", subscriptionName1 + SUB_NAME_DELIMITER + "global2", tracker.reserveNextSubscriptionLinkName(subscriptionName1, sub1consumer2)); + + // For the second shared sub name + JmsConsumerInfo sub2consumer1 = createConsumerInfo(subscriptionName2, topicName, true, true, false); + assertEquals("Unexpected first sub link name", subscriptionName2 + SUB_NAME_DELIMITER + "global", tracker.reserveNextSubscriptionLinkName(subscriptionName2, sub2consumer1)); + JmsConsumerInfo sub2consumer2 = createConsumerInfo(subscriptionName2, topicName, true, true, false); + assertEquals("Unexpected second sub link name", subscriptionName2 + SUB_NAME_DELIMITER + "global2", tracker.reserveNextSubscriptionLinkName(subscriptionName2, sub2consumer2)); + + // Register a third subscriber for a subscription, after removing the first subscriber for the subscription. + // Validate the new link name isn't the same as the second subscribers (which is still using its name...) + tracker.consumerRemoved(sub2consumer1); + JmsConsumerInfo sub2consumer3 = createConsumerInfo(subscriptionName2, topicName, true, true, false); + assertEquals("Unexpected third subscriber link name", subscriptionName2 + SUB_NAME_DELIMITER + "global3", tracker.reserveNextSubscriptionLinkName(subscriptionName2, sub2consumer3)); + } + + @Test + public void testReserveNextSubscriptionLinkNameSharedVolatile() { + String topicName = "myTopic"; + String subscriptionName1 = "mySubscription1"; + String subscriptionName2 = "mySubscription2"; + + AmqpSubscriptionTracker tracker = new AmqpSubscriptionTracker(); + + // For the first shared sub name + JmsConsumerInfo sub1consumer1 = createConsumerInfo(subscriptionName1, topicName, true, false, true); + assertEquals("Unexpected first sub link name", subscriptionName1 + SUB_NAME_DELIMITER + "volatile1", tracker.reserveNextSubscriptionLinkName(subscriptionName1, sub1consumer1)); + JmsConsumerInfo sub1consumer2 = createConsumerInfo(subscriptionName1, topicName, true, false, true); + assertEquals("Unexpected second sub link name", subscriptionName1 + SUB_NAME_DELIMITER + "volatile2", tracker.reserveNextSubscriptionLinkName(subscriptionName1, sub1consumer2)); + + // For the second shared sub name + JmsConsumerInfo sub2consumer1 = createConsumerInfo(subscriptionName2, topicName, true, false, true); + assertEquals("Unexpected first sub link name", subscriptionName2 + SUB_NAME_DELIMITER + "volatile1", tracker.reserveNextSubscriptionLinkName(subscriptionName2, sub2consumer1)); + JmsConsumerInfo sub2consumer2 = createConsumerInfo(subscriptionName2, topicName, true, false, true); + assertEquals("Unexpected second sub link name", subscriptionName2 + SUB_NAME_DELIMITER + "volatile2", tracker.reserveNextSubscriptionLinkName(subscriptionName2, sub2consumer2)); + + // Register a third subscriber for a subscription, after removing the first subscriber for the subscription. + // Validate the new link name isn't the same as the second subscribers (which is still using its name...) + tracker.consumerRemoved(sub2consumer1); + JmsConsumerInfo sub2consumer3 = createConsumerInfo(subscriptionName2, topicName, true, false, true); + assertEquals("Unexpected third subscriber link name", subscriptionName2 + SUB_NAME_DELIMITER + "volatile3", tracker.reserveNextSubscriptionLinkName(subscriptionName2, sub2consumer3)); + } + + @Test + public void testReserveNextSubscriptionLinkNameSharedVolatileWithoutClientID() { + String topicName = "myTopic"; + String subscriptionName1 = "mySubscription1"; + String subscriptionName2 = "mySubscription2"; + + AmqpSubscriptionTracker tracker = new AmqpSubscriptionTracker(); + + // For the first shared sub name + JmsConsumerInfo sub1consumer1 = createConsumerInfo(subscriptionName1, topicName, true, false, false); + assertEquals("Unexpected first sub link name", subscriptionName1 + SUB_NAME_DELIMITER + "global-volatile1", tracker.reserveNextSubscriptionLinkName(subscriptionName1, sub1consumer1)); + JmsConsumerInfo sub1consumer2 = createConsumerInfo(subscriptionName1, topicName, true, false, false); + assertEquals("Unexpected second sub link name", subscriptionName1 + SUB_NAME_DELIMITER + "global-volatile2", tracker.reserveNextSubscriptionLinkName(subscriptionName1, sub1consumer2)); + + // For the second shared sub name + JmsConsumerInfo sub2consumer1 = createConsumerInfo(subscriptionName2, topicName, true, false, false); + assertEquals("Unexpected first sub link name", subscriptionName2 + SUB_NAME_DELIMITER + "global-volatile1", tracker.reserveNextSubscriptionLinkName(subscriptionName2, sub2consumer1)); + JmsConsumerInfo sub2consumer2 = createConsumerInfo(subscriptionName2, topicName, true, false, false); + assertEquals("Unexpected second sub link name", subscriptionName2 + SUB_NAME_DELIMITER + "global-volatile2", tracker.reserveNextSubscriptionLinkName(subscriptionName2, sub2consumer2)); + + // Register a third subscriber for a subscription, after removing the first subscriber for the subscription. + // Validate the new link name isn't the same as the second subscribers (which is still using its name...) + tracker.consumerRemoved(sub2consumer1); + JmsConsumerInfo sub2consumer3 = createConsumerInfo(subscriptionName2, topicName, true, false, false); + assertEquals("Unexpected third subscriber link name", subscriptionName2 + SUB_NAME_DELIMITER + "global-volatile3", tracker.reserveNextSubscriptionLinkName(subscriptionName2, sub2consumer3)); + } + @Test + public void testReserveNextSubscriptionLinkNameExclusiveDurable() { + String topicName = "myTopic"; + String subscriptionName1 = "mySubscription1"; + String subscriptionName2 = "mySubscription2"; + + AmqpSubscriptionTracker tracker = new AmqpSubscriptionTracker(); + + // For the first shared sub name + JmsConsumerInfo sub1consumer1 = createConsumerInfo(subscriptionName1, topicName, false, true, true); + assertEquals("Unexpected first sub link name", subscriptionName1, tracker.reserveNextSubscriptionLinkName(subscriptionName1, sub1consumer1)); + // This shouldn't happen, checks elsewhere should stop requests for an exclusive durable sub link + // name if its already in use, but check we get the same name anyway even with an existing registration. + JmsConsumerInfo sub1consumer2 = createConsumerInfo(subscriptionName1, topicName, false, true, true); + assertEquals("Unexpected second sub link name", subscriptionName1, tracker.reserveNextSubscriptionLinkName(subscriptionName1, sub1consumer2)); + + // For the second shared sub name + JmsConsumerInfo sub2consumer1 = createConsumerInfo(subscriptionName2, topicName, false, true, true); + assertEquals("Unexpected first sub link name", subscriptionName2, tracker.reserveNextSubscriptionLinkName(subscriptionName2, sub2consumer1)); + // This shouldn't happen, checks elsewhere should stop requests for an exclusive durable sub link + // name if its already in use, but check we get the same name anyway even with an existing registration. + JmsConsumerInfo sub2consumer2 = createConsumerInfo(subscriptionName2, topicName, false, true, true); + assertEquals("Unexpected second sub link name", subscriptionName2, tracker.reserveNextSubscriptionLinkName(subscriptionName2, sub2consumer2)); + } + + @Test + public void testReserveNextSubscriptionLinkNameExclusiveNonDurable() { + String topicName = "myTopic"; + String subscriptionName = "mySubscription"; + + AmqpSubscriptionTracker tracker = new AmqpSubscriptionTracker(); + + JmsConsumerInfo subInfo = createConsumerInfo(subscriptionName, topicName, false, false, true); + try { + tracker.reserveNextSubscriptionLinkName(subscriptionName, subInfo); + fail("Should have thrown exception, tracker doesn't name these subs"); + } catch (IllegalStateException ise) { + // Expected + } + + // Verify it no-ops with an exclusive non-durable sub info + tracker.consumerRemoved(subInfo); + } + + @Test + public void testIsActiveExclusiveDurableSub() { + String subscriptionName1 = "mySubscription"; + + AmqpSubscriptionTracker tracker = new AmqpSubscriptionTracker(); + + assertFalse(tracker.isActiveExclusiveDurableSub(subscriptionName1)); + + JmsConsumerInfo subInfo = createConsumerInfo(subscriptionName1, "myTopic", false, true, true); + tracker.reserveNextSubscriptionLinkName(subscriptionName1, subInfo); + + assertTrue(tracker.isActiveExclusiveDurableSub(subscriptionName1)); + + tracker.consumerRemoved(subInfo); + + assertFalse(tracker.isActiveExclusiveDurableSub(subscriptionName1)); + } + + @Test + public void testIsActiveSharedDurableSub() { + String subscriptionName1 = "mySubscription"; + + AmqpSubscriptionTracker tracker = new AmqpSubscriptionTracker(); + + assertFalse(tracker.isActiveSharedDurableSub(subscriptionName1)); + + JmsConsumerInfo subInfo = createConsumerInfo(subscriptionName1, "myTopic", true, true, true); + tracker.reserveNextSubscriptionLinkName(subscriptionName1, subInfo); + + assertTrue(tracker.isActiveSharedDurableSub(subscriptionName1)); + + tracker.consumerRemoved(subInfo); + + assertFalse(tracker.isActiveSharedDurableSub(subscriptionName1)); + } + + @Test + public void testIsActiveDurableSub() { + String subscriptionName = "mySubscription"; + + // Test when an exclusive durable sub is active + AmqpSubscriptionTracker tracker1 = new AmqpSubscriptionTracker(); + + assertFalse(tracker1.isActiveDurableSub(subscriptionName)); + + JmsConsumerInfo subInfo1 = createConsumerInfo(subscriptionName, "myTopic", false, true, true); + tracker1.reserveNextSubscriptionLinkName(subscriptionName, subInfo1); + + assertTrue(tracker1.isActiveDurableSub(subscriptionName)); + assertTrue(tracker1.isActiveExclusiveDurableSub(subscriptionName)); + assertFalse(tracker1.isActiveSharedDurableSub(subscriptionName)); + + tracker1.consumerRemoved(subInfo1); + + assertFalse(tracker1.isActiveDurableSub(subscriptionName)); + assertFalse(tracker1.isActiveExclusiveDurableSub(subscriptionName)); + assertFalse(tracker1.isActiveSharedDurableSub(subscriptionName)); + + // Test when an shared durable sub is active + AmqpSubscriptionTracker tracker2 = new AmqpSubscriptionTracker(); + + assertFalse(tracker2.isActiveDurableSub(subscriptionName)); + + JmsConsumerInfo subInfo2 = createConsumerInfo(subscriptionName, "myTopic", true, true, true); + tracker2.reserveNextSubscriptionLinkName(subscriptionName, subInfo2); + + assertTrue(tracker2.isActiveDurableSub(subscriptionName)); + assertFalse(tracker2.isActiveExclusiveDurableSub(subscriptionName)); + assertTrue(tracker2.isActiveSharedDurableSub(subscriptionName)); + + tracker2.consumerRemoved(subInfo2); + + assertFalse(tracker2.isActiveDurableSub(subscriptionName)); + assertFalse(tracker2.isActiveExclusiveDurableSub(subscriptionName)); + assertFalse(tracker2.isActiveSharedDurableSub(subscriptionName)); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
