Repository: activemq
Updated Branches:
  refs/heads/master a35d23dff -> 6630e8137


https://issues.apache.org/jira/browse/AMQ-6435 - destination mbean query api


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/6630e813
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/6630e813
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/6630e813

Branch: refs/heads/master
Commit: 6630e813795c20055b26cea52ba6a34390315bdf
Parents: a35d23d
Author: Dejan Bosanac <de...@nighttale.net>
Authored: Mon Sep 19 16:22:36 2016 +0200
Committer: Dejan Bosanac <de...@nighttale.net>
Committed: Mon Sep 19 16:22:44 2016 +0200

----------------------------------------------------------------------
 activemq-broker/pom.xml                         |  10 +
 .../apache/activemq/broker/jmx/BrokerView.java  |  31 ++-
 .../activemq/broker/jmx/BrokerViewMBean.java    |  18 ++
 .../broker/jmx/DestinationsViewFilter.java      | 248 +++++++++++++++++++
 .../broker/jmx/ManagedRegionBroker.java         |   9 +
 .../activemq/util/IntrospectionSupport.java     |  15 +-
 assembly/pom.xml                                |   5 +
 pom.xml                                         |   1 +
 8 files changed, 328 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/6630e813/activemq-broker/pom.xml
----------------------------------------------------------------------
diff --git a/activemq-broker/pom.xml b/activemq-broker/pom.xml
index 09a84af..0078c80 100755
--- a/activemq-broker/pom.xml
+++ b/activemq-broker/pom.xml
@@ -83,6 +83,16 @@
       <artifactId>slf4j-log4j12</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+      <version>${guava-version}</version>
+    </dependency>
+    <dependency>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-databind</artifactId>
+      <version>2.7.5</version>
+    </dependency>
   </dependencies>
 
   <reporting>

http://git-wip-us.apache.org/repos/asf/activemq/blob/6630e813/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java 
b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java
index 55fb9bd..70a8083 100755
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java
@@ -19,23 +19,20 @@ package org.apache.activemq.broker.jmx;
 import java.io.File;
 import java.io.IOException;
 import java.net.URI;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.NoSuchElementException;
+import java.util.*;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import javax.management.MalformedObjectNameException;
 import javax.management.ObjectName;
+import javax.management.openmbean.CompositeData;
+import javax.management.openmbean.OpenDataException;
 
 import org.apache.activemq.ActiveMQConnectionMetaData;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.TransportConnector;
 import org.apache.activemq.broker.region.Subscription;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.command.ActiveMQTopic;
-import org.apache.activemq.command.ConsumerId;
-import org.apache.activemq.command.ConsumerInfo;
-import org.apache.activemq.command.RemoveSubscriptionInfo;
+import org.apache.activemq.command.*;
 import org.apache.activemq.network.NetworkConnector;
 import org.apache.activemq.util.BrokerSupport;
 import org.slf4j.Logger;
@@ -289,6 +286,24 @@ public class BrokerView implements BrokerViewMBean {
     }
 
     @Override
+    public String queryQueues(String filter, int page, int pageSize) throws 
IOException {
+        return DestinationsViewFilter.create(filter)
+                .setDestinations(safeGetBroker().getQueueViews())
+                .filter(page, pageSize);
+    }
+
+    @Override
+    public String queryTopics(String filter, int page, int pageSize) throws 
IOException {
+        return DestinationsViewFilter.create(filter)
+                .setDestinations(safeGetBroker().getTopicViews())
+                .filter(page, pageSize);
+    }
+
+    public CompositeData[] browseQueue(String queueName) throws 
OpenDataException, MalformedObjectNameException {
+       return safeGetBroker().getQueueView(queueName).browse();
+    }
+
+    @Override
     public ObjectName[] getTemporaryTopics() {
         return safeGetBroker().getTemporaryTopicsNonSuppressed();
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/6630e813/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java
index 9951803..4f80dc7 100755
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java
@@ -16,9 +16,13 @@
  */
 package org.apache.activemq.broker.jmx;
 
+import java.io.IOException;
 import java.util.Map;
 
+import javax.management.MalformedObjectNameException;
 import javax.management.ObjectName;
+import javax.management.openmbean.CompositeData;
+import javax.management.openmbean.OpenDataException;
 
 import org.apache.activemq.Service;
 
@@ -178,6 +182,20 @@ public interface BrokerViewMBean extends Service {
     @MBeanInfo("Standard Queues containing AIE messages.")
     ObjectName[] getQueues();
 
+    /**
+     * Queue Query API, take a look at {@link DestinationsViewFilter} for more 
information
+     */
+    @MBeanInfo("Query queues")
+    String queryQueues(String filter, int page, int pageSize) throws 
IOException;
+
+    /**
+     * Topic Query API, take a look at {@link DestinationsViewFilter} for more 
information
+     */
+    @MBeanInfo("Query topics")
+    String queryTopics(String filter, int page, int pageSize) throws 
IOException;
+
+    public CompositeData[] browseQueue(String queueName) throws 
OpenDataException, MalformedObjectNameException;
+
     @MBeanInfo("Temporary Topics; generally unused.")
     ObjectName[] getTemporaryTopics();
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/6630e813/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationsViewFilter.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationsViewFilter.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationsViewFilter.java
new file mode 100644
index 0000000..bd9a27f
--- /dev/null
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationsViewFilter.java
@@ -0,0 +1,248 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.jmx;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Predicate;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Ordering;
+import org.apache.activemq.advisory.AdvisorySupport;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.management.ObjectName;
+import java.io.IOException;
+import java.io.Serializable;
+import java.io.StringWriter;
+import java.lang.reflect.Method;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.activemq.util.IntrospectionSupport.*;
+
+/**
+ * Defines a query API for destinations MBeans
+ *
+ * Typical usage
+ *
+ *         return DestinationsViewFilter.create(filter)
+ *                .setDestinations(broker.getQueueViews())
+ *                .filter(page, pageSize);
+ *
+ * where 'filter' is JSON representation of the query, like
+ *
+ * {name: '77', filter:'nonEmpty', sortColumn:'queueSize', sortOrder:'desc'}
+ *
+ * This returns a JSON map, containing filtered map of MBeans in the "data" 
field and total number of destinations that match criteria in the "count" field.
+ * The result will be properly paged, according to 'page' and 'pageSize' 
parameters.
+ *
+ */
+public class DestinationsViewFilter implements Serializable {
+    private static final Logger LOG = 
LoggerFactory.getLogger(DestinationsViewFilter.class);
+
+    private static final long serialVersionUID = 1L;
+
+    /**
+     * Name pattern used to filter destinations
+     */
+    String name;
+
+    /**
+     * Arbitrary filter key to be applied to the destinations. Currently only 
simple predefined filters has been implemented:
+     *
+     * empty - return only empty queues (queueSize = 0)
+     * nonEmpty - return only non-empty queues queueSize != 0)
+     * noConsumer - return only destinations that doesn't have consumers
+     * nonAdvisory - return only non-Advisory topics
+     *
+     * For more implementation details see {@link 
DestinationsViewFilter.getPredicate}
+     *
+     */
+    String filter;
+
+    /**
+     * Sort destinations by this {@link DestinationView} property
+     */
+    String sortColumn = "name";
+
+    /**
+     * Order of sorting - 'asc' or 'desc'
+     */
+    String sortOrder = "asc";
+
+    Map<ObjectName, DestinationView> destinations;
+
+
+    public DestinationsViewFilter() {
+    }
+
+    /**
+     * Creates an object from the JSON string
+     *
+     */
+    public static DestinationsViewFilter create(String json) throws 
IOException {
+        ObjectMapper mapper = new ObjectMapper();
+        if (json == null) {
+            return new DestinationsViewFilter();
+        }
+        json = json.trim();
+        if (json.length() == 0 || json.equals("{}")) {
+            return new DestinationsViewFilter();
+        }
+        return mapper.readerFor(DestinationsViewFilter.class).readValue(json);
+    }
+
+    /**
+     * Destination MBeans to be queried
+     */
+    public DestinationsViewFilter setDestinations(Map<ObjectName, 
DestinationView> destinations) {
+        this.destinations = destinations;
+        return this;
+    }
+
+    /**
+     * Filter, sort and page results.
+     *
+     * Returns JSON map with resulting destination views and total number of 
matched destinations
+     *
+     * @param page - defines result page to be returned
+     * @param pageSize - defines page size to be used
+     * @throws IOException
+     */
+    String filter(int page, int pageSize) throws IOException {
+        ObjectMapper mapper = new ObjectMapper();
+        Map<ObjectName, DestinationView> filtered = 
getFilteredDestinations(page, pageSize);
+        Map<String, Object> result = new HashMap<String, Object>();
+        result.put("data", filtered);
+        result.put("count", destinations.size());
+        StringWriter writer = new StringWriter();
+        mapper.writeValue(writer, result);
+        return writer.toString();
+    }
+
+    Map<ObjectName, DestinationView> getFilteredDestinations(int page, int 
pageSize) {
+        Map<ObjectName, DestinationView> filtered = 
Maps.filterValues(destinations, getPredicate());
+        ImmutableMap.Builder<ObjectName, DestinationView> builder = 
ImmutableMap.builder();
+        int start = (page - 1) * pageSize;
+        int end = Math.min(page * pageSize, filtered.size());
+        int i = 0;
+        for (Map.Entry<ObjectName, DestinationView> entry :
+                getOrdering().sortedCopy(filtered.entrySet())) {
+            if (i >= start && i < end) {
+                builder.put(entry.getKey(), entry.getValue());
+            }
+            i++;
+        }
+
+        return builder.build();
+    }
+
+    Predicate<DestinationView> getPredicate() {
+        return new Predicate<DestinationView>() {
+            @Override
+            public boolean apply(DestinationView input) {
+                boolean match = true;
+                if (getName() != null && !getName().isEmpty()) {
+                    match = input.getName().contains(getName());
+                }
+
+                if (match) {
+                    if (getFilter().equals("empty")) {
+                        match = input.getQueueSize() == 0;
+                    }
+                    if (getFilter().equals("nonEmpty")) {
+                        match = input.getQueueSize() != 0;
+                    }
+                    if (getFilter().equals("noConsumer")) {
+                        match = input.getConsumerCount() == 0;
+                    }
+                    if (getFilter().equals("nonAdvisory")) {
+                        return !(input instanceof TopicView && 
AdvisorySupport.isAdvisoryTopic(new ActiveMQTopic(input.getName())));
+                    }
+                }
+
+                return match;
+            }
+        };
+    }
+
+    Ordering<Map.Entry<ObjectName, DestinationView>> getOrdering() {
+        return new Ordering<Map.Entry<ObjectName, DestinationView>>() {
+
+            Method getter = findGetterMethod(DestinationView.class, 
getSortColumn());
+
+            @Override
+            public int compare(Map.Entry<ObjectName, DestinationView> left, 
Map.Entry<ObjectName, DestinationView> right) {
+                try {
+                    if (getter != null) {
+                        Object leftValue = getter.invoke(left.getValue());
+                        Object rightValue = getter.invoke(right.getValue());
+                        if (leftValue instanceof Comparable && rightValue 
instanceof Comparable) {
+                            if (getSortOrder().toLowerCase().equals("desc")) {
+                                return ((Comparable) 
rightValue).compareTo(leftValue);
+                            } else {
+                                return ((Comparable) 
leftValue).compareTo(rightValue);
+                            }
+                        }
+                    }
+                    return 0;
+                } catch (Exception e) {
+                    LOG.info("Exception sorting destinations", e);
+                    return 0;
+                }
+            }
+        };
+    }
+
+    public Map<ObjectName, DestinationView> getDestinations() {
+        return destinations;
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public void setName(String name) {
+        this.name = name;
+    }
+
+    public String getFilter() {
+        return filter;
+    }
+
+    public void setFilter(String filter) {
+        this.filter = filter;
+    }
+
+    public String getSortOrder() {
+        return sortOrder;
+    }
+
+    public void setSortOrder(String sortOrder) {
+        this.sortOrder = sortOrder;
+    }
+
+    public String getSortColumn() {
+        return sortColumn;
+    }
+
+    public void setSortColumn(String sortColumn) {
+        this.sortColumn = sortColumn;
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/6630e813/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java
index ddd1a17..fa9584d 100755
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java
@@ -828,4 +828,13 @@ public class ManagedRegionBroker extends RegionBroker {
     public Map<ObjectName, DestinationView> getQueueViews() {
         return queues;
     }
+
+    public Map<ObjectName, DestinationView> getTopicViews() {
+        return topics;
+    }
+
+    public DestinationView getQueueView(String queueName) throws 
MalformedObjectNameException {
+        ObjectName objName = 
BrokerMBeanSupport.createDestinationName(brokerObjectName.toString(), "Queue", 
queueName);
+        return queues.get(objName);
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/6630e813/activemq-client/src/main/java/org/apache/activemq/util/IntrospectionSupport.java
----------------------------------------------------------------------
diff --git 
a/activemq-client/src/main/java/org/apache/activemq/util/IntrospectionSupport.java
 
b/activemq-client/src/main/java/org/apache/activemq/util/IntrospectionSupport.java
index a13e7b4..37835c1 100755
--- 
a/activemq-client/src/main/java/org/apache/activemq/util/IntrospectionSupport.java
+++ 
b/activemq-client/src/main/java/org/apache/activemq/util/IntrospectionSupport.java
@@ -255,7 +255,7 @@ public final class IntrospectionSupport {
         }
     }
 
-    private static Method findSetterMethod(Class clazz, String name) {
+    public static Method findSetterMethod(Class clazz, String name) {
         // Build the method name.
         name = "set" + Character.toUpperCase(name.charAt(0)) + 
name.substring(1);
         Method[] methods = clazz.getMethods();
@@ -268,6 +268,19 @@ public final class IntrospectionSupport {
         return null;
     }
 
+    public static Method findGetterMethod(Class clazz, String name) {
+        // Build the method name.
+        name = "get" + Character.toUpperCase(name.charAt(0)) + 
name.substring(1);
+        Method[] methods = clazz.getMethods();
+        for (Method method : methods) {
+            Class<?> params[] = method.getParameterTypes();
+            if (method.getName().equals(name) && params.length == 0 ) {
+                return method;
+            }
+        }
+        return null;
+    }
+
     public static String toString(Object target) {
         return toString(target, Object.class, null);
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/6630e813/assembly/pom.xml
----------------------------------------------------------------------
diff --git a/assembly/pom.xml b/assembly/pom.xml
index 95c7693..7147587 100755
--- a/assembly/pom.xml
+++ b/assembly/pom.xml
@@ -340,6 +340,11 @@
       <artifactId>derbynet</artifactId>
       <optional>true</optional>
     </dependency>
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+      <version>${guava-version}</version>
+    </dependency>
 
     <!-- copied dependencies from activemq-web-console -->
     <!-- enable commons-logging when inside jetty6:run -->

http://git-wip-us.apache.org/repos/asf/activemq/blob/6630e813/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index f27343f..3fda867 100755
--- a/pom.xml
+++ b/pom.xml
@@ -63,6 +63,7 @@
     <directory-version>2.0.0-M6</directory-version>
     <ftpserver-version>1.0.6</ftpserver-version>
     <geronimo-version>1.0</geronimo-version>
+    <guava-version>19.0</guava-version>
     <hadoop-version>1.0.0</hadoop-version>
     <hawtbuf-version>1.11</hawtbuf-version>
     <hawtdispatch-version>1.22</hawtdispatch-version>

Reply via email to