Repository: activemq Updated Branches: refs/heads/activemq-5.14.x 330deb2c8 -> 70728e97d
NO-JIRA: Add some additional test variations and add some more checks (cherry picked from commit 3e237ca73a9c9af18191ff4a23c456c1d427511e) Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/70728e97 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/70728e97 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/70728e97 Branch: refs/heads/activemq-5.14.x Commit: 70728e97dabf29cccfc03794589b9515d8a7ce51 Parents: 330deb2 Author: Timothy Bish <[email protected]> Authored: Thu Oct 13 14:29:13 2016 -0400 Committer: Timothy Bish <[email protected]> Committed: Thu Oct 13 14:29:55 2016 -0400 ---------------------------------------------------------------------- .../amqp/interop/AmqpDurableReceiverTest.java | 122 +++++++++++++++++-- 1 file changed, 111 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/70728e97/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpDurableReceiverTest.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpDurableReceiverTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpDurableReceiverTest.java index b016cc5..3db3301 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpDurableReceiverTest.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpDurableReceiverTest.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. @@ -31,6 +31,7 @@ import org.apache.activemq.transport.amqp.client.AmqpClientTestSupport; import org.apache.activemq.transport.amqp.client.AmqpConnection; import org.apache.activemq.transport.amqp.client.AmqpReceiver; import org.apache.activemq.transport.amqp.client.AmqpSession; +import org.apache.qpid.proton.amqp.DescribedType; import org.apache.qpid.proton.amqp.messaging.Source; import org.apache.qpid.proton.amqp.messaging.TerminusDurability; import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy; @@ -42,6 +43,8 @@ import org.junit.Test; */ public class AmqpDurableReceiverTest extends AmqpClientTestSupport { + private final String SELECTOR_STRING = "color = red"; + @Override protected boolean isUseOpenWireConnector() { return true; @@ -244,6 +247,103 @@ public class AmqpDurableReceiverTest extends AmqpClientTestSupport { } @Test(timeout = 60000) + public void testLookupExistingSubscriptionWithSelector() throws Exception { + + final BrokerViewMBean brokerView = getProxyToBroker(); + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = trackConnection(client.createConnection()); + connection.setContainerId(getTestName()); + connection.connect(); + + AmqpSession session = connection.createSession(); + AmqpReceiver receiver = session.createDurableReceiver("topic://" + getTestName(), getTestName(), SELECTOR_STRING); + + assertEquals(1, brokerView.getDurableTopicSubscribers().length); + assertEquals(0, brokerView.getInactiveDurableTopicSubscribers().length); + + receiver.detach(); + + assertEquals(0, brokerView.getDurableTopicSubscribers().length); + assertEquals(1, brokerView.getInactiveDurableTopicSubscribers().length); + + receiver = session.lookupSubscription(getTestName()); + + assertNotNull(receiver); + + Receiver protonReceiver = receiver.getReceiver(); + assertNotNull(protonReceiver.getRemoteSource()); + Source remoteSource = (Source) protonReceiver.getRemoteSource(); + + assertNotNull(remoteSource.getFilter()); + assertFalse(remoteSource.getFilter().containsKey(NO_LOCAL_NAME)); + assertTrue(remoteSource.getFilter().containsKey(JMS_SELECTOR_NAME)); + assertEquals(SELECTOR_STRING, ((DescribedType) remoteSource.getFilter().get(JMS_SELECTOR_NAME)).getDescribed()); + + assertEquals(TerminusExpiryPolicy.NEVER, remoteSource.getExpiryPolicy()); + assertEquals(TerminusDurability.UNSETTLED_STATE, remoteSource.getDurable()); + assertEquals(COPY, remoteSource.getDistributionMode()); + + assertEquals(1, brokerView.getDurableTopicSubscribers().length); + assertEquals(0, brokerView.getInactiveDurableTopicSubscribers().length); + + receiver.close(); + + assertEquals(0, brokerView.getDurableTopicSubscribers().length); + assertEquals(0, brokerView.getInactiveDurableTopicSubscribers().length); + + connection.close(); + } + + @Test(timeout = 60000) + public void testLookupExistingSubscriptionWithNoLocal() throws Exception { + + final BrokerViewMBean brokerView = getProxyToBroker(); + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = trackConnection(client.createConnection()); + connection.setContainerId(getTestName()); + connection.connect(); + + AmqpSession session = connection.createSession(); + AmqpReceiver receiver = session.createDurableReceiver("topic://" + getTestName(), getTestName(), null, true); + + assertEquals(1, brokerView.getDurableTopicSubscribers().length); + assertEquals(0, brokerView.getInactiveDurableTopicSubscribers().length); + + receiver.detach(); + + assertEquals(0, brokerView.getDurableTopicSubscribers().length); + assertEquals(1, brokerView.getInactiveDurableTopicSubscribers().length); + + receiver = session.lookupSubscription(getTestName()); + + assertNotNull(receiver); + + Receiver protonReceiver = receiver.getReceiver(); + assertNotNull(protonReceiver.getRemoteSource()); + Source remoteSource = (Source) protonReceiver.getRemoteSource(); + + assertNotNull(remoteSource.getFilter()); + assertTrue(remoteSource.getFilter().containsKey(NO_LOCAL_NAME)); + assertFalse(remoteSource.getFilter().containsKey(JMS_SELECTOR_NAME)); + + assertEquals(TerminusExpiryPolicy.NEVER, remoteSource.getExpiryPolicy()); + assertEquals(TerminusDurability.UNSETTLED_STATE, remoteSource.getDurable()); + assertEquals(COPY, remoteSource.getDistributionMode()); + + assertEquals(1, brokerView.getDurableTopicSubscribers().length); + assertEquals(0, brokerView.getInactiveDurableTopicSubscribers().length); + + receiver.close(); + + assertEquals(0, brokerView.getDurableTopicSubscribers().length); + assertEquals(0, brokerView.getInactiveDurableTopicSubscribers().length); + + connection.close(); + } + + @Test(timeout = 60000) public void testLookupExistingSubscriptionWithSelectorAndNoLocal() throws Exception { final BrokerViewMBean brokerView = getProxyToBroker(); @@ -254,7 +354,7 @@ public class AmqpDurableReceiverTest extends AmqpClientTestSupport { connection.connect(); AmqpSession session = connection.createSession(); - AmqpReceiver receiver = session.createDurableReceiver("topic://" + getTestName(), getTestName(), "color = red", true); + AmqpReceiver receiver = session.createDurableReceiver("topic://" + getTestName(), getTestName(), SELECTOR_STRING, true); assertEquals(1, brokerView.getDurableTopicSubscribers().length); assertEquals(0, brokerView.getInactiveDurableTopicSubscribers().length); @@ -272,10 +372,10 @@ public class AmqpDurableReceiverTest extends AmqpClientTestSupport { assertNotNull(protonReceiver.getRemoteSource()); Source remoteSource = (Source) protonReceiver.getRemoteSource(); - if (remoteSource.getFilter() != null) { - assertTrue(remoteSource.getFilter().containsKey(NO_LOCAL_NAME)); - assertTrue(remoteSource.getFilter().containsKey(JMS_SELECTOR_NAME)); - } + assertNotNull(remoteSource.getFilter()); + assertTrue(remoteSource.getFilter().containsKey(NO_LOCAL_NAME)); + assertTrue(remoteSource.getFilter().containsKey(JMS_SELECTOR_NAME)); + assertEquals(SELECTOR_STRING, ((DescribedType) remoteSource.getFilter().get(JMS_SELECTOR_NAME)).getDescribed()); assertEquals(TerminusExpiryPolicy.NEVER, remoteSource.getExpiryPolicy()); assertEquals(TerminusDurability.UNSETTLED_STATE, remoteSource.getDurable()); @@ -303,7 +403,7 @@ public class AmqpDurableReceiverTest extends AmqpClientTestSupport { connection.connect(); AmqpSession session = connection.createSession(); - AmqpReceiver receiver = session.createDurableReceiver("topic://" + getTestName(), getTestName(), "color = red", true); + AmqpReceiver receiver = session.createDurableReceiver("topic://" + getTestName(), getTestName(), SELECTOR_STRING, true); assertEquals(1, brokerView.getDurableTopicSubscribers().length); assertEquals(0, brokerView.getInactiveDurableTopicSubscribers().length); @@ -328,10 +428,10 @@ public class AmqpDurableReceiverTest extends AmqpClientTestSupport { assertNotNull(protonReceiver.getRemoteSource()); Source remoteSource = (Source) protonReceiver.getRemoteSource(); - if (remoteSource.getFilter() != null) { - assertTrue(remoteSource.getFilter().containsKey(NO_LOCAL_NAME)); - assertTrue(remoteSource.getFilter().containsKey(JMS_SELECTOR_NAME)); - } + assertNotNull(remoteSource.getFilter()); + assertTrue(remoteSource.getFilter().containsKey(NO_LOCAL_NAME)); + assertTrue(remoteSource.getFilter().containsKey(JMS_SELECTOR_NAME)); + assertEquals(SELECTOR_STRING, ((DescribedType) remoteSource.getFilter().get(JMS_SELECTOR_NAME)).getDescribed()); assertEquals(TerminusExpiryPolicy.NEVER, remoteSource.getExpiryPolicy()); assertEquals(TerminusDurability.UNSETTLED_STATE, remoteSource.getDurable());
