Hi,

Attached is a first stab at a python version of gmetad.  Brad asked me
to try rewriting gmetad in python in order to hopefully support future
extensibility via python modules.

I've included a CHANGELOG file that outlines basically what this python
gmetad should do and what it doesn't claim to do.  It is important to
emphasize here:  This is NOT yet a functional replacement for the
current gmetad!  Basically, what it WILL do is honor the current gmetad
configuration file (at least the parts it can understand, the others are
ignored), read gmond data, and offer the aggregated data over the XML
(8651) and interactive (8652) ports.

Some testing has been done to compare outputs from the current gmetad
and this python gmetad.  They appear to produce functionally equivalent
XML documents, although this hasn't been extensively tested.


What I'm Planning To Do:
The next thing I'm planning to work on is the summary capability on the
interactive port.  When that is done we should be able to test this new
gmetad by running it side-by-side with the current gmetad, letting the
current gmetad generate the RRD files, and the new one respond on 8652
to web front end queries.  In other words, see if the web front end will
still work with the python gmetad in there.
After that I'll probably start working on an RRD module.  The goal is to
drive first toward something that has the same functionality as the
existing gmetad, then start looking at ways to enhance.

I've also got a test engineer on my team who thinks he's going to have
some free cycles, so hopefully he will be able to create and submit some
more test scripts.

Of course, the other thing I have to do is fix the bugs that all of you
are going to file on it. :)


What I'm Asking You To Do:
- If this is of interest to the Ganglia project at all, I'd sure like to
have it checked in if possible.  Probably a discussion needs to take
place as to where it should go.  I don't think it should replace the
current gmetad, so putting it in that directory seems wrong IMO.  Maybe
a new directory under monitor-core called gmetad-python or something?
Anyway, if we can resolve where to put it, and if the project wants it,
then it would be great to get it checked in, so I don't lose the work.
- Take a look at it and try it out in a safe place.  Poke around at it
and find bugs.
- Anything else that comes to mind.


Installing this is easy.  Just put all the files in the same directory,
then run "python ./gmetad.py".  You can add a --help at the end to show
usage information.



Please let me know what you think.

-- 
Matt Ryan
Sr. Software Engineer
Novell, Inc.

#!/usr/bin/env python

import sys
import socket
import os
import asyncore
import logging
import logging.handlers

from gmetad_dbgprint import dbgPrint
from gmetad_gmondReader import GmondReader
from gmetad_xmlWriter import XmlWriter
from gmetad_config import getConfig, GmetadConfig
from gmetad_socketListener import GmetadListenSocket
from gmetad_daemon import daemonize


class GmetadListenSocket(asyncore.dispatcher):
    def __init__(self):
        asyncore.dispatcher.__init__(self)
        
    def open(self, port, interface=''):
        self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
        self.set_reuse_addr()
        host = 'localhost'
        if 0 < len(interface.strip()):
            host = interface
        logging.info('Opening query interface on %s:%d' % (host, port))
        self.bind((interface,port))
        self.listen(5)

class XmlSocket(GmetadListenSocket):
    def handle_accept(self):
        newsock, addr = self.accept()
        logging.debug('Replying to XML dump query from %s' % addr[0])
        writer = XmlWriter()
        newsock.sendall(writer.getXml())
        newsock.close()


class InteractiveSocket(GmetadListenSocket):        
    def handle_accept(self):
        newsock, addr = self.accept()
        logging.debug('Replying to interactive query from %s' % addr[0])
        InteractiveConnectionHandler(newsock)
        
class InteractiveConnectionHandler(asyncore.dispatcher_with_send):
    def __init__(self, sock):
        asyncore.dispatcher_with_send.__init__(self, sock)
        self.buffer = ''
        self.amt_to_write = 0
        
    def writable(self):
        return self.amt_to_write
        
    def handle_read(self):
        rbuf = self.recv(1024)
        if rbuf:
            rbuf = rbuf.strip().strip('/')
            if 0 == len(rbuf):
                rbuf = None
            writer = XmlWriter()
            self.buffer = writer.getXml(rbuf)
            self.amt_to_write = len(self.buffer)
        
    def handle_write(self):
        sent = self.socket.send(self.buffer)
        self.buffer = self.buffer[sent:]
        self.amt_to_write -= sent
        if not self.amt_to_write:
            self.close()
            
def getLoggingLevel(lspec):
    levelMap = {0:logging.FATAL,
            1:logging.CRITICAL,
            2:logging.ERROR,
            3:logging.WARNING,
            4:logging.INFO,
            5:logging.DEBUG}
    try:
        return levelMap[lspec]
    except KeyError:
        if lspec < 0: return logging.FATAL
        return logging.DEBUG

if __name__ == '__main__':
    gmetadConfig = getConfig()
    
    ignore_fds = [] # Remembers log file descriptors we create, so they aren't 
closed when we daemonize.
    
logging.basicConfig(level=getLoggingLevel(gmetadConfig[GmetadConfig.DEBUG_LEVEL]),
            format='%(levelname)-8s %(message)s')    
    syslogHandler = logging.handlers.SysLogHandler('/dev/log')
    
syslogHandler.setLevel(getLoggingLevel(gmetadConfig[GmetadConfig.DEBUG_LEVEL]))
    syslogHandler.setFormatter(logging.Formatter(fmt='%(asctime)s 
%(levelname)-8s - GMETAD - %(message)s',
            datefmt='%a, %d %b %Y %H:%M:%S'))
    ignore_fds.append(syslogHandler.socket.fileno())
    logging.getLogger().addHandler(syslogHandler)
    if gmetadConfig[GmetadConfig.LOGFILE] is not None:
        fileHandler = 
logging.FileHandler(gmetadConfig[GmetadConfig.LOGFILE],'a')
        
fileHandler.setLevel(getLoggingLevel(gmetadConfig[GmetadConfig.DEBUG_LEVEL]))
        fileHandler.setFormatter(logging.Formatter(fmt='%(asctime)s 
%(levelname)-8s %(message)s',
                datefmt='%a, %d %b %Y %H:%M:%S'))
        ignore_fds.append(fileHandler.stream.fileno())
        logging.getLogger().addHandler(fileHandler)

    if 5 > int(gmetadConfig[GmetadConfig.DEBUG_LEVEL]):
        daemonize(ignore_fds)
        
    logging.info('Gmetad application started.')
    
    pffd = None
    if gmetadConfig[GmetadConfig.PIDFILE] is not None:
        try:
            pffd = open(gmetadConfig[GmetadConfig.PIDFILE], 'w')
            pffd.write('%d\n' % os.getpid())
            logging.debug('Wrote pid %d to pidfile %s' % (os.getpid(), 
gmetadConfig[GmetadConfig.PIDFILE]))
            pffd.close()
            pffd = open(gmetadConfig[GmetadConfig.PIDFILE], 'r')
        except Exception, e:
            logger.error('Unable to write PID %d to %s (%s)' % (os.getpid(), 
gmetadConfig[GmetadConfig.PIDFILE], e))
            sys.exit()
         
         
    # load modules here, when we support modules.
    
    readers = []
    xmlSocket = XmlSocket()
    interactiveSocket = InteractiveSocket()
    try:
        try:
            for ds in gmetadConfig[GmetadConfig.DATA_SOURCE]:
                readers.append(GmondReader(ds))
                gr = readers[len(readers)-1]
                gr.start()
            xmlSocket.open(port=int(gmetadConfig[GmetadConfig.XML_PORT]))
            
interactiveSocket.open(port=int(gmetadConfig[GmetadConfig.INTERACTIVE_PORT]))
            asyncore.loop()
        except KeyboardInterrupt:
            logging.info('Shutting down...')
        except Exception, e:
            logging.error('Caught exception: %s' % e)
            raise
    finally:
        logging.debug('Shutting down all data source readers...')
        for gr in readers:
            gr.shutdown()
        logging.debug('Closing all query ports...')
        xmlSocket.close()
        interactiveSocket.close()
        if pffd is not None:
            pffd.close()
            os.unlink(gmetadConfig[GmetadConfig.PIDFILE])
        

    
from socket import getfqdn
import os
import sys
import optparse

class GmetadRRA:
    def __init__(self, args):
        self.args = args
        
class GmetadDataSource:
    def  __init__(self, name, hosts=['localhost'], interval=15):
        self.name = name
        self.hosts = hosts
        self.interval = interval
        self.time_to_next_read = 0

class GmetadConfig:
    _shared_state = {}
    
    _isInitialized = False
    
    DEBUG_LEVEL = 'debug_level'
    LOGFILE = 'logfile'
    PIDFILE = 'pidfile'
    DATA_SOURCE = 'data_source'
    RRAS = 'RRAs'
    SCALABLE = 'scalable'
    GRIDNAME = 'gridname'
    AUTHORITY = 'authority'
    TRUSTED_HOSTS = 'trusted_hosts'
    ALL_TRUSTED = 'all_trusted'
    SETUID = 'setuid'
    SETUID_USERNAME = 'setuid_username'
    XML_PORT = 'xml_port'
    INTERACTIVE_PORT = 'interactive_port'
    SERVER_THREADS = 'server_threads'
    RRD_ROOTDIR = 'rrd_rootdir'
    
    _cfgDefaults = {
            DEBUG_LEVEL : 2,
            LOGFILE : None,
            PIDFILE : None,
            DATA_SOURCE : [],
            RRAS : [
                    GmetadRRA('AVERAGE:0.5:1:244'),
                    GmetadRRA('AVERAGE:0.5:24:244'),
                    GmetadRRA('AVERAGE:0.5:168:244'),
                    GmetadRRA('AVERAGE:0.5:672:244'),
                    GmetadRRA('AVERAGE:0.5:5760:374')
            ],
            SCALABLE : True,
            GRIDNAME : None,
            AUTHORITY : 'http://%s/ganglia/' % getfqdn(),
            TRUSTED_HOSTS : [],
            ALL_TRUSTED : False,
            SETUID : True,
            SETUID_USERNAME : 'nobody',
            XML_PORT : 8651,
            INTERACTIVE_PORT : 8652,
            SERVER_THREADS : 4,
            RRD_ROOTDIR : '/var/lib/ganglia/rrds'
    }
    
    
    def __init__(self, cfgpath=None):
        self.__dict__ = GmetadConfig._shared_state
        
        if cfgpath is not None:
            self.path = cfgpath
            self.resetToDefaults()
            
            self.kwHandlers = {
                GmetadConfig.DEBUG_LEVEL : self.parseDbgLevel,
                GmetadConfig.LOGFILE : self.parseLogfile,
                GmetadConfig.DATA_SOURCE : self.parseDataSource,
                GmetadConfig.RRAS : self.parseRRAs,
                GmetadConfig.SCALABLE : self.parseScalable,
                GmetadConfig.GRIDNAME : self.parseGridname,
                GmetadConfig.AUTHORITY : self.parseAuthority,
                GmetadConfig.TRUSTED_HOSTS : self.parseTrustedHosts,
                GmetadConfig.ALL_TRUSTED : self.parseAllTrusted,
                GmetadConfig.SETUID : self.parseSetuid,
                GmetadConfig.SETUID_USERNAME : self.parseSetuidUsername,
                GmetadConfig.XML_PORT : self.parseXmlPort,
                GmetadConfig.INTERACTIVE_PORT : self.parseInteractivePort,
                GmetadConfig.SERVER_THREADS : self.parseServerThreads,
                GmetadConfig.RRD_ROOTDIR : self.parseRrdRootdir
            }
            self.updateConfig()
            GmetadConfig._isInitialized = True
        
    def updateConfig(self):
        f = open(self.path, 'r')
        prev_line = None
        for line in f.readlines():
            if line.startswith('#'): continue
            if 0 >= len(line.strip()): continue
            if line.strip().endswith('\\'):
                if prev_line is None:
                    prev_line = line.strip().strip('\\')
                else:
                    prev_line += line.strip().strip('\\')
                continue
            elif prev_line is not None:
                prev_line += line.strip()
                line = prev_line
                prev_line = None
            kw, args = line.strip().split(None,1)
            if self.kwHandlers.has_key(kw):
                self.kwHandlers[kw](args)
                
    def __setitem__(self, k, v):
        self.cfg[k] = v
        
    def __getitem__(self, k):
        return self.cfg[k]
        
    def resetToDefaults(self):
        self.cfg = GmetadConfig._cfgDefaults
        
    def parseDbgLevel(self, level):
        v = level.strip()
        if v.isdigit():
            self.cfg[GmetadConfig.DEBUG_LEVEL] = v
            
    def parseLogfile(self, logfile):
        self.cfg[GgmetadConfig.LOGFILE] = logfile.strip().strip('"')
            
    def parseDataSource(self, args):
        a = args.split()
        name = a[0].strip().strip('"')
        interval = 15
        if a[1].strip().isdigit():
            interval = int(a[1].strip())
            hosts = a[2:]
        else:
            hosts = a[1:]
        self.cfg[GmetadConfig.DATA_SOURCE].append(GmetadDataSource(name, hosts, 
interval))
        
    def parseRRAs(self, args):
        self.cfg[GmetadConfig.RRAS] = []
        for rraspec in args:
            
self.cfg[GmetadConfig.RRAS].append(GmetadRRA(rraspec.strip().strip('"').split(':',1)[1]))
            
    def parseScalable(self, arg):
        v = arg.strip().lower()
        if v == 'off' or v == 'false' or v == 'no':
            self.cfg[GmetadConfig.SCALABLE] = False
        else:
            self.cfg[GmetadConfig.SCALABLE] = True
            
    def parseGridname(self, arg):
        self.cfg[GmetadConfig.GRIDNAME] = arg.strip().strip('"')
        
    def parseAuthority(self, arg):
        self.cfg[GmetadConfig.AUTHORITY] = arg.strip().strip('"')
        
    def parseTrustedHosts(self, args):
        if len(args):
            self.cfg[GmetadConfig.TRUSTED_HOSTS] = args
            
    def parseAllTrusted(self, arg):
        v = arg.strip().lower()
        if v == 'on' or v == 'true' or v == 'yes':
            self.cfg[GmetadConfig.ALL_TRUSTED] = True
        else:
            self.cfg[GmetadConfig.ALL_TRUSTED] = False
            
    def parseSetuid(self, arg):
        v = arg.strip().lower()
        if v == 'off' or v == 'false' or v == 'no':
            self.cfg[GmetadConfig.SETUID] = False
        else:
            self.cfg[GmetadConfig.SETUID] = True
            
    def parseSetuidUsername(self, arg):
        self.cfg[GmetadConfig.SETUID_USERNAME] = arg.strip().strip('"')
        
    def parseXmlPort(self, arg):
        v = arg.strip()
        if v.isdigit():
            self.cfg[GmetadConfig.XML_PORT] = int(v)
            
    def parseInteractivePort(self, arg):
        v = arg.strip()
        if v.isdigit():
            self.cfg[GmetadConfig.INTERACTIVE_PORT] = int(v)
        
    def parseServerThreads(self, arg):
        v = arg.strip()
        if v.isdigit():
            self.cfg[GmetadConfigSERVER_THREADS] = int(v)
        
    def parseRrdRootdir(self, arg):
        v = arg.strip().strip('"')
        if os.path.isdir(v):
            self.cfg[GmetadConfig] = v
        

def getConfig(args=sys.argv):
    if GmetadConfig._isInitialized:
        return GmetadConfig()
    dbgLevelDefault = GmetadConfig._cfgDefaults[GmetadConfig.DEBUG_LEVEL]
    iPortDefault = GmetadConfig._cfgDefaults[GmetadConfig.INTERACTIVE_PORT]
    xPortDefault = GmetadConfig._cfgDefaults[GmetadConfig.XML_PORT]
    parser = optparse.OptionParser()
    parser.add_option('-V', '--version', action='version',
            help='Print version and exit')
    parser.add_option('-d', '--debug', action='store',
            help='Debug level. If five (5) or greater, daemon will stay in 
foreground.  Values are:\n\
            0 - FATAL\n\
            1 - CRITICAL\n\
            2 - ERROR (default)\n\
            3 - WARNING\n\
            4 - INFO\n\
            5 - DEBUG',
            default='%d' % dbgLevelDefault)
    parser.add_option('-p', '--pid_file', action='store',
            help='Write process-id to file',
            default=None)
    parser.add_option('-c', '--conf', action='store',
            help='Location of gmetad configuration file 
(default=\'/etc/ganglia/gmetad.conf\')',
            default='/etc/ganglia/gmetad.conf')
    parser.add_option('-l', '--logfile', action='store',
            help='Log messages to this path in addition to syslog; overrides 
configuration',
            default=None)
    parser.add_option('-i', '--interactive_port', action='store',
            help='Interactive port to listen on (default=%d)' % iPortDefault,
            default='%d' % iPortDefault)
    parser.add_option('-x', '--xml_port', action='store',
            help='XML port to listen on (default=%d)' % xPortDefault,
            default='%d' % xPortDefault)
            
    options, arguments = parser.parse_args()
    
    if not options.debug.isdigit():
        print 'Invalid numeric value for --debug: %s' % options.debug
        parser.print_help()
        sys.exit()
    elif not options.interactive_port.isdigit():
        print 'Invalid numeric value for --interactive_port: %s' % 
options.interactive_port
        sys.exit()
    elif not options.xml_port.isdigit():
        print 'Invalid numeric value for --xml_port: %s' % options.xml_port
        sys.exit()
    elif not os.path.exists(options.conf):
        print 'No such configuration file: %s' % options.conf
        parser.print_help()
        sys.exit()
        
    cfg = GmetadConfig(options.conf)
    
    # Update configuration if non-default values were provided.
    if dbgLevelDefault != options.debug:
        cfg[GmetadConfig.DEBUG_LEVEL] = options.debug
    if iPortDefault != options.interactive_port:
        cfg[GmetadConfig.INTERACTIVE_PORT] = options.interactive_port
    if xPortDefault != options.xml_port:
        cfg[GmetadConfig.XML_PORT] = options.xml_port
    if options.logfile is not None:
        cfg[GmetadConfig.LOGFILE] = options.logfile
    if options.pid_file is not None:
        cfg[GmetadConfig.PIDFILE] = options.pid_file
        
    return cfg
    
#!/usr/bin/env python

import optparse
import os
import time
import signal
import socket
import xml.sax
import sys
from urlparse import urlsplit

class GmetadElement:
    def __init__(self, id):
        self.id = id
        self._data = {}
        self.child_elements = []
        
    def __getitem__(self, key):
        return self._data[key]
        
    def __setitem__(self, key, value):
        self._data[key] = value
        
    def __str__(self):
        buf = 'ID: %s\nAttrs:' % self.id
        for k, v in self._data.items():
            buf += ' %s=>%s' % (k,v)
        buf += '\n'
        for ce in self.child_elements:
            buf += str(ce)
        return buf
        

class GmetadXmlHandler(xml.sax.ContentHandler):
    def __init__(self):
        xml.sax.ContentHandler.__init__(self)
        self.elemList = []
        self._elemStack = []
        self._elemStackSize = 0
        
    def startElement(self, tag, attrs):
        newElem = GmetadElement(tag)
        for ak, av in attrs.items():
            newElem[ak] = av
        if self._elemStackSize:
            
self._elemStack[self._elemStackSize-1].child_elements.append(newElem)
        else:
            self.elemList.append(newElem)
        self._elemStack.append(newElem)
        self._elemStackSize += 1
        
    def endElement(self, tag):
        self._elemStack.pop()
        self._elemStackSize -= 1    

def urlCompare(u1, u2):
    url1 = urlsplit(u1)
    url2 = urlsplit(u2)
    if url1[0] != url2[0] or url1[2] != url2[2] or url1[3] != url2[3] or 
url1[4] != url2[4]:
        return False
    url1host = ''
    url1port = None
    url2host = ''
    url2port = None
    try:
        url1host, url1port = url1[1].split(':')
    except ValueError:
        pass
    try:
        url2host, urh2port = url2[1].split(':')
    except ValueError:
        pass
    if url1port != url2port:
        return False
    url1host_hostname = url1host
    url2host_hostname = url2host
    try:
        url1host_hostname, remnants = url1host.split('.',1)
    except ValueError:
        pass
    try:
        url2host_hostname, remnants = url2host.split('.',1)
    except ValueError:
        pass
    if url1host_hostname != url2host_hostname:
        return False
    return True
    
ignore_attr_values = ['LOCALTIME', 'TN', 'REPORTED']
        
def checkEquivalentXml(oldelem, newelem):
    global ignore_attr_values
    if oldelem.id != newelem.id:
        raise Exception, 'Element ids do not match (old=%s, new=%s)' % 
(oldelem.id, newelem.id)
    if len(oldelem._data) != len(newelem._data):
        raise Exception, 'Element attribute numbers do not match at node %s 
(old=%d, new=%d)' % (oldelem.id, len(oldelem._data), len(newelem._data))
    if len(oldelem.child_elements) != len(newelem.child_elements):
        raise Exception, 'Element children numbers do not match at node %s 
(old=%d, new=%d)' % (oldelem.id, len(oldelem.child_elements), 
len(newelem.child_elements))
    for k in oldelem._data.keys():
        if not newelem._data.has_key(k):
            raise Exception, 'Attribute "%s" not found in new XML' % k
        if oldelem[k] != newelem[k]:
            if k in ignore_attr_values:
                # Skip context-sensitive values
                continue
            if oldelem.id == 'METRIC' and k == 'VAL':
                # Skip metric values, since they are measured real-time and can 
change.
                continue
            if oldelem.id == 'GRID' and k == 'AUTHORITY':
                if urlCompare(oldelem[k], newelem[k]):
                    continue
            raise Exception, 'Value for attribute "%s" of tag %s does not match 
(old=%s, new=%s)' % (k, oldelem.id, oldelem[k], newelem[k])
    for k in newelem._data.keys():
        if not oldelem._data.has_key(k):
            raise Exception, 'Attribute "%s" not found in old XML' % k
        if oldelem[k] != newelem[k]:
            if k in ignore_attr_values:
                # Skip context-sensitive values
                continue
            if oldelem.id == 'METRIC' and k == 'VAL':
                # Skip metric values, since they are measured real-time and can 
change.
                continue
            if oldelem.id == 'GRID' and k == 'AUTHORITY':
                if urlCompare(oldelem[k], newelem[k]):
                    continue
            raise Exception, 'Value for attribute "%s" of tag %s does not match 
(old=%s, new=%s)' % (k, oldelem.id, oldelem[k], newelem[k])
    for oce in oldelem.child_elements:
        for nce in newelem.child_elements:
            if oce._data.has_key('NAME') and nce._data.has_key('NAME'):
                if oce['NAME'] == nce['NAME']:
                    checkEquivalentXml(oce, nce)
                    break
            else:
                checkEquivalentXml(oce, nce)
        
def get_socket(hspec):
    host, port = hspec.split(':')
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.connect((host, int(port)))
    return sock
    
def get_xml_from_socket(sock):
    xmlbuf = ''
    while True:
        buf = sock.recv(8192)
        if not buf:
            break
        xmlbuf += buf
    return xmlbuf
    
def compare_xml(oldXmlHandler, newXmlHandler):
    if len(oldXmlHandler.elemList) != len(newXmlHandler.elemList):
        raise Exception, 'Different number of base elements.'
    for oe in oldXmlHandler.elemList:
        for ne in newXmlHandler.elemList:
            if oe._data.has_key('NAME') and ne._data.has_key('NAME'):
                if oe['NAME'] == ne['NAME']:
                    checkEquivalentXml(oe, ne)
                    break
            else:
                checkEquivalentXml(oe, ne)
    
def run_xml_consistency_test(old, new):
    print 'Running XML Consistency Test.'
    print 'Old host is %s' % old
    print 'New host is %s' % new
    
    sock = get_socket(old)
    xmlbuf = get_xml_from_socket(sock)
    oldXmlHandler = GmetadXmlHandler()
    xml.sax.parseString(xmlbuf, oldXmlHandler)
    
    sock = get_socket(new)
    xmlbuf = get_xml_from_socket(sock)    
    newXmlHandler = GmetadXmlHandler()
    xml.sax.parseString(xmlbuf, newXmlHandler)
    
    compare_xml(oldXmlHandler, newXmlHandler)

def run_interactive_consistency_test(old, new):
    filters = ['\n', '/\n', '/Grid-Node\n', '/Grid-Node/localhost\n', 
'/Grid-Node/localhost/mem_free\n']
    for filter in filters:
        print 'Running interactive consistency test with filter "%s"' % 
filter.strip()
        print 'Old host is %s' % old
        print 'New host is %s' % new
        
        sock = get_socket(old)
        sock.send(filter)
        xmlbuf = get_xml_from_socket(sock)
        oldXmlHandler = GmetadXmlHandler()
        xml.sax.parseString(xmlbuf, oldXmlHandler)
        
        sock = get_socket(new)
        sock.send(filter)
        xmlbuf = get_xml_from_socket(sock)
        newXmlHandler = GmetadXmlHandler()
        xml.sax.parseString(xmlbuf, newXmlHandler)
        
        compare_xml(oldXmlHandler, newXmlHandler)
    

if __name__ == '__main__':
    p = optparse.OptionParser()
    p.add_option('-I', '--old_gmetad_interactive',  action='store',
            help='Location of old gmetad interactive port 
(default="localhost:8652")',
            default='localhost:8652')
    p.add_option('-i', '--new_gmetad_interactive', action='store',
            help='Location of new gmetad interactive port 
(default="localhost:8652")',
            default='localhost:8652')
    p.add_option('-X', '--old_gmetad_xml', action='store',
            help='Location of old gmetad xml port (default="localhost:8651")',
            default='localhost:8651')
    p.add_option('-x', '--new_gmetad_xml', action='store',
            help='Location of new gmetad xml port (default="localhost:8651")',
            default='localhost:8651')
    p.add_option('-s', '--server_path', action='store',
            help='Path to new gmetad script.  If not provided, it will not be 
started.',
            default=None)
    p.add_option('-c', '--conf', action='store',
            help='Path to gmetad configuration file 
(default="/etc/ganglia/gmetad.conf")',
            default='/etc/ganglia/gmetad.conf')
    p.add_option('-l', '--logfile', action='store',
            help='Path to gmetad log file, overrides configuration',
            default=None)
            
    options, args = p.parse_args()
    
    do_interactive_test=False
    do_xml_test=False
    if options.old_gmetad_interactive == options.new_gmetad_interactive:
        print 'Locations for old and new gmetad interative ports are the same.'
        print 'Skipping the interactive port consistency test.'
    else:
        do_interactive_test=True
    if options.old_gmetad_xml == options.new_gmetad_xml:
        print 'Locations for old and new gmetad xml ports are the same.'
        print 'Skipping the xml port consistency test.'
    else:
        do_xml_test=True
        
    if not do_interactive_test and not do_xml_test:
        sys.exit()
        
    gmetad_pidfile = None
    if options.server_path is not None:
        gmetad_pidfile = '/tmp/gmetad.pid'
        cmd = 'python %s -c %s -i %s -x %s -p %s' % (options.server_path,
                options.conf,
                options.new_gmetad_interactive.split(':')[1],
                options.new_gmetad_xml.split(':')[1],
                gmetad_pidfile)
        if options.logfile is not None:
            cmd += ' -l %s' % options.logfile
        os.system(cmd)
        time.sleep(1) # wait for it to come up
    
    try:
        if do_xml_test:
            run_xml_consistency_test(options.old_gmetad_xml, 
options.new_gmetad_xml)
        if do_interactive_test:
            run_interactive_consistency_test(options.old_gmetad_interactive, 
options.new_gmetad_interactive)
    finally:
        if gmetad_pidfile is not None:
            f = open(gmetad_pidfile, 'r')
            line = f.readline()
            pid = line.strip()
            os.kill(int(pid), signal.SIGTERM)
    print 'All tests passed.'

import os
import pwd
import sys
import resource

from gmetad_config import getConfig, GmetadConfig

def daemonize(ignore_fds=[]):
    UMASK=0
    WORKDIR = '/'
    MAXFD = 1024
    REDIRECT_TO = '/dev/null'
    if hasattr(os, 'devnull'):
        REDIRECT_TO = os.devnull
        
    cfg = getConfig()
    setuid_user = None
    if cfg[GmetadConfig.SETUID]:
        setuid_user = cfg[GmetadConfig.SETUID_USERNAME]
    if setuid_user is not None:
        try:
            os.setuid(pwd.getpwnam(setuid_user)[2])
        except Exception:
            print 'Unable to setuid to user "%s", exiting' % setuid_user
            sys.exit()
        
    try:
        pid = os.fork()
    except OSError, e:
        raise Exception, 'Daemonize error: %d (%s)' % (e.errno, e.strerror)
    if pid == 0:
        # first child
        os.setsid()
        try:
            pid = os.fork()
        except OSError, e:
            raise Exception, 'Daemonize error: %d (%s)' % (e.errno, e.strerror)
        if pid == 0:
            # second child
            os.chdir(WORKDIR)
            os.umask(UMASK)
        else:
            os._exit(0)
    else:
        os._exit(0)
        
        
    maxfd = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
    if resource.RLIM_INFINITY == maxfd:
        maxfd = MAXFD
    for fd in range(0,maxfd):
        if fd in ignore_fds: continue
        try:
            os.close(fd)
        except OSError:
            pass
            
    os.open(REDIRECT_TO, os.O_RDWR)
    os.dup2(0,1)
    os.dup2(0,2)
    
import thread
import logging

class DataStore:
    _shared_state = {}
    _initialized = False
    lock = thread.allocate_lock()
    
    def __init__(self):
        self.__dict__ = DataStore._shared_state
        if not DataStore._initialized:
            self.lock = thread.allocate_lock()
            self.rootElement = None
            DataStore._initialized = True
            
    def setNode(self, node, parent=None):
        if parent is None:
            if self.rootElement is None:
                self.rootElement = node
                self.rootElement.source = 'gmetad'
            return self.rootElement
        parent[str(node)] = node
        return parent[str(node)]
        
    def getNode(self, ancestry):
        node = None
        while ancestry:
            nodeId = ancestry.pop(0)
            if node is None:
                if nodeId == str(self.rootElement):
                    node = self.rootElement
                else: return None
            else:
                try:
                    return node[nodeId]
                except KeyError:
                    pass
        return node
        
class Element:
    def generateKey(vals):
        if isinstance(vals,list):
            return ':'.join(vals)
        return vals
    generateKey = staticmethod(generateKey)
    
    def __init__(self, id, attrs):
        self.id = id
        for k,v in attrs.items():
            self.__dict__[k.lower()] = v
        self.children = {}
        
    def __setitem__(self, k, v):
        try:
            self.children[k].update(v)
        except KeyError:
            self.children[k] = v
            
    def __getitem__(self, k):
        return self.children[k]
        
    def update(self, elem):
        for k in self.__dict__.keys():
            if k == 'children' or k == 'id' or k == 'name':
                continue
            try:
                self.__dict__[k] = elem.__dict__[k]
            except ValueError:
                pass
        
    def __str__(self):
        if self.__dict__.has_key('name'):
            return Element.generateKey([self.id,self.name])
        return Element.generateKey(self.id)
import threading
import xml.sax
import socket
import time
import logging

from gmetad_config import GmetadConfig
from gmetad_random import getRandomInterval
from gmetad_data import DataStore, Element

class GmondContentHandler(xml.sax.ContentHandler):
    def __init__(self):
        xml.sax.ContentHandler.__init__(self)
        self._elemStack = []
        self._elemStackLen = 0
        
    def startElement(self, tag, attrs):
        ds = DataStore()
        e = Element(tag, attrs)
        if 'GANGLIA_XML' == tag:
            ds.lock.acquire()
            self._elemStack.append(ds.setNode(e))
            self._elemStackLen += 1
            cfg = GmetadConfig()
            e = Element('GRID', {'NAME':cfg[GmetadConfig.GRIDNAME], 
'AUTHORITY':cfg[GmetadConfig.AUTHORITY], 'LOCALTIME':'%d' % time.time()})
        self._elemStack.append(ds.setNode(e, 
self._elemStack[self._elemStackLen-1]))
        self._elemStackLen += 1
        
    def endElement(self, tag):
        if tag == 'GANGLIA_XML':
            DataStore().lock.release()
        self._elemStack.pop()
        self._elemStackLen -= 1

class GmondReader(threading.Thread):
    def __init__(self,dataSource,name=None,target=None,args=(),kwargs={}):
        threading.Thread.__init__(self,name,target,args,kwargs)
        self._cond = threading.Condition()
        self._shuttingDown = False
        self.dataSource = dataSource
        self.lastKnownGoodHost = 0
        logging.debug('Reader created for cluster %s' % self.dataSource.name)
        
    def _getEndpoint(self, hostspec, port=8649):
        hostinfo = hostspec.split(':')
        if len(hostinfo) > 1:
            port = int(hostinfo[1])
        return (hostinfo[0], port)
        
    def run(self):
        while not self._shuttingDown:
            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            try:
                sock.connect( 
self._getEndpoint(self.dataSource.hosts[self.lastKnownGoodHost]) )
            except socket.error:
                curidx = self.lastKnownGoodHost
                connected=False
                while True:
                    curidx += 1
                    if curidx >= len(self.dataSource.hosts):
                        curidx = 0
                    if curidx == self.lastKnownGoodHost: break
                    try:
                        sock.connect( 
self._getEndpoint(self.dataSource.hosts[curidx]) )
                        self.lastKnownGoodHost = curidx
                        connected=True
                        break
                    except socket.error:
                        pass
                if not connected:
                    logging.error('Could not connect to any host for data 
source %s' % self.dataSource.name)
                    return
            logging.info('Quering data source %s via host %s' % 
(self.dataSource.name, self.dataSource.hosts[self.lastKnownGoodHost]))
            xmlbuf = ''
            while True:
                buf = sock.recv(8192)
                if not buf:
                    break
                xmlbuf += buf
            sock.close()
            if self._shuttingDown:
                break
            xml.sax.parseString(xmlbuf, GmondContentHandler())
            self._cond.acquire()
            self._cond.wait(getRandomInterval(self.dataSource.interval))
            self._cond.release()        
            
    def shutdown(self):
        self._shuttingDown = True
        self._cond.acquire()
        self._cond.notifyAll()
        self._cond.release()
        self.join()
from random import randrange

def getRandomInterval(midpoint, range=5):
    return randrange(max(midpoint-range,0), midpoint+range)
import thread
import time

from gmetad_config import GmetadConfig
from gmetad_data import DataStore, Element

class XmlWriter:
    _xml_starttag = '<?xml version="1.0" encoding="ISO-8859-1" 
standalone="yes"?>'
    _xml_dtd = '''<!DOCTYPE GANGLIA_XML [
   <!ELEMENT GANGLIA_XML (GRID|CLUSTER|HOST)*>
      <!ATTLIST GANGLIA_XML VERSION CDATA #REQUIRED>
      <!ATTLIST GANGLIA_XML SOURCE CDATA #REQUIRED>
   <!ELEMENT GRID (CLUSTER | GRID | HOSTS | METRICS)*>
      <!ATTLIST GRID NAME CDATA #REQUIRED>
      <!ATTLIST GRID AUTHORITY CDATA #REQUIRED>
      <!ATTLIST GRID LOCALTIME CDATA #IMPLIED>
   <!ELEMENT CLUSTER (HOST | HOSTS | METRICS)*>
      <!ATTLIST CLUSTER NAME CDATA #REQUIRED>
      <!ATTLIST CLUSTER OWNER CDATA #IMPLIED>
      <!ATTLIST CLUSTER LATLONG CDATA #IMPLIED>
      <!ATTLIST CLUSTER URL CDATA #IMPLIED>
      <!ATTLIST CLUSTER LOCALTIME CDATA #REQUIRED>
   <!ELEMENT HOST (METRIC)*>
      <!ATTLIST HOST NAME CDATA #REQUIRED>
      <!ATTLIST HOST IP CDATA #REQUIRED>
      <!ATTLIST HOST LOCATION CDATA #IMPLIED>
      <!ATTLIST HOST REPORTED CDATA #REQUIRED>
      <!ATTLIST HOST TN CDATA #IMPLIED>
      <!ATTLIST HOST TMAX CDATA #IMPLIED>
      <!ATTLIST HOST DMAX CDATA #IMPLIED>
      <!ATTLIST HOST GMOND_STARTED CDATA #IMPLIED>
   <!ELEMENT METRIC (EXTRA_DATA*)>
      <!ATTLIST METRIC NAME CDATA #REQUIRED>
      <!ATTLIST METRIC VAL CDATA #REQUIRED>
      <!ATTLIST METRIC TYPE (string | int8 | uint8 | int16 | uint16 | int32 | 
uint32 | float | double | timestamp) #REQUIRED>
      <!ATTLIST METRIC UNITS CDATA #IMPLIED>
      <!ATTLIST METRIC TN CDATA #IMPLIED>
      <!ATTLIST METRIC TMAX CDATA #IMPLIED>
      <!ATTLIST METRIC DMAX CDATA #IMPLIED>
      <!ATTLIST METRIC SLOPE (zero | positive | negative | both | unspecified) 
#IMPLIED>
      <!ATTLIST METRIC SOURCE (gmond | gmetric) #REQUIRED>
   <!ELEMENT EXTRA_DATA (EXTRA_ELEMENT*)>
   <!ELEMENT EXTRA_ELEMENT EMPTY>
      <!ATTLIST METRIC NAME CDATA #REQUIRED>
      <!ATTLIST METRIC VAL CDATA #REQUIRED>
   <!ELEMENT HOSTS EMPTY>
      <!ATTLIST HOSTS UP CDATA #REQUIRED>
      <!ATTLIST HOSTS DOWN CDATA #REQUIRED>
      <!ATTLIST HOSTS SOURCE (gmond | gmetric | gmetad) #REQUIRED>
   <!ELEMENT METRICS EMPTY>
      <!ATTLIST METRICS NAME CDATA #REQUIRED>
      <!ATTLIST METRICS SUM CDATA #REQUIRED>
      <!ATTLIST METRICS NUM CDATA #REQUIRED>
      <!ATTLIST METRICS TYPE (string | int8 | uint8 | int16 | uint16 | int32 | 
uint32 | float | double | timestamp) #REQUIRED>
      <!ATTLIST METRICS UNITS CDATA #IMPLIED>
      <!ATTLIST METRICS SLOPE (zero | positive | negative | both | unspecified) 
#IMPLIED>
      <!ATTLIST METRICS SOURCE (gmond | gmetric) #REQUIRED>
]>'''
    _pcid_map = {'GANGLIA_XML' : 'GRID',
            'GRID' : 'CLUSTER',
            'CLUSTER' : 'HOST',
            'HOST' : 'METRIC'
    }
    
    def __init__(self):
        cfg = GmetadConfig()
        self.gridname = cfg[GmetadConfig.GRIDNAME]
        self.authority = cfg[GmetadConfig.AUTHORITY]
        self.localtime = time.time()
        
    def _getXmlImpl(self, element, filterList=None):
        rbuf = '<%s' % element.id
        foundName = False
        try:
            rbuf += ' NAME="%s"' % element.name
            foundName = True
        except AttributeError:
            pass
        for k,v in element.__dict__.items():
            if k == 'id' or k == 'children' or (foundName and k == 'name'):
                continue
            rbuf += ' %s="%s"' % (k.upper(), v)
        if 0 < len(element.children):
            rbuf += '>\n'
            showAllChildren = True
            if filterList is not None and len(filterList):
                try:
                    key = Element.generateKey([self._pcid_map[element.id], 
filterList[0]])
                    rbuf += self._getXmlImpl(element.children[key], 
filterList[1:])
                    showAllChildren = False
                except KeyError:
                    pass
            if showAllChildren:
                for c in element.children.values():
                    rbuf += self._getXmlImpl(c, filterList)
            rbuf += '</%s>\n' % element.id
        else:
            rbuf += ' />\n'
        return rbuf
            
    def getXml(self, filter=None):
        if filter is None:
            filterList = None
        elif not len(filter.strip()):
            filterList = None
        else:
            filterList = filter.split('/')
        return '%s\n%s\n%s' % (self._xml_starttag, self._xml_dtd, 
self._getXmlImpl(DataStore().rootElement, filterList))
Thu, Apr 17 2008 17:04:00 - Matt Ryan (mryan at novell dot com) - Initial 
Version
This represents the initial version of the rewritten gmetad in python.
Files are:
- gmetad_config.py
- gmetad_daemon.py
- gmetad_data.py
- gmetad_gmondReader.py
- gmetad.py
- gmetad_random.py
- gmetad_xmlWriter.py
- gmetad_consistency_test.py (test script)

This version should function generally as a gmetad replacement for reading 
gmond data and providing aggregate data on XML ports only.
Advertised capabilities of this version include:
- Support of current gmetad configuration files - should "just work" by 
pointing it directly to an existing gmetad.conf file.
- Most current gmetad command-line options, plus a few others.
- Six log levels as per the python "logging" module.  Two of these - FATAL and 
CRITICAL - are functionally equivalent but indicated separately in the 
command-line parameters.  The default logging or debug level is 2, meaning that 
all messages ERROR level and higher will be logged.  A specified debug level of 
5 or higher means all debug messages are shown and that the application stays 
in the foreground.
- Logging is done to syslog, optionally by appending to a log file in addition 
to syslog.  If the debug level is 5 or higher, logging is also done to the 
console.
- At a logging level of 4 or below, the application will go through the 
daemonizing process (setuid to another user, fork twice, set session id, change 
working dir to "/" and set umask to 0, close all open file descriptors except 
the logging file descriptors, and redirect all other I/O to /dev/null.
- Writes a PID file that can be used for graceful shutdowns later (kill -TERM).
- Creates a reader thread for every data source specified in the configuration. 
 All data is stored internally in a single data structure which is updated on 
every read.
- Readers honor the polling interval specified in the data source 
configuration, but vary the interval by +- 5 seconds.
- Listens on the XML and Interactive ports.  XML port is a dump of all stored 
data in a single XML file.  Interactive waits for input from the user then 
dumps the XML.
- Interactive port supports basic pathing filters, such as 
/Gridname/Clustername/Hostname/Metricname.
- Interactive port ignores filters that don't match the corresponding level in 
the XML - in other words, all data at that level will be sent, and no filtering 
applied, if the specified filter doesn't match any of the tags at that level.
- Shuts down gracefully at SIGTERM (generally this is true, unless an exception 
occurs - and even then, it usually works...).

Certainly there are hundreds of features not in this version, but of the more 
obvious:
- The filter querystring on the interactive port is not currently supported, 
i.e. "/Gridname?filter=summary".
- No current support for summary data.
- No current support for any type of data handling or output modules or plugins 
of any kind.
- No writing of RRD files or any other type of permanent data storage.
- The "scalable" configuration attribute is not currently supported.
- The "trusted_hosts" configuration attribute is not currently supported.
- The "all_trusted" configuration attribute is not currently supported.
- The "server_threads" configuration attribute is not currently supported.

In addition, we don't currently have any data on the following:
- Comparative data to compare disk and runtime footprint to current gmetad.
- Throughput and performance data as compared to current gmetad.
- Stability data - memory usage over time, ability to continue running for long 
periods of time, etc.
- Correctness testing - honoring of all configuration parameters and 
command-line parameters, generation of correct data, etc.
- Cross-platform capabilities.
 
-------------------------------------------------------------------------
This SF.net email is sponsored by the 2008 JavaOne(SM) Conference 
Don't miss this year's exciting event. There's still time to save $100. 
Use priority code J8TL2D2. 
http://ad.doubleclick.net/clk;198757673;13503038;p?http://java.sun.com/javaone
_______________________________________________
Ganglia-developers mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/ganglia-developers

Reply via email to