Hello everybody.

I have developed a python script to query the statistics for qpid
servers. See http://qpid.apache.org/ for the project homepage.

This is just a basic plugin and I am submitting it for review or in
case someone else is interested.
It is a python plugin and thus requires a configuration like this in
collectd.conf:

#<Plugin python>
#        # Example location of the required qpid python client
#        ModulePath "/usr/local/qpid-0.6/python"
#
#        # Example location of the qpidcollectd.py script
#        ModulePath "/usr/local/collectd-scripts/"
#
#        # Actual module configuration
#        <Module qpidcollectd>
#                <hostname "firstserver">
#                    queue "veryinterestingqueue1"
#                    queue "veryinterestingqueue2"
#                </hostname>
#
#                <hostname "anotherserver">
#                    queue "anotherveryinterestingqueue1"
"anotherveryinterestingqueue2"
#                </hostname>
#        </Module>


The qpid client library (at least the version 0.6 that I used) needs
to spawn processes, so this plugin will need to handle CHLD signals.
This means that if you want to use this plugin you won't be able to
use the Exec plugin.

Looking for constructive critics / patches / whatever.

Thanks,
Alessandro
#
# A plugin to read stats from the Apache Qpid message framework
# Qpid homepage at http://qpid.apache.org/
#
# Written by Alessandro Iurlano <[email protected]>
#
# This is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free
# Software Foundation; only version 2 of the License is applicable.
# 

import sys
import os
import time

import signal

import collectd

from qmf.console import Session, Console
    
sessions=[]

# Read the configuration from collectd configuration
# It will be something like this:
#<Plugin python>
#        # Example location of the required qpid python client
#        ModulePath "/usr/local/qpid-0.6/python"
#
#        # Example location of the qpidcollectd.py script
#        ModulePath "/usr/local/collectd-scripts/"
#
#
#        # Actual module configuration
#        <Module qpidcollectd>
#                <hostname "firstserver">
#                    queue "veryinterestingqueue1"
#                    queue "veryinterestingqueue2"
#                </hostname>
#
#                <hostname "anotherserver">
#                    queue "anotherveryinterestingqueue1" "anotherveryinterestingqueue2"
#                </hostname>
#
#        </Module>

def config_qpid_plugin(config):
    global sessions
    for child in config.children:
	if child.key=="hostname":
	    if len(child.values)>=1:
		host=child.values[0]
	    else:
		host="localhost"
	    queues=list()
            for hostchild in child.children:
		if hostchild.key=="queue":
			for q in hostchild.values:
				queues.append(q)

            sessions.append({"hostname": host, "session": None, "last_connection_attempt": None, 
                             "connection_interval": 0, "queues": queues})

# Initializes the plugin
# Gets the handling of CHLD signals because the qpid-python lib uses subprocesses
# Conflicts with the exec plugin
#
def init_qpid_plugin():
    signal.signal(signal.SIGCHLD, signal.SIG_DFL)

# Read the qpid statistics from the servers configured
# Each instance of session is a dictionary with:
# hostname : the host to be queried
# session  : the connection to the host that was created at config time 
# queues   : a list of the queues that are to be displayed in detail
#
def read_qpid_plugin():
    global sessions

    # for every host configured in the configuration file
    for instance in sessions:

        now=time.time()
        # Check if there is already an existing connection
        if instance['session'] is None:
	    if (instance['last_connection_attempt'] is None) or (now >= instance['last_connection_attempt']+instance['connection_interval']):
                # There isn't a connection. Try to make one
	        sess=Session()
	        try:
           	    broker=sess.addBroker(instance['hostname'])
                    # Connection successful
                    instance['last_connection_attempt']=None
	            instance['connection_interval']=0
		    instance['session']=sess
	        except Exception, e:
                    # Some error in the connection attempt
		    instance['last_connection_attempt']=now
		    if instance['connection_interval']==0:
			instance['connection_interval']=10
		    else:
                        if instance['connection_interval']>=640:
			    instance['connection_interval']=640
                        else:
                            instance['connection_interval']=instance['connection_interval']*2
                    instance['session']=None
		    collectd.error("qpid: Error connecting to host %s: %s. Trying to reconnect in %d seconds" % (instance['hostname'],str(e),instance['connection_interval']))

        # There is a connection already
        if instance['session'] is not None:
            # Try to use it
            try:
                # Asks for queue and exchange objects from the server
                queues     = instance['session'].getObjects(_class="queue", _package="org.apache.qpid.broker")
                exchanges  = instance['session'].getObjects(_class="exchange", _package="org.apache.qpid.broker")
        	hostname   = instance['hostname']

        	# Parses the queues list
        	for q in queues:
            	    # Is the queue one of those we want detailed?
	    	    if q.name in instance['queues']:
			# Generates a new value for the queue length
	        	vl=collectd.Values(type='queue_length', plugin='qpid', plugin_instance=q.name, host=hostname)
	        	vl.dispatch(values=[q.msgDepth])

			# Generates a new value for the enqueued messages
	        	vl=collectd.Values(type='counter', plugin='qpid', plugin_instance=q.name,
						type_instance='Enqueues', host=hostname)
    	                vl.dispatch(values=[q.msgTotalEnqueues])

			# Generates a new value for the dequeued messages
    	        	vl=collectd.Values(type='counter', plugin='qpid', plugin_instance=q.name,
						type_instance='Dequeues', host=hostname)
                	vl.dispatch(values=[q.msgTotalDequeues])

        	# All queues have been parsed
        	# Generates the values for the totals by summing up the statistics in all the exchanges
        	total_msg_sent      = 0
        	total_msg_received  = 0
        	total_msg_drop      = 0

        	for xchg in exchanges:
	    	    total_msg_sent     = total_msg_sent + xchg.msgRoutes
	    	    total_msg_received = total_msg_received + xchg.msgReceives
	    	    total_msg_drop     = total_msg_drop + xchg.msgDrops

		# Send these values to collectd
        	vl=collectd.Values(host=hostname, type='derive', plugin='qpid', type_instance='sent-messages')
        	vl.dispatch(values=[total_msg_sent])

        	vl=collectd.Values(host=hostname, type='derive', plugin='qpid', type_instance='received-messages')
        	vl.dispatch(values=[total_msg_received])

        	vl=collectd.Values(host=hostname, type='derive', plugin='qpid', type_instance='dropped-messages')
        	vl.dispatch(values=[total_msg_drop])
	    except:
                # Something went wrong while querying the server.
                # Set session to None so that next time it will try to connect again
		instance['session']=None
                instance['last_connection_attempt']=None
                instance['connection_interval']=0


# Initializes the plugin by making collectd aware of the callback functions to use 
collectd.register_init(init_qpid_plugin)
collectd.register_config(config_qpid_plugin)
collectd.register_read(read_qpid_plugin)


_______________________________________________
collectd mailing list
[email protected]
http://mailman.verplant.org/listinfo/collectd

Reply via email to