Sidnei da Silva has proposed merging lp:~sidnei/graphite/twistd-plugins into lp:graphite with lp:~sidnei/graphite/hardcoded-conf-dir as a prerequisite.
Requested reviews: graphite-dev (graphite-dev) For more details, see: https://code.launchpad.net/~sidnei/graphite/twistd-plugins/+merge/67391 Refactor carbon startup scripts to use twistd, register each service as a separate twistd plugin. -- https://code.launchpad.net/~sidnei/graphite/twistd-plugins/+merge/67391 Your team graphite-dev is requested to review the proposed merge of lp:~sidnei/graphite/twistd-plugins into lp:graphite.
=== modified file 'carbon/bin/carbon-aggregator.py' --- carbon/bin/carbon-aggregator.py 2011-07-08 20:52:33 +0000 +++ carbon/bin/carbon-aggregator.py 2011-07-08 20:52:34 +0000 @@ -1,11 +1,35 @@ #!/usr/bin/env python +"""Copyright 2009 Chris Davis + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License.""" + import sys -import os -import atexit -from os.path import basename, dirname, exists, join, isdir - - -program = basename( sys.argv[0] ).split('.')[0] +from os.path import dirname, join, abspath + +# Figure out where we're installed +BIN_DIR = dirname(abspath(__file__)) +ROOT_DIR = dirname(BIN_DIR) + +# Make sure that carbon's 'lib' dir is in the $PYTHONPATH if we're running from +# source. +LIB_DIR = join(ROOT_DIR, 'lib') +sys.path.insert(0, LIB_DIR) + +if __name__ == "__main__": + # If we were run directly, call ourselves as a tac file instead. + from carbon.util import run_tac + + run_tac(__file__) # Initialize twisted try: @@ -13,151 +37,11 @@ epollreactor.install() except: pass -from twisted.internet import reactor - - -# Figure out where we're installed -BIN_DIR = dirname( os.path.abspath(__file__) ) -ROOT_DIR = dirname(BIN_DIR) -LIB_DIR = join(ROOT_DIR, 'lib') - -sys.path.insert(0, LIB_DIR) -os.environ['GRAPHITE_ROOT'] = ROOT_DIR - -# Capture useful debug info for this commonly reported problem -try: - import carbon -except ImportError: - print 'Failed to import carbon, debug information follows.' - print 'pwd=%s' % os.getcwd() - print 'sys.path=%s' % sys.path - print '__file__=%s' % __file__ - sys.exit(1) - - -# Read config (we want failures to occur before daemonizing) -from carbon.conf import (get_default_parser, parse_options, - read_config, settings as global_settings) - - -parser = get_default_parser() -parser.add_option( - '--rules', - default=None, - help='Use the give aggregation rules file') - -(options, args) = parse_options(parser, sys.argv[1:]) -settings = read_config(program, options, ROOT_DIR=ROOT_DIR) -global_settings.update(settings) - -if options.rules is None: - options.rules = join(settings.CONF_DIR, "aggregation-rules.conf") - -pidfile = settings.pidfile -logdir = settings.LOG_DIR - -__builtins__.program = program -action = args[0] - - -if action == 'stop': - if not exists(pidfile): - print 'Pidfile %s does not exist' % pidfile - raise SystemExit(0) - - pf = open(pidfile, 'r') - try: - pid = int( pidfile.read().strip() ) - except: - print 'Could not read pidfile %s' % pidfile - raise SystemExit(1) - - print 'Deleting %s (contained pid %d)' % (pidfile, pid) - os.unlink(pidfile) - - print 'Sending kill signal to pid %d' % pid - os.kill(pid, 15) - raise SystemExit(0) - - -elif action == 'status': - if not exists(pidfile): - print '%s is not running' % program - raise SystemExit(0) - - pf = open(pidfile, 'r') - try: - pid = int( pidfile.read().strip() ) - except: - print 'Failed to read pid from %s' % pidfile - raise SystemExit(1) - - if exists('/proc/%d' % pid): - print "%s is running with pid %d" % (program, pid) - raise SystemExit(0) - else: - print "%s is not running" % program - raise SystemExit(0) - -# Import application components -from carbon.log import logToStdout, logToDir -from carbon.instrumentation import startRecording -from carbon.listeners import MetricLineReceiver, MetricPickleReceiver, startListener -from carbon.aggregator.rules import RuleManager -from carbon.aggregator import receiver -from carbon.aggregator import client -from carbon.rewrite import RewriteRuleManager -from carbon.events import metricReceived -from carbon.util import daemonize - -RuleManager.read_from(options.rules) - -rewrite_rules_conf = join(settings.CONF_DIR, 'rewrite-rules.conf') -if exists(rewrite_rules_conf): - RewriteRuleManager.read_from(rewrite_rules_conf) - -# --debug -if options.debug: - logToStdout() - -else: - if not isdir(logdir): - os.makedirs(logdir) - - daemonize() - - pf = open(pidfile, 'w') - pf.write( str(os.getpid()) ) - pf.close() - - def shutdown(): - if os.path.exists(pidfile): - os.unlink(pidfile) - - atexit.register(shutdown) - - logToDir(logdir) - - -# Configure application components -metricReceived.installHandler(receiver.process) -startListener(settings.LINE_RECEIVER_INTERFACE, settings.LINE_RECEIVER_PORT, MetricLineReceiver) -startListener(settings.PICKLE_RECEIVER_INTERFACE, settings.PICKLE_RECEIVER_PORT, MetricPickleReceiver) - -client.connect(settings.DESTINATION_HOST, settings.DESTINATION_PORT) -startRecording() - - -# Run the twisted reactor -print "%s running with pid %d" % (program, os.getpid()) - -if options.profile: - import cProfile - - if exists(options.profile): - os.unlink(options.profile) - - cProfile.run('reactor.run()', options.profile) - -else: - reactor.run() + +from twisted.application.service import Application +from carbon import service + +application = Application("carbon-aggregator") + +aggregator_service = service.createAggregatorService(None) +aggregator_service.setServiceParent(application) === modified file 'carbon/bin/carbon-cache.py' --- carbon/bin/carbon-cache.py 2011-07-08 20:52:33 +0000 +++ carbon/bin/carbon-cache.py 2011-07-08 20:52:34 +0000 @@ -14,15 +14,22 @@ limitations under the License.""" import sys -import os -import socket -import pwd -import atexit -from os.path import basename, dirname, exists, join, isdir - -program = basename( sys.argv[0] ).split('.')[0] -hostname = socket.gethostname().split('.')[0] -os.umask(022) +from os.path import dirname, join, abspath + +# Figure out where we're installed +BIN_DIR = dirname(abspath(__file__)) +ROOT_DIR = dirname(BIN_DIR) + +# Make sure that carbon's 'lib' dir is in the $PYTHONPATH if we're running from +# source. +LIB_DIR = join(ROOT_DIR, 'lib') +sys.path.insert(0, LIB_DIR) + +if __name__ == "__main__": + # If we were run directly, call ourselves as a tac file instead. + from carbon.util import run_tac + + run_tac(__file__) # Initialize twisted try: @@ -30,176 +37,11 @@ epollreactor.install() except: pass -from twisted.internet import reactor - - -# Figure out where we're installed -BIN_DIR = dirname( os.path.abspath(__file__) ) -ROOT_DIR = dirname(BIN_DIR) -LIB_DIR = join(ROOT_DIR, 'lib') -sys.path.insert(0, LIB_DIR) - - -# Capture useful debug info for this commonly reported problem -try: - import carbon -except ImportError: - print 'Failed to import carbon, debug information follows.' - print 'pwd=%s' % os.getcwd() - print 'sys.path=%s' % sys.path - print '__file__=%s' % __file__ - sys.exit(1) - - -# Read config (we want failures to occur before daemonizing) -from carbon.conf import (get_default_parser, parse_options, - read_config, settings as global_settings) - - -(options, args) = parse_options(get_default_parser(), sys.argv[1:]) -settings = read_config(program, options, ROOT_DIR=ROOT_DIR) -global_settings.update(settings) - -instance = options.instance -pidfile = settings.pidfile -logdir = settings.LOG_DIR - - -__builtins__.instance = instance # This isn't as evil as you might think -__builtins__.program = program -action = args[0] - - -if action == 'stop': - if not exists(pidfile): - print 'Pidfile %s does not exist' % pidfile - raise SystemExit(0) - - pf = open(pidfile, 'r') - try: - pid = int( pf.read().strip() ) - except: - print 'Could not read pidfile %s' % pidfile - raise SystemExit(1) - - print 'Deleting %s (contained pid %d)' % (pidfile, pid) - os.unlink(pidfile) - - print 'Sending kill signal to pid %d' % pid - os.kill(pid, 15) - raise SystemExit(0) - - -elif action == 'status': - if not exists(pidfile): - print '%s (instance %s) is not running' % (program, instance) - raise SystemExit(0) - - pf = open(pidfile, 'r') - try: - pid = int( pf.read().strip() ) - except: - print 'Failed to read pid from %s' % pidfile - raise SystemExit(1) - - if exists('/proc/%d' % pid): - print "%s (instance %s) is running with pid %d" % (program, instance, pid) - raise SystemExit(0) - else: - print "%s (instance %s) is not running" % (program, instance) - raise SystemExit(0) - -if exists(pidfile): - print "Pidfile %s already exists, is %s already running?" % (pidfile, program) - raise SystemExit(1) - -# Import application components -from carbon.log import logToStdout, logToDir -from carbon.listeners import MetricLineReceiver, MetricPickleReceiver, CacheQueryHandler, startListener -from carbon.cache import MetricCache -from carbon.instrumentation import startRecording -from carbon.events import metricReceived - -storage_schemas = join(settings.CONF_DIR, 'storage-schemas.conf') -if not exists(storage_schemas): - print "Error: missing required config %s" % storage_schemas - sys.exit(1) - -use_amqp = settings.get("ENABLE_AMQP", False) -if use_amqp: - from carbon import amqp_listener - amqp_host = settings.get("AMQP_HOST", "localhost") - amqp_port = settings.get("AMQP_PORT", 5672) - amqp_user = settings.get("AMQP_USER", "guest") - amqp_password = settings.get("AMQP_PASSWORD", "guest") - amqp_verbose = settings.get("AMQP_VERBOSE", False) - amqp_vhost = settings.get("AMQP_VHOST", "/") - amqp_spec = settings.get("AMQP_SPEC", None) - amqp_exchange_name = settings.get("AMQP_EXCHANGE", "graphite") - - -# --debug -if options.debug: - logToStdout() - -else: - if not isdir(logdir): - os.makedirs(logdir) - - if settings.USER: - print "Dropping privileges to become the user %s" % settings.USER - - from carbon.util import daemonize, dropprivs - daemonize() - - pf = open(pidfile, 'w') - pf.write( str(os.getpid()) ) - pf.close() - - def shutdown(): - if os.path.exists(pidfile): - os.unlink(pidfile) - - atexit.register(shutdown) - - if settings.USER: - pwent = pwd.getpwnam(settings.USER) - os.chown(pidfile, pwent.pw_uid, pwent.pw_gid) - dropprivs(settings.USER) - - logToDir(logdir) - -# Configure application components -metricReceived.installHandler(MetricCache.store) -startListener(settings.LINE_RECEIVER_INTERFACE, settings.LINE_RECEIVER_PORT, MetricLineReceiver) -startListener(settings.PICKLE_RECEIVER_INTERFACE, settings.PICKLE_RECEIVER_PORT, MetricPickleReceiver) -startListener(settings.CACHE_QUERY_INTERFACE, settings.CACHE_QUERY_PORT, CacheQueryHandler) - -if use_amqp: - amqp_listener.startReceiver(amqp_host, amqp_port, amqp_user, amqp_password, - vhost=amqp_vhost, spec=amqp_spec, - exchange_name=amqp_exchange_name, - verbose=amqp_verbose) - -if settings.ENABLE_MANHOLE: - from carbon import manhole - manhole.start() - -from carbon.writer import startWriter # have to import this *after* settings are defined -startWriter() -startRecording() - - -# Run the twisted reactor -print "%s running [instance %s]" % (program, instance) - -if options.profile: - import cProfile - - if exists(options.profile): - os.unlink(options.profile) - - cProfile.run('reactor.run()', options.profile) - -else: - reactor.run() + +from twisted.application.service import Application +from carbon import service + +application = Application("carbon-cache") + +cache_service = service.createCacheService(None) +cache_service.setServiceParent(application) === modified file 'carbon/bin/carbon-relay.py' --- carbon/bin/carbon-relay.py 2011-07-08 20:52:33 +0000 +++ carbon/bin/carbon-relay.py 2011-07-08 20:52:34 +0000 @@ -14,13 +14,22 @@ limitations under the License.""" import sys -import os -import atexit -from os.path import basename, dirname, exists, join, isdir - -program = basename( sys.argv[0] ).split('.')[0] -os.umask(022) - +from os.path import dirname, join, abspath + +# Figure out where we're installed +BIN_DIR = dirname(abspath(__file__)) +ROOT_DIR = dirname(BIN_DIR) + +# Make sure that carbon's 'lib' dir is in the $PYTHONPATH if we're running from +# source. +LIB_DIR = join(ROOT_DIR, 'lib') +sys.path.insert(0, LIB_DIR) + +if __name__ == "__main__": + # If we were run directly, call ourselves as a tac file instead. + from carbon.util import run_tac + + run_tac(__file__) # Initialize twisted try: @@ -28,156 +37,11 @@ epollreactor.install() except: pass -from twisted.internet import reactor - - -# Figure out where we're installed -BIN_DIR = dirname(__file__) -ROOT_DIR = dirname(BIN_DIR) -LIB_DIR = join(ROOT_DIR, 'lib') -sys.path.insert(0, LIB_DIR) - - -# Capture useful debug info for this commonly reported problem -try: - import carbon -except ImportError: - print 'Failed to import carbon, debug information follows.' - print 'pwd=%s' % os.getcwd() - print 'sys.path=%s' % sys.path - print '__file__=%s' % __file__ - sys.exit(1) - - -# Read config (we want failures to occur before daemonizing) -from carbon.conf import (get_default_parser, parse_options, - read_config, settings as global_settings) - - -parser = get_default_parser() -parser.add_option( - '--rules', - default=None, - help='Use the given relay rules file') - -(options, args) = parse_options(parser, sys.argv[1:]) -settings = read_config(program, options, ROOT_DIR=ROOT_DIR) -global_settings.update(settings) - -if options.rules is None: - options.rules = join(settings.CONF_DIR, "relay-rules.conf") - -pidfile = settings.pidfile -logdir = settings.LOG_DIR - -__builtins__.program = program -action = args[0] - - -if action == 'stop': - if not exists(pidfile): - print 'Pidfile %s does not exist' % pidfile - raise SystemExit(0) - - pf = open(pidfile, 'r') - try: - pid = int( pf.read().strip() ) - except: - print 'Could not read pidfile %s' % pidfile - raise SystemExit(1) - - print 'Deleting %s (contained pid %d)' % (pidfile, pid) - os.unlink(pidfile) - - print 'Sending kill signal to pid %d' % pid - os.kill(pid, 15) - raise SystemExit(0) - - -elif action == 'status': - if not exists(pidfile): - print '%s is not running' % program - raise SystemExit(0) - - pf = open(pidfile, 'r') - try: - pid = int( pf.read().strip() ) - except: - print 'Failed to read pid from %s' % pidfile - raise SystemExit(1) - - if exists('/proc/%d' % pid): - print "%s is running with pid %d" % (program, pid) - raise SystemExit(0) - else: - print "%s is not running" % program - raise SystemExit(0) - -if exists(pidfile): - print "Pidfile %s already exists, is %s already running?" % (pidfile, program) - raise SystemExit(1) - -# Quick validation -if settings.RELAY_METHOD not in ('rules', 'consistent-hashing'): - print "In carbon.conf, RELAY_METHOD must be either 'rules' or 'consistent-hashing'. Invalid value: '%s'" % settings.RELAY_METHOD - sys.exit(1) - -# Import application components -from carbon.log import logToStdout, logToDir, msg -from carbon.listeners import MetricLineReceiver, MetricPickleReceiver, startListener -from carbon.relay import createClientConnections, relay -from carbon.events import metricReceived -from carbon.instrumentation import startRecording -from carbon.rules import loadRules, allDestinationServers, parseHostList -from carbon.hashing import setDestinationHosts - -# --debug -if options.debug: - logToStdout() -else: - if not isdir(logdir): - os.makedirs(logdir) - - from carbon.util import daemonize - daemonize() - logToDir(logdir) - - pidfile = open(pidfile, 'w') - pidfile.write( str(os.getpid()) ) - pidfile.close() - - def shutdown(): - if os.path.exists(pidfile): - os.unlink(pidfile) - - atexit.register(shutdown) - - -# Configure application components -metricReceived.installHandler(relay) -startListener(settings.LINE_RECEIVER_INTERFACE, settings.LINE_RECEIVER_PORT, MetricLineReceiver) -startListener(settings.PICKLE_RECEIVER_INTERFACE, settings.PICKLE_RECEIVER_PORT, MetricPickleReceiver) - -if settings.RELAY_METHOD == 'rules': - loadRules(options.rules) - createClientConnections( allDestinationServers() ) -elif settings.RELAY_METHOD == 'consistent-hashing': - hosts = parseHostList(settings.CH_HOST_LIST) - msg('consistent-hashing hosts = %s' % str(hosts)) - setDestinationHosts(hosts) - createClientConnections(hosts) - -startRecording() - - -# Run the twisted reactor -if options.profile: - import cProfile - - if exists(options.profile): - os.unlink(options.profile) - - cProfile.run('reactor.run()', options.profile) - -else: - reactor.run() + +from twisted.application.service import Application +from carbon import service + +application = Application("carbon-relay") + +relay_service = service.createRelayService(None) +relay_service.setServiceParent(application) === modified file 'carbon/lib/carbon/amqp_listener.py' --- carbon/lib/carbon/amqp_listener.py 2011-04-25 15:50:10 +0000 +++ carbon/lib/carbon/amqp_listener.py 2011-07-08 20:52:34 +0000 @@ -147,11 +147,12 @@ p.factory = self return p -def startReceiver(host, port, username, password, vhost, exchange_name, - spec=None, channel=1, verbose=False): - """Starts a twisted process that will read messages on the amqp broker - and post them as metrics.""" +def createAMQPListener(username, password, vhost, exchange_name, + spec=None, channel=1, verbose=False): + """ + Create an C{AMQPReconnectingFactory} configured with the specified options. + """ # use provided spec if not specified if not spec: spec = txamqp.spec.load(os.path.normpath( @@ -161,6 +162,17 @@ factory = AMQPReconnectingFactory(username, password, delegate, vhost, spec, channel, exchange_name, verbose=verbose) + return factory + + +def startReceiver(host, port, username, password, vhost, exchange_name, + spec=None, channel=1, verbose=False): + """ + Starts a twisted process that will read messages on the amqp broker and + post them as metrics. + """ + factory = createAMQPListener(username, password, vhost, exchange_name, + spec=spec, channel=channel, verbose=verbose) reactor.connectTCP(host, port, factory) === modified file 'carbon/lib/carbon/conf.py' --- carbon/lib/carbon/conf.py 2011-07-08 20:52:33 +0000 +++ carbon/lib/carbon/conf.py 2011-07-08 20:52:34 +0000 @@ -13,10 +13,19 @@ limitations under the License.""" import os -from os.path import join, dirname, normpath +import sys +import pwd + +from os.path import join, dirname, normpath, abspath, basename, exists from optparse import OptionParser from ConfigParser import ConfigParser +import whisper +from carbon import log + +from twisted.python import usage +from twisted.scripts import twistd + defaults = dict( LOCAL_DATA_DIR="/opt/graphite/storage/whisper/", @@ -118,6 +127,89 @@ settings.update(defaults) +class CarbonServerOptions(usage.Options): + + optParameters = [ + ["config", "c", None, "Use the given config file."], + ["instance", "", None, "Manage a specific carbon instance."], + ["logdir", "", None, "Write logs to the given directory."], + ["pidfile", "", None, "Name of the pidfile"], + ] + + def postOptions(self): + global settings + ROOT_DIR = dirname(dirname(abspath(self["python"]))) + program = basename(self["python"]).split('.')[0] + program_settings = read_config(program, self, ROOT_DIR=ROOT_DIR) + settings.update(program_settings) + + settings["program"] = program + + if settings.USER: + self["uid"], self["gid"] = pwd.getpwnam(settings.USER)[2:4] + + self["pidfile"] = settings["pidfile"] + + storage_schemas = join(settings["CONF_DIR"], "storage-schemas.conf") + if not exists(storage_schemas): + print "Error: missing required config %s" % storage_schemas + sys.exit(1) + + if settings.WHISPER_AUTOFLUSH: + log.msg("enabling whisper autoflush") + whisper.AUTOFLUSH = True + + +class CarbonCacheOptions(CarbonServerOptions, twistd.ServerOptions): + + optParameters = ( + twistd.ServerOptions.optParameters + + CarbonServerOptions.optParameters + ) + + def postOptions(self): + twistd.ServerOptions.postOptions(self) + CarbonServerOptions.postOptions(self) + + +class CarbonAggregatorOptions(CarbonCacheOptions): + + optParameters = [ + ["rules", "", None, "Use the given aggregation rules file."], + ["rewrite-rules", "", None, "Use the given rewrite rules file."], + ] + CarbonCacheOptions.optParameters + + def postOptions(self): + CarbonCacheOptions.postOptions(self) + if self["rules"] is None: + self["rules"] = join(settings["CONF_DIR"], "aggregation-rules.conf") + settings["aggregation-rules"] = self["rules"] + + if self["rewrite-rules"] is None: + self["rewrite-rules"] = join(settings["CONF_DIR"], + "rewrite-rules.conf") + settings["rewrite-rules"] = self["rewrite-rules"] + + +class CarbonRelayOptions(CarbonCacheOptions): + + optParameters = [ + ["rules", "", None, "Use the given relay rules file."], + ] + CarbonCacheOptions.optParameters + + def postOptions(self): + CarbonCacheOptions.postOptions(self) + if self["rules"] is None: + self["rules"] = join(settings["CONF_DIR"], "relay-rules.conf") + settings["relay-rules"] = self["rules"] + + if settings["RELAY_METHOD"] not in ("rules", "consistent-hashing"): + print ("In carbon.conf, RELAY_METHOD must be either 'rules' or " + "'consistent-hashing'. Invalid value: '%s'" % + settings.RELAY_METHOD) + sys.exit(1) + + def get_default_parser(usage="%prog [options] <start|stop|status>"): """Create a parser for command line options.""" parser = OptionParser(usage=usage) @@ -167,7 +259,7 @@ def read_config(program, options, **kwargs): """ Read settings for 'program' from configuration file specified by - 'options.config', with missing values provided by 'defaults'. + 'options["config"]', with missing values provided by 'defaults'. """ settings = Settings() settings.update(defaults) @@ -184,40 +276,44 @@ # 'GRAPHITE_CONF_DIR' environment variable. settings.setdefault("CONF_DIR", os.environ.get("GRAPHITE_CONF_DIR", - join(settings.ROOT_DIR, "conf"))) - if options.config is None: - options.config = join(settings.CONF_DIR, "carbon.conf") + join(settings["ROOT_DIR"], "conf"))) + if options["config"] is None: + options["config"] = join(settings["CONF_DIR"], "carbon.conf") else: # Set 'CONF_DIR' to the parent directory of the 'carbon.conf' config # file. - settings["CONF_DIR"] = dirname(normpath(options.config)) + settings["CONF_DIR"] = dirname(normpath(options["config"])) # Storage directory can be overriden by the 'GRAPHITE_STORAGE_DIR' # environment variable. It defaults to a path relative to 'ROOT_DIR' for # backwards compatibility though. settings.setdefault("STORAGE_DIR", os.environ.get("GRAPHITE_STORAGE_DIR", - join(settings.ROOT_DIR, "storage"))) - settings.setdefault("LOG_DIR", join(settings.STORAGE_DIR, "log", program)) + join(settings["ROOT_DIR"], "storage"))) + settings.setdefault( + "LOG_DIR", join(settings["STORAGE_DIR"], "log", program)) # Read configuration options from program-specific section. section = program[len("carbon-"):] - settings.readFrom(options.config, section) + settings.readFrom(options["config"], section) # If a specific instance of the program is specified, augment the settings # with the instance-specific settings and provide sane defaults for # optional settings. - if options.instance: - settings.readFrom(options.config, "%s:%s" % (section, options.instance)) - settings["pidfile"] = (options.pidfile or - join(settings.STORAGE_DIR, - "%s-%s.pid" % (program, options.instance))) - settings["LOG_DIR"] = (options.logdir or - "%s-%s" % (settings.LOG_DIR.rstrip('/'), - options.instance)) + if options["instance"]: + settings.readFrom(options["config"], + "%s:%s" % (section, options["instance"])) + settings["pidfile"] = ( + options["pidfile"] or + join(settings["STORAGE_DIR"], "%s-%s.pid" % + (program, options["instance"]))) + settings["LOG_DIR"] = (options["logdir"] or + "%s-%s" % (settings["LOG_DIR"].rstrip('/'), + options["instance"])) else: - settings["pidfile"] = (options.pidfile or - join(settings.STORAGE_DIR, '%s.pid' % program)) - settings["LOG_DIR"] = (options.logdir or settings.LOG_DIR) + settings["pidfile"] = ( + options["pidfile"] or + join(settings["STORAGE_DIR"], '%s.pid' % program)) + settings["LOG_DIR"] = (options["logdir"] or settings["LOG_DIR"]) return settings === modified file 'carbon/lib/carbon/events.py' --- carbon/lib/carbon/events.py 2009-09-10 19:28:51 +0000 +++ carbon/lib/carbon/events.py 2011-07-08 20:52:34 +0000 @@ -2,6 +2,7 @@ class EventHandler: + def __init__(self, defaultHandler=None): self.handler = defaultHandler === modified file 'carbon/lib/carbon/instrumentation.py' --- carbon/lib/carbon/instrumentation.py 2011-04-09 05:06:50 +0000 +++ carbon/lib/carbon/instrumentation.py 2011-07-08 20:52:34 +0000 @@ -2,13 +2,14 @@ import time import socket from resource import getrusage, RUSAGE_SELF + +from twisted.application.service import Service from twisted.internet.task import LoopingCall stats = {} HOSTNAME = socket.gethostname().replace('.','_') PAGESIZE = os.sysconf('SC_PAGESIZE') -recordTask = None rusage = getrusage(RUSAGE_SELF) lastUsage = rusage.ru_utime + rusage.ru_stime lastUsageTime = time.time() @@ -56,8 +57,6 @@ def startRecording(): global recordTask - recordTask = LoopingCall(recordMetrics) - recordTask.start(60, now=False) def recordMetrics(): @@ -138,6 +137,20 @@ send_metric(fullMetric, datapoint) +class InstrumentationService(Service): + + def __init__(self): + self.record_task = LoopingCall(recordMetrics) + + def startService(self): + self.record_task.start(60, False) + Service.startService(self) + + def stopService(self): + self.record_task.stop() + Service.stopService(self) + + # Avoid import circularity from carbon.aggregator.buffers import BufferManager from carbon.aggregator.client import send_metric === modified file 'carbon/lib/carbon/manhole.py' --- carbon/lib/carbon/manhole.py 2011-04-02 00:44:19 +0000 +++ carbon/lib/carbon/manhole.py 2011-07-08 20:52:34 +0000 @@ -1,4 +1,4 @@ -from twisted.cred import portal, checkers +from twisted.cred import portal from twisted.conch.ssh import keys from twisted.conch.checkers import SSHPublicKeyDatabase from twisted.conch.manhole import Manhole @@ -21,8 +21,7 @@ keyBlob = self.userKeys[credentials.username] return keyBlob == credentials.blob - -def start(): +def createManholeListener(): sshRealm = TerminalRealm() sshRealm.chainedProtocolFactory.protocolFactory = lambda _: Manhole(namespace) @@ -37,4 +36,8 @@ sshPortal = portal.Portal(sshRealm) sshPortal.registerChecker(credChecker) sessionFactory = ConchFactory(sshPortal) - reactor.listenTCP(settings.MANHOLE_PORT, sessionFactory, interface=settings.MANHOLE_INTERFACE) + return sessionFactory + +def start(): + sessionFactory = createManholeListener() + reactor.listenTCP(settings.MANHOLE_PORT, sessionFactory, interface=settings.MANHOLE_INTERFACE) === added file 'carbon/lib/carbon/service.py' --- carbon/lib/carbon/service.py 1970-01-01 00:00:00 +0000 +++ carbon/lib/carbon/service.py 2011-07-08 20:52:34 +0000 @@ -0,0 +1,157 @@ +#!/usr/bin/env python +"""Copyright 2009 Chris Davis + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License.""" + +from os.path import exists + +from twisted.application.service import MultiService +from twisted.application.internet import TCPServer, TCPClient +from twisted.internet.protocol import ServerFactory + + +def createBaseService(config): + from carbon.conf import settings + from carbon.listeners import MetricLineReceiver, MetricPickleReceiver + + root_service = MultiService() + root_service.setName(settings.program) + + use_amqp = settings.get("ENABLE_AMQP", False) + if use_amqp: + from carbon import amqp_listener + + amqp_host = settings.get("AMQP_HOST", "localhost") + amqp_port = settings.get("AMQP_PORT", 5672) + amqp_user = settings.get("AMQP_USER", "guest") + amqp_password = settings.get("AMQP_PASSWORD", "guest") + amqp_verbose = settings.get("AMQP_VERBOSE", False) + amqp_vhost = settings.get("AMQP_VHOST", "/") + amqp_spec = settings.get("AMQP_SPEC", None) + amqp_exchange_name = settings.get("AMQP_EXCHANGE", "graphite") + + + for interface, port, protocol in ((settings.LINE_RECEIVER_INTERFACE, + settings.LINE_RECEIVER_PORT, + MetricLineReceiver), + (settings.PICKLE_RECEIVER_INTERFACE, + settings.PICKLE_RECEIVER_PORT, + MetricPickleReceiver)): + factory = ServerFactory() + factory.protocol = protocol + service = TCPServer(int(port), factory, interface=interface) + service.setServiceParent(root_service) + + if use_amqp: + factory = amqp_listener.createAMQPListener( + amqp_user, amqp_password, + vhost=amqp_vhost, spec=amqp_spec, + exchange_name=amqp_exchange_name, + verbose=amqp_verbose) + service = TCPClient(amqp_host, int(amqp_port), factory) + service.setServiceParent(root_service) + + if settings.ENABLE_MANHOLE: + from carbon import manhole + + factory = manhole.createManholeListener() + service = TCPServer(int(settings.MANHOLE_PORT), factory, + interface=settings.MANHOLE_INTERFACE) + service.setServiceParent(root_service) + + # have to import this *after* settings are defined + from carbon.writer import WriterService + + service = WriterService() + service.setServiceParent(root_service) + + # Instantiate an instrumentation service that will record metrics about + # this service. + from carbon.instrumentation import InstrumentationService + + service = InstrumentationService() + service.setServiceParent(root_service) + + return root_service + + +def createCacheService(config): + from carbon.cache import MetricCache + from carbon.conf import settings + from carbon.events import metricReceived + from carbon.listeners import CacheQueryHandler + + # Configure application components + metricReceived.installHandler(MetricCache.store) + + root_service = createBaseService(config) + factory = ServerFactory() + factory.protocol = CacheQueryHandler + service = TCPServer(int(settings.CACHE_QUERY_PORT), factory, + interface=settings.CACHE_QUERY_INTERFACE) + service.setServiceParent(root_service) + + # have to import this *after* settings are defined + from carbon.writer import WriterService + + service = WriterService() + service.setServiceParent(root_service) + + return root_service + + +def createAggregatorService(config): + from carbon.events import metricReceived + from carbon.aggregator import receiver + from carbon.aggregator.rules import RuleManager + from carbon.aggregator import client + from carbon.rewrite import RewriteRuleManager + from carbon.conf import settings + + root_service = createBaseService(config) + + # Configure application components + metricReceived.installHandler(receiver.process) + RuleManager.read_from(settings["aggregation-rules"]) + if exists(settings["rewrite-rules"]): + RewriteRuleManager.read_from(settings["rewrite-rules"]) + + client.connect(settings["DESTINATION_HOST"], + int(settings["DESTINATION_PORT"])) + + return root_service + + +def createRelayService(config): + from carbon.log import msg + from carbon.conf import settings + from carbon.events import metricReceived + from carbon.hashing import setDestinationHosts + from carbon.relay import createClientConnections, relay + from carbon.rules import loadRules, allDestinationServers, parseHostList + + root_service = createBaseService(config) + + # Configure application components + metricReceived.installHandler(relay) + + if settings["RELAY_METHOD"] == "rules": + loadRules(settings["relay-rules"]) + createClientConnections(allDestinationServers()) + elif settings["RELAY_METHOD"] == "consistent-hashing": + hosts = parseHostList(settings["CH_HOST_LIST"]) + msg('consistent-hashing hosts = %s' % str(hosts)) + setDestinationHosts(hosts) + createClientConnections(hosts) + + return root_service === modified file 'carbon/lib/carbon/util.py' --- carbon/lib/carbon/util.py 2009-12-13 01:35:28 +0000 +++ carbon/lib/carbon/util.py 2011-07-08 20:52:34 +0000 @@ -1,22 +1,50 @@ -import sys import os import pwd - -def daemonize(): - if os.fork() > 0: sys.exit(0) - os.setsid() - if os.fork() > 0: sys.exit(0) - si = open('/dev/null', 'r') - so = open('/dev/null', 'a+') - se = open('/dev/null', 'a+', 0) - os.dup2(si.fileno(), sys.stdin.fileno()) - os.dup2(so.fileno(), sys.stdout.fileno()) - os.dup2(se.fileno(), sys.stderr.fileno()) +from os.path import basename, isdir + +import carbon + +from twisted.application import service +from twisted import plugin +from twisted.python.util import initgroups +from twisted.scripts.twistd import runApp +from twisted.scripts._twistd_unix import daemonize + + +daemonize = daemonize # Backwards compatibility def dropprivs(user): - uid,gid = pwd.getpwnam(user)[2:4] - os.setregid(gid,gid) - os.setreuid(uid,uid) - return (uid,gid) + uid, gid = pwd.getpwnam(user)[2:4] + initgroups(uid, gid) + os.setregid(gid, gid) + os.setreuid(uid, uid) + return (uid, gid) + + +def run_tac(tac_file): + from carbon.log import logToDir + from carbon.conf import settings + + plugins = {} + for plug in plugin.getPlugins(service.IServiceMaker): + plugins[plug.tapname] = plug + + program = basename(tac_file).split('.')[0] + config = plugins[program].options() + config["python"] = tac_file + config["umask"] = 022 + config.parseOptions() + + if not config["nodaemon"]: + logdir = settings.LOG_DIR + if not isdir(logdir): + os.makedirs(logdir) + logToDir(logdir) + + # This isn't as evil as you might think + __builtins__["instance"] = config["instance"] + __builtins__["program"] = program + + runApp(config) === modified file 'carbon/lib/carbon/writer.py' --- carbon/lib/carbon/writer.py 2011-04-02 19:54:28 +0000 +++ carbon/lib/carbon/writer.py 2011-07-08 20:52:34 +0000 @@ -16,26 +16,29 @@ import os import time from os.path import join, exists, dirname, basename -from threading import Thread -from twisted.internet import reactor -from twisted.internet.task import LoopingCall + +try: + import cPickle as pickle +except ImportError: + import pickle + import whisper + from carbon.cache import MetricCache from carbon.storage import getFilesystemPath, loadStorageSchemas from carbon.conf import settings from carbon.instrumentation import increment, append from carbon import log -try: - import cPickle as pickle -except ImportError: - import pickle - -if settings.WHISPER_AUTOFLUSH: - log.msg("enabling whisper autoflush") - whisper.AUTOFLUSH = True + +from twisted.internet import reactor +from twisted.internet.task import LoopingCall +from twisted.application.service import Service + lastCreateInterval = 0 createCount = 0 +schemas = loadStorageSchemas() + def optimalWriteOrder(): "Generates metrics with the most cached values first and applies a soft rate limit on new metrics" @@ -177,10 +180,16 @@ log.err() -schemaReloadTask = LoopingCall(reloadStorageSchemas) -schemas = loadStorageSchemas() - - -def startWriter(): - schemaReloadTask.start(60) - reactor.callInThread(writeForever) +class WriterService(Service): + + def __init__(self): + self.reload_task = LoopingCall(reloadStorageSchemas) + + def startService(self): + self.reload_task.start(60, False) + reactor.callInThread(writeForever) + Service.startService(self) + + def stopService(self): + self.reload_task.stop() + Service.stopService(self) === added directory 'carbon/lib/twisted' === added directory 'carbon/lib/twisted/plugins' === added file 'carbon/lib/twisted/plugins/carbon_aggregator_plugin.py' --- carbon/lib/twisted/plugins/carbon_aggregator_plugin.py 1970-01-01 00:00:00 +0000 +++ carbon/lib/twisted/plugins/carbon_aggregator_plugin.py 2011-07-08 20:52:34 +0000 @@ -0,0 +1,25 @@ +from zope.interface import implements + +from twisted.plugin import IPlugin +from twisted.application.service import IServiceMaker + +from carbon import service +from carbon import conf + + +class CarbonAggregatorServiceMaker(object): + + implements(IServiceMaker, IPlugin) + tapname = "carbon-aggregator" + description = "Aggregate stats for graphite." + options = conf.CarbonAggregatorOptions + + def makeService(self, options): + """ + Construct a C{carbon-aggregator} service. + """ + return service.createAggregatorService(options) + + +# Now construct an object which *provides* the relevant interfaces +serviceMaker = CarbonAggregatorServiceMaker() === added file 'carbon/lib/twisted/plugins/carbon_cache_plugin.py' --- carbon/lib/twisted/plugins/carbon_cache_plugin.py 1970-01-01 00:00:00 +0000 +++ carbon/lib/twisted/plugins/carbon_cache_plugin.py 2011-07-08 20:52:34 +0000 @@ -0,0 +1,25 @@ +from zope.interface import implements + +from twisted.plugin import IPlugin +from twisted.application.service import IServiceMaker + +from carbon import service +from carbon import conf + + +class CarbonCacheServiceMaker(object): + + implements(IServiceMaker, IPlugin) + tapname = "carbon-cache" + description = "Collect stats for graphite." + options = conf.CarbonCacheOptions + + def makeService(self, options): + """ + Construct a C{carbon-cache} service. + """ + return service.createCacheService(options) + + +# Now construct an object which *provides* the relevant interfaces +serviceMaker = CarbonCacheServiceMaker() === added file 'carbon/lib/twisted/plugins/carbon_relay_plugin.py' --- carbon/lib/twisted/plugins/carbon_relay_plugin.py 1970-01-01 00:00:00 +0000 +++ carbon/lib/twisted/plugins/carbon_relay_plugin.py 2011-07-08 20:52:34 +0000 @@ -0,0 +1,25 @@ +from zope.interface import implements + +from twisted.plugin import IPlugin +from twisted.application.service import IServiceMaker + +from carbon import service +from carbon import conf + + +class CarbonRelayServiceMaker(object): + + implements(IServiceMaker, IPlugin) + tapname = "carbon-relay" + description = "Relay stats for graphite." + options = conf.CarbonRelayOptions + + def makeService(self, options): + """ + Construct a C{carbon-aggregator} service. + """ + return service.createRelayService(options) + + +# Now construct an object which *provides* the relevant interfaces +serviceMaker = CarbonRelayServiceMaker()
_______________________________________________ Mailing list: https://launchpad.net/~graphite-dev Post to : [email protected] Unsubscribe : https://launchpad.net/~graphite-dev More help : https://help.launchpad.net/ListHelp

