Hi,
Attached is a first stab at a python version of gmetad. Brad asked me
to try rewriting gmetad in python in order to hopefully support future
extensibility via python modules.
I've included a CHANGELOG file that outlines basically what this python
gmetad should do and what it doesn't claim to do. It is important to
emphasize here: This is NOT yet a functional replacement for the
current gmetad! Basically, what it WILL do is honor the current gmetad
configuration file (at least the parts it can understand, the others are
ignored), read gmond data, and offer the aggregated data over the XML
(8651) and interactive (8652) ports.
Some testing has been done to compare outputs from the current gmetad
and this python gmetad. They appear to produce functionally equivalent
XML documents, although this hasn't been extensively tested.
What I'm Planning To Do:
The next thing I'm planning to work on is the summary capability on the
interactive port. When that is done we should be able to test this new
gmetad by running it side-by-side with the current gmetad, letting the
current gmetad generate the RRD files, and the new one respond on 8652
to web front end queries. In other words, see if the web front end will
still work with the python gmetad in there.
After that I'll probably start working on an RRD module. The goal is to
drive first toward something that has the same functionality as the
existing gmetad, then start looking at ways to enhance.
I've also got a test engineer on my team who thinks he's going to have
some free cycles, so hopefully he will be able to create and submit some
more test scripts.
Of course, the other thing I have to do is fix the bugs that all of you
are going to file on it. :)
What I'm Asking You To Do:
- If this is of interest to the Ganglia project at all, I'd sure like to
have it checked in if possible. Probably a discussion needs to take
place as to where it should go. I don't think it should replace the
current gmetad, so putting it in that directory seems wrong IMO. Maybe
a new directory under monitor-core called gmetad-python or something?
Anyway, if we can resolve where to put it, and if the project wants it,
then it would be great to get it checked in, so I don't lose the work.
- Take a look at it and try it out in a safe place. Poke around at it
and find bugs.
- Anything else that comes to mind.
Installing this is easy. Just put all the files in the same directory,
then run "python ./gmetad.py". You can add a --help at the end to show
usage information.
Please let me know what you think.
--
Matt Ryan
Sr. Software Engineer
Novell, Inc.
#!/usr/bin/env python
import sys
import socket
import os
import asyncore
import logging
import logging.handlers
from gmetad_dbgprint import dbgPrint
from gmetad_gmondReader import GmondReader
from gmetad_xmlWriter import XmlWriter
from gmetad_config import getConfig, GmetadConfig
from gmetad_socketListener import GmetadListenSocket
from gmetad_daemon import daemonize
class GmetadListenSocket(asyncore.dispatcher):
def __init__(self):
asyncore.dispatcher.__init__(self)
def open(self, port, interface=''):
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
self.set_reuse_addr()
host = 'localhost'
if 0 < len(interface.strip()):
host = interface
logging.info('Opening query interface on %s:%d' % (host, port))
self.bind((interface,port))
self.listen(5)
class XmlSocket(GmetadListenSocket):
def handle_accept(self):
newsock, addr = self.accept()
logging.debug('Replying to XML dump query from %s' % addr[0])
writer = XmlWriter()
newsock.sendall(writer.getXml())
newsock.close()
class InteractiveSocket(GmetadListenSocket):
def handle_accept(self):
newsock, addr = self.accept()
logging.debug('Replying to interactive query from %s' % addr[0])
InteractiveConnectionHandler(newsock)
class InteractiveConnectionHandler(asyncore.dispatcher_with_send):
def __init__(self, sock):
asyncore.dispatcher_with_send.__init__(self, sock)
self.buffer = ''
self.amt_to_write = 0
def writable(self):
return self.amt_to_write
def handle_read(self):
rbuf = self.recv(1024)
if rbuf:
rbuf = rbuf.strip().strip('/')
if 0 == len(rbuf):
rbuf = None
writer = XmlWriter()
self.buffer = writer.getXml(rbuf)
self.amt_to_write = len(self.buffer)
def handle_write(self):
sent = self.socket.send(self.buffer)
self.buffer = self.buffer[sent:]
self.amt_to_write -= sent
if not self.amt_to_write:
self.close()
def getLoggingLevel(lspec):
levelMap = {0:logging.FATAL,
1:logging.CRITICAL,
2:logging.ERROR,
3:logging.WARNING,
4:logging.INFO,
5:logging.DEBUG}
try:
return levelMap[lspec]
except KeyError:
if lspec < 0: return logging.FATAL
return logging.DEBUG
if __name__ == '__main__':
gmetadConfig = getConfig()
ignore_fds = [] # Remembers log file descriptors we create, so they aren't
closed when we daemonize.
logging.basicConfig(level=getLoggingLevel(gmetadConfig[GmetadConfig.DEBUG_LEVEL]),
format='%(levelname)-8s %(message)s')
syslogHandler = logging.handlers.SysLogHandler('/dev/log')
syslogHandler.setLevel(getLoggingLevel(gmetadConfig[GmetadConfig.DEBUG_LEVEL]))
syslogHandler.setFormatter(logging.Formatter(fmt='%(asctime)s
%(levelname)-8s - GMETAD - %(message)s',
datefmt='%a, %d %b %Y %H:%M:%S'))
ignore_fds.append(syslogHandler.socket.fileno())
logging.getLogger().addHandler(syslogHandler)
if gmetadConfig[GmetadConfig.LOGFILE] is not None:
fileHandler =
logging.FileHandler(gmetadConfig[GmetadConfig.LOGFILE],'a')
fileHandler.setLevel(getLoggingLevel(gmetadConfig[GmetadConfig.DEBUG_LEVEL]))
fileHandler.setFormatter(logging.Formatter(fmt='%(asctime)s
%(levelname)-8s %(message)s',
datefmt='%a, %d %b %Y %H:%M:%S'))
ignore_fds.append(fileHandler.stream.fileno())
logging.getLogger().addHandler(fileHandler)
if 5 > int(gmetadConfig[GmetadConfig.DEBUG_LEVEL]):
daemonize(ignore_fds)
logging.info('Gmetad application started.')
pffd = None
if gmetadConfig[GmetadConfig.PIDFILE] is not None:
try:
pffd = open(gmetadConfig[GmetadConfig.PIDFILE], 'w')
pffd.write('%d\n' % os.getpid())
logging.debug('Wrote pid %d to pidfile %s' % (os.getpid(),
gmetadConfig[GmetadConfig.PIDFILE]))
pffd.close()
pffd = open(gmetadConfig[GmetadConfig.PIDFILE], 'r')
except Exception, e:
logger.error('Unable to write PID %d to %s (%s)' % (os.getpid(),
gmetadConfig[GmetadConfig.PIDFILE], e))
sys.exit()
# load modules here, when we support modules.
readers = []
xmlSocket = XmlSocket()
interactiveSocket = InteractiveSocket()
try:
try:
for ds in gmetadConfig[GmetadConfig.DATA_SOURCE]:
readers.append(GmondReader(ds))
gr = readers[len(readers)-1]
gr.start()
xmlSocket.open(port=int(gmetadConfig[GmetadConfig.XML_PORT]))
interactiveSocket.open(port=int(gmetadConfig[GmetadConfig.INTERACTIVE_PORT]))
asyncore.loop()
except KeyboardInterrupt:
logging.info('Shutting down...')
except Exception, e:
logging.error('Caught exception: %s' % e)
raise
finally:
logging.debug('Shutting down all data source readers...')
for gr in readers:
gr.shutdown()
logging.debug('Closing all query ports...')
xmlSocket.close()
interactiveSocket.close()
if pffd is not None:
pffd.close()
os.unlink(gmetadConfig[GmetadConfig.PIDFILE])
from socket import getfqdn
import os
import sys
import optparse
class GmetadRRA:
def __init__(self, args):
self.args = args
class GmetadDataSource:
def __init__(self, name, hosts=['localhost'], interval=15):
self.name = name
self.hosts = hosts
self.interval = interval
self.time_to_next_read = 0
class GmetadConfig:
_shared_state = {}
_isInitialized = False
DEBUG_LEVEL = 'debug_level'
LOGFILE = 'logfile'
PIDFILE = 'pidfile'
DATA_SOURCE = 'data_source'
RRAS = 'RRAs'
SCALABLE = 'scalable'
GRIDNAME = 'gridname'
AUTHORITY = 'authority'
TRUSTED_HOSTS = 'trusted_hosts'
ALL_TRUSTED = 'all_trusted'
SETUID = 'setuid'
SETUID_USERNAME = 'setuid_username'
XML_PORT = 'xml_port'
INTERACTIVE_PORT = 'interactive_port'
SERVER_THREADS = 'server_threads'
RRD_ROOTDIR = 'rrd_rootdir'
_cfgDefaults = {
DEBUG_LEVEL : 2,
LOGFILE : None,
PIDFILE : None,
DATA_SOURCE : [],
RRAS : [
GmetadRRA('AVERAGE:0.5:1:244'),
GmetadRRA('AVERAGE:0.5:24:244'),
GmetadRRA('AVERAGE:0.5:168:244'),
GmetadRRA('AVERAGE:0.5:672:244'),
GmetadRRA('AVERAGE:0.5:5760:374')
],
SCALABLE : True,
GRIDNAME : None,
AUTHORITY : 'http://%s/ganglia/' % getfqdn(),
TRUSTED_HOSTS : [],
ALL_TRUSTED : False,
SETUID : True,
SETUID_USERNAME : 'nobody',
XML_PORT : 8651,
INTERACTIVE_PORT : 8652,
SERVER_THREADS : 4,
RRD_ROOTDIR : '/var/lib/ganglia/rrds'
}
def __init__(self, cfgpath=None):
self.__dict__ = GmetadConfig._shared_state
if cfgpath is not None:
self.path = cfgpath
self.resetToDefaults()
self.kwHandlers = {
GmetadConfig.DEBUG_LEVEL : self.parseDbgLevel,
GmetadConfig.LOGFILE : self.parseLogfile,
GmetadConfig.DATA_SOURCE : self.parseDataSource,
GmetadConfig.RRAS : self.parseRRAs,
GmetadConfig.SCALABLE : self.parseScalable,
GmetadConfig.GRIDNAME : self.parseGridname,
GmetadConfig.AUTHORITY : self.parseAuthority,
GmetadConfig.TRUSTED_HOSTS : self.parseTrustedHosts,
GmetadConfig.ALL_TRUSTED : self.parseAllTrusted,
GmetadConfig.SETUID : self.parseSetuid,
GmetadConfig.SETUID_USERNAME : self.parseSetuidUsername,
GmetadConfig.XML_PORT : self.parseXmlPort,
GmetadConfig.INTERACTIVE_PORT : self.parseInteractivePort,
GmetadConfig.SERVER_THREADS : self.parseServerThreads,
GmetadConfig.RRD_ROOTDIR : self.parseRrdRootdir
}
self.updateConfig()
GmetadConfig._isInitialized = True
def updateConfig(self):
f = open(self.path, 'r')
prev_line = None
for line in f.readlines():
if line.startswith('#'): continue
if 0 >= len(line.strip()): continue
if line.strip().endswith('\\'):
if prev_line is None:
prev_line = line.strip().strip('\\')
else:
prev_line += line.strip().strip('\\')
continue
elif prev_line is not None:
prev_line += line.strip()
line = prev_line
prev_line = None
kw, args = line.strip().split(None,1)
if self.kwHandlers.has_key(kw):
self.kwHandlers[kw](args)
def __setitem__(self, k, v):
self.cfg[k] = v
def __getitem__(self, k):
return self.cfg[k]
def resetToDefaults(self):
self.cfg = GmetadConfig._cfgDefaults
def parseDbgLevel(self, level):
v = level.strip()
if v.isdigit():
self.cfg[GmetadConfig.DEBUG_LEVEL] = v
def parseLogfile(self, logfile):
self.cfg[GgmetadConfig.LOGFILE] = logfile.strip().strip('"')
def parseDataSource(self, args):
a = args.split()
name = a[0].strip().strip('"')
interval = 15
if a[1].strip().isdigit():
interval = int(a[1].strip())
hosts = a[2:]
else:
hosts = a[1:]
self.cfg[GmetadConfig.DATA_SOURCE].append(GmetadDataSource(name, hosts,
interval))
def parseRRAs(self, args):
self.cfg[GmetadConfig.RRAS] = []
for rraspec in args:
self.cfg[GmetadConfig.RRAS].append(GmetadRRA(rraspec.strip().strip('"').split(':',1)[1]))
def parseScalable(self, arg):
v = arg.strip().lower()
if v == 'off' or v == 'false' or v == 'no':
self.cfg[GmetadConfig.SCALABLE] = False
else:
self.cfg[GmetadConfig.SCALABLE] = True
def parseGridname(self, arg):
self.cfg[GmetadConfig.GRIDNAME] = arg.strip().strip('"')
def parseAuthority(self, arg):
self.cfg[GmetadConfig.AUTHORITY] = arg.strip().strip('"')
def parseTrustedHosts(self, args):
if len(args):
self.cfg[GmetadConfig.TRUSTED_HOSTS] = args
def parseAllTrusted(self, arg):
v = arg.strip().lower()
if v == 'on' or v == 'true' or v == 'yes':
self.cfg[GmetadConfig.ALL_TRUSTED] = True
else:
self.cfg[GmetadConfig.ALL_TRUSTED] = False
def parseSetuid(self, arg):
v = arg.strip().lower()
if v == 'off' or v == 'false' or v == 'no':
self.cfg[GmetadConfig.SETUID] = False
else:
self.cfg[GmetadConfig.SETUID] = True
def parseSetuidUsername(self, arg):
self.cfg[GmetadConfig.SETUID_USERNAME] = arg.strip().strip('"')
def parseXmlPort(self, arg):
v = arg.strip()
if v.isdigit():
self.cfg[GmetadConfig.XML_PORT] = int(v)
def parseInteractivePort(self, arg):
v = arg.strip()
if v.isdigit():
self.cfg[GmetadConfig.INTERACTIVE_PORT] = int(v)
def parseServerThreads(self, arg):
v = arg.strip()
if v.isdigit():
self.cfg[GmetadConfigSERVER_THREADS] = int(v)
def parseRrdRootdir(self, arg):
v = arg.strip().strip('"')
if os.path.isdir(v):
self.cfg[GmetadConfig] = v
def getConfig(args=sys.argv):
if GmetadConfig._isInitialized:
return GmetadConfig()
dbgLevelDefault = GmetadConfig._cfgDefaults[GmetadConfig.DEBUG_LEVEL]
iPortDefault = GmetadConfig._cfgDefaults[GmetadConfig.INTERACTIVE_PORT]
xPortDefault = GmetadConfig._cfgDefaults[GmetadConfig.XML_PORT]
parser = optparse.OptionParser()
parser.add_option('-V', '--version', action='version',
help='Print version and exit')
parser.add_option('-d', '--debug', action='store',
help='Debug level. If five (5) or greater, daemon will stay in
foreground. Values are:\n\
0 - FATAL\n\
1 - CRITICAL\n\
2 - ERROR (default)\n\
3 - WARNING\n\
4 - INFO\n\
5 - DEBUG',
default='%d' % dbgLevelDefault)
parser.add_option('-p', '--pid_file', action='store',
help='Write process-id to file',
default=None)
parser.add_option('-c', '--conf', action='store',
help='Location of gmetad configuration file
(default=\'/etc/ganglia/gmetad.conf\')',
default='/etc/ganglia/gmetad.conf')
parser.add_option('-l', '--logfile', action='store',
help='Log messages to this path in addition to syslog; overrides
configuration',
default=None)
parser.add_option('-i', '--interactive_port', action='store',
help='Interactive port to listen on (default=%d)' % iPortDefault,
default='%d' % iPortDefault)
parser.add_option('-x', '--xml_port', action='store',
help='XML port to listen on (default=%d)' % xPortDefault,
default='%d' % xPortDefault)
options, arguments = parser.parse_args()
if not options.debug.isdigit():
print 'Invalid numeric value for --debug: %s' % options.debug
parser.print_help()
sys.exit()
elif not options.interactive_port.isdigit():
print 'Invalid numeric value for --interactive_port: %s' %
options.interactive_port
sys.exit()
elif not options.xml_port.isdigit():
print 'Invalid numeric value for --xml_port: %s' % options.xml_port
sys.exit()
elif not os.path.exists(options.conf):
print 'No such configuration file: %s' % options.conf
parser.print_help()
sys.exit()
cfg = GmetadConfig(options.conf)
# Update configuration if non-default values were provided.
if dbgLevelDefault != options.debug:
cfg[GmetadConfig.DEBUG_LEVEL] = options.debug
if iPortDefault != options.interactive_port:
cfg[GmetadConfig.INTERACTIVE_PORT] = options.interactive_port
if xPortDefault != options.xml_port:
cfg[GmetadConfig.XML_PORT] = options.xml_port
if options.logfile is not None:
cfg[GmetadConfig.LOGFILE] = options.logfile
if options.pid_file is not None:
cfg[GmetadConfig.PIDFILE] = options.pid_file
return cfg
#!/usr/bin/env python
import optparse
import os
import time
import signal
import socket
import xml.sax
import sys
from urlparse import urlsplit
class GmetadElement:
def __init__(self, id):
self.id = id
self._data = {}
self.child_elements = []
def __getitem__(self, key):
return self._data[key]
def __setitem__(self, key, value):
self._data[key] = value
def __str__(self):
buf = 'ID: %s\nAttrs:' % self.id
for k, v in self._data.items():
buf += ' %s=>%s' % (k,v)
buf += '\n'
for ce in self.child_elements:
buf += str(ce)
return buf
class GmetadXmlHandler(xml.sax.ContentHandler):
def __init__(self):
xml.sax.ContentHandler.__init__(self)
self.elemList = []
self._elemStack = []
self._elemStackSize = 0
def startElement(self, tag, attrs):
newElem = GmetadElement(tag)
for ak, av in attrs.items():
newElem[ak] = av
if self._elemStackSize:
self._elemStack[self._elemStackSize-1].child_elements.append(newElem)
else:
self.elemList.append(newElem)
self._elemStack.append(newElem)
self._elemStackSize += 1
def endElement(self, tag):
self._elemStack.pop()
self._elemStackSize -= 1
def urlCompare(u1, u2):
url1 = urlsplit(u1)
url2 = urlsplit(u2)
if url1[0] != url2[0] or url1[2] != url2[2] or url1[3] != url2[3] or
url1[4] != url2[4]:
return False
url1host = ''
url1port = None
url2host = ''
url2port = None
try:
url1host, url1port = url1[1].split(':')
except ValueError:
pass
try:
url2host, urh2port = url2[1].split(':')
except ValueError:
pass
if url1port != url2port:
return False
url1host_hostname = url1host
url2host_hostname = url2host
try:
url1host_hostname, remnants = url1host.split('.',1)
except ValueError:
pass
try:
url2host_hostname, remnants = url2host.split('.',1)
except ValueError:
pass
if url1host_hostname != url2host_hostname:
return False
return True
ignore_attr_values = ['LOCALTIME', 'TN', 'REPORTED']
def checkEquivalentXml(oldelem, newelem):
global ignore_attr_values
if oldelem.id != newelem.id:
raise Exception, 'Element ids do not match (old=%s, new=%s)' %
(oldelem.id, newelem.id)
if len(oldelem._data) != len(newelem._data):
raise Exception, 'Element attribute numbers do not match at node %s
(old=%d, new=%d)' % (oldelem.id, len(oldelem._data), len(newelem._data))
if len(oldelem.child_elements) != len(newelem.child_elements):
raise Exception, 'Element children numbers do not match at node %s
(old=%d, new=%d)' % (oldelem.id, len(oldelem.child_elements),
len(newelem.child_elements))
for k in oldelem._data.keys():
if not newelem._data.has_key(k):
raise Exception, 'Attribute "%s" not found in new XML' % k
if oldelem[k] != newelem[k]:
if k in ignore_attr_values:
# Skip context-sensitive values
continue
if oldelem.id == 'METRIC' and k == 'VAL':
# Skip metric values, since they are measured real-time and can
change.
continue
if oldelem.id == 'GRID' and k == 'AUTHORITY':
if urlCompare(oldelem[k], newelem[k]):
continue
raise Exception, 'Value for attribute "%s" of tag %s does not match
(old=%s, new=%s)' % (k, oldelem.id, oldelem[k], newelem[k])
for k in newelem._data.keys():
if not oldelem._data.has_key(k):
raise Exception, 'Attribute "%s" not found in old XML' % k
if oldelem[k] != newelem[k]:
if k in ignore_attr_values:
# Skip context-sensitive values
continue
if oldelem.id == 'METRIC' and k == 'VAL':
# Skip metric values, since they are measured real-time and can
change.
continue
if oldelem.id == 'GRID' and k == 'AUTHORITY':
if urlCompare(oldelem[k], newelem[k]):
continue
raise Exception, 'Value for attribute "%s" of tag %s does not match
(old=%s, new=%s)' % (k, oldelem.id, oldelem[k], newelem[k])
for oce in oldelem.child_elements:
for nce in newelem.child_elements:
if oce._data.has_key('NAME') and nce._data.has_key('NAME'):
if oce['NAME'] == nce['NAME']:
checkEquivalentXml(oce, nce)
break
else:
checkEquivalentXml(oce, nce)
def get_socket(hspec):
host, port = hspec.split(':')
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((host, int(port)))
return sock
def get_xml_from_socket(sock):
xmlbuf = ''
while True:
buf = sock.recv(8192)
if not buf:
break
xmlbuf += buf
return xmlbuf
def compare_xml(oldXmlHandler, newXmlHandler):
if len(oldXmlHandler.elemList) != len(newXmlHandler.elemList):
raise Exception, 'Different number of base elements.'
for oe in oldXmlHandler.elemList:
for ne in newXmlHandler.elemList:
if oe._data.has_key('NAME') and ne._data.has_key('NAME'):
if oe['NAME'] == ne['NAME']:
checkEquivalentXml(oe, ne)
break
else:
checkEquivalentXml(oe, ne)
def run_xml_consistency_test(old, new):
print 'Running XML Consistency Test.'
print 'Old host is %s' % old
print 'New host is %s' % new
sock = get_socket(old)
xmlbuf = get_xml_from_socket(sock)
oldXmlHandler = GmetadXmlHandler()
xml.sax.parseString(xmlbuf, oldXmlHandler)
sock = get_socket(new)
xmlbuf = get_xml_from_socket(sock)
newXmlHandler = GmetadXmlHandler()
xml.sax.parseString(xmlbuf, newXmlHandler)
compare_xml(oldXmlHandler, newXmlHandler)
def run_interactive_consistency_test(old, new):
filters = ['\n', '/\n', '/Grid-Node\n', '/Grid-Node/localhost\n',
'/Grid-Node/localhost/mem_free\n']
for filter in filters:
print 'Running interactive consistency test with filter "%s"' %
filter.strip()
print 'Old host is %s' % old
print 'New host is %s' % new
sock = get_socket(old)
sock.send(filter)
xmlbuf = get_xml_from_socket(sock)
oldXmlHandler = GmetadXmlHandler()
xml.sax.parseString(xmlbuf, oldXmlHandler)
sock = get_socket(new)
sock.send(filter)
xmlbuf = get_xml_from_socket(sock)
newXmlHandler = GmetadXmlHandler()
xml.sax.parseString(xmlbuf, newXmlHandler)
compare_xml(oldXmlHandler, newXmlHandler)
if __name__ == '__main__':
p = optparse.OptionParser()
p.add_option('-I', '--old_gmetad_interactive', action='store',
help='Location of old gmetad interactive port
(default="localhost:8652")',
default='localhost:8652')
p.add_option('-i', '--new_gmetad_interactive', action='store',
help='Location of new gmetad interactive port
(default="localhost:8652")',
default='localhost:8652')
p.add_option('-X', '--old_gmetad_xml', action='store',
help='Location of old gmetad xml port (default="localhost:8651")',
default='localhost:8651')
p.add_option('-x', '--new_gmetad_xml', action='store',
help='Location of new gmetad xml port (default="localhost:8651")',
default='localhost:8651')
p.add_option('-s', '--server_path', action='store',
help='Path to new gmetad script. If not provided, it will not be
started.',
default=None)
p.add_option('-c', '--conf', action='store',
help='Path to gmetad configuration file
(default="/etc/ganglia/gmetad.conf")',
default='/etc/ganglia/gmetad.conf')
p.add_option('-l', '--logfile', action='store',
help='Path to gmetad log file, overrides configuration',
default=None)
options, args = p.parse_args()
do_interactive_test=False
do_xml_test=False
if options.old_gmetad_interactive == options.new_gmetad_interactive:
print 'Locations for old and new gmetad interative ports are the same.'
print 'Skipping the interactive port consistency test.'
else:
do_interactive_test=True
if options.old_gmetad_xml == options.new_gmetad_xml:
print 'Locations for old and new gmetad xml ports are the same.'
print 'Skipping the xml port consistency test.'
else:
do_xml_test=True
if not do_interactive_test and not do_xml_test:
sys.exit()
gmetad_pidfile = None
if options.server_path is not None:
gmetad_pidfile = '/tmp/gmetad.pid'
cmd = 'python %s -c %s -i %s -x %s -p %s' % (options.server_path,
options.conf,
options.new_gmetad_interactive.split(':')[1],
options.new_gmetad_xml.split(':')[1],
gmetad_pidfile)
if options.logfile is not None:
cmd += ' -l %s' % options.logfile
os.system(cmd)
time.sleep(1) # wait for it to come up
try:
if do_xml_test:
run_xml_consistency_test(options.old_gmetad_xml,
options.new_gmetad_xml)
if do_interactive_test:
run_interactive_consistency_test(options.old_gmetad_interactive,
options.new_gmetad_interactive)
finally:
if gmetad_pidfile is not None:
f = open(gmetad_pidfile, 'r')
line = f.readline()
pid = line.strip()
os.kill(int(pid), signal.SIGTERM)
print 'All tests passed.'
import os
import pwd
import sys
import resource
from gmetad_config import getConfig, GmetadConfig
def daemonize(ignore_fds=[]):
UMASK=0
WORKDIR = '/'
MAXFD = 1024
REDIRECT_TO = '/dev/null'
if hasattr(os, 'devnull'):
REDIRECT_TO = os.devnull
cfg = getConfig()
setuid_user = None
if cfg[GmetadConfig.SETUID]:
setuid_user = cfg[GmetadConfig.SETUID_USERNAME]
if setuid_user is not None:
try:
os.setuid(pwd.getpwnam(setuid_user)[2])
except Exception:
print 'Unable to setuid to user "%s", exiting' % setuid_user
sys.exit()
try:
pid = os.fork()
except OSError, e:
raise Exception, 'Daemonize error: %d (%s)' % (e.errno, e.strerror)
if pid == 0:
# first child
os.setsid()
try:
pid = os.fork()
except OSError, e:
raise Exception, 'Daemonize error: %d (%s)' % (e.errno, e.strerror)
if pid == 0:
# second child
os.chdir(WORKDIR)
os.umask(UMASK)
else:
os._exit(0)
else:
os._exit(0)
maxfd = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
if resource.RLIM_INFINITY == maxfd:
maxfd = MAXFD
for fd in range(0,maxfd):
if fd in ignore_fds: continue
try:
os.close(fd)
except OSError:
pass
os.open(REDIRECT_TO, os.O_RDWR)
os.dup2(0,1)
os.dup2(0,2)
import thread
import logging
class DataStore:
_shared_state = {}
_initialized = False
lock = thread.allocate_lock()
def __init__(self):
self.__dict__ = DataStore._shared_state
if not DataStore._initialized:
self.lock = thread.allocate_lock()
self.rootElement = None
DataStore._initialized = True
def setNode(self, node, parent=None):
if parent is None:
if self.rootElement is None:
self.rootElement = node
self.rootElement.source = 'gmetad'
return self.rootElement
parent[str(node)] = node
return parent[str(node)]
def getNode(self, ancestry):
node = None
while ancestry:
nodeId = ancestry.pop(0)
if node is None:
if nodeId == str(self.rootElement):
node = self.rootElement
else: return None
else:
try:
return node[nodeId]
except KeyError:
pass
return node
class Element:
def generateKey(vals):
if isinstance(vals,list):
return ':'.join(vals)
return vals
generateKey = staticmethod(generateKey)
def __init__(self, id, attrs):
self.id = id
for k,v in attrs.items():
self.__dict__[k.lower()] = v
self.children = {}
def __setitem__(self, k, v):
try:
self.children[k].update(v)
except KeyError:
self.children[k] = v
def __getitem__(self, k):
return self.children[k]
def update(self, elem):
for k in self.__dict__.keys():
if k == 'children' or k == 'id' or k == 'name':
continue
try:
self.__dict__[k] = elem.__dict__[k]
except ValueError:
pass
def __str__(self):
if self.__dict__.has_key('name'):
return Element.generateKey([self.id,self.name])
return Element.generateKey(self.id)
import threading
import xml.sax
import socket
import time
import logging
from gmetad_config import GmetadConfig
from gmetad_random import getRandomInterval
from gmetad_data import DataStore, Element
class GmondContentHandler(xml.sax.ContentHandler):
def __init__(self):
xml.sax.ContentHandler.__init__(self)
self._elemStack = []
self._elemStackLen = 0
def startElement(self, tag, attrs):
ds = DataStore()
e = Element(tag, attrs)
if 'GANGLIA_XML' == tag:
ds.lock.acquire()
self._elemStack.append(ds.setNode(e))
self._elemStackLen += 1
cfg = GmetadConfig()
e = Element('GRID', {'NAME':cfg[GmetadConfig.GRIDNAME],
'AUTHORITY':cfg[GmetadConfig.AUTHORITY], 'LOCALTIME':'%d' % time.time()})
self._elemStack.append(ds.setNode(e,
self._elemStack[self._elemStackLen-1]))
self._elemStackLen += 1
def endElement(self, tag):
if tag == 'GANGLIA_XML':
DataStore().lock.release()
self._elemStack.pop()
self._elemStackLen -= 1
class GmondReader(threading.Thread):
def __init__(self,dataSource,name=None,target=None,args=(),kwargs={}):
threading.Thread.__init__(self,name,target,args,kwargs)
self._cond = threading.Condition()
self._shuttingDown = False
self.dataSource = dataSource
self.lastKnownGoodHost = 0
logging.debug('Reader created for cluster %s' % self.dataSource.name)
def _getEndpoint(self, hostspec, port=8649):
hostinfo = hostspec.split(':')
if len(hostinfo) > 1:
port = int(hostinfo[1])
return (hostinfo[0], port)
def run(self):
while not self._shuttingDown:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
sock.connect(
self._getEndpoint(self.dataSource.hosts[self.lastKnownGoodHost]) )
except socket.error:
curidx = self.lastKnownGoodHost
connected=False
while True:
curidx += 1
if curidx >= len(self.dataSource.hosts):
curidx = 0
if curidx == self.lastKnownGoodHost: break
try:
sock.connect(
self._getEndpoint(self.dataSource.hosts[curidx]) )
self.lastKnownGoodHost = curidx
connected=True
break
except socket.error:
pass
if not connected:
logging.error('Could not connect to any host for data
source %s' % self.dataSource.name)
return
logging.info('Quering data source %s via host %s' %
(self.dataSource.name, self.dataSource.hosts[self.lastKnownGoodHost]))
xmlbuf = ''
while True:
buf = sock.recv(8192)
if not buf:
break
xmlbuf += buf
sock.close()
if self._shuttingDown:
break
xml.sax.parseString(xmlbuf, GmondContentHandler())
self._cond.acquire()
self._cond.wait(getRandomInterval(self.dataSource.interval))
self._cond.release()
def shutdown(self):
self._shuttingDown = True
self._cond.acquire()
self._cond.notifyAll()
self._cond.release()
self.join()
from random import randrange
def getRandomInterval(midpoint, range=5):
return randrange(max(midpoint-range,0), midpoint+range)
import thread
import time
from gmetad_config import GmetadConfig
from gmetad_data import DataStore, Element
class XmlWriter:
_xml_starttag = '<?xml version="1.0" encoding="ISO-8859-1"
standalone="yes"?>'
_xml_dtd = '''<!DOCTYPE GANGLIA_XML [
<!ELEMENT GANGLIA_XML (GRID|CLUSTER|HOST)*>
<!ATTLIST GANGLIA_XML VERSION CDATA #REQUIRED>
<!ATTLIST GANGLIA_XML SOURCE CDATA #REQUIRED>
<!ELEMENT GRID (CLUSTER | GRID | HOSTS | METRICS)*>
<!ATTLIST GRID NAME CDATA #REQUIRED>
<!ATTLIST GRID AUTHORITY CDATA #REQUIRED>
<!ATTLIST GRID LOCALTIME CDATA #IMPLIED>
<!ELEMENT CLUSTER (HOST | HOSTS | METRICS)*>
<!ATTLIST CLUSTER NAME CDATA #REQUIRED>
<!ATTLIST CLUSTER OWNER CDATA #IMPLIED>
<!ATTLIST CLUSTER LATLONG CDATA #IMPLIED>
<!ATTLIST CLUSTER URL CDATA #IMPLIED>
<!ATTLIST CLUSTER LOCALTIME CDATA #REQUIRED>
<!ELEMENT HOST (METRIC)*>
<!ATTLIST HOST NAME CDATA #REQUIRED>
<!ATTLIST HOST IP CDATA #REQUIRED>
<!ATTLIST HOST LOCATION CDATA #IMPLIED>
<!ATTLIST HOST REPORTED CDATA #REQUIRED>
<!ATTLIST HOST TN CDATA #IMPLIED>
<!ATTLIST HOST TMAX CDATA #IMPLIED>
<!ATTLIST HOST DMAX CDATA #IMPLIED>
<!ATTLIST HOST GMOND_STARTED CDATA #IMPLIED>
<!ELEMENT METRIC (EXTRA_DATA*)>
<!ATTLIST METRIC NAME CDATA #REQUIRED>
<!ATTLIST METRIC VAL CDATA #REQUIRED>
<!ATTLIST METRIC TYPE (string | int8 | uint8 | int16 | uint16 | int32 |
uint32 | float | double | timestamp) #REQUIRED>
<!ATTLIST METRIC UNITS CDATA #IMPLIED>
<!ATTLIST METRIC TN CDATA #IMPLIED>
<!ATTLIST METRIC TMAX CDATA #IMPLIED>
<!ATTLIST METRIC DMAX CDATA #IMPLIED>
<!ATTLIST METRIC SLOPE (zero | positive | negative | both | unspecified)
#IMPLIED>
<!ATTLIST METRIC SOURCE (gmond | gmetric) #REQUIRED>
<!ELEMENT EXTRA_DATA (EXTRA_ELEMENT*)>
<!ELEMENT EXTRA_ELEMENT EMPTY>
<!ATTLIST METRIC NAME CDATA #REQUIRED>
<!ATTLIST METRIC VAL CDATA #REQUIRED>
<!ELEMENT HOSTS EMPTY>
<!ATTLIST HOSTS UP CDATA #REQUIRED>
<!ATTLIST HOSTS DOWN CDATA #REQUIRED>
<!ATTLIST HOSTS SOURCE (gmond | gmetric | gmetad) #REQUIRED>
<!ELEMENT METRICS EMPTY>
<!ATTLIST METRICS NAME CDATA #REQUIRED>
<!ATTLIST METRICS SUM CDATA #REQUIRED>
<!ATTLIST METRICS NUM CDATA #REQUIRED>
<!ATTLIST METRICS TYPE (string | int8 | uint8 | int16 | uint16 | int32 |
uint32 | float | double | timestamp) #REQUIRED>
<!ATTLIST METRICS UNITS CDATA #IMPLIED>
<!ATTLIST METRICS SLOPE (zero | positive | negative | both | unspecified)
#IMPLIED>
<!ATTLIST METRICS SOURCE (gmond | gmetric) #REQUIRED>
]>'''
_pcid_map = {'GANGLIA_XML' : 'GRID',
'GRID' : 'CLUSTER',
'CLUSTER' : 'HOST',
'HOST' : 'METRIC'
}
def __init__(self):
cfg = GmetadConfig()
self.gridname = cfg[GmetadConfig.GRIDNAME]
self.authority = cfg[GmetadConfig.AUTHORITY]
self.localtime = time.time()
def _getXmlImpl(self, element, filterList=None):
rbuf = '<%s' % element.id
foundName = False
try:
rbuf += ' NAME="%s"' % element.name
foundName = True
except AttributeError:
pass
for k,v in element.__dict__.items():
if k == 'id' or k == 'children' or (foundName and k == 'name'):
continue
rbuf += ' %s="%s"' % (k.upper(), v)
if 0 < len(element.children):
rbuf += '>\n'
showAllChildren = True
if filterList is not None and len(filterList):
try:
key = Element.generateKey([self._pcid_map[element.id],
filterList[0]])
rbuf += self._getXmlImpl(element.children[key],
filterList[1:])
showAllChildren = False
except KeyError:
pass
if showAllChildren:
for c in element.children.values():
rbuf += self._getXmlImpl(c, filterList)
rbuf += '</%s>\n' % element.id
else:
rbuf += ' />\n'
return rbuf
def getXml(self, filter=None):
if filter is None:
filterList = None
elif not len(filter.strip()):
filterList = None
else:
filterList = filter.split('/')
return '%s\n%s\n%s' % (self._xml_starttag, self._xml_dtd,
self._getXmlImpl(DataStore().rootElement, filterList))
Thu, Apr 17 2008 17:04:00 - Matt Ryan (mryan at novell dot com) - Initial
Version
This represents the initial version of the rewritten gmetad in python.
Files are:
- gmetad_config.py
- gmetad_daemon.py
- gmetad_data.py
- gmetad_gmondReader.py
- gmetad.py
- gmetad_random.py
- gmetad_xmlWriter.py
- gmetad_consistency_test.py (test script)
This version should function generally as a gmetad replacement for reading
gmond data and providing aggregate data on XML ports only.
Advertised capabilities of this version include:
- Support of current gmetad configuration files - should "just work" by
pointing it directly to an existing gmetad.conf file.
- Most current gmetad command-line options, plus a few others.
- Six log levels as per the python "logging" module. Two of these - FATAL and
CRITICAL - are functionally equivalent but indicated separately in the
command-line parameters. The default logging or debug level is 2, meaning that
all messages ERROR level and higher will be logged. A specified debug level of
5 or higher means all debug messages are shown and that the application stays
in the foreground.
- Logging is done to syslog, optionally by appending to a log file in addition
to syslog. If the debug level is 5 or higher, logging is also done to the
console.
- At a logging level of 4 or below, the application will go through the
daemonizing process (setuid to another user, fork twice, set session id, change
working dir to "/" and set umask to 0, close all open file descriptors except
the logging file descriptors, and redirect all other I/O to /dev/null.
- Writes a PID file that can be used for graceful shutdowns later (kill -TERM).
- Creates a reader thread for every data source specified in the configuration.
All data is stored internally in a single data structure which is updated on
every read.
- Readers honor the polling interval specified in the data source
configuration, but vary the interval by +- 5 seconds.
- Listens on the XML and Interactive ports. XML port is a dump of all stored
data in a single XML file. Interactive waits for input from the user then
dumps the XML.
- Interactive port supports basic pathing filters, such as
/Gridname/Clustername/Hostname/Metricname.
- Interactive port ignores filters that don't match the corresponding level in
the XML - in other words, all data at that level will be sent, and no filtering
applied, if the specified filter doesn't match any of the tags at that level.
- Shuts down gracefully at SIGTERM (generally this is true, unless an exception
occurs - and even then, it usually works...).
Certainly there are hundreds of features not in this version, but of the more
obvious:
- The filter querystring on the interactive port is not currently supported,
i.e. "/Gridname?filter=summary".
- No current support for summary data.
- No current support for any type of data handling or output modules or plugins
of any kind.
- No writing of RRD files or any other type of permanent data storage.
- The "scalable" configuration attribute is not currently supported.
- The "trusted_hosts" configuration attribute is not currently supported.
- The "all_trusted" configuration attribute is not currently supported.
- The "server_threads" configuration attribute is not currently supported.
In addition, we don't currently have any data on the following:
- Comparative data to compare disk and runtime footprint to current gmetad.
- Throughput and performance data as compared to current gmetad.
- Stability data - memory usage over time, ability to continue running for long
periods of time, etc.
- Correctness testing - honoring of all configuration parameters and
command-line parameters, generation of correct data, etc.
- Cross-platform capabilities.
-------------------------------------------------------------------------
This SF.net email is sponsored by the 2008 JavaOne(SM) Conference
Don't miss this year's exciting event. There's still time to save $100.
Use priority code J8TL2D2.
http://ad.doubleclick.net/clk;198757673;13503038;p?http://java.sun.com/javaone
_______________________________________________
Ganglia-developers mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/ganglia-developers