Added: 
qpid/trunk/qpid/tools/src/java/src/main/java/org/apache/qpid/qmf2/tools/QpidCtrl.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/tools/src/java/src/main/java/org/apache/qpid/qmf2/tools/QpidCtrl.java?rev=1465662&view=auto
==============================================================================
--- 
qpid/trunk/qpid/tools/src/java/src/main/java/org/apache/qpid/qmf2/tools/QpidCtrl.java
 (added)
+++ 
qpid/trunk/qpid/tools/src/java/src/main/java/org/apache/qpid/qmf2/tools/QpidCtrl.java
 Mon Apr  8 15:19:04 2013
@@ -0,0 +1,358 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.qmf2.tools;
+
+// JMS Imports
+import javax.jms.Connection;
+
+// Misc Imports
+import java.util.Arrays;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+// QMF2 Imports
+import org.apache.qpid.qmf2.common.ObjectId;
+import org.apache.qpid.qmf2.common.QmfData;
+import org.apache.qpid.qmf2.common.QmfException;
+import org.apache.qpid.qmf2.console.Agent;
+import org.apache.qpid.qmf2.console.Console;
+import org.apache.qpid.qmf2.console.MethodResult;
+import org.apache.qpid.qmf2.console.QmfConsoleData;
+
+import org.apache.qpid.qmf2.util.ConnectionHelper;
+import org.apache.qpid.qmf2.util.GetOpt;
+
+// Reuse this class as it provides a handy mechanism to parse an args String 
into a Map
+import org.apache.qpid.messaging.util.AddressParser;
+
+/**
+ * A tool to allow QMF2 methods to be invoked from the command line.
+ * <pre>
+ * Usage: QpidCtrl [options] command [args]
+ * The args need to be in a Stringified Map format (similar to an Address 
String)
+ * e.g. to set broker log level: QpidCtrl setLogLevel 
"{level:\"debug+:Broker\"}"
+ * The listValues command lists property names and values of the specified 
object.
+ * The listObjects command lists all objects of the specified package and 
class.
+ * 
+ * Options:
+ *   -h, --help            show this help message and exit
+ *   -v                    enable logging
+ *   -a &lt;address&gt;, --broker-address=&lt;address&gt;
+ *                         broker-addr is in the form:  [username/password@]
+ *                         hostname | ip-address [:&lt;port&gt;]   ex:  
localhost,
+ *                         10.1.1.7:10000, broker-host:10000,
+ *                         guest/guest@localhost
+ *   -c &lt;class&gt;, --class=&lt;class&gt;
+ *                         class of object on which command is being invoked
+ *                         (default broker)
+ *   -p &lt;package&gt;, --package=&lt;package&gt;
+ *                         package of object on which command is being invoked
+ *                         (default org.apache.qpid.broker)
+ *   -i &lt;id&gt;, --id=&lt;id&gt;    identifier of object on which command 
is being invoked
+ *                         (default amqp-broker)
+ *   --agent=&lt;agent name&gt;
+ *                         The name of the Agent to which commands will be sent
+ *                         This will try to match &lt;agent name&gt; against 
the Agent name
+ *                         the Agent product name and will also check if the 
Agent name
+ *                         contains the &lt;agent name&gt; String
+ *                         (default qpidd)
+ *   --sasl-mechanism=&lt;mech&gt;
+ *                         SASL mechanism for authentication (e.g. EXTERNAL,
+ *                         ANONYMOUS, PLAIN, CRAM-MD, DIGEST-MD5, GSSAPI). SASL
+ *                         automatically picks the most secure available
+ *                         mechanism - use this option to override.
+ * </pre>
+ * Examples (Note the quotes and escaped quotes are significant!):
+ * <p>
+ * Get the current broker log level:
+ * <pre>QpidCtrl getLogLevel</pre>
+ *
+ * Set the current broker log level to notice+:
+ * <pre>QpidCtrl setLogLevel "{level:\"notice+\"}"</pre>
+ * 
+ * Set the current broker log level to debug+ for all Management Objects:
+ * <pre>QpidCtrl setLogLevel "{level:\"debug+\"}"</pre>
+ *
+ * Set the current broker log level to debug+ for just the Broker Management 
Object:
+ * <pre>QpidCtrl setLogLevel "{level:\"debug+:Broker\"}"</pre>
+ *
+ * List the properties of the qmf.default.direct exchange:
+ * <pre>QpidCtrl -c exchange -i qmf.default.direct listValues</pre>
+ *
+ * Create a queue called test with a flow-to-disk limit policy:
+ * <pre>QpidCtrl create 
"{type:queue,name:test,properties:{'qpid.policy_type':ring}}"</pre>
+ *
+ * Delete a queue called test:
+ * <pre>QpidCtrl delete "{type:queue,name:test}"</pre>
+ *
+ * Create a binding called bind1 between the amq.match exchange and the test 
queue matching the headers name=fadams
+ * and gender=male:
+ * <pre>QpidCtrl create 
"{type:binding,name:'amq.match/test/bind1',properties:{x-match:all,name:fadams,gender:male}}"</pre>
+ *
+ * Delete the binding called bind1 between the amq.match exchange and the test 
queue:
+ * <pre>QpidCtrl delete "{type:binding,name:'amq.match/test/bind1'}"</pre>
+ *
+ * Get the broker to echo a message:
+ * <pre>QpidCtrl echo "{sequence:1234,body:'Peaches En Regalia'}"</pre>
+ *
+ * Invoke the event method on the gizmo Agent (launch gizmo Agent via 
AgentTest):
+ * <pre>QpidCtrl -p com.profitron.gizmo -c control -i OPERATIONAL 
--agent=gizmo event</pre>
+ *
+ * Invoke the create_child method on the gizmo Agent (launch gizmo Agent via 
AgentTest):
+ * <pre>QpidCtrl -p com.profitron.gizmo -c control -i OPERATIONAL 
--agent=gizmo create_child "{name:monkeyBoy}"</pre>
+ *
+ * Invoke the stop method on the gizmo Agent (launch gizmo Agent via 
AgentTest):
+ * <pre>QpidCtrl -p com.profitron.gizmo -c control -i OPERATIONAL 
--agent=gizmo stop "{message:'Will I dream?'}"</pre>
+ *
+ * @author Fraser Adams
+ */
+public final class QpidCtrl
+{
+    private static final String _usage =
+    "Usage: QpidCtrl [options] command [args]\n" +
+    "The args need to be in a Stringified Map format (similar to an Address 
String)\n" +
+    "e.g. to set broker log level: QpidCtrl setLogLevel 
\"{level:\\\"debug+:Broker\\\"}\"\n" +
+    "The listValues command lists property names and values of the specified 
object.\n" +
+    "The listObjects command lists all objects of the specified package and 
class.\n";
+
+    private static final String _options =
+    "Options:\n" +
+    "  -h, --help            show this help message and exit\n" +
+    "  -v                    enable logging\n" +
+    "  -a <address>, --broker-address=<address>\n" +
+    "                        broker-addr is in the form:  
[username/password@]\n" +
+    "                        hostname | ip-address [:<port>]   ex:  
localhost,\n" +
+    "                        10.1.1.7:10000, broker-host:10000,\n" +
+    "                        guest/guest@localhost\n" +
+    "  -c <class>, --class=<class>\n" +
+    "                        class of object on which command is being 
invoked\n" +
+    "                        (default broker)\n" +
+    "  -p <package>, --package=<package>\n" +
+    "                        package of object on which command is being 
invoked\n" +
+    "                        (default org.apache.qpid.broker)\n" +
+    "  -i <id>, --id=<id>    identifier of object on which command is being 
invoked\n" +
+    "                        (default amqp-broker)\n" +
+    "  --agent=<agent name>\n" +
+    "                        The name of the Agent to which commands will be 
sent\n" +
+    "                        This will try to match <agent name> against the 
Agent name,\n" +
+    "                        the Agent product name and will also check if the 
Agent name\n" +
+    "                        contains the <agent name> String\n" +
+    "                        (default qpidd)\n" +
+    "  --sasl-mechanism=<mech>\n" +
+    "                        SASL mechanism for authentication (e.g. 
EXTERNAL,\n" +
+    "                        ANONYMOUS, PLAIN, CRAM-MD, DIGEST-MD5, GSSAPI). 
SASL\n" +
+    "                        automatically picks the most secure available\n" +
+    "                        mechanism - use this option to override.\n";
+
+    private Console _console;
+
+    /**
+     * Basic constructor. Creates JMS Session, Initialises Destinations, 
Producers & Consumers and starts connection.
+     * @param url the Connection URL.
+     * @param connectionOptions the connection options String to pass to 
ConnectionHelper.
+     * @param pkg the package name of the object we're invoking the method on.
+     * @param cls the class name of the object we're invoking the method on.
+     * @param id the ObjectId name of the object we're invoking the method on.
+     * @param agentName the name of the Agent to invoke the QMF method on.
+     * @param command the QMF method we're invoking.
+     * @param args the Stringified Map form of the method arguments.
+     */
+    public QpidCtrl(final String url, final String connectionOptions, final 
String pkg, final String cls,
+                    final String id, final String agentName, final String 
command, final String args)
+    {
+        try
+        {
+            Connection connection = ConnectionHelper.createConnection(url, 
connectionOptions);        
+            _console = new Console();
+            _console.addConnection(connection);
+
+            // Find the specified Agent
+            Agent agent = _console.findAgent(agentName);
+            if (agent == null)
+            {
+                System.out.println("Agent " + agentName + " not found");
+                System.exit(1);
+            }
+
+            List<Agent> agentList = Arrays.asList(new Agent[] {agent});
+            List<QmfConsoleData> objects = _console.getObjects(pkg, cls, 
agentList);
+
+            // Parse the args String
+            QmfData inArgs = (args == null) ? new QmfData() : new QmfData(new 
AddressParser(args).map());
+
+            // Find the required QmfConsoleData object and invoke the 
specified command
+            MethodResult results = null;
+            for (QmfConsoleData object : objects)
+            {
+                String objectName = object.getObjectId().getObjectName();
+                if (command.equals("listObjects"))
+                {
+                    System.out.println(objectName);
+                }
+                else
+                {
+                    if (objectName.contains(id))
+                    { // Use contains as ObjectNames may comprise other 
identifiers tha make using equals impractical
+                        if (command.equals("listValues"))
+                        {
+                            object.listValues();
+                            System.exit(1);
+                        }
+                        else
+                        {
+                            results = object.invokeMethod(command, inArgs);
+                        }
+                        break;
+                    }
+                }
+            }
+
+            if (results == null)
+            {
+                if (objects.size() == 0)
+                {
+                    System.out.println("getObjects(" + pkg + ", " + cls + ", " 
+ agentName + ") returned no objects.");
+                }
+                else
+                {
+                    System.out.println("Id " + id + " not found in " + pkg + 
":" + cls);
+                }
+            }
+            else
+            {
+                if (results.succeeded())
+                {
+                    results.listValues();
+                }
+                else
+                {
+                    System.err.println ("QmfException " + 
results.getQmfException().getMessage() +
+                                        " returned from " + command + " 
method");
+                }
+            }
+        }
+        catch (QmfException qmfe)
+        {
+            System.err.println ("QmfException " + qmfe.getMessage() + " caught 
in QpidCtrl constructor");
+        }
+    }
+
+    /**
+     * Runs QpidCtrl.
+     * @param args the command line arguments.
+     */
+    public static void main(final String[] args)
+    {
+        String logLevel = System.getProperty("amqj.logging.level");
+        logLevel = (logLevel == null) ? "FATAL" : logLevel; // Set default log 
level to FATAL rather than DEBUG.
+        System.setProperty("amqj.logging.level", logLevel);
+
+        // As of Qpid 0.16 the Session Dispatcher Thread is non-Daemon so the 
JVM gets prevented from exiting.
+        // Setting the following property to true makes it a Daemon Thread.
+        System.setProperty("qpid.jms.daemon.dispatcher", "true");
+
+        String[] longOpts = {"help", "broker-address=", "class=", "package=", 
"id=", "agent=", "sasl-mechanism="};
+        try
+        {
+            String host = "localhost";
+            String connectionOptions = "{reconnect: true}";
+            String cls = "broker";
+            String pkg = "org.apache.qpid.broker";
+            String id = "amqp-broker";
+            String agentName = "qpidd";
+            String command = null;
+            String arg = null;
+
+            GetOpt getopt = new GetOpt(args, "ha:c:p:i:v", longOpts);
+            List<String[]> optList = getopt.getOptList();
+            String[] cargs = {};
+            cargs = getopt.getEncArgs().toArray(cargs);
+
+            for (String[] opt : optList)
+            {
+                if (opt[0].equals("-h") || opt[0].equals("--help"))
+                {
+                    System.out.println(_usage);
+                    System.out.println(_options);
+                    System.exit(1);
+                }
+                else if (opt[0].equals("-a") || 
opt[0].equals("--broker-address"))
+                {
+                    host = opt[1];
+                }
+                else if (opt[0].equals("-c") || opt[0].equals("--class"))
+                {
+                    cls = opt[1];
+                }
+                else if (opt[0].equals("-p") || opt[0].equals("--package"))
+                {
+                    pkg = opt[1];
+                }
+                else if (opt[0].equals("-i") || opt[0].equals("--id"))
+                {
+                    id = opt[1];
+                }
+                else if (opt[0].equals("--agent"))
+                {
+                    agentName = opt[1];
+                }
+                else if (opt[0].equals("-v"))
+                {
+                    System.setProperty("amqj.logging.level", "DEBUG");
+                }
+                else if (opt[0].equals("--sasl-mechanism"))
+                {
+                    connectionOptions = "{reconnect: true, sasl_mechs: " + 
opt[1] + "}";
+                }
+            }
+
+            if (cargs.length < 1 || cargs.length > 2)
+            {
+                System.out.println(Arrays.asList(cargs));
+                System.out.println(_usage);
+                System.exit(1);
+            }
+    
+            command = cargs[0];
+
+            if (cargs.length == 2)
+            {
+                arg = cargs[1];
+                if (!arg.startsWith("{") || !arg.endsWith("}"))
+                {
+                    System.out.println("Incorrect format for args.");
+                    System.out.println("This needs to be in a Stringified Map 
format similar to an Address String");
+                    System.exit(1);
+                }
+            }
+
+            QpidCtrl qpidCtrl = new QpidCtrl(host, connectionOptions, pkg, 
cls, id, agentName, command, arg);
+        }
+        catch (IllegalArgumentException e)
+        {
+            System.out.println(_usage);
+            System.out.println(e.getMessage());
+            System.exit(1);
+        }
+    }
+}

Propchange: 
qpid/trunk/qpid/tools/src/java/src/main/java/org/apache/qpid/qmf2/tools/QpidCtrl.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: 
qpid/trunk/qpid/tools/src/java/src/main/java/org/apache/qpid/qmf2/tools/QpidCtrl.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: 
qpid/trunk/qpid/tools/src/java/src/main/java/org/apache/qpid/qmf2/tools/QpidPrintEvents.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/tools/src/java/src/main/java/org/apache/qpid/qmf2/tools/QpidPrintEvents.java?rev=1465662&view=auto
==============================================================================
--- 
qpid/trunk/qpid/tools/src/java/src/main/java/org/apache/qpid/qmf2/tools/QpidPrintEvents.java
 (added)
+++ 
qpid/trunk/qpid/tools/src/java/src/main/java/org/apache/qpid/qmf2/tools/QpidPrintEvents.java
 Mon Apr  8 15:19:04 2013
@@ -0,0 +1,210 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.qmf2.tools;
+
+// JMS Imports
+import javax.jms.Connection;
+
+// Misc Imports
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.io.IOException;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+
+// QMF2 Imports
+import org.apache.qpid.qmf2.common.ObjectId;
+import org.apache.qpid.qmf2.common.QmfEvent;
+import org.apache.qpid.qmf2.common.QmfEventListener;
+import org.apache.qpid.qmf2.common.QmfException;
+import org.apache.qpid.qmf2.common.WorkItem;
+import org.apache.qpid.qmf2.console.Agent;
+import org.apache.qpid.qmf2.console.Console;
+import org.apache.qpid.qmf2.console.EventReceivedWorkItem;
+import org.apache.qpid.qmf2.console.QmfConsoleData;
+import org.apache.qpid.qmf2.util.ConnectionHelper;
+import org.apache.qpid.qmf2.util.GetOpt;
+import static org.apache.qpid.qmf2.common.WorkItem.WorkItemType.*;
+
+/**
+ * Collect and print events from one or more Qpid message brokers.
+ * <pre>
+ * If no broker-addr is supplied, QpidPrintEvents connects to 'localhost:5672'.
+ * 
+ * [broker-addr] syntax:
+ * 
+ * [username/password@] hostname
+ * ip-address [:&lt;port&gt;]
+ * 
+ * Examples:
+ * 
+ * $ QpidPrintEvents localhost:5672
+ * $ QpidPrintEvents 10.1.1.7:10000
+ * $ QpidPrintEvents guest/guest@broker-host:10000
+ * 
+ * Options:
+ *   -h, --help            show this help message and exit
+ *   --heartbeats          Use heartbeats.
+ *   --sasl-mechanism=&lt;mech&gt;
+ *                         SASL mechanism for authentication (e.g. EXTERNAL,
+ *                         ANONYMOUS, PLAIN, CRAM-MD, DIGEST-MD5, GSSAPI). SASL
+ *                         automatically picks the most secure available
+ *                         mechanism - use this option to override.
+ * </pre>
+ * @author Fraser Adams
+ */
+public final class QpidPrintEvents implements QmfEventListener
+{
+    private static final String _usage =
+    "Usage: QpidPrintEvents [options] [broker-addr]...\n";
+
+    private static final String _description =
+    "Collect and print events from one or more Qpid message brokers.\n" +
+    "\n" +
+    "If no broker-addr is supplied, QpidPrintEvents connects to 
'localhost:5672'.\n" +
+    "\n" +
+    "[broker-addr] syntax:\n" +
+    "\n" +
+    "[username/password@] hostname\n" +
+    "ip-address [:<port>]\n" +
+    "\n" +
+    "Examples:\n" +
+    "\n" +
+    "$ QpidPrintEvents localhost:5672\n" +
+    "$ QpidPrintEvents 10.1.1.7:10000\n" +
+    "$ QpidPrintEvents guest/guest@broker-host:10000\n";
+
+    private static final String _options =
+    "Options:\n" +
+    "  -h, --help            show this help message and exit\n" +
+    "  --heartbeats          Use heartbeats.\n" +
+    "  --sasl-mechanism=<mech>\n" +
+    "                        SASL mechanism for authentication (e.g. 
EXTERNAL,\n" +
+    "                        ANONYMOUS, PLAIN, CRAM-MD, DIGEST-MD5, GSSAPI). 
SASL\n" +
+    "                        automatically picks the most secure available\n" +
+    "                        mechanism - use this option to override.\n";
+
+    private final String _url;
+    private Console _console;
+
+    /**
+     * Basic constructor. Creates JMS Session, Initialises Destinations, 
Producers & Consumers and starts connection.
+     * @param url the connection URL.
+     * @param connectionOptions the options String to pass to ConnectionHelper.
+     */
+    public QpidPrintEvents(final String url, final String connectionOptions)
+    {
+        System.out.println("Connecting to " + url);
+        _url = url;
+        try
+        {
+            Connection connection = ConnectionHelper.createConnection(url, 
connectionOptions);        
+            _console = new Console(this);
+            _console.addConnection(connection);
+        }
+        catch (QmfException qmfe)
+        {
+            System.err.println ("QmfException " + qmfe.getMessage() + " caught 
in QpidPrintEvents constructor");
+        }
+    }
+
+    /**
+     * Checks if the WorkItem is an EventReceivedWorkItem and if it is 
extracts and renders the QmfEvent.
+     * @param wi a QMF2 WorkItem object
+     */
+    public void onEvent(final WorkItem wi)
+    {
+        if (wi instanceof EventReceivedWorkItem)
+        {
+            EventReceivedWorkItem item = (EventReceivedWorkItem)wi;
+            QmfEvent event = item.getEvent();
+            System.out.println(event + " broker=" + _url);
+        }
+    }
+
+    /**
+     * Runs QpidPrintEvents.
+     * @param args the command line arguments.
+     */
+    public static void main(final String[] args)
+    {
+        String logLevel = System.getProperty("amqj.logging.level");
+        logLevel = (logLevel == null) ? "FATAL" : logLevel; // Set default log 
level to FATAL rather than DEBUG.
+        System.setProperty("amqj.logging.level", logLevel);
+
+        String[] longOpts = {"help", "heartbeats", "sasl-mechanism="};
+        try
+        {
+            String connectionOptions = "{reconnect: true}";
+            GetOpt getopt = new GetOpt(args, "h", longOpts);
+            List<String[]> optList = getopt.getOptList();
+            String[] cargs = {};
+            cargs = getopt.getEncArgs().toArray(cargs);
+            for (String[] opt : optList)
+            {
+                if (opt[0].equals("-h") || opt[0].equals("--help"))
+                {
+                    System.out.println(_usage);
+                    System.out.println(_description);
+                    System.out.println(_options);
+                    System.exit(1);
+                }
+                else if (opt[0].equals("--heartbeats"))
+                {
+                    // Ignore Java uses heartbeats by default
+                }
+                else if (opt[0].equals("--sasl-mechanism"))
+                {
+                    connectionOptions = "{reconnect: true, sasl_mechs: " + 
opt[1] + "}";
+                }
+            }
+
+            int nargs = cargs.length;
+            if (nargs == 0)
+            {
+                cargs = new String[] {"localhost"};
+            }
+
+            for (String url : cargs)
+            {
+                QpidPrintEvents eventPrinter = new QpidPrintEvents(url, 
connectionOptions);
+            }
+        }
+        catch (IllegalArgumentException e)
+        {
+            System.out.println(_usage);
+            System.exit(1);
+        }
+
+        BufferedReader commandLine = new BufferedReader(new 
InputStreamReader(System.in));
+        try
+        { // Blocks here until return is pressed
+            System.out.println("Hit Return to exit");
+            String s = commandLine.readLine();
+            System.exit(0);
+        }
+        catch (IOException e)
+        {
+            System.out.println ("QpidPrintEvents main(): IOException: " + 
e.getMessage());
+        }
+    }
+}

Propchange: 
qpid/trunk/qpid/tools/src/java/src/main/java/org/apache/qpid/qmf2/tools/QpidPrintEvents.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: 
qpid/trunk/qpid/tools/src/java/src/main/java/org/apache/qpid/qmf2/tools/QpidPrintEvents.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: 
qpid/trunk/qpid/tools/src/java/src/main/java/org/apache/qpid/qmf2/tools/QpidQueueStats.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/tools/src/java/src/main/java/org/apache/qpid/qmf2/tools/QpidQueueStats.java?rev=1465662&view=auto
==============================================================================
--- 
qpid/trunk/qpid/tools/src/java/src/main/java/org/apache/qpid/qmf2/tools/QpidQueueStats.java
 (added)
+++ 
qpid/trunk/qpid/tools/src/java/src/main/java/org/apache/qpid/qmf2/tools/QpidQueueStats.java
 Mon Apr  8 15:19:04 2013
@@ -0,0 +1,374 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.qmf2.tools;
+
+// JMS Imports
+import javax.jms.Connection;
+
+// Misc Imports
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+// QMF2 Imports
+import org.apache.qpid.qmf2.common.ObjectId;
+import org.apache.qpid.qmf2.common.QmfEvent;
+import org.apache.qpid.qmf2.common.QmfEventListener;
+import org.apache.qpid.qmf2.common.QmfException;
+import org.apache.qpid.qmf2.common.QmfQuery;
+import org.apache.qpid.qmf2.common.QmfQueryTarget;
+import org.apache.qpid.qmf2.common.SchemaClassId;
+import org.apache.qpid.qmf2.common.WorkItem;
+import org.apache.qpid.qmf2.console.Agent;
+import org.apache.qpid.qmf2.console.AgentHeartbeatWorkItem;
+import org.apache.qpid.qmf2.console.AgentRestartedWorkItem;
+import org.apache.qpid.qmf2.console.Console;
+import org.apache.qpid.qmf2.console.QmfConsoleData;
+
+import org.apache.qpid.qmf2.console.SubscribeIndication;
+import org.apache.qpid.qmf2.console.SubscribeParams;
+import org.apache.qpid.qmf2.console.SubscriptionIndicationWorkItem;
+
+import org.apache.qpid.qmf2.util.ConnectionHelper;
+import org.apache.qpid.qmf2.util.GetOpt;
+
+/**
+ * Collect and print queue statistics.
+ * <pre>
+ * Usage: QpidQueueStats [options]
+ * 
+ * Options:
+ *   -h, --help            show this help message and exit
+ *   -a &lt;address&gt;, --broker-address=&lt;address&gt;
+ *                         broker-addr is in the form:  [username/password@]
+ *                         hostname | ip-address [:&lt;port&gt;]   ex:  
localhost,
+ *                         10.1.1.7:10000, broker-host:10000,
+ *                         guest/guest@localhost
+ *   -f &lt;filter&gt;, --filter=&lt;filter&gt;
+ *                         a list of comma separated queue names (regex are
+ *                         accepted) to show
+ *   --sasl-mechanism=&lt;mech&gt;
+ *                         SASL mechanism for authentication (e.g. EXTERNAL,
+ *                         ANONYMOUS, PLAIN, CRAM-MD, DIGEST-MD5, GSSAPI). SASL
+ *                         automatically picks the most secure available
+ *                         mechanism - use this option to override.
+ * </pre>
+ * @author Fraser Adams
+ */
+public final class QpidQueueStats implements QmfEventListener
+{
+    private final class Stats
+    {
+        private final String _name;
+        private QmfConsoleData _data;
+
+        public Stats(final String name, final QmfConsoleData data)
+        {
+            _name = name;
+            _data = data;
+        }
+
+        public String getName()
+        {
+            return _name;
+        }
+
+        public QmfConsoleData getData()
+        {
+            return _data;
+        }
+
+        public void setData(final QmfConsoleData data)
+        {
+            _data = data;
+        }
+    }
+
+    private static final String _usage =
+    "Usage: QpidQueueStats [options]\n";
+
+    private static final String _options =
+    "Options:\n" +
+    "  -h, --help            show this help message and exit\n" +
+    "  -a <address>, --broker-address=<address>\n" +
+    "                        broker-addr is in the form:  
[username/password@]\n" +
+    "                        hostname | ip-address [:<port>]   ex:  
localhost,\n" +
+    "                        10.1.1.7:10000, broker-host:10000,\n" +
+    "                        guest/guest@localhost\n" +
+    "  -f <filter>, --filter=<filter>\n" +
+    "                        a list of comma separated queue names (regex 
are\n" +
+    "                        accepted) to show\n" +
+    "  --sasl-mechanism=<mech>\n" +
+    "                        SASL mechanism for authentication (e.g. 
EXTERNAL,\n" +
+    "                        ANONYMOUS, PLAIN, CRAM-MD, DIGEST-MD5, GSSAPI). 
SASL\n" +
+    "                        automatically picks the most secure available\n" +
+    "                        mechanism - use this option to override.\n";
+
+    private final String _url;
+    private final List<Pattern> _filter;
+    private Agent _broker;
+    private Console _console;
+    private Map<ObjectId, Stats> _objects = new HashMap<ObjectId, Stats>();
+    private String _subscriptionId = null;
+    private long _subscriptionDuration;
+    private long _startTime;
+
+    /**
+     * Basic constructor. Creates JMS Session, Initialises Destinations, 
Producers & Consumers and starts connection.
+     * @param url the connection URL.
+     * @param connectionOptions the options String to pass to ConnectionHelper.
+     * @param filter a list of regex Patterns used to choose the queues we 
wish to display.
+     */
+    public QpidQueueStats(final String url, final String connectionOptions, 
final List<Pattern> filter)
+    {
+        System.out.println("Connecting to " + url);
+        if (filter.size() > 0)
+        {
+            System.out.println("Filter = " + filter);
+        }
+        _url = url;
+        _filter = filter;
+        try
+        {
+            Connection connection = ConnectionHelper.createConnection(url, 
connectionOptions);        
+            _console = new Console(this);
+            _console.addConnection(connection);
+
+            // Wait until the broker Agent has been discovered
+            _broker = _console.findAgent("broker");
+            if (_broker != null)
+            {
+                createQueueSubscription();
+            }
+
+            System.out.println("Hit Return to exit");
+            System.out.println(
+                "Queue Name                                          Sec       
Depth     Enq Rate     Deq Rate");
+            System.out.println(
+                
"=============================================================================================");
+        }
+        catch (QmfException qmfe)
+        {
+            System.err.println ("QmfException " + qmfe.getMessage() + " caught 
in QpidQueueStats constructor");
+        }
+    }
+
+    /**
+     * Create a Subscription to query for all queue objects
+     */
+    private void createQueueSubscription()
+    {
+        try
+        {   // This QmfQuery simply does an ID query for objects with the 
className "queue"
+            QmfQuery query = new QmfQuery(QmfQueryTarget.OBJECT, new 
SchemaClassId("queue"));
+            SubscribeParams params = _console.createSubscription(_broker, 
query, "queueStatsHandle");
+            _subscriptionId = params.getSubscriptionId();
+            _subscriptionDuration = params.getLifetime() - 10; // Subtract 10 
as we want to refresh before it times out
+            _startTime = System.currentTimeMillis();
+        }
+        catch (QmfException qmfe)
+        {
+        }
+    }
+
+    /**
+     * Main Event handler. Checks if the WorkItem is a 
SubscriptionIndicationWorkItem, if it is it stores the object
+     * in a Map and uses this to maintain state so we can record deltas such 
as enqueue and dequeue rates.
+     * <p>
+     * The AgentHeartbeatWorkItem is used to periodically compare the elapsed 
time against the Subscription duration
+     * so that we can refresh the Subscription (or create a new one if 
necessary) in order to continue receiving
+     * queue Management Object data from the broker.
+     * <p>
+     * When the AgentRestartedWorkItem is received we clear the state to 
remove any stale queue Management Objects.
+     * @param wi a QMF2 WorkItem object
+     */
+    public void onEvent(final WorkItem wi)
+    {
+        if (wi instanceof AgentHeartbeatWorkItem && _subscriptionId != null)
+        {
+            long elapsed = (long)Math.round((System.currentTimeMillis() - 
_startTime)/1000.0f);    
+            if (elapsed > _subscriptionDuration)
+            {
+                try
+                {
+                    _console.refreshSubscription(_subscriptionId);
+                    _startTime = System.currentTimeMillis();
+                }
+                catch (QmfException qmfe)
+                {
+                    System.err.println ("QmfException " + qmfe.getMessage() + 
" caught in QpidQueueStats onEvent");
+                   createQueueSubscription();
+                }
+            }
+        }
+        else if (wi instanceof AgentRestartedWorkItem)
+        {
+            _objects.clear();
+        }
+        else if (wi instanceof SubscriptionIndicationWorkItem)
+        {
+            SubscriptionIndicationWorkItem item = 
(SubscriptionIndicationWorkItem)wi;
+            SubscribeIndication indication = item.getSubscribeIndication();
+            String correlationId = indication.getConsoleHandle();
+            if (correlationId.equals("queueStatsHandle"))
+            { // If it is (and it should be!!) then it's our queue object 
Subscription
+                List<QmfConsoleData> data = indication.getData();
+                for (QmfConsoleData record : data)
+                {
+                    ObjectId id = record.getObjectId();
+                    if (record.isDeleted())
+                    { // If the object was deleted by the Agent we remove it 
from out Map
+                        _objects.remove(id);
+                    }
+                    else
+                    {
+                        if (_objects.containsKey(id))
+                        { // If the object is already in the Map it's likely 
to be a statistics push from the broker.
+                            Stats stats = _objects.get(id);
+                            String name = stats.getName();
+
+                            boolean matches = false;
+                            for (Pattern x : _filter)
+                            { // Check the queue name against the regexes in 
the filter List (if any)
+                                Matcher m = x.matcher(name);
+                                if (m.find())
+                                {
+                                    matches = true;
+                                    break;
+                                }
+                            }
+
+                            if (_filter.isEmpty() || matches)
+                            { // If there's no filter enabled or the filter 
matches the queue name we display statistics.
+                                QmfConsoleData lastSample = stats.getData();
+                                stats.setData(record);
+    
+                                float deltaTime = record.getUpdateTime() - 
lastSample.getUpdateTime();
+                                if (deltaTime > 1000000000.0f)
+                                {
+                                    float deltaEnqueues = 
record.getLongValue("msgTotalEnqueues") -
+                                                          
lastSample.getLongValue("msgTotalEnqueues");
+                                    float deltaDequeues = 
record.getLongValue("msgTotalDequeues") -
+                                                          
lastSample.getLongValue("msgTotalDequeues");
+                                    long msgDepth = 
record.getLongValue("msgDepth");
+                                    float enqueueRate = 
deltaEnqueues/(deltaTime/1000000000.0f);
+                                    float dequeueRate = 
deltaDequeues/(deltaTime/1000000000.0f);
+
+                                    
System.out.printf("%-46s%10.2f%11d%13.2f%13.2f\n", 
+                                                      name, 
deltaTime/1000000000, msgDepth, enqueueRate, dequeueRate);
+                                }
+                            }
+                        }
+                        else
+                        { // If the object isn't in the Map it's likely to be 
a properties push from the broker.
+                            if (!record.hasValue("name"))
+                            { // This probably won't happen, but if it does we 
refresh the object to get its full state.
+                                try
+                                {
+                                    record.refresh();
+                                }
+                                catch (QmfException qmfe)
+                                {  
+                                } 
+                            }
+                            String queueName = record.getStringValue("name");
+                            _objects.put(id, new Stats(queueName, record));
+                        }
+                    }
+                }
+            }
+        }
+    }
+
+    /**
+     * Runs QpidQueueStats.
+     * @param args the command line arguments.
+     */
+    public static void main(final String[] args)
+    {
+        String logLevel = System.getProperty("amqj.logging.level");
+        logLevel = (logLevel == null) ? "FATAL" : logLevel; // Set default log 
level to FATAL rather than DEBUG.
+        System.setProperty("amqj.logging.level", logLevel);
+
+        String[] longOpts = {"help", "broker-address=", "filter=", 
"sasl-mechanism="};
+        try
+        {
+            String host = "localhost";
+            String connectionOptions = "{reconnect: true}";
+            List<Pattern> filter = new ArrayList<Pattern>();
+            GetOpt getopt = new GetOpt(args, "ha:f:", longOpts);
+            List<String[]> optList = getopt.getOptList();
+
+            for (String[] opt : optList)
+            {
+                if (opt[0].equals("-h") || opt[0].equals("--help"))
+                {
+                    System.out.println(_usage);
+                    System.out.println(_options);
+                    System.exit(1);
+                }
+                else if (opt[0].equals("-a") || 
opt[0].equals("--broker-address"))
+                {
+                    host = opt[1];
+                }
+                else if (opt[0].equals("-f") || opt[0].equals("--filter"))
+                {
+                    String[] split = opt[1].split(",");
+                    for (String s : split)
+                    {
+                        Pattern p = Pattern.compile(s);
+                        filter.add(p);
+                    }
+                }
+                else if (opt[0].equals("--sasl-mechanism"))
+                {
+                    connectionOptions = "{reconnect: true, sasl_mechs: " + 
opt[1] + "}";
+                }
+            }
+
+            QpidQueueStats queueStats = new QpidQueueStats(host, 
connectionOptions, filter);
+        }
+        catch (IllegalArgumentException e)
+        {
+            System.out.println(_usage);
+            System.out.println(e.getMessage());
+            System.exit(1);
+        }
+
+        BufferedReader commandLine = new BufferedReader(new 
InputStreamReader(System.in));
+        try
+        { // Blocks here until return is pressed
+            String s = commandLine.readLine();
+            System.exit(0);
+        }
+        catch (IOException e)
+        {
+            System.out.println ("QpidQueueStats main(): IOException: " + 
e.getMessage());
+        }
+    }
+}

Propchange: 
qpid/trunk/qpid/tools/src/java/src/main/java/org/apache/qpid/qmf2/tools/QpidQueueStats.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: 
qpid/trunk/qpid/tools/src/java/src/main/java/org/apache/qpid/qmf2/tools/QpidQueueStats.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: 
qpid/trunk/qpid/tools/src/java/src/main/java/org/apache/qpid/qmf2/tools/QueueFuse.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/tools/src/java/src/main/java/org/apache/qpid/qmf2/tools/QueueFuse.java?rev=1465662&view=auto
==============================================================================
--- 
qpid/trunk/qpid/tools/src/java/src/main/java/org/apache/qpid/qmf2/tools/QueueFuse.java
 (added)
+++ 
qpid/trunk/qpid/tools/src/java/src/main/java/org/apache/qpid/qmf2/tools/QueueFuse.java
 Mon Apr  8 15:19:04 2013
@@ -0,0 +1,364 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.qmf2.tools;
+
+// JMS Imports
+import javax.jms.Connection;
+
+// Misc Imports
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+// QMF2 Imports
+import org.apache.qpid.qmf2.common.ObjectId;
+import org.apache.qpid.qmf2.common.QmfData;
+import org.apache.qpid.qmf2.common.QmfEvent;
+import org.apache.qpid.qmf2.common.QmfEventListener;
+import org.apache.qpid.qmf2.common.QmfException;
+import org.apache.qpid.qmf2.common.SchemaClassId;
+import org.apache.qpid.qmf2.common.WorkItem;
+import org.apache.qpid.qmf2.console.Agent;
+import org.apache.qpid.qmf2.console.Console;
+import org.apache.qpid.qmf2.console.EventReceivedWorkItem;
+import org.apache.qpid.qmf2.console.QmfConsoleData;
+
+import org.apache.qpid.qmf2.util.ConnectionHelper;
+import org.apache.qpid.qmf2.util.GetOpt;
+
+/**
+ * QueueFuse provides protection to message producers from consumers who can't 
consume messages fast enough.
+ * <p>
+ * With the default "reject" limit policy when a queue exceeds its capacity an 
exception is thrown to the
+ * producer. This behaviour is unfortunate, because if there happen to be 
multiple consumers consuming
+ * messages from a given producer it is possible for a single slow consumer to 
cause message flow to be
+ * stopped to <u>all</u> consumers, in other words a de-facto denial of 
service may take place.
+ * <p>
+ * In an Enterprise environment it is likely that this sort of behaviour is 
unwelcome, so QueueFuse makes it
+ * possible for queueThresholdExceeded Events to be detected and for the 
offending queues to have messages
+ * purged, thus protecting the other consumers by preventing an exception 
being thrown to the message producer.
+ * <p>
+ * The original intention with this class was to unbind bindings to queues 
that exceed the threshold. This method
+ * works, but it has a number of disadvantages. In particular there is no way 
to unbind from (and thus protect)
+ * queues bound to the default direct exchange, in addition in order to unbind 
it is necessary to retrieve
+ * binding and exchange information, both of which require further exchanges 
with the broker (which is not
+ * desirable as when the queueThresholdExceeded occurs we need to act pretty 
quickly). Finally as it happens
+ * it is also necessary to purge some messages after unbinding anyway as if 
this is not done the queue remains
+ * in the flowStopped state and producers will eventually time out and throw 
an exception if this is not cleared.
+ * So all in all simply purging each time we cross the threshold is simpler 
and has the additional advantage that
+ * if and when the consumer speeds up message delivery will eventually return 
to normal. 
+ * 
+ * <pre>
+ * Usage: QueueFuse [options] [broker-addr]...
+ * 
+ * Monitors one or more Qpid message brokers for queueThresholdExceeded Events.
+ *
+ * If a queueThresholdExceeded Event occurs messages are purged from the queue,
+ * in other words this class behaves rather like a fuse 'blowing' if the
+ * threshold gets exceeded.
+ *
+ * If no broker-addr is supplied, QueueFuse connects to 'localhost:5672'.
+ *
+ * [broker-addr] syntax:
+ *
+ * [username/password@] hostname
+ * ip-address [:&lt;port&gt;]
+ * 
+ * Examples:
+ * 
+ * $ QueueFuse localhost:5672
+ * $ QueueFuse 10.1.1.7:10000
+ * $ QueueFuse guest/guest@broker-host:10000
+ *
+ * Options:
+ *   -h, --help            show this help message and exit
+ *   -f &lt;filter&gt;, --filter=&lt;filter&gt;
+ *                         a list of comma separated queue names (regex are
+ *                         accepted) to protect (default is to protect all).
+ *   -p &lt;PERCENT&gt;, --purge=&lt;PERCENT&gt;\n" +
+ *                         The percentage of messages to purge when the 
queue\n" +
+ *                         threshold gets exceeded (default = 20%).\n" +
+ *                         N.B. if this gets set too low the fuse may not 
blow.\n" +
+ *   --sasl-mechanism=&lt;mech&gt;
+ *                         SASL mechanism for authentication (e.g. EXTERNAL,
+ *                         ANONYMOUS, PLAIN, CRAM-MD, DIGEST-MD5, GSSAPI). SASL
+ *                         automatically picks the most secure available
+ *                         mechanism - use this option to override.
+ * </pre>
+ * @author Fraser Adams
+ */
+public final class QueueFuse implements QmfEventListener
+{
+    private static final String _usage =
+    "Usage: QueueFuse [options] [broker-addr]...\n";
+
+    private static final String _description =
+    "Monitors one or more Qpid message brokers for queueThresholdExceeded 
Events.\n" +
+    "\n" +
+    "If a queueThresholdExceeded Event occurs messages are purged from the 
queue,\n" +
+    "in other words this class behaves rather like a fuse 'blowing' if the\n" +
+    "threshold gets exceeded.\n" +
+    "\n" +
+    "If no broker-addr is supplied, QueueFuse connects to 'localhost:5672'.\n" 
+
+    "\n" +
+    "[broker-addr] syntax:\n" +
+    "\n" +
+    "[username/password@] hostname\n" +
+    "ip-address [:<port>]\n" +
+    "\n" +
+    "Examples:\n" +
+    "\n" +
+    "$ QueueFuse localhost:5672\n" +
+    "$ QueueFuse 10.1.1.7:10000\n" +
+    "$ QueueFuse guest/guest@broker-host:10000\n";
+
+    private static final String _options =
+    "Options:\n" +
+    "  -h, --help            show this help message and exit\n" +
+    "  -f <filter>, --filter=<filter>\n" +
+    "                        a list of comma separated queue names (regex 
are\n" +
+    "                        accepted) to protect (default is to protect 
all).\n" +
+    "  -p <PERCENT>, --purge=<PERCENT>\n" +
+    "                        The percentage of messages to purge when the 
queue\n" +
+    "                        threshold gets exceeded (default = 20%).\n" +
+    "                        N.B. if this gets set too low the fuse may not 
blow.\n" +
+    "  --sasl-mechanism=<mech>\n" +
+    "                        SASL mechanism for authentication (e.g. 
EXTERNAL,\n" +
+    "                        ANONYMOUS, PLAIN, CRAM-MD, DIGEST-MD5, GSSAPI). 
SASL\n" +
+    "                        automatically picks the most secure available\n" +
+    "                        mechanism - use this option to override.\n";
+
+    private final String _url;
+    private final List<Pattern> _filter;
+    private final float _purge;
+    private Map<String, QmfConsoleData> _queueCache = new HashMap<String, 
QmfConsoleData>(50);
+    private Console _console;
+
+    /**
+     * Basic constructor. Creates JMS Session, Initialises Destinations, 
Producers & Consumers and starts connection.
+     * @param url the connection URL.
+     * @param connectionOptions the options String to pass to ConnectionHelper.
+     * @param filter a list of regex Patterns used to choose the queues we 
wish to protect.
+     * @param purge the ratio of messages that we wish to purge if the 
threshold gets exceeded.
+     */
+    public QueueFuse(final String url, final String connectionOptions, final 
List<Pattern> filter, final float purge)
+    {
+        System.out.println("QueueFuse Connecting to " + url);
+        if (filter.size() > 0)
+        {
+            System.out.println("Filter = " + filter);
+        }
+        _url = url;
+        _filter = filter;
+        _purge = purge;
+        try
+        {
+            Connection connection = ConnectionHelper.createConnection(url, 
connectionOptions);        
+            _console = new Console(this);
+            _console.addConnection(connection);
+            updateQueueCache();
+        }
+        catch (QmfException qmfe)
+        {
+            System.err.println("QmfException " + qmfe.getMessage() + " caught 
in QueueFuse constructor");
+        }
+    }
+
+    /**
+     * Looks up queue objects and stores them in _queueCache keyed by the 
queue name
+     */
+    private void updateQueueCache()
+    {
+        _queueCache.clear();
+        List<QmfConsoleData> queues = 
_console.getObjects("org.apache.qpid.broker", "queue");
+        for (QmfConsoleData queue : queues)
+        {
+            String queueName = queue.getStringValue("name");
+            _queueCache.put(queueName, queue);
+        }
+    }
+
+    /**
+     * Look up a queue object with the given name and if it's not a ring queue 
invoke the queue's purge method.
+     * @param queueName the name of the queue to purge
+     * @param msgDepth the number of messages on the queue, used to determine 
how many messages to purge.
+     */
+    private void purgeQueue(final String queueName, long msgDepth)
+    {
+        QmfConsoleData queue = _queueCache.get(queueName);
+
+        if (queue == null)
+        {
+            System.out.printf("%s ERROR QueueFuse.disconnectQueue() %s 
reference couldn't be found\n",
+                              new Date().toString(), queueName);
+        }
+        else
+        { // If we've found a queue called queueName we then find the bindings 
that reference it.
+
+            Map args = (Map)queue.getValue("arguments");
+            String policyType = (String)args.get("qpid.policy_type");
+            if (policyType != null && policyType.equals("ring"))
+            {  // If qpid.policy_type=ring we return.
+                return;
+            }
+
+            try
+            {
+                QmfData arguments = new QmfData();
+                arguments.setValue("request", (long)(_purge*msgDepth));
+                queue.invokeMethod("purge", arguments);
+            }
+            catch (QmfException e)
+            {
+                System.out.println(e.getMessage());
+            }    
+        }
+    }
+
+    /**
+     * Main Event handler.
+     * @param wi a QMF2 WorkItem object
+     */
+    public void onEvent(final WorkItem wi)
+    {
+        if (wi instanceof EventReceivedWorkItem)
+        {
+            EventReceivedWorkItem item = (EventReceivedWorkItem)wi;
+            Agent agent = item.getAgent();
+            QmfEvent event = item.getEvent();
+            String className = event.getSchemaClassId().getClassName();
+
+            if (className.equals("queueDeclare"))
+            {
+                updateQueueCache();
+            }
+            else if (className.equals("queueThresholdExceeded"))
+            {
+                String queueName = event.getStringValue("qName");
+                boolean matches = false;
+                for (Pattern x : _filter)
+                { // Check the queue name against the regexes in the filter 
List (if any)
+                    Matcher m = x.matcher(queueName);
+                    if (m.find())
+                    {
+                        matches = true;
+                        break;
+                    }
+                }
+
+                if (_filter.isEmpty() || matches)
+                { // If there's no filter enabled or the filter matches the 
queue name we call purgeQueue().
+                    long msgDepth = event.getLongValue("msgDepth");
+                    purgeQueue(queueName, msgDepth);
+                }
+            }
+        }
+    }
+
+    /**
+     * Runs QueueFuse.
+     * @param args the command line arguments.
+     */
+    public static void main(final String[] args)
+    {
+        String logLevel = System.getProperty("amqj.logging.level");
+        logLevel = (logLevel == null) ? "FATAL" : logLevel; // Set default log 
level to FATAL rather than DEBUG.
+        System.setProperty("amqj.logging.level", logLevel);
+
+        String[] longOpts = {"help", "filter=", "purge=", "sasl-mechanism="};
+        try
+        {
+            boolean includeRingQueues = false;
+            String connectionOptions = "{reconnect: true}";
+            List<Pattern> filter = new ArrayList<Pattern>();
+            float purge = 0.2f;
+            GetOpt getopt = new GetOpt(args, "hf:p:", longOpts);
+            List<String[]> optList = getopt.getOptList();
+            String[] cargs = {};
+            cargs = getopt.getEncArgs().toArray(cargs);
+            for (String[] opt : optList)
+            {
+                if (opt[0].equals("-h") || opt[0].equals("--help"))
+                {
+                    System.out.println(_usage);
+                    System.out.println(_description);
+                    System.out.println(_options);
+                    System.exit(1);
+                }
+                else if (opt[0].equals("-f") || opt[0].equals("--filter"))
+                {
+                    String[] split = opt[1].split(",");
+                    for (String s : split)
+                    {
+                        Pattern p = Pattern.compile(s);
+                        filter.add(p);
+                    }
+                }
+                else if (opt[0].equals("-p") || opt[0].equals("--purge"))
+                {
+                    int percent = Integer.parseInt(opt[1]);
+                    if (percent < 0 || percent > 100)
+                    {
+                        System.out.println(_usage);
+                        System.exit(1);
+                    }
+                    purge = percent/100.0f;
+                }
+                else if (opt[0].equals("--sasl-mechanism"))
+                {
+                    connectionOptions = "{reconnect: true, sasl_mechs: " + 
opt[1] + "}";
+                }
+            }
+
+            int nargs = cargs.length;
+            if (nargs == 0)
+            {
+                cargs = new String[] {"localhost"};
+            }
+
+            for (String url : cargs)
+            {
+                QueueFuse queueFuse = new QueueFuse(url, connectionOptions, 
filter, purge);
+            }
+        }
+        catch (IllegalArgumentException e)
+        {
+            System.out.println(_usage);
+            System.out.println(e.getMessage());
+            System.exit(1);
+        }
+
+        try
+        {   // Block here
+            Thread.currentThread().join();
+        }
+        catch (InterruptedException ie)
+        {
+        }
+    }
+}

Propchange: 
qpid/trunk/qpid/tools/src/java/src/main/java/org/apache/qpid/qmf2/tools/QueueFuse.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: 
qpid/trunk/qpid/tools/src/java/src/main/java/org/apache/qpid/qmf2/tools/QueueFuse.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to