Hello everybody. I have developed a python script to query the statistics for qpid servers. See http://qpid.apache.org/ for the project homepage.
This is just a basic plugin and I am submitting it for review or in case someone else is interested. It is a python plugin and thus requires a configuration like this in collectd.conf: #<Plugin python> # # Example location of the required qpid python client # ModulePath "/usr/local/qpid-0.6/python" # # # Example location of the qpidcollectd.py script # ModulePath "/usr/local/collectd-scripts/" # # # Actual module configuration # <Module qpidcollectd> # <hostname "firstserver"> # queue "veryinterestingqueue1" # queue "veryinterestingqueue2" # </hostname> # # <hostname "anotherserver"> # queue "anotherveryinterestingqueue1" "anotherveryinterestingqueue2" # </hostname> # </Module> The qpid client library (at least the version 0.6 that I used) needs to spawn processes, so this plugin will need to handle CHLD signals. This means that if you want to use this plugin you won't be able to use the Exec plugin. Looking for constructive critics / patches / whatever. Thanks, Alessandro
# # A plugin to read stats from the Apache Qpid message framework # Qpid homepage at http://qpid.apache.org/ # # Written by Alessandro Iurlano <[email protected]> # # This is free software; you can redistribute it and/or modify it under # the terms of the GNU General Public License as published by the Free # Software Foundation; only version 2 of the License is applicable. # import sys import os import time import signal import collectd from qmf.console import Session, Console sessions=[] # Read the configuration from collectd configuration # It will be something like this: #<Plugin python> # # Example location of the required qpid python client # ModulePath "/usr/local/qpid-0.6/python" # # # Example location of the qpidcollectd.py script # ModulePath "/usr/local/collectd-scripts/" # # # # Actual module configuration # <Module qpidcollectd> # <hostname "firstserver"> # queue "veryinterestingqueue1" # queue "veryinterestingqueue2" # </hostname> # # <hostname "anotherserver"> # queue "anotherveryinterestingqueue1" "anotherveryinterestingqueue2" # </hostname> # # </Module> def config_qpid_plugin(config): global sessions for child in config.children: if child.key=="hostname": if len(child.values)>=1: host=child.values[0] else: host="localhost" queues=list() for hostchild in child.children: if hostchild.key=="queue": for q in hostchild.values: queues.append(q) sessions.append({"hostname": host, "session": None, "last_connection_attempt": None, "connection_interval": 0, "queues": queues}) # Initializes the plugin # Gets the handling of CHLD signals because the qpid-python lib uses subprocesses # Conflicts with the exec plugin # def init_qpid_plugin(): signal.signal(signal.SIGCHLD, signal.SIG_DFL) # Read the qpid statistics from the servers configured # Each instance of session is a dictionary with: # hostname : the host to be queried # session : the connection to the host that was created at config time # queues : a list of the queues that are to be displayed in detail # def read_qpid_plugin(): global sessions # for every host configured in the configuration file for instance in sessions: now=time.time() # Check if there is already an existing connection if instance['session'] is None: if (instance['last_connection_attempt'] is None) or (now >= instance['last_connection_attempt']+instance['connection_interval']): # There isn't a connection. Try to make one sess=Session() try: broker=sess.addBroker(instance['hostname']) # Connection successful instance['last_connection_attempt']=None instance['connection_interval']=0 instance['session']=sess except Exception, e: # Some error in the connection attempt instance['last_connection_attempt']=now if instance['connection_interval']==0: instance['connection_interval']=10 else: if instance['connection_interval']>=640: instance['connection_interval']=640 else: instance['connection_interval']=instance['connection_interval']*2 instance['session']=None collectd.error("qpid: Error connecting to host %s: %s. Trying to reconnect in %d seconds" % (instance['hostname'],str(e),instance['connection_interval'])) # There is a connection already if instance['session'] is not None: # Try to use it try: # Asks for queue and exchange objects from the server queues = instance['session'].getObjects(_class="queue", _package="org.apache.qpid.broker") exchanges = instance['session'].getObjects(_class="exchange", _package="org.apache.qpid.broker") hostname = instance['hostname'] # Parses the queues list for q in queues: # Is the queue one of those we want detailed? if q.name in instance['queues']: # Generates a new value for the queue length vl=collectd.Values(type='queue_length', plugin='qpid', plugin_instance=q.name, host=hostname) vl.dispatch(values=[q.msgDepth]) # Generates a new value for the enqueued messages vl=collectd.Values(type='counter', plugin='qpid', plugin_instance=q.name, type_instance='Enqueues', host=hostname) vl.dispatch(values=[q.msgTotalEnqueues]) # Generates a new value for the dequeued messages vl=collectd.Values(type='counter', plugin='qpid', plugin_instance=q.name, type_instance='Dequeues', host=hostname) vl.dispatch(values=[q.msgTotalDequeues]) # All queues have been parsed # Generates the values for the totals by summing up the statistics in all the exchanges total_msg_sent = 0 total_msg_received = 0 total_msg_drop = 0 for xchg in exchanges: total_msg_sent = total_msg_sent + xchg.msgRoutes total_msg_received = total_msg_received + xchg.msgReceives total_msg_drop = total_msg_drop + xchg.msgDrops # Send these values to collectd vl=collectd.Values(host=hostname, type='derive', plugin='qpid', type_instance='sent-messages') vl.dispatch(values=[total_msg_sent]) vl=collectd.Values(host=hostname, type='derive', plugin='qpid', type_instance='received-messages') vl.dispatch(values=[total_msg_received]) vl=collectd.Values(host=hostname, type='derive', plugin='qpid', type_instance='dropped-messages') vl.dispatch(values=[total_msg_drop]) except: # Something went wrong while querying the server. # Set session to None so that next time it will try to connect again instance['session']=None instance['last_connection_attempt']=None instance['connection_interval']=0 # Initializes the plugin by making collectd aware of the callback functions to use collectd.register_init(init_qpid_plugin) collectd.register_config(config_qpid_plugin) collectd.register_read(read_qpid_plugin)
_______________________________________________ collectd mailing list [email protected] http://mailman.verplant.org/listinfo/collectd
