Repository: activemq Updated Branches: refs/heads/master 2e4c907f2 -> 11579bb91
https://issues.apache.org/jira/browse/AMQ-5938 - add remove(messageId) op to offline durable subs jmx view Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/11579bb9 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/11579bb9 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/11579bb9 Branch: refs/heads/master Commit: 11579bb918bcc4e2689556b2bb4bbf46d5fcabe6 Parents: 2e4c907 Author: gtully <[email protected]> Authored: Fri Aug 21 13:16:23 2015 +0100 Committer: gtully <[email protected]> Committed: Fri Aug 21 13:17:09 2015 +0100 ---------------------------------------------------------------------- .../broker/jmx/DurableSubscriptionView.java | 4 + .../jmx/DurableSubscriptionViewMBean.java | 19 +++ .../jmx/InactiveDurableSubscriptionView.java | 6 + .../broker/jmx/ManagedRegionBroker.java | 93 ++++++------ .../broker/jmx/SubscriptionViewMBean.java | 6 +- .../broker/region/NullMessageReference.java | 4 +- ...ableSubscriptionOfflineBrowseRemoveTest.java | 151 +++++++++++++++++++ 7 files changed, 234 insertions(+), 49 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/11579bb9/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DurableSubscriptionView.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DurableSubscriptionView.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DurableSubscriptionView.java index 77ec8c2..deac3db 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DurableSubscriptionView.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DurableSubscriptionView.java @@ -105,6 +105,10 @@ public class DurableSubscriptionView extends SubscriptionView implements Durable return 0; } + @Override + public void removeMessage(@MBeanInfo("messageId") String messageId) throws Exception { + throw new IllegalStateException("Subscription must be inactive"); + } public boolean doesCursorHaveMessagesBuffered() { if (durableSub != null && durableSub.getPending() != null) { http://git-wip-us.apache.org/repos/asf/activemq/blob/11579bb9/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DurableSubscriptionViewMBean.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DurableSubscriptionViewMBean.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DurableSubscriptionViewMBean.java index df0841b..73fa16e 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DurableSubscriptionViewMBean.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DurableSubscriptionViewMBean.java @@ -27,6 +27,7 @@ public interface DurableSubscriptionViewMBean extends SubscriptionViewMBean { /** * @return name of the durable subscription name */ + @MBeanInfo("The subscription name.") String getSubscriptionName(); /** @@ -35,6 +36,7 @@ public interface DurableSubscriptionViewMBean extends SubscriptionViewMBean { * @return messages * @throws OpenDataException */ + @MBeanInfo("Browse the composite data array of pending messages in this subscription") CompositeData[] browse() throws OpenDataException; /** @@ -43,44 +45,61 @@ public interface DurableSubscriptionViewMBean extends SubscriptionViewMBean { * @return messages * @throws OpenDataException */ + @MBeanInfo("Browse the tabular data of pending messages in this subscription") TabularData browseAsTable() throws OpenDataException; /** * Destroys the durable subscription so that messages will no longer be * stored for this subscription */ + @MBeanInfo("Destroy or delete this subscription") void destroy() throws Exception; /** * @return true if the message cursor has memory space available * to page in more messages */ + @MBeanInfo("The subscription has space for more messages in memory") public boolean doesCursorHaveSpace(); /** * @return true if the cursor has reached its memory limit for * paged in messages */ + @MBeanInfo("The subscription cursor is full") public boolean isCursorFull(); /** * @return true if the cursor has messages buffered to deliver */ + @MBeanInfo("The subscription cursor has messages in memory") public boolean doesCursorHaveMessagesBuffered(); /** * @return the cursor memory usage in bytes */ + @MBeanInfo("The subscription cursor memory usage bytes") public long getCursorMemoryUsage(); /** * @return the cursor memory usage as a percentage */ + @MBeanInfo("The subscription cursor memory usage %") public int getCursorPercentUsage(); /** * @return the number of messages available to be paged in * by the cursor */ + @MBeanInfo("The subscription cursor size or message count") public int cursorSize(); + + /** + * Removes a message from the durable subscription. + * + * @param messageId + * @throws Exception + */ + @MBeanInfo("Remove a message from the subscription by JMS message ID.") + public void removeMessage(@MBeanInfo("messageId") String messageId) throws Exception; } http://git-wip-us.apache.org/repos/asf/activemq/blob/11579bb9/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/InactiveDurableSubscriptionView.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/InactiveDurableSubscriptionView.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/InactiveDurableSubscriptionView.java index 65e49f4..8e61b31 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/InactiveDurableSubscriptionView.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/InactiveDurableSubscriptionView.java @@ -157,4 +157,10 @@ public class InactiveDurableSubscriptionView extends DurableSubscriptionView imp public String getSelector() { return subscriptionInfo.getSelector(); } + + @Override + public void removeMessage(@MBeanInfo("messageId") String messageId) throws Exception { + broker.remove(this, messageId); + } + } http://git-wip-us.apache.org/repos/asf/activemq/blob/11579bb9/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java index 59278f1..854f5e0 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java @@ -17,10 +17,8 @@ package org.apache.activemq.broker.jmx; import java.io.IOException; -import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; -import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; @@ -29,6 +27,7 @@ import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadPoolExecutor; +import javax.jms.IllegalStateException; import javax.management.InstanceNotFoundException; import javax.management.MalformedObjectNameException; import javax.management.ObjectName; @@ -47,8 +46,10 @@ import org.apache.activemq.broker.ProducerBrokerExchange; import org.apache.activemq.broker.jmx.OpenTypeSupport.OpenTypeFactory; import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.DestinationFactory; -import org.apache.activemq.broker.region.DestinationFactoryImpl; import org.apache.activemq.broker.region.DestinationInterceptor; +import org.apache.activemq.broker.region.DurableTopicSubscription; +import org.apache.activemq.broker.region.MessageReference; +import org.apache.activemq.broker.region.NullMessageReference; import org.apache.activemq.broker.region.Queue; import org.apache.activemq.broker.region.Region; import org.apache.activemq.broker.region.RegionBroker; @@ -64,12 +65,10 @@ import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.command.ConnectionInfo; import org.apache.activemq.command.ConsumerInfo; import org.apache.activemq.command.Message; +import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.MessageId; import org.apache.activemq.command.ProducerInfo; import org.apache.activemq.command.SubscriptionInfo; -import org.apache.activemq.store.MessageRecoveryListener; -import org.apache.activemq.store.PersistenceAdapter; -import org.apache.activemq.store.TopicMessageStore; import org.apache.activemq.thread.Scheduler; import org.apache.activemq.thread.TaskRunnerFactory; import org.apache.activemq.transaction.XATransaction; @@ -539,11 +538,11 @@ public class ManagedRegionBroker extends RegionBroker { } public CompositeData[] browse(SubscriptionView view) throws OpenDataException { - List<Message> messages = getSubscriberMessages(view); - CompositeData c[] = new CompositeData[messages.size()]; + Message[] messages = getSubscriberMessages(view); + CompositeData c[] = new CompositeData[messages.length]; for (int i = 0; i < c.length; i++) { try { - c[i] = OpenTypeSupport.convert(messages.get(i)); + c[i] = OpenTypeSupport.convert(messages[i]); } catch (Throwable e) { LOG.error("Failed to browse: {}", view, e); } @@ -553,53 +552,59 @@ public class ManagedRegionBroker extends RegionBroker { public TabularData browseAsTable(SubscriptionView view) throws OpenDataException { OpenTypeFactory factory = OpenTypeSupport.getFactory(ActiveMQMessage.class); - List<Message> messages = getSubscriberMessages(view); + Message[] messages = getSubscriberMessages(view); CompositeType ct = factory.getCompositeType(); TabularType tt = new TabularType("MessageList", "MessageList", ct, new String[] {"JMSMessageID"}); TabularDataSupport rc = new TabularDataSupport(tt); - for (int i = 0; i < messages.size(); i++) { - rc.put(new CompositeDataSupport(ct, factory.getFields(messages.get(i)))); + for (int i = 0; i < messages.length; i++) { + rc.put(new CompositeDataSupport(ct, factory.getFields(messages[i]))); } return rc; } - protected List<Message> getSubscriberMessages(SubscriptionView view) { - // TODO It is very dangerous operation for big backlogs - if (!(destinationFactory instanceof DestinationFactoryImpl)) { - throw new RuntimeException("unsupported by " + destinationFactory); - } - PersistenceAdapter adapter = ((DestinationFactoryImpl)destinationFactory).getPersistenceAdapter(); - final List<Message> result = new ArrayList<Message>(); - try { - ActiveMQTopic topic = new ActiveMQTopic(view.getDestinationName()); - TopicMessageStore store = adapter.createTopicMessageStore(topic); - store.recover(new MessageRecoveryListener() { - @Override - public boolean recoverMessage(Message message) throws Exception { - result.add(message); - return true; - } + public void remove(SubscriptionView view, String messageId) throws Exception { + ActiveMQDestination destination = getTopicDestination(view); + if (destination != null) { + final Topic topic = (Topic) getTopicRegion().getDestinationMap().get(destination); + final MessageAck messageAck = new MessageAck(); + messageAck.setMessageID(new MessageId(messageId)); + messageAck.setDestination(destination); - @Override - public boolean recoverMessageReference(MessageId messageReference) throws Exception { - throw new RuntimeException("Should not be called."); - } + topic.getMessageStore().removeMessage(brokerService.getAdminConnectionContext(), messageAck); - @Override - public boolean hasSpace() { - return true; - } + // if sub is active, remove from cursor + if (view.subscription instanceof DurableTopicSubscription) { + final DurableTopicSubscription durableTopicSubscription = (DurableTopicSubscription) view.subscription; + final MessageReference messageReference = new NullMessageReference(); + messageReference.getMessage().setMessageId(messageAck.getFirstMessageId()); + durableTopicSubscription.getPending().remove(messageReference); + } - @Override - public boolean isDuplicate(MessageId id) { - return false; - } - }); - } catch (Throwable e) { - LOG.error("Failed to browse messages for Subscription {}", view, e); + } else { + throw new IllegalStateException("can't determine topic for sub:" + view); + } + } + + protected Message[] getSubscriberMessages(SubscriptionView view) { + ActiveMQDestination destination = getTopicDestination(view); + if (destination != null) { + Topic topic = (Topic) getTopicRegion().getDestinationMap().get(destination); + return topic.browse(); + + } else { + LOG.warn("can't determine topic to browse for sub:" + view); + return new Message[]{}; } - return result; + } + private ActiveMQDestination getTopicDestination(SubscriptionView view) { + ActiveMQDestination destination = null; + if (view.subscription instanceof DurableTopicSubscription) { + destination = new ActiveMQTopic(view.getDestinationName()); + } else if (view instanceof InactiveDurableSubscriptionView) { + destination = ((InactiveDurableSubscriptionView)view).subscriptionInfo.getDestination(); + } + return destination; } protected ObjectName[] getTopics() { http://git-wip-us.apache.org/repos/asf/activemq/blob/11579bb9/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/SubscriptionViewMBean.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/SubscriptionViewMBean.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/SubscriptionViewMBean.java index 1907b98..8fad1b6 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/SubscriptionViewMBean.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/SubscriptionViewMBean.java @@ -58,7 +58,7 @@ public interface SubscriptionViewMBean { /** * @return the destination name */ - @MBeanInfo("The name of the destionation the subscription is on.") + @MBeanInfo("The name of the destination the subscription is on.") String getDestinationName(); /** @@ -158,13 +158,13 @@ public interface SubscriptionViewMBean { /** * @return whether or not the subscriber is durable (persistent) */ - @MBeanInfo("The subsription is persistent.") + @MBeanInfo("The subscription is persistent.") boolean isDurable(); /** * @return whether or not the subscriber ignores local messages */ - @MBeanInfo("The subsription ignores local messages.") + @MBeanInfo("The subscription ignores local messages.") boolean isNoLocal(); /** http://git-wip-us.apache.org/repos/asf/activemq/blob/11579bb9/activemq-broker/src/main/java/org/apache/activemq/broker/region/NullMessageReference.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/NullMessageReference.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/NullMessageReference.java index 510f5aa..9510972 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/NullMessageReference.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/NullMessageReference.java @@ -22,9 +22,9 @@ import org.apache.activemq.command.Message; import org.apache.activemq.command.MessageId; /** - * Only used by the {@link QueueMessageReference#NULL_MESSAGE} + * Used by the {@link QueueMessageReference#NULL_MESSAGE} */ -final class NullMessageReference implements QueueMessageReference { +public final class NullMessageReference implements QueueMessageReference { private final ActiveMQMessage message = new ActiveMQMessage(); private volatile int references; http://git-wip-us.apache.org/repos/asf/activemq/blob/11579bb9/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineBrowseRemoveTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineBrowseRemoveTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineBrowseRemoveTest.java new file mode 100644 index 0000000..0532ede --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineBrowseRemoveTest.java @@ -0,0 +1,151 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.usecases; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.LinkedList; +import java.util.List; +import javax.jms.Connection; +import javax.jms.Message; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.management.ObjectName; +import javax.management.openmbean.CompositeData; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.TestSupport; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.jmx.DurableSubscriptionViewMBean; +import org.apache.activemq.store.PersistenceAdapter; +import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +import static org.junit.Assert.*; + + +@RunWith(value = Parameterized.class) +public class DurableSubscriptionOfflineBrowseRemoveTest extends DurableSubscriptionOfflineTestBase { + + private static final Logger LOG = LoggerFactory.getLogger(DurableSubscriptionOfflineBrowseRemoveTest.class); + + public boolean keepDurableSubsActive; + + @Parameterized.Parameters(name = "PA-{0}.KeepSubsActive-{1}") + public static Collection<Object[]> getTestParameters() { + List<Object[]> testParameters = new ArrayList<Object[]>(); + testParameters.add(new Object[]{TestSupport.PersistenceAdapterChoice.KahaDB, Boolean.TRUE}); + testParameters.add(new Object[]{TestSupport.PersistenceAdapterChoice.KahaDB, Boolean.FALSE}); + + testParameters.add(new Object[]{TestSupport.PersistenceAdapterChoice.JDBC, Boolean.TRUE}); + testParameters.add(new Object[]{TestSupport.PersistenceAdapterChoice.JDBC, Boolean.FALSE}); + + // leveldb needs some work on finding index from green messageId + return testParameters; + } + + public DurableSubscriptionOfflineBrowseRemoveTest(TestSupport.PersistenceAdapterChoice adapter, boolean keepDurableSubsActive) { + this.defaultPersistenceAdapter = adapter; + this.usePrioritySupport = true; + this.keepDurableSubsActive = keepDurableSubsActive; + } + + @Override + public PersistenceAdapter setDefaultPersistenceAdapter(BrokerService broker) throws IOException { + broker.setKeepDurableSubsActive(keepDurableSubsActive); + return super.setPersistenceAdapter(broker, defaultPersistenceAdapter); + } + + @Override + protected ActiveMQConnectionFactory createConnectionFactory() throws Exception { + ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://" + getName(true)); + connectionFactory.setWatchTopicAdvisories(false); + return connectionFactory; + } + + @Test(timeout = 60 * 1000) + public void testBrowseRemoveBrowseOfflineSub() throws Exception { + // create durable subscription + Connection con = createConnection(); + Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); + session.createDurableSubscriber(topic, "SubsId"); + session.close(); + con.close(); + + // send messages + con = createConnection(); + session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(null); + + for (int i = 0; i < 10; i++) { + Message message = session.createMessage(); + message.setStringProperty("filter", "true"); + producer.send(topic, message); + } + + session.close(); + con.close(); + + // browse the durable sub + ObjectName[] subs = broker.getAdminView().getInactiveDurableTopicSubscribers(); + assertEquals(1, subs.length); + ObjectName subName = subs[0]; + DurableSubscriptionViewMBean sub = (DurableSubscriptionViewMBean) + broker.getManagementContext().newProxyInstance(subName, DurableSubscriptionViewMBean.class, true); + CompositeData[] data = sub.browse(); + assertNotNull(data); + assertEquals(10, data.length); + + LinkedList<String> idToRemove = new LinkedList<>(); + idToRemove.add((String)data[5].get("JMSMessageID")); + idToRemove.add((String)data[9].get("JMSMessageID")); + idToRemove.add((String)data[0].get("JMSMessageID")); + + LOG.info("Removing: " + idToRemove); + for (String id: idToRemove) { + sub.removeMessage(id); + } + + if (defaultPersistenceAdapter.compareTo(TestSupport.PersistenceAdapterChoice.JDBC) == 0) { + for (int i=0; i<10; i++) { + // each iteration does one priority + ((JDBCPersistenceAdapter)broker.getPersistenceAdapter()).cleanup(); + } + } + + data = sub.browse(); + assertNotNull(data); + assertEquals(7, data.length); + + for (CompositeData c: data) { + String id = (String)c.get("JMSMessageID"); + for (String removedId : idToRemove) { + assertNotEquals(id, removedId); + } + } + + // remove non existent + LOG.info("Repeat remove: " + idToRemove.getFirst()); + sub.removeMessage(idToRemove.getFirst()); + + } +}
