Repository: activemq
Updated Branches:
  refs/heads/master 2e4c907f2 -> 11579bb91


https://issues.apache.org/jira/browse/AMQ-5938 - add remove(messageId) op to 
offline durable subs jmx view


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/11579bb9
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/11579bb9
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/11579bb9

Branch: refs/heads/master
Commit: 11579bb918bcc4e2689556b2bb4bbf46d5fcabe6
Parents: 2e4c907
Author: gtully <[email protected]>
Authored: Fri Aug 21 13:16:23 2015 +0100
Committer: gtully <[email protected]>
Committed: Fri Aug 21 13:17:09 2015 +0100

----------------------------------------------------------------------
 .../broker/jmx/DurableSubscriptionView.java     |   4 +
 .../jmx/DurableSubscriptionViewMBean.java       |  19 +++
 .../jmx/InactiveDurableSubscriptionView.java    |   6 +
 .../broker/jmx/ManagedRegionBroker.java         |  93 ++++++------
 .../broker/jmx/SubscriptionViewMBean.java       |   6 +-
 .../broker/region/NullMessageReference.java     |   4 +-
 ...ableSubscriptionOfflineBrowseRemoveTest.java | 151 +++++++++++++++++++
 7 files changed, 234 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/11579bb9/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DurableSubscriptionView.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DurableSubscriptionView.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DurableSubscriptionView.java
index 77ec8c2..deac3db 100755
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DurableSubscriptionView.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DurableSubscriptionView.java
@@ -105,6 +105,10 @@ public class DurableSubscriptionView extends 
SubscriptionView implements Durable
         return 0;
     }
 
+    @Override
+    public void removeMessage(@MBeanInfo("messageId") String messageId) throws 
Exception {
+        throw new IllegalStateException("Subscription must be inactive");
+    }
 
     public boolean doesCursorHaveMessagesBuffered() {
         if (durableSub != null && durableSub.getPending() != null) {

http://git-wip-us.apache.org/repos/asf/activemq/blob/11579bb9/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DurableSubscriptionViewMBean.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DurableSubscriptionViewMBean.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DurableSubscriptionViewMBean.java
index df0841b..73fa16e 100755
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DurableSubscriptionViewMBean.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DurableSubscriptionViewMBean.java
@@ -27,6 +27,7 @@ public interface DurableSubscriptionViewMBean extends 
SubscriptionViewMBean {
     /**
      * @return name of the durable subscription name
      */
+    @MBeanInfo("The subscription name.")
     String getSubscriptionName();
 
     /**
@@ -35,6 +36,7 @@ public interface DurableSubscriptionViewMBean extends 
SubscriptionViewMBean {
      * @return messages
      * @throws OpenDataException
      */
+    @MBeanInfo("Browse the composite data array of pending messages in this 
subscription")
     CompositeData[] browse() throws OpenDataException;
 
     /**
@@ -43,44 +45,61 @@ public interface DurableSubscriptionViewMBean extends 
SubscriptionViewMBean {
      * @return messages
      * @throws OpenDataException
      */
+    @MBeanInfo("Browse the tabular data of pending messages in this 
subscription")
     TabularData browseAsTable() throws OpenDataException;
 
     /**
      * Destroys the durable subscription so that messages will no longer be
      * stored for this subscription
      */
+    @MBeanInfo("Destroy or delete this subscription")
     void destroy() throws Exception;
     
     /**
      * @return true if the message cursor has memory space available
      * to page in more messages
      */
+    @MBeanInfo("The subscription has space for more messages in memory")
     public boolean doesCursorHaveSpace();
     
     /**
      * @return true if the cursor has reached its memory limit for
      * paged in messages
      */
+    @MBeanInfo("The subscription cursor is full")
     public boolean isCursorFull();
     
     /**
      * @return true if the cursor has messages buffered to deliver
      */
+    @MBeanInfo("The subscription cursor has messages in memory")
     public boolean doesCursorHaveMessagesBuffered();
     
     /**
      * @return the cursor memory usage in bytes
      */
+    @MBeanInfo("The subscription cursor memory usage bytes")
     public long getCursorMemoryUsage();
     
     /**
      * @return the cursor memory usage as a percentage
      */
+    @MBeanInfo("The subscription cursor memory usage %")
     public int getCursorPercentUsage();
     
     /**
      * @return the number of messages available to be paged in 
      * by the cursor
      */
+    @MBeanInfo("The subscription cursor size or message count")
     public int cursorSize();
+
+    /**
+     * Removes a message from the durable subscription.
+     *
+     * @param messageId
+     * @throws Exception
+     */
+    @MBeanInfo("Remove a message from the subscription by JMS message ID.")
+    public void removeMessage(@MBeanInfo("messageId") String messageId) throws 
Exception;
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/11579bb9/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/InactiveDurableSubscriptionView.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/InactiveDurableSubscriptionView.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/InactiveDurableSubscriptionView.java
index 65e49f4..8e61b31 100755
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/InactiveDurableSubscriptionView.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/InactiveDurableSubscriptionView.java
@@ -157,4 +157,10 @@ public class InactiveDurableSubscriptionView extends 
DurableSubscriptionView imp
     public String getSelector() {
         return subscriptionInfo.getSelector();
     }
+
+    @Override
+    public void removeMessage(@MBeanInfo("messageId") String messageId) throws 
Exception {
+        broker.remove(this, messageId);
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/11579bb9/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java
index 59278f1..854f5e0 100755
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java
@@ -17,10 +17,8 @@
 package org.apache.activemq.broker.jmx;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
-import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
@@ -29,6 +27,7 @@ import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ThreadPoolExecutor;
 
+import javax.jms.IllegalStateException;
 import javax.management.InstanceNotFoundException;
 import javax.management.MalformedObjectNameException;
 import javax.management.ObjectName;
@@ -47,8 +46,10 @@ import org.apache.activemq.broker.ProducerBrokerExchange;
 import org.apache.activemq.broker.jmx.OpenTypeSupport.OpenTypeFactory;
 import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.broker.region.DestinationFactory;
-import org.apache.activemq.broker.region.DestinationFactoryImpl;
 import org.apache.activemq.broker.region.DestinationInterceptor;
+import org.apache.activemq.broker.region.DurableTopicSubscription;
+import org.apache.activemq.broker.region.MessageReference;
+import org.apache.activemq.broker.region.NullMessageReference;
 import org.apache.activemq.broker.region.Queue;
 import org.apache.activemq.broker.region.Region;
 import org.apache.activemq.broker.region.RegionBroker;
@@ -64,12 +65,10 @@ import org.apache.activemq.command.ActiveMQTopic;
 import org.apache.activemq.command.ConnectionInfo;
 import org.apache.activemq.command.ConsumerInfo;
 import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.MessageId;
 import org.apache.activemq.command.ProducerInfo;
 import org.apache.activemq.command.SubscriptionInfo;
-import org.apache.activemq.store.MessageRecoveryListener;
-import org.apache.activemq.store.PersistenceAdapter;
-import org.apache.activemq.store.TopicMessageStore;
 import org.apache.activemq.thread.Scheduler;
 import org.apache.activemq.thread.TaskRunnerFactory;
 import org.apache.activemq.transaction.XATransaction;
@@ -539,11 +538,11 @@ public class ManagedRegionBroker extends RegionBroker {
     }
 
     public CompositeData[] browse(SubscriptionView view) throws 
OpenDataException {
-        List<Message> messages = getSubscriberMessages(view);
-        CompositeData c[] = new CompositeData[messages.size()];
+        Message[] messages = getSubscriberMessages(view);
+        CompositeData c[] = new CompositeData[messages.length];
         for (int i = 0; i < c.length; i++) {
             try {
-                c[i] = OpenTypeSupport.convert(messages.get(i));
+                c[i] = OpenTypeSupport.convert(messages[i]);
             } catch (Throwable e) {
                 LOG.error("Failed to browse: {}", view, e);
             }
@@ -553,53 +552,59 @@ public class ManagedRegionBroker extends RegionBroker {
 
     public TabularData browseAsTable(SubscriptionView view) throws 
OpenDataException {
         OpenTypeFactory factory = 
OpenTypeSupport.getFactory(ActiveMQMessage.class);
-        List<Message> messages = getSubscriberMessages(view);
+        Message[] messages = getSubscriberMessages(view);
         CompositeType ct = factory.getCompositeType();
         TabularType tt = new TabularType("MessageList", "MessageList", ct, new 
String[] {"JMSMessageID"});
         TabularDataSupport rc = new TabularDataSupport(tt);
-        for (int i = 0; i < messages.size(); i++) {
-            rc.put(new CompositeDataSupport(ct, 
factory.getFields(messages.get(i))));
+        for (int i = 0; i < messages.length; i++) {
+            rc.put(new CompositeDataSupport(ct, 
factory.getFields(messages[i])));
         }
         return rc;
     }
 
-    protected List<Message> getSubscriberMessages(SubscriptionView view) {
-        // TODO It is very dangerous operation for big backlogs
-        if (!(destinationFactory instanceof DestinationFactoryImpl)) {
-            throw new RuntimeException("unsupported by " + destinationFactory);
-        }
-        PersistenceAdapter adapter = 
((DestinationFactoryImpl)destinationFactory).getPersistenceAdapter();
-        final List<Message> result = new ArrayList<Message>();
-        try {
-            ActiveMQTopic topic = new ActiveMQTopic(view.getDestinationName());
-            TopicMessageStore store = adapter.createTopicMessageStore(topic);
-            store.recover(new MessageRecoveryListener() {
-                @Override
-                public boolean recoverMessage(Message message) throws 
Exception {
-                    result.add(message);
-                    return true;
-                }
+    public void remove(SubscriptionView view, String messageId)  throws 
Exception {
+        ActiveMQDestination destination = getTopicDestination(view);
+        if (destination != null) {
+            final Topic topic = (Topic) 
getTopicRegion().getDestinationMap().get(destination);
+            final MessageAck messageAck = new MessageAck();
+            messageAck.setMessageID(new MessageId(messageId));
+            messageAck.setDestination(destination);
 
-                @Override
-                public boolean recoverMessageReference(MessageId 
messageReference) throws Exception {
-                    throw new RuntimeException("Should not be called.");
-                }
+            
topic.getMessageStore().removeMessage(brokerService.getAdminConnectionContext(),
 messageAck);
 
-                @Override
-                public boolean hasSpace() {
-                    return true;
-                }
+            // if sub is active, remove from cursor
+            if (view.subscription instanceof DurableTopicSubscription) {
+                final DurableTopicSubscription durableTopicSubscription = 
(DurableTopicSubscription) view.subscription;
+                final MessageReference messageReference = new 
NullMessageReference();
+                
messageReference.getMessage().setMessageId(messageAck.getFirstMessageId());
+                durableTopicSubscription.getPending().remove(messageReference);
+            }
 
-                @Override
-                public boolean isDuplicate(MessageId id) {
-                    return false;
-                }
-            });
-        } catch (Throwable e) {
-            LOG.error("Failed to browse messages for Subscription {}", view, 
e);
+        } else {
+            throw new IllegalStateException("can't determine topic for sub:" + 
view);
+        }
+    }
+
+    protected Message[] getSubscriberMessages(SubscriptionView view) {
+        ActiveMQDestination destination = getTopicDestination(view);
+        if (destination != null) {
+            Topic topic = (Topic) 
getTopicRegion().getDestinationMap().get(destination);
+            return topic.browse();
+
+        } else {
+            LOG.warn("can't determine topic to browse for sub:" + view);
+            return new Message[]{};
         }
-        return result;
+    }
 
+    private ActiveMQDestination getTopicDestination(SubscriptionView view) {
+        ActiveMQDestination destination = null;
+        if (view.subscription instanceof DurableTopicSubscription) {
+            destination = new ActiveMQTopic(view.getDestinationName());
+        } else if (view instanceof InactiveDurableSubscriptionView) {
+            destination = 
((InactiveDurableSubscriptionView)view).subscriptionInfo.getDestination();
+        }
+        return destination;
     }
 
     protected ObjectName[] getTopics() {

http://git-wip-us.apache.org/repos/asf/activemq/blob/11579bb9/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/SubscriptionViewMBean.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/SubscriptionViewMBean.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/SubscriptionViewMBean.java
index 1907b98..8fad1b6 100755
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/SubscriptionViewMBean.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/SubscriptionViewMBean.java
@@ -58,7 +58,7 @@ public interface SubscriptionViewMBean {
     /**
      * @return the destination name
      */
-    @MBeanInfo("The name of the destionation the subscription is on.")
+    @MBeanInfo("The name of the destination the subscription is on.")
     String getDestinationName();
 
     /**
@@ -158,13 +158,13 @@ public interface SubscriptionViewMBean {
     /**
      * @return whether or not the subscriber is durable (persistent)
      */
-    @MBeanInfo("The subsription is persistent.")
+    @MBeanInfo("The subscription is persistent.")
     boolean isDurable();
 
     /**
      * @return whether or not the subscriber ignores local messages
      */
-    @MBeanInfo("The subsription ignores local messages.")
+    @MBeanInfo("The subscription ignores local messages.")
     boolean isNoLocal();
 
     /**

http://git-wip-us.apache.org/repos/asf/activemq/blob/11579bb9/activemq-broker/src/main/java/org/apache/activemq/broker/region/NullMessageReference.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/NullMessageReference.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/NullMessageReference.java
index 510f5aa..9510972 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/NullMessageReference.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/NullMessageReference.java
@@ -22,9 +22,9 @@ import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageId;
 
 /**
- * Only used by the {@link QueueMessageReference#NULL_MESSAGE}
+ * Used by the {@link QueueMessageReference#NULL_MESSAGE}
  */
-final class NullMessageReference implements QueueMessageReference {
+public final class NullMessageReference implements QueueMessageReference {
 
     private final ActiveMQMessage message = new ActiveMQMessage();
     private volatile int references;

http://git-wip-us.apache.org/repos/asf/activemq/blob/11579bb9/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineBrowseRemoveTest.java
----------------------------------------------------------------------
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineBrowseRemoveTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineBrowseRemoveTest.java
new file mode 100644
index 0000000..0532ede
--- /dev/null
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineBrowseRemoveTest.java
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.usecases;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.management.ObjectName;
+import javax.management.openmbean.CompositeData;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.TestSupport;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.jmx.DurableSubscriptionViewMBean;
+import org.apache.activemq.store.PersistenceAdapter;
+import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+import static org.junit.Assert.*;
+
+
+@RunWith(value = Parameterized.class)
+public class DurableSubscriptionOfflineBrowseRemoveTest extends 
DurableSubscriptionOfflineTestBase {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(DurableSubscriptionOfflineBrowseRemoveTest.class);
+
+    public boolean keepDurableSubsActive;
+
+    @Parameterized.Parameters(name = "PA-{0}.KeepSubsActive-{1}")
+    public static Collection<Object[]> getTestParameters() {
+        List<Object[]> testParameters = new ArrayList<Object[]>();
+        testParameters.add(new 
Object[]{TestSupport.PersistenceAdapterChoice.KahaDB, Boolean.TRUE});
+        testParameters.add(new 
Object[]{TestSupport.PersistenceAdapterChoice.KahaDB, Boolean.FALSE});
+
+        testParameters.add(new 
Object[]{TestSupport.PersistenceAdapterChoice.JDBC, Boolean.TRUE});
+        testParameters.add(new 
Object[]{TestSupport.PersistenceAdapterChoice.JDBC, Boolean.FALSE});
+
+        // leveldb needs some work on finding index from green messageId
+        return testParameters;
+    }
+
+    public 
DurableSubscriptionOfflineBrowseRemoveTest(TestSupport.PersistenceAdapterChoice 
adapter, boolean keepDurableSubsActive) {
+        this.defaultPersistenceAdapter = adapter;
+        this.usePrioritySupport = true;
+        this.keepDurableSubsActive = keepDurableSubsActive;
+    }
+
+    @Override
+    public PersistenceAdapter setDefaultPersistenceAdapter(BrokerService 
broker) throws IOException {
+        broker.setKeepDurableSubsActive(keepDurableSubsActive);
+        return super.setPersistenceAdapter(broker, defaultPersistenceAdapter);
+    }
+
+    @Override
+    protected ActiveMQConnectionFactory createConnectionFactory() throws 
Exception {
+        ActiveMQConnectionFactory connectionFactory = new 
ActiveMQConnectionFactory("vm://" + getName(true));
+        connectionFactory.setWatchTopicAdvisories(false);
+        return connectionFactory;
+    }
+
+    @Test(timeout = 60 * 1000)
+    public void testBrowseRemoveBrowseOfflineSub() throws Exception {
+        // create durable subscription
+        Connection con = createConnection();
+        Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        session.createDurableSubscriber(topic, "SubsId");
+        session.close();
+        con.close();
+
+        // send messages
+        con = createConnection();
+        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageProducer producer = session.createProducer(null);
+
+        for (int i = 0; i < 10; i++) {
+            Message message = session.createMessage();
+            message.setStringProperty("filter", "true");
+            producer.send(topic, message);
+        }
+
+        session.close();
+        con.close();
+
+        // browse the durable sub
+        ObjectName[] subs = 
broker.getAdminView().getInactiveDurableTopicSubscribers();
+        assertEquals(1, subs.length);
+        ObjectName subName = subs[0];
+        DurableSubscriptionViewMBean sub = (DurableSubscriptionViewMBean)
+                broker.getManagementContext().newProxyInstance(subName, 
DurableSubscriptionViewMBean.class, true);
+        CompositeData[] data  = sub.browse();
+        assertNotNull(data);
+        assertEquals(10, data.length);
+
+        LinkedList<String> idToRemove = new LinkedList<>();
+        idToRemove.add((String)data[5].get("JMSMessageID"));
+        idToRemove.add((String)data[9].get("JMSMessageID"));
+        idToRemove.add((String)data[0].get("JMSMessageID"));
+
+        LOG.info("Removing: " + idToRemove);
+        for (String id: idToRemove) {
+            sub.removeMessage(id);
+        }
+
+        if 
(defaultPersistenceAdapter.compareTo(TestSupport.PersistenceAdapterChoice.JDBC) 
== 0) {
+            for (int i=0; i<10; i++) {
+                // each iteration does one priority
+                
((JDBCPersistenceAdapter)broker.getPersistenceAdapter()).cleanup();
+            }
+        }
+
+        data  = sub.browse();
+        assertNotNull(data);
+        assertEquals(7, data.length);
+
+        for (CompositeData c: data) {
+            String id = (String)c.get("JMSMessageID");
+            for (String removedId : idToRemove) {
+                assertNotEquals(id, removedId);
+            }
+        }
+
+        // remove non existent
+        LOG.info("Repeat remove: " + idToRemove.getFirst());
+        sub.removeMessage(idToRemove.getFirst());
+
+    }
+}

Reply via email to