Added:
qpid/trunk/qpid/tools/src/java/src/main/java/org/apache/qpid/qmf2/tools/QpidCtrl.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/tools/src/java/src/main/java/org/apache/qpid/qmf2/tools/QpidCtrl.java?rev=1465662&view=auto
==============================================================================
---
qpid/trunk/qpid/tools/src/java/src/main/java/org/apache/qpid/qmf2/tools/QpidCtrl.java
(added)
+++
qpid/trunk/qpid/tools/src/java/src/main/java/org/apache/qpid/qmf2/tools/QpidCtrl.java
Mon Apr 8 15:19:04 2013
@@ -0,0 +1,358 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.qmf2.tools;
+
+// JMS Imports
+import javax.jms.Connection;
+
+// Misc Imports
+import java.util.Arrays;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+// QMF2 Imports
+import org.apache.qpid.qmf2.common.ObjectId;
+import org.apache.qpid.qmf2.common.QmfData;
+import org.apache.qpid.qmf2.common.QmfException;
+import org.apache.qpid.qmf2.console.Agent;
+import org.apache.qpid.qmf2.console.Console;
+import org.apache.qpid.qmf2.console.MethodResult;
+import org.apache.qpid.qmf2.console.QmfConsoleData;
+
+import org.apache.qpid.qmf2.util.ConnectionHelper;
+import org.apache.qpid.qmf2.util.GetOpt;
+
+// Reuse this class as it provides a handy mechanism to parse an args String
into a Map
+import org.apache.qpid.messaging.util.AddressParser;
+
+/**
+ * A tool to allow QMF2 methods to be invoked from the command line.
+ * <pre>
+ * Usage: QpidCtrl [options] command [args]
+ * The args need to be in a Stringified Map format (similar to an Address
String)
+ * e.g. to set broker log level: QpidCtrl setLogLevel
"{level:\"debug+:Broker\"}"
+ * The listValues command lists property names and values of the specified
object.
+ * The listObjects command lists all objects of the specified package and
class.
+ *
+ * Options:
+ * -h, --help show this help message and exit
+ * -v enable logging
+ * -a <address>, --broker-address=<address>
+ * broker-addr is in the form: [username/password@]
+ * hostname | ip-address [:<port>] ex:
localhost,
+ * 10.1.1.7:10000, broker-host:10000,
+ * guest/guest@localhost
+ * -c <class>, --class=<class>
+ * class of object on which command is being invoked
+ * (default broker)
+ * -p <package>, --package=<package>
+ * package of object on which command is being invoked
+ * (default org.apache.qpid.broker)
+ * -i <id>, --id=<id> identifier of object on which command
is being invoked
+ * (default amqp-broker)
+ * --agent=<agent name>
+ * The name of the Agent to which commands will be sent
+ * This will try to match <agent name> against
the Agent name
+ * the Agent product name and will also check if the
Agent name
+ * contains the <agent name> String
+ * (default qpidd)
+ * --sasl-mechanism=<mech>
+ * SASL mechanism for authentication (e.g. EXTERNAL,
+ * ANONYMOUS, PLAIN, CRAM-MD, DIGEST-MD5, GSSAPI). SASL
+ * automatically picks the most secure available
+ * mechanism - use this option to override.
+ * </pre>
+ * Examples (Note the quotes and escaped quotes are significant!):
+ * <p>
+ * Get the current broker log level:
+ * <pre>QpidCtrl getLogLevel</pre>
+ *
+ * Set the current broker log level to notice+:
+ * <pre>QpidCtrl setLogLevel "{level:\"notice+\"}"</pre>
+ *
+ * Set the current broker log level to debug+ for all Management Objects:
+ * <pre>QpidCtrl setLogLevel "{level:\"debug+\"}"</pre>
+ *
+ * Set the current broker log level to debug+ for just the Broker Management
Object:
+ * <pre>QpidCtrl setLogLevel "{level:\"debug+:Broker\"}"</pre>
+ *
+ * List the properties of the qmf.default.direct exchange:
+ * <pre>QpidCtrl -c exchange -i qmf.default.direct listValues</pre>
+ *
+ * Create a queue called test with a flow-to-disk limit policy:
+ * <pre>QpidCtrl create
"{type:queue,name:test,properties:{'qpid.policy_type':ring}}"</pre>
+ *
+ * Delete a queue called test:
+ * <pre>QpidCtrl delete "{type:queue,name:test}"</pre>
+ *
+ * Create a binding called bind1 between the amq.match exchange and the test
queue matching the headers name=fadams
+ * and gender=male:
+ * <pre>QpidCtrl create
"{type:binding,name:'amq.match/test/bind1',properties:{x-match:all,name:fadams,gender:male}}"</pre>
+ *
+ * Delete the binding called bind1 between the amq.match exchange and the test
queue:
+ * <pre>QpidCtrl delete "{type:binding,name:'amq.match/test/bind1'}"</pre>
+ *
+ * Get the broker to echo a message:
+ * <pre>QpidCtrl echo "{sequence:1234,body:'Peaches En Regalia'}"</pre>
+ *
+ * Invoke the event method on the gizmo Agent (launch gizmo Agent via
AgentTest):
+ * <pre>QpidCtrl -p com.profitron.gizmo -c control -i OPERATIONAL
--agent=gizmo event</pre>
+ *
+ * Invoke the create_child method on the gizmo Agent (launch gizmo Agent via
AgentTest):
+ * <pre>QpidCtrl -p com.profitron.gizmo -c control -i OPERATIONAL
--agent=gizmo create_child "{name:monkeyBoy}"</pre>
+ *
+ * Invoke the stop method on the gizmo Agent (launch gizmo Agent via
AgentTest):
+ * <pre>QpidCtrl -p com.profitron.gizmo -c control -i OPERATIONAL
--agent=gizmo stop "{message:'Will I dream?'}"</pre>
+ *
+ * @author Fraser Adams
+ */
+public final class QpidCtrl
+{
+ private static final String _usage =
+ "Usage: QpidCtrl [options] command [args]\n" +
+ "The args need to be in a Stringified Map format (similar to an Address
String)\n" +
+ "e.g. to set broker log level: QpidCtrl setLogLevel
\"{level:\\\"debug+:Broker\\\"}\"\n" +
+ "The listValues command lists property names and values of the specified
object.\n" +
+ "The listObjects command lists all objects of the specified package and
class.\n";
+
+ private static final String _options =
+ "Options:\n" +
+ " -h, --help show this help message and exit\n" +
+ " -v enable logging\n" +
+ " -a <address>, --broker-address=<address>\n" +
+ " broker-addr is in the form:
[username/password@]\n" +
+ " hostname | ip-address [:<port>] ex:
localhost,\n" +
+ " 10.1.1.7:10000, broker-host:10000,\n" +
+ " guest/guest@localhost\n" +
+ " -c <class>, --class=<class>\n" +
+ " class of object on which command is being
invoked\n" +
+ " (default broker)\n" +
+ " -p <package>, --package=<package>\n" +
+ " package of object on which command is being
invoked\n" +
+ " (default org.apache.qpid.broker)\n" +
+ " -i <id>, --id=<id> identifier of object on which command is being
invoked\n" +
+ " (default amqp-broker)\n" +
+ " --agent=<agent name>\n" +
+ " The name of the Agent to which commands will be
sent\n" +
+ " This will try to match <agent name> against the
Agent name,\n" +
+ " the Agent product name and will also check if the
Agent name\n" +
+ " contains the <agent name> String\n" +
+ " (default qpidd)\n" +
+ " --sasl-mechanism=<mech>\n" +
+ " SASL mechanism for authentication (e.g.
EXTERNAL,\n" +
+ " ANONYMOUS, PLAIN, CRAM-MD, DIGEST-MD5, GSSAPI).
SASL\n" +
+ " automatically picks the most secure available\n" +
+ " mechanism - use this option to override.\n";
+
+ private Console _console;
+
+ /**
+ * Basic constructor. Creates JMS Session, Initialises Destinations,
Producers & Consumers and starts connection.
+ * @param url the Connection URL.
+ * @param connectionOptions the connection options String to pass to
ConnectionHelper.
+ * @param pkg the package name of the object we're invoking the method on.
+ * @param cls the class name of the object we're invoking the method on.
+ * @param id the ObjectId name of the object we're invoking the method on.
+ * @param agentName the name of the Agent to invoke the QMF method on.
+ * @param command the QMF method we're invoking.
+ * @param args the Stringified Map form of the method arguments.
+ */
+ public QpidCtrl(final String url, final String connectionOptions, final
String pkg, final String cls,
+ final String id, final String agentName, final String
command, final String args)
+ {
+ try
+ {
+ Connection connection = ConnectionHelper.createConnection(url,
connectionOptions);
+ _console = new Console();
+ _console.addConnection(connection);
+
+ // Find the specified Agent
+ Agent agent = _console.findAgent(agentName);
+ if (agent == null)
+ {
+ System.out.println("Agent " + agentName + " not found");
+ System.exit(1);
+ }
+
+ List<Agent> agentList = Arrays.asList(new Agent[] {agent});
+ List<QmfConsoleData> objects = _console.getObjects(pkg, cls,
agentList);
+
+ // Parse the args String
+ QmfData inArgs = (args == null) ? new QmfData() : new QmfData(new
AddressParser(args).map());
+
+ // Find the required QmfConsoleData object and invoke the
specified command
+ MethodResult results = null;
+ for (QmfConsoleData object : objects)
+ {
+ String objectName = object.getObjectId().getObjectName();
+ if (command.equals("listObjects"))
+ {
+ System.out.println(objectName);
+ }
+ else
+ {
+ if (objectName.contains(id))
+ { // Use contains as ObjectNames may comprise other
identifiers tha make using equals impractical
+ if (command.equals("listValues"))
+ {
+ object.listValues();
+ System.exit(1);
+ }
+ else
+ {
+ results = object.invokeMethod(command, inArgs);
+ }
+ break;
+ }
+ }
+ }
+
+ if (results == null)
+ {
+ if (objects.size() == 0)
+ {
+ System.out.println("getObjects(" + pkg + ", " + cls + ", "
+ agentName + ") returned no objects.");
+ }
+ else
+ {
+ System.out.println("Id " + id + " not found in " + pkg +
":" + cls);
+ }
+ }
+ else
+ {
+ if (results.succeeded())
+ {
+ results.listValues();
+ }
+ else
+ {
+ System.err.println ("QmfException " +
results.getQmfException().getMessage() +
+ " returned from " + command + "
method");
+ }
+ }
+ }
+ catch (QmfException qmfe)
+ {
+ System.err.println ("QmfException " + qmfe.getMessage() + " caught
in QpidCtrl constructor");
+ }
+ }
+
+ /**
+ * Runs QpidCtrl.
+ * @param args the command line arguments.
+ */
+ public static void main(final String[] args)
+ {
+ String logLevel = System.getProperty("amqj.logging.level");
+ logLevel = (logLevel == null) ? "FATAL" : logLevel; // Set default log
level to FATAL rather than DEBUG.
+ System.setProperty("amqj.logging.level", logLevel);
+
+ // As of Qpid 0.16 the Session Dispatcher Thread is non-Daemon so the
JVM gets prevented from exiting.
+ // Setting the following property to true makes it a Daemon Thread.
+ System.setProperty("qpid.jms.daemon.dispatcher", "true");
+
+ String[] longOpts = {"help", "broker-address=", "class=", "package=",
"id=", "agent=", "sasl-mechanism="};
+ try
+ {
+ String host = "localhost";
+ String connectionOptions = "{reconnect: true}";
+ String cls = "broker";
+ String pkg = "org.apache.qpid.broker";
+ String id = "amqp-broker";
+ String agentName = "qpidd";
+ String command = null;
+ String arg = null;
+
+ GetOpt getopt = new GetOpt(args, "ha:c:p:i:v", longOpts);
+ List<String[]> optList = getopt.getOptList();
+ String[] cargs = {};
+ cargs = getopt.getEncArgs().toArray(cargs);
+
+ for (String[] opt : optList)
+ {
+ if (opt[0].equals("-h") || opt[0].equals("--help"))
+ {
+ System.out.println(_usage);
+ System.out.println(_options);
+ System.exit(1);
+ }
+ else if (opt[0].equals("-a") ||
opt[0].equals("--broker-address"))
+ {
+ host = opt[1];
+ }
+ else if (opt[0].equals("-c") || opt[0].equals("--class"))
+ {
+ cls = opt[1];
+ }
+ else if (opt[0].equals("-p") || opt[0].equals("--package"))
+ {
+ pkg = opt[1];
+ }
+ else if (opt[0].equals("-i") || opt[0].equals("--id"))
+ {
+ id = opt[1];
+ }
+ else if (opt[0].equals("--agent"))
+ {
+ agentName = opt[1];
+ }
+ else if (opt[0].equals("-v"))
+ {
+ System.setProperty("amqj.logging.level", "DEBUG");
+ }
+ else if (opt[0].equals("--sasl-mechanism"))
+ {
+ connectionOptions = "{reconnect: true, sasl_mechs: " +
opt[1] + "}";
+ }
+ }
+
+ if (cargs.length < 1 || cargs.length > 2)
+ {
+ System.out.println(Arrays.asList(cargs));
+ System.out.println(_usage);
+ System.exit(1);
+ }
+
+ command = cargs[0];
+
+ if (cargs.length == 2)
+ {
+ arg = cargs[1];
+ if (!arg.startsWith("{") || !arg.endsWith("}"))
+ {
+ System.out.println("Incorrect format for args.");
+ System.out.println("This needs to be in a Stringified Map
format similar to an Address String");
+ System.exit(1);
+ }
+ }
+
+ QpidCtrl qpidCtrl = new QpidCtrl(host, connectionOptions, pkg,
cls, id, agentName, command, arg);
+ }
+ catch (IllegalArgumentException e)
+ {
+ System.out.println(_usage);
+ System.out.println(e.getMessage());
+ System.exit(1);
+ }
+ }
+}
Propchange:
qpid/trunk/qpid/tools/src/java/src/main/java/org/apache/qpid/qmf2/tools/QpidCtrl.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
qpid/trunk/qpid/tools/src/java/src/main/java/org/apache/qpid/qmf2/tools/QpidCtrl.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added:
qpid/trunk/qpid/tools/src/java/src/main/java/org/apache/qpid/qmf2/tools/QpidPrintEvents.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/tools/src/java/src/main/java/org/apache/qpid/qmf2/tools/QpidPrintEvents.java?rev=1465662&view=auto
==============================================================================
---
qpid/trunk/qpid/tools/src/java/src/main/java/org/apache/qpid/qmf2/tools/QpidPrintEvents.java
(added)
+++
qpid/trunk/qpid/tools/src/java/src/main/java/org/apache/qpid/qmf2/tools/QpidPrintEvents.java
Mon Apr 8 15:19:04 2013
@@ -0,0 +1,210 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.qmf2.tools;
+
+// JMS Imports
+import javax.jms.Connection;
+
+// Misc Imports
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.io.IOException;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+
+// QMF2 Imports
+import org.apache.qpid.qmf2.common.ObjectId;
+import org.apache.qpid.qmf2.common.QmfEvent;
+import org.apache.qpid.qmf2.common.QmfEventListener;
+import org.apache.qpid.qmf2.common.QmfException;
+import org.apache.qpid.qmf2.common.WorkItem;
+import org.apache.qpid.qmf2.console.Agent;
+import org.apache.qpid.qmf2.console.Console;
+import org.apache.qpid.qmf2.console.EventReceivedWorkItem;
+import org.apache.qpid.qmf2.console.QmfConsoleData;
+import org.apache.qpid.qmf2.util.ConnectionHelper;
+import org.apache.qpid.qmf2.util.GetOpt;
+import static org.apache.qpid.qmf2.common.WorkItem.WorkItemType.*;
+
+/**
+ * Collect and print events from one or more Qpid message brokers.
+ * <pre>
+ * If no broker-addr is supplied, QpidPrintEvents connects to 'localhost:5672'.
+ *
+ * [broker-addr] syntax:
+ *
+ * [username/password@] hostname
+ * ip-address [:<port>]
+ *
+ * Examples:
+ *
+ * $ QpidPrintEvents localhost:5672
+ * $ QpidPrintEvents 10.1.1.7:10000
+ * $ QpidPrintEvents guest/guest@broker-host:10000
+ *
+ * Options:
+ * -h, --help show this help message and exit
+ * --heartbeats Use heartbeats.
+ * --sasl-mechanism=<mech>
+ * SASL mechanism for authentication (e.g. EXTERNAL,
+ * ANONYMOUS, PLAIN, CRAM-MD, DIGEST-MD5, GSSAPI). SASL
+ * automatically picks the most secure available
+ * mechanism - use this option to override.
+ * </pre>
+ * @author Fraser Adams
+ */
+public final class QpidPrintEvents implements QmfEventListener
+{
+ private static final String _usage =
+ "Usage: QpidPrintEvents [options] [broker-addr]...\n";
+
+ private static final String _description =
+ "Collect and print events from one or more Qpid message brokers.\n" +
+ "\n" +
+ "If no broker-addr is supplied, QpidPrintEvents connects to
'localhost:5672'.\n" +
+ "\n" +
+ "[broker-addr] syntax:\n" +
+ "\n" +
+ "[username/password@] hostname\n" +
+ "ip-address [:<port>]\n" +
+ "\n" +
+ "Examples:\n" +
+ "\n" +
+ "$ QpidPrintEvents localhost:5672\n" +
+ "$ QpidPrintEvents 10.1.1.7:10000\n" +
+ "$ QpidPrintEvents guest/guest@broker-host:10000\n";
+
+ private static final String _options =
+ "Options:\n" +
+ " -h, --help show this help message and exit\n" +
+ " --heartbeats Use heartbeats.\n" +
+ " --sasl-mechanism=<mech>\n" +
+ " SASL mechanism for authentication (e.g.
EXTERNAL,\n" +
+ " ANONYMOUS, PLAIN, CRAM-MD, DIGEST-MD5, GSSAPI).
SASL\n" +
+ " automatically picks the most secure available\n" +
+ " mechanism - use this option to override.\n";
+
+ private final String _url;
+ private Console _console;
+
+ /**
+ * Basic constructor. Creates JMS Session, Initialises Destinations,
Producers & Consumers and starts connection.
+ * @param url the connection URL.
+ * @param connectionOptions the options String to pass to ConnectionHelper.
+ */
+ public QpidPrintEvents(final String url, final String connectionOptions)
+ {
+ System.out.println("Connecting to " + url);
+ _url = url;
+ try
+ {
+ Connection connection = ConnectionHelper.createConnection(url,
connectionOptions);
+ _console = new Console(this);
+ _console.addConnection(connection);
+ }
+ catch (QmfException qmfe)
+ {
+ System.err.println ("QmfException " + qmfe.getMessage() + " caught
in QpidPrintEvents constructor");
+ }
+ }
+
+ /**
+ * Checks if the WorkItem is an EventReceivedWorkItem and if it is
extracts and renders the QmfEvent.
+ * @param wi a QMF2 WorkItem object
+ */
+ public void onEvent(final WorkItem wi)
+ {
+ if (wi instanceof EventReceivedWorkItem)
+ {
+ EventReceivedWorkItem item = (EventReceivedWorkItem)wi;
+ QmfEvent event = item.getEvent();
+ System.out.println(event + " broker=" + _url);
+ }
+ }
+
+ /**
+ * Runs QpidPrintEvents.
+ * @param args the command line arguments.
+ */
+ public static void main(final String[] args)
+ {
+ String logLevel = System.getProperty("amqj.logging.level");
+ logLevel = (logLevel == null) ? "FATAL" : logLevel; // Set default log
level to FATAL rather than DEBUG.
+ System.setProperty("amqj.logging.level", logLevel);
+
+ String[] longOpts = {"help", "heartbeats", "sasl-mechanism="};
+ try
+ {
+ String connectionOptions = "{reconnect: true}";
+ GetOpt getopt = new GetOpt(args, "h", longOpts);
+ List<String[]> optList = getopt.getOptList();
+ String[] cargs = {};
+ cargs = getopt.getEncArgs().toArray(cargs);
+ for (String[] opt : optList)
+ {
+ if (opt[0].equals("-h") || opt[0].equals("--help"))
+ {
+ System.out.println(_usage);
+ System.out.println(_description);
+ System.out.println(_options);
+ System.exit(1);
+ }
+ else if (opt[0].equals("--heartbeats"))
+ {
+ // Ignore Java uses heartbeats by default
+ }
+ else if (opt[0].equals("--sasl-mechanism"))
+ {
+ connectionOptions = "{reconnect: true, sasl_mechs: " +
opt[1] + "}";
+ }
+ }
+
+ int nargs = cargs.length;
+ if (nargs == 0)
+ {
+ cargs = new String[] {"localhost"};
+ }
+
+ for (String url : cargs)
+ {
+ QpidPrintEvents eventPrinter = new QpidPrintEvents(url,
connectionOptions);
+ }
+ }
+ catch (IllegalArgumentException e)
+ {
+ System.out.println(_usage);
+ System.exit(1);
+ }
+
+ BufferedReader commandLine = new BufferedReader(new
InputStreamReader(System.in));
+ try
+ { // Blocks here until return is pressed
+ System.out.println("Hit Return to exit");
+ String s = commandLine.readLine();
+ System.exit(0);
+ }
+ catch (IOException e)
+ {
+ System.out.println ("QpidPrintEvents main(): IOException: " +
e.getMessage());
+ }
+ }
+}
Propchange:
qpid/trunk/qpid/tools/src/java/src/main/java/org/apache/qpid/qmf2/tools/QpidPrintEvents.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
qpid/trunk/qpid/tools/src/java/src/main/java/org/apache/qpid/qmf2/tools/QpidPrintEvents.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added:
qpid/trunk/qpid/tools/src/java/src/main/java/org/apache/qpid/qmf2/tools/QpidQueueStats.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/tools/src/java/src/main/java/org/apache/qpid/qmf2/tools/QpidQueueStats.java?rev=1465662&view=auto
==============================================================================
---
qpid/trunk/qpid/tools/src/java/src/main/java/org/apache/qpid/qmf2/tools/QpidQueueStats.java
(added)
+++
qpid/trunk/qpid/tools/src/java/src/main/java/org/apache/qpid/qmf2/tools/QpidQueueStats.java
Mon Apr 8 15:19:04 2013
@@ -0,0 +1,374 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.qmf2.tools;
+
+// JMS Imports
+import javax.jms.Connection;
+
+// Misc Imports
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+// QMF2 Imports
+import org.apache.qpid.qmf2.common.ObjectId;
+import org.apache.qpid.qmf2.common.QmfEvent;
+import org.apache.qpid.qmf2.common.QmfEventListener;
+import org.apache.qpid.qmf2.common.QmfException;
+import org.apache.qpid.qmf2.common.QmfQuery;
+import org.apache.qpid.qmf2.common.QmfQueryTarget;
+import org.apache.qpid.qmf2.common.SchemaClassId;
+import org.apache.qpid.qmf2.common.WorkItem;
+import org.apache.qpid.qmf2.console.Agent;
+import org.apache.qpid.qmf2.console.AgentHeartbeatWorkItem;
+import org.apache.qpid.qmf2.console.AgentRestartedWorkItem;
+import org.apache.qpid.qmf2.console.Console;
+import org.apache.qpid.qmf2.console.QmfConsoleData;
+
+import org.apache.qpid.qmf2.console.SubscribeIndication;
+import org.apache.qpid.qmf2.console.SubscribeParams;
+import org.apache.qpid.qmf2.console.SubscriptionIndicationWorkItem;
+
+import org.apache.qpid.qmf2.util.ConnectionHelper;
+import org.apache.qpid.qmf2.util.GetOpt;
+
+/**
+ * Collect and print queue statistics.
+ * <pre>
+ * Usage: QpidQueueStats [options]
+ *
+ * Options:
+ * -h, --help show this help message and exit
+ * -a <address>, --broker-address=<address>
+ * broker-addr is in the form: [username/password@]
+ * hostname | ip-address [:<port>] ex:
localhost,
+ * 10.1.1.7:10000, broker-host:10000,
+ * guest/guest@localhost
+ * -f <filter>, --filter=<filter>
+ * a list of comma separated queue names (regex are
+ * accepted) to show
+ * --sasl-mechanism=<mech>
+ * SASL mechanism for authentication (e.g. EXTERNAL,
+ * ANONYMOUS, PLAIN, CRAM-MD, DIGEST-MD5, GSSAPI). SASL
+ * automatically picks the most secure available
+ * mechanism - use this option to override.
+ * </pre>
+ * @author Fraser Adams
+ */
+public final class QpidQueueStats implements QmfEventListener
+{
+ private final class Stats
+ {
+ private final String _name;
+ private QmfConsoleData _data;
+
+ public Stats(final String name, final QmfConsoleData data)
+ {
+ _name = name;
+ _data = data;
+ }
+
+ public String getName()
+ {
+ return _name;
+ }
+
+ public QmfConsoleData getData()
+ {
+ return _data;
+ }
+
+ public void setData(final QmfConsoleData data)
+ {
+ _data = data;
+ }
+ }
+
+ private static final String _usage =
+ "Usage: QpidQueueStats [options]\n";
+
+ private static final String _options =
+ "Options:\n" +
+ " -h, --help show this help message and exit\n" +
+ " -a <address>, --broker-address=<address>\n" +
+ " broker-addr is in the form:
[username/password@]\n" +
+ " hostname | ip-address [:<port>] ex:
localhost,\n" +
+ " 10.1.1.7:10000, broker-host:10000,\n" +
+ " guest/guest@localhost\n" +
+ " -f <filter>, --filter=<filter>\n" +
+ " a list of comma separated queue names (regex
are\n" +
+ " accepted) to show\n" +
+ " --sasl-mechanism=<mech>\n" +
+ " SASL mechanism for authentication (e.g.
EXTERNAL,\n" +
+ " ANONYMOUS, PLAIN, CRAM-MD, DIGEST-MD5, GSSAPI).
SASL\n" +
+ " automatically picks the most secure available\n" +
+ " mechanism - use this option to override.\n";
+
+ private final String _url;
+ private final List<Pattern> _filter;
+ private Agent _broker;
+ private Console _console;
+ private Map<ObjectId, Stats> _objects = new HashMap<ObjectId, Stats>();
+ private String _subscriptionId = null;
+ private long _subscriptionDuration;
+ private long _startTime;
+
+ /**
+ * Basic constructor. Creates JMS Session, Initialises Destinations,
Producers & Consumers and starts connection.
+ * @param url the connection URL.
+ * @param connectionOptions the options String to pass to ConnectionHelper.
+ * @param filter a list of regex Patterns used to choose the queues we
wish to display.
+ */
+ public QpidQueueStats(final String url, final String connectionOptions,
final List<Pattern> filter)
+ {
+ System.out.println("Connecting to " + url);
+ if (filter.size() > 0)
+ {
+ System.out.println("Filter = " + filter);
+ }
+ _url = url;
+ _filter = filter;
+ try
+ {
+ Connection connection = ConnectionHelper.createConnection(url,
connectionOptions);
+ _console = new Console(this);
+ _console.addConnection(connection);
+
+ // Wait until the broker Agent has been discovered
+ _broker = _console.findAgent("broker");
+ if (_broker != null)
+ {
+ createQueueSubscription();
+ }
+
+ System.out.println("Hit Return to exit");
+ System.out.println(
+ "Queue Name Sec
Depth Enq Rate Deq Rate");
+ System.out.println(
+
"=============================================================================================");
+ }
+ catch (QmfException qmfe)
+ {
+ System.err.println ("QmfException " + qmfe.getMessage() + " caught
in QpidQueueStats constructor");
+ }
+ }
+
+ /**
+ * Create a Subscription to query for all queue objects
+ */
+ private void createQueueSubscription()
+ {
+ try
+ { // This QmfQuery simply does an ID query for objects with the
className "queue"
+ QmfQuery query = new QmfQuery(QmfQueryTarget.OBJECT, new
SchemaClassId("queue"));
+ SubscribeParams params = _console.createSubscription(_broker,
query, "queueStatsHandle");
+ _subscriptionId = params.getSubscriptionId();
+ _subscriptionDuration = params.getLifetime() - 10; // Subtract 10
as we want to refresh before it times out
+ _startTime = System.currentTimeMillis();
+ }
+ catch (QmfException qmfe)
+ {
+ }
+ }
+
+ /**
+ * Main Event handler. Checks if the WorkItem is a
SubscriptionIndicationWorkItem, if it is it stores the object
+ * in a Map and uses this to maintain state so we can record deltas such
as enqueue and dequeue rates.
+ * <p>
+ * The AgentHeartbeatWorkItem is used to periodically compare the elapsed
time against the Subscription duration
+ * so that we can refresh the Subscription (or create a new one if
necessary) in order to continue receiving
+ * queue Management Object data from the broker.
+ * <p>
+ * When the AgentRestartedWorkItem is received we clear the state to
remove any stale queue Management Objects.
+ * @param wi a QMF2 WorkItem object
+ */
+ public void onEvent(final WorkItem wi)
+ {
+ if (wi instanceof AgentHeartbeatWorkItem && _subscriptionId != null)
+ {
+ long elapsed = (long)Math.round((System.currentTimeMillis() -
_startTime)/1000.0f);
+ if (elapsed > _subscriptionDuration)
+ {
+ try
+ {
+ _console.refreshSubscription(_subscriptionId);
+ _startTime = System.currentTimeMillis();
+ }
+ catch (QmfException qmfe)
+ {
+ System.err.println ("QmfException " + qmfe.getMessage() +
" caught in QpidQueueStats onEvent");
+ createQueueSubscription();
+ }
+ }
+ }
+ else if (wi instanceof AgentRestartedWorkItem)
+ {
+ _objects.clear();
+ }
+ else if (wi instanceof SubscriptionIndicationWorkItem)
+ {
+ SubscriptionIndicationWorkItem item =
(SubscriptionIndicationWorkItem)wi;
+ SubscribeIndication indication = item.getSubscribeIndication();
+ String correlationId = indication.getConsoleHandle();
+ if (correlationId.equals("queueStatsHandle"))
+ { // If it is (and it should be!!) then it's our queue object
Subscription
+ List<QmfConsoleData> data = indication.getData();
+ for (QmfConsoleData record : data)
+ {
+ ObjectId id = record.getObjectId();
+ if (record.isDeleted())
+ { // If the object was deleted by the Agent we remove it
from out Map
+ _objects.remove(id);
+ }
+ else
+ {
+ if (_objects.containsKey(id))
+ { // If the object is already in the Map it's likely
to be a statistics push from the broker.
+ Stats stats = _objects.get(id);
+ String name = stats.getName();
+
+ boolean matches = false;
+ for (Pattern x : _filter)
+ { // Check the queue name against the regexes in
the filter List (if any)
+ Matcher m = x.matcher(name);
+ if (m.find())
+ {
+ matches = true;
+ break;
+ }
+ }
+
+ if (_filter.isEmpty() || matches)
+ { // If there's no filter enabled or the filter
matches the queue name we display statistics.
+ QmfConsoleData lastSample = stats.getData();
+ stats.setData(record);
+
+ float deltaTime = record.getUpdateTime() -
lastSample.getUpdateTime();
+ if (deltaTime > 1000000000.0f)
+ {
+ float deltaEnqueues =
record.getLongValue("msgTotalEnqueues") -
+
lastSample.getLongValue("msgTotalEnqueues");
+ float deltaDequeues =
record.getLongValue("msgTotalDequeues") -
+
lastSample.getLongValue("msgTotalDequeues");
+ long msgDepth =
record.getLongValue("msgDepth");
+ float enqueueRate =
deltaEnqueues/(deltaTime/1000000000.0f);
+ float dequeueRate =
deltaDequeues/(deltaTime/1000000000.0f);
+
+
System.out.printf("%-46s%10.2f%11d%13.2f%13.2f\n",
+ name,
deltaTime/1000000000, msgDepth, enqueueRate, dequeueRate);
+ }
+ }
+ }
+ else
+ { // If the object isn't in the Map it's likely to be
a properties push from the broker.
+ if (!record.hasValue("name"))
+ { // This probably won't happen, but if it does we
refresh the object to get its full state.
+ try
+ {
+ record.refresh();
+ }
+ catch (QmfException qmfe)
+ {
+ }
+ }
+ String queueName = record.getStringValue("name");
+ _objects.put(id, new Stats(queueName, record));
+ }
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Runs QpidQueueStats.
+ * @param args the command line arguments.
+ */
+ public static void main(final String[] args)
+ {
+ String logLevel = System.getProperty("amqj.logging.level");
+ logLevel = (logLevel == null) ? "FATAL" : logLevel; // Set default log
level to FATAL rather than DEBUG.
+ System.setProperty("amqj.logging.level", logLevel);
+
+ String[] longOpts = {"help", "broker-address=", "filter=",
"sasl-mechanism="};
+ try
+ {
+ String host = "localhost";
+ String connectionOptions = "{reconnect: true}";
+ List<Pattern> filter = new ArrayList<Pattern>();
+ GetOpt getopt = new GetOpt(args, "ha:f:", longOpts);
+ List<String[]> optList = getopt.getOptList();
+
+ for (String[] opt : optList)
+ {
+ if (opt[0].equals("-h") || opt[0].equals("--help"))
+ {
+ System.out.println(_usage);
+ System.out.println(_options);
+ System.exit(1);
+ }
+ else if (opt[0].equals("-a") ||
opt[0].equals("--broker-address"))
+ {
+ host = opt[1];
+ }
+ else if (opt[0].equals("-f") || opt[0].equals("--filter"))
+ {
+ String[] split = opt[1].split(",");
+ for (String s : split)
+ {
+ Pattern p = Pattern.compile(s);
+ filter.add(p);
+ }
+ }
+ else if (opt[0].equals("--sasl-mechanism"))
+ {
+ connectionOptions = "{reconnect: true, sasl_mechs: " +
opt[1] + "}";
+ }
+ }
+
+ QpidQueueStats queueStats = new QpidQueueStats(host,
connectionOptions, filter);
+ }
+ catch (IllegalArgumentException e)
+ {
+ System.out.println(_usage);
+ System.out.println(e.getMessage());
+ System.exit(1);
+ }
+
+ BufferedReader commandLine = new BufferedReader(new
InputStreamReader(System.in));
+ try
+ { // Blocks here until return is pressed
+ String s = commandLine.readLine();
+ System.exit(0);
+ }
+ catch (IOException e)
+ {
+ System.out.println ("QpidQueueStats main(): IOException: " +
e.getMessage());
+ }
+ }
+}
Propchange:
qpid/trunk/qpid/tools/src/java/src/main/java/org/apache/qpid/qmf2/tools/QpidQueueStats.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
qpid/trunk/qpid/tools/src/java/src/main/java/org/apache/qpid/qmf2/tools/QpidQueueStats.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added:
qpid/trunk/qpid/tools/src/java/src/main/java/org/apache/qpid/qmf2/tools/QueueFuse.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/tools/src/java/src/main/java/org/apache/qpid/qmf2/tools/QueueFuse.java?rev=1465662&view=auto
==============================================================================
---
qpid/trunk/qpid/tools/src/java/src/main/java/org/apache/qpid/qmf2/tools/QueueFuse.java
(added)
+++
qpid/trunk/qpid/tools/src/java/src/main/java/org/apache/qpid/qmf2/tools/QueueFuse.java
Mon Apr 8 15:19:04 2013
@@ -0,0 +1,364 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.qmf2.tools;
+
+// JMS Imports
+import javax.jms.Connection;
+
+// Misc Imports
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+// QMF2 Imports
+import org.apache.qpid.qmf2.common.ObjectId;
+import org.apache.qpid.qmf2.common.QmfData;
+import org.apache.qpid.qmf2.common.QmfEvent;
+import org.apache.qpid.qmf2.common.QmfEventListener;
+import org.apache.qpid.qmf2.common.QmfException;
+import org.apache.qpid.qmf2.common.SchemaClassId;
+import org.apache.qpid.qmf2.common.WorkItem;
+import org.apache.qpid.qmf2.console.Agent;
+import org.apache.qpid.qmf2.console.Console;
+import org.apache.qpid.qmf2.console.EventReceivedWorkItem;
+import org.apache.qpid.qmf2.console.QmfConsoleData;
+
+import org.apache.qpid.qmf2.util.ConnectionHelper;
+import org.apache.qpid.qmf2.util.GetOpt;
+
+/**
+ * QueueFuse provides protection to message producers from consumers who can't
consume messages fast enough.
+ * <p>
+ * With the default "reject" limit policy when a queue exceeds its capacity an
exception is thrown to the
+ * producer. This behaviour is unfortunate, because if there happen to be
multiple consumers consuming
+ * messages from a given producer it is possible for a single slow consumer to
cause message flow to be
+ * stopped to <u>all</u> consumers, in other words a de-facto denial of
service may take place.
+ * <p>
+ * In an Enterprise environment it is likely that this sort of behaviour is
unwelcome, so QueueFuse makes it
+ * possible for queueThresholdExceeded Events to be detected and for the
offending queues to have messages
+ * purged, thus protecting the other consumers by preventing an exception
being thrown to the message producer.
+ * <p>
+ * The original intention with this class was to unbind bindings to queues
that exceed the threshold. This method
+ * works, but it has a number of disadvantages. In particular there is no way
to unbind from (and thus protect)
+ * queues bound to the default direct exchange, in addition in order to unbind
it is necessary to retrieve
+ * binding and exchange information, both of which require further exchanges
with the broker (which is not
+ * desirable as when the queueThresholdExceeded occurs we need to act pretty
quickly). Finally as it happens
+ * it is also necessary to purge some messages after unbinding anyway as if
this is not done the queue remains
+ * in the flowStopped state and producers will eventually time out and throw
an exception if this is not cleared.
+ * So all in all simply purging each time we cross the threshold is simpler
and has the additional advantage that
+ * if and when the consumer speeds up message delivery will eventually return
to normal.
+ *
+ * <pre>
+ * Usage: QueueFuse [options] [broker-addr]...
+ *
+ * Monitors one or more Qpid message brokers for queueThresholdExceeded Events.
+ *
+ * If a queueThresholdExceeded Event occurs messages are purged from the queue,
+ * in other words this class behaves rather like a fuse 'blowing' if the
+ * threshold gets exceeded.
+ *
+ * If no broker-addr is supplied, QueueFuse connects to 'localhost:5672'.
+ *
+ * [broker-addr] syntax:
+ *
+ * [username/password@] hostname
+ * ip-address [:<port>]
+ *
+ * Examples:
+ *
+ * $ QueueFuse localhost:5672
+ * $ QueueFuse 10.1.1.7:10000
+ * $ QueueFuse guest/guest@broker-host:10000
+ *
+ * Options:
+ * -h, --help show this help message and exit
+ * -f <filter>, --filter=<filter>
+ * a list of comma separated queue names (regex are
+ * accepted) to protect (default is to protect all).
+ * -p <PERCENT>, --purge=<PERCENT>\n" +
+ * The percentage of messages to purge when the
queue\n" +
+ * threshold gets exceeded (default = 20%).\n" +
+ * N.B. if this gets set too low the fuse may not
blow.\n" +
+ * --sasl-mechanism=<mech>
+ * SASL mechanism for authentication (e.g. EXTERNAL,
+ * ANONYMOUS, PLAIN, CRAM-MD, DIGEST-MD5, GSSAPI). SASL
+ * automatically picks the most secure available
+ * mechanism - use this option to override.
+ * </pre>
+ * @author Fraser Adams
+ */
+public final class QueueFuse implements QmfEventListener
+{
+ private static final String _usage =
+ "Usage: QueueFuse [options] [broker-addr]...\n";
+
+ private static final String _description =
+ "Monitors one or more Qpid message brokers for queueThresholdExceeded
Events.\n" +
+ "\n" +
+ "If a queueThresholdExceeded Event occurs messages are purged from the
queue,\n" +
+ "in other words this class behaves rather like a fuse 'blowing' if the\n" +
+ "threshold gets exceeded.\n" +
+ "\n" +
+ "If no broker-addr is supplied, QueueFuse connects to 'localhost:5672'.\n"
+
+ "\n" +
+ "[broker-addr] syntax:\n" +
+ "\n" +
+ "[username/password@] hostname\n" +
+ "ip-address [:<port>]\n" +
+ "\n" +
+ "Examples:\n" +
+ "\n" +
+ "$ QueueFuse localhost:5672\n" +
+ "$ QueueFuse 10.1.1.7:10000\n" +
+ "$ QueueFuse guest/guest@broker-host:10000\n";
+
+ private static final String _options =
+ "Options:\n" +
+ " -h, --help show this help message and exit\n" +
+ " -f <filter>, --filter=<filter>\n" +
+ " a list of comma separated queue names (regex
are\n" +
+ " accepted) to protect (default is to protect
all).\n" +
+ " -p <PERCENT>, --purge=<PERCENT>\n" +
+ " The percentage of messages to purge when the
queue\n" +
+ " threshold gets exceeded (default = 20%).\n" +
+ " N.B. if this gets set too low the fuse may not
blow.\n" +
+ " --sasl-mechanism=<mech>\n" +
+ " SASL mechanism for authentication (e.g.
EXTERNAL,\n" +
+ " ANONYMOUS, PLAIN, CRAM-MD, DIGEST-MD5, GSSAPI).
SASL\n" +
+ " automatically picks the most secure available\n" +
+ " mechanism - use this option to override.\n";
+
+ private final String _url;
+ private final List<Pattern> _filter;
+ private final float _purge;
+ private Map<String, QmfConsoleData> _queueCache = new HashMap<String,
QmfConsoleData>(50);
+ private Console _console;
+
+ /**
+ * Basic constructor. Creates JMS Session, Initialises Destinations,
Producers & Consumers and starts connection.
+ * @param url the connection URL.
+ * @param connectionOptions the options String to pass to ConnectionHelper.
+ * @param filter a list of regex Patterns used to choose the queues we
wish to protect.
+ * @param purge the ratio of messages that we wish to purge if the
threshold gets exceeded.
+ */
+ public QueueFuse(final String url, final String connectionOptions, final
List<Pattern> filter, final float purge)
+ {
+ System.out.println("QueueFuse Connecting to " + url);
+ if (filter.size() > 0)
+ {
+ System.out.println("Filter = " + filter);
+ }
+ _url = url;
+ _filter = filter;
+ _purge = purge;
+ try
+ {
+ Connection connection = ConnectionHelper.createConnection(url,
connectionOptions);
+ _console = new Console(this);
+ _console.addConnection(connection);
+ updateQueueCache();
+ }
+ catch (QmfException qmfe)
+ {
+ System.err.println("QmfException " + qmfe.getMessage() + " caught
in QueueFuse constructor");
+ }
+ }
+
+ /**
+ * Looks up queue objects and stores them in _queueCache keyed by the
queue name
+ */
+ private void updateQueueCache()
+ {
+ _queueCache.clear();
+ List<QmfConsoleData> queues =
_console.getObjects("org.apache.qpid.broker", "queue");
+ for (QmfConsoleData queue : queues)
+ {
+ String queueName = queue.getStringValue("name");
+ _queueCache.put(queueName, queue);
+ }
+ }
+
+ /**
+ * Look up a queue object with the given name and if it's not a ring queue
invoke the queue's purge method.
+ * @param queueName the name of the queue to purge
+ * @param msgDepth the number of messages on the queue, used to determine
how many messages to purge.
+ */
+ private void purgeQueue(final String queueName, long msgDepth)
+ {
+ QmfConsoleData queue = _queueCache.get(queueName);
+
+ if (queue == null)
+ {
+ System.out.printf("%s ERROR QueueFuse.disconnectQueue() %s
reference couldn't be found\n",
+ new Date().toString(), queueName);
+ }
+ else
+ { // If we've found a queue called queueName we then find the bindings
that reference it.
+
+ Map args = (Map)queue.getValue("arguments");
+ String policyType = (String)args.get("qpid.policy_type");
+ if (policyType != null && policyType.equals("ring"))
+ { // If qpid.policy_type=ring we return.
+ return;
+ }
+
+ try
+ {
+ QmfData arguments = new QmfData();
+ arguments.setValue("request", (long)(_purge*msgDepth));
+ queue.invokeMethod("purge", arguments);
+ }
+ catch (QmfException e)
+ {
+ System.out.println(e.getMessage());
+ }
+ }
+ }
+
+ /**
+ * Main Event handler.
+ * @param wi a QMF2 WorkItem object
+ */
+ public void onEvent(final WorkItem wi)
+ {
+ if (wi instanceof EventReceivedWorkItem)
+ {
+ EventReceivedWorkItem item = (EventReceivedWorkItem)wi;
+ Agent agent = item.getAgent();
+ QmfEvent event = item.getEvent();
+ String className = event.getSchemaClassId().getClassName();
+
+ if (className.equals("queueDeclare"))
+ {
+ updateQueueCache();
+ }
+ else if (className.equals("queueThresholdExceeded"))
+ {
+ String queueName = event.getStringValue("qName");
+ boolean matches = false;
+ for (Pattern x : _filter)
+ { // Check the queue name against the regexes in the filter
List (if any)
+ Matcher m = x.matcher(queueName);
+ if (m.find())
+ {
+ matches = true;
+ break;
+ }
+ }
+
+ if (_filter.isEmpty() || matches)
+ { // If there's no filter enabled or the filter matches the
queue name we call purgeQueue().
+ long msgDepth = event.getLongValue("msgDepth");
+ purgeQueue(queueName, msgDepth);
+ }
+ }
+ }
+ }
+
+ /**
+ * Runs QueueFuse.
+ * @param args the command line arguments.
+ */
+ public static void main(final String[] args)
+ {
+ String logLevel = System.getProperty("amqj.logging.level");
+ logLevel = (logLevel == null) ? "FATAL" : logLevel; // Set default log
level to FATAL rather than DEBUG.
+ System.setProperty("amqj.logging.level", logLevel);
+
+ String[] longOpts = {"help", "filter=", "purge=", "sasl-mechanism="};
+ try
+ {
+ boolean includeRingQueues = false;
+ String connectionOptions = "{reconnect: true}";
+ List<Pattern> filter = new ArrayList<Pattern>();
+ float purge = 0.2f;
+ GetOpt getopt = new GetOpt(args, "hf:p:", longOpts);
+ List<String[]> optList = getopt.getOptList();
+ String[] cargs = {};
+ cargs = getopt.getEncArgs().toArray(cargs);
+ for (String[] opt : optList)
+ {
+ if (opt[0].equals("-h") || opt[0].equals("--help"))
+ {
+ System.out.println(_usage);
+ System.out.println(_description);
+ System.out.println(_options);
+ System.exit(1);
+ }
+ else if (opt[0].equals("-f") || opt[0].equals("--filter"))
+ {
+ String[] split = opt[1].split(",");
+ for (String s : split)
+ {
+ Pattern p = Pattern.compile(s);
+ filter.add(p);
+ }
+ }
+ else if (opt[0].equals("-p") || opt[0].equals("--purge"))
+ {
+ int percent = Integer.parseInt(opt[1]);
+ if (percent < 0 || percent > 100)
+ {
+ System.out.println(_usage);
+ System.exit(1);
+ }
+ purge = percent/100.0f;
+ }
+ else if (opt[0].equals("--sasl-mechanism"))
+ {
+ connectionOptions = "{reconnect: true, sasl_mechs: " +
opt[1] + "}";
+ }
+ }
+
+ int nargs = cargs.length;
+ if (nargs == 0)
+ {
+ cargs = new String[] {"localhost"};
+ }
+
+ for (String url : cargs)
+ {
+ QueueFuse queueFuse = new QueueFuse(url, connectionOptions,
filter, purge);
+ }
+ }
+ catch (IllegalArgumentException e)
+ {
+ System.out.println(_usage);
+ System.out.println(e.getMessage());
+ System.exit(1);
+ }
+
+ try
+ { // Block here
+ Thread.currentThread().join();
+ }
+ catch (InterruptedException ie)
+ {
+ }
+ }
+}
Propchange:
qpid/trunk/qpid/tools/src/java/src/main/java/org/apache/qpid/qmf2/tools/QueueFuse.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
qpid/trunk/qpid/tools/src/java/src/main/java/org/apache/qpid/qmf2/tools/QueueFuse.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]