Repository: activemq Updated Branches: refs/heads/master 8e183db03 -> b5c626478
https://issues.apache.org/jira/browse/AMQ-5799 Return the noLocal filter and set selector if one exists for the existing durable subscription when a lookup is requested. Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/b5c62647 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/b5c62647 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/b5c62647 Branch: refs/heads/master Commit: b5c62647897b119364794d2ef0027b8d84b216d3 Parents: 8e183db Author: Timothy Bish <[email protected]> Authored: Tue May 26 17:30:54 2015 -0400 Committer: Timothy Bish <[email protected]> Committed: Tue May 26 17:30:54 2015 -0400 ---------------------------------------------------------------------- .../transport/amqp/protocol/AmqpConnection.java | 7 +- .../transport/amqp/protocol/AmqpSession.java | 23 ++++++- .../amqp/interop/AmqpDurableReceiverTest.java | 67 +++++++++++++++++++- 3 files changed, 90 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/b5c62647/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpConnection.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpConnection.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpConnection.java index 365c0fc..711156a 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpConnection.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpConnection.java @@ -49,6 +49,7 @@ import org.apache.activemq.command.ConnectionId; import org.apache.activemq.command.ConnectionInfo; import org.apache.activemq.command.ConsumerControl; import org.apache.activemq.command.ConsumerId; +import org.apache.activemq.command.ConsumerInfo; import org.apache.activemq.command.DestinationInfo; import org.apache.activemq.command.ExceptionResponse; import org.apache.activemq.command.MessageDispatch; @@ -630,8 +631,8 @@ public class AmqpConnection implements AmqpProtocolConverter { subscriptionsByConsumerId.remove(consumerId); } - ActiveMQDestination lookupSubscription(String subscriptionName) throws AmqpProtocolException { - ActiveMQDestination result = null; + ConsumerInfo lookupSubscription(String subscriptionName) throws AmqpProtocolException { + ConsumerInfo result = null; RegionBroker regionBroker; try { @@ -643,7 +644,7 @@ public class AmqpConnection implements AmqpProtocolConverter { final TopicRegion topicRegion = (TopicRegion) regionBroker.getTopicRegion(); DurableTopicSubscription subscription = topicRegion.lookupSubscription(subscriptionName, connectionInfo.getClientId()); if (subscription != null) { - result = subscription.getActiveMQDestination(); + result = subscription.getConsumerInfo(); } return result; http://git-wip-us.apache.org/repos/asf/activemq/blob/b5c62647/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSession.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSession.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSession.java index abc680b..cdef850 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSession.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSession.java @@ -18,7 +18,9 @@ package org.apache.activemq.transport.amqp.protocol; import static org.apache.activemq.transport.amqp.AmqpSupport.COPY; import static org.apache.activemq.transport.amqp.AmqpSupport.JMS_SELECTOR_FILTER_IDS; +import static org.apache.activemq.transport.amqp.AmqpSupport.JMS_SELECTOR_NAME; import static org.apache.activemq.transport.amqp.AmqpSupport.NO_LOCAL_FILTER_IDS; +import static org.apache.activemq.transport.amqp.AmqpSupport.NO_LOCAL_NAME; import static org.apache.activemq.transport.amqp.AmqpSupport.createDestination; import static org.apache.activemq.transport.amqp.AmqpSupport.findFilter; @@ -43,6 +45,8 @@ import org.apache.activemq.selector.SelectorParser; import org.apache.activemq.transport.amqp.AmqpProtocolConverter; import org.apache.activemq.transport.amqp.AmqpProtocolException; import org.apache.activemq.transport.amqp.ResponseHandler; +import org.apache.qpid.jms.provider.amqp.AmqpJmsNoLocalType; +import org.apache.qpid.jms.provider.amqp.AmqpJmsSelectorType; import org.apache.qpid.proton.amqp.DescribedType; import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.amqp.messaging.Target; @@ -255,14 +259,29 @@ public class AmqpSession implements AmqpResource { ActiveMQDestination destination; if (source == null) { // Attempt to recover previous subscription - destination = connection.lookupSubscription(protonSender.getName()); + ConsumerInfo storedInfo = connection.lookupSubscription(protonSender.getName()); + + if (storedInfo != null) { + destination = storedInfo.getDestination(); - if (destination != null) { source = new org.apache.qpid.proton.amqp.messaging.Source(); source.setAddress(destination.getQualifiedName()); source.setDurable(TerminusDurability.UNSETTLED_STATE); source.setExpiryPolicy(TerminusExpiryPolicy.NEVER); source.setDistributionMode(COPY); + + Map<Symbol, DescribedType> filters = new HashMap<Symbol, DescribedType>(); + if (storedInfo.isNoLocal()) { + filters.put(NO_LOCAL_NAME, AmqpJmsNoLocalType.NO_LOCAL); + } + + if (storedInfo.getSelector() != null && !storedInfo.getSelector().trim().equals("")) { + filters.put(JMS_SELECTOR_NAME, new AmqpJmsSelectorType(storedInfo.getSelector())); + } + + if (!filters.isEmpty()) { + source.setFilter(filters); + } } else { sender.close(new ErrorCondition(AmqpError.NOT_FOUND, "Unknown subscription link: " + protonSender.getName())); return; http://git-wip-us.apache.org/repos/asf/activemq/blob/b5c62647/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpDurableReceiverTest.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpDurableReceiverTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpDurableReceiverTest.java index 31c8961..e2d2495 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpDurableReceiverTest.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpDurableReceiverTest.java @@ -17,8 +17,12 @@ package org.apache.activemq.transport.amqp.interop; import static org.apache.activemq.transport.amqp.AmqpSupport.COPY; +import static org.apache.activemq.transport.amqp.AmqpSupport.JMS_SELECTOR_NAME; +import static org.apache.activemq.transport.amqp.AmqpSupport.NO_LOCAL_NAME; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import org.apache.activemq.broker.jmx.BrokerViewMBean; @@ -219,6 +223,11 @@ public class AmqpDurableReceiverTest extends AmqpClientTestSupport { assertNotNull(protonReceiver.getRemoteSource()); Source remoteSource = (Source) protonReceiver.getRemoteSource(); + if (remoteSource.getFilter() != null) { + assertFalse(remoteSource.getFilter().containsKey(NO_LOCAL_NAME)); + assertFalse(remoteSource.getFilter().containsKey(JMS_SELECTOR_NAME)); + } + assertEquals(TerminusExpiryPolicy.NEVER, remoteSource.getExpiryPolicy()); assertEquals(TerminusDurability.UNSETTLED_STATE, remoteSource.getDurable()); assertEquals(COPY, remoteSource.getDistributionMode()); @@ -235,7 +244,7 @@ public class AmqpDurableReceiverTest extends AmqpClientTestSupport { } @Test(timeout = 60000) - public void testLookupExistingSubscriptionAfterRestart() throws Exception { + public void testLookupExistingSubscriptionWithSelectorAndNoLocal() throws Exception { final BrokerViewMBean brokerView = getProxyToBroker(); @@ -245,7 +254,56 @@ public class AmqpDurableReceiverTest extends AmqpClientTestSupport { connection.connect(); AmqpSession session = connection.createSession(); - AmqpReceiver receiver = session.createDurableReceiver("topic://" + getTestName(), getTestName()); + AmqpReceiver receiver = session.createDurableReceiver("topic://" + getTestName(), getTestName(), "color = red", true); + + assertEquals(1, brokerView.getDurableTopicSubscribers().length); + assertEquals(0, brokerView.getInactiveDurableTopicSubscribers().length); + + receiver.detach(); + + assertEquals(0, brokerView.getDurableTopicSubscribers().length); + assertEquals(1, brokerView.getInactiveDurableTopicSubscribers().length); + + receiver = session.lookupSubscription(getTestName()); + + assertNotNull(receiver); + + Receiver protonReceiver = receiver.getReceiver(); + assertNotNull(protonReceiver.getRemoteSource()); + Source remoteSource = (Source) protonReceiver.getRemoteSource(); + + if (remoteSource.getFilter() != null) { + assertTrue(remoteSource.getFilter().containsKey(NO_LOCAL_NAME)); + assertTrue(remoteSource.getFilter().containsKey(JMS_SELECTOR_NAME)); + } + + assertEquals(TerminusExpiryPolicy.NEVER, remoteSource.getExpiryPolicy()); + assertEquals(TerminusDurability.UNSETTLED_STATE, remoteSource.getDurable()); + assertEquals(COPY, remoteSource.getDistributionMode()); + + assertEquals(1, brokerView.getDurableTopicSubscribers().length); + assertEquals(0, brokerView.getInactiveDurableTopicSubscribers().length); + + receiver.close(); + + assertEquals(0, brokerView.getDurableTopicSubscribers().length); + assertEquals(0, brokerView.getInactiveDurableTopicSubscribers().length); + + connection.close(); + } + + @Test(timeout = 60000) + public void testLookupExistingSubscriptionAfterRestartWithSelectorAndNoLocal() throws Exception { + + final BrokerViewMBean brokerView = getProxyToBroker(); + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = client.createConnection(); + connection.setContainerId(getTestName()); + connection.connect(); + + AmqpSession session = connection.createSession(); + AmqpReceiver receiver = session.createDurableReceiver("topic://" + getTestName(), getTestName(), "color = red", true); assertEquals(1, brokerView.getDurableTopicSubscribers().length); assertEquals(0, brokerView.getInactiveDurableTopicSubscribers().length); @@ -270,6 +328,11 @@ public class AmqpDurableReceiverTest extends AmqpClientTestSupport { assertNotNull(protonReceiver.getRemoteSource()); Source remoteSource = (Source) protonReceiver.getRemoteSource(); + if (remoteSource.getFilter() != null) { + assertTrue(remoteSource.getFilter().containsKey(NO_LOCAL_NAME)); + assertTrue(remoteSource.getFilter().containsKey(JMS_SELECTOR_NAME)); + } + assertEquals(TerminusExpiryPolicy.NEVER, remoteSource.getExpiryPolicy()); assertEquals(TerminusDurability.UNSETTLED_STATE, remoteSource.getDurable()); assertEquals(COPY, remoteSource.getDistributionMode());
