Modified: subversion/branches/inheritable-props/tools/server-side/svnpubsub/svnwcsub.py URL: http://svn.apache.org/viewvc/subversion/branches/inheritable-props/tools/server-side/svnpubsub/svnwcsub.py?rev=1301216&r1=1301215&r2=1301216&view=diff ============================================================================== --- subversion/branches/inheritable-props/tools/server-side/svnpubsub/svnwcsub.py (original) +++ subversion/branches/inheritable-props/tools/server-side/svnpubsub/svnwcsub.py Thu Mar 15 21:40:15 2012 @@ -39,23 +39,19 @@ import time import logging.handlers import Queue import optparse +import functools +import urlparse -from twisted.internet import reactor, task, threads -from twisted.internet.utils import getProcessOutput -from twisted.application import internet -from twisted.web.client import HTTPClientFactory, HTTPPageDownloader -from urlparse import urlparse -from xml.sax import handler, make_parser -from twisted.internet import protocol - +import daemonize +import svnpubsub.client # check_output() is only available in Python 2.7. Allow us to run with # earlier versions try: check_output = subprocess.check_output except AttributeError: - def check_output(args): # note: we don't use anything beyond args - pipe = subprocess.Popen(args, stdout=subprocess.PIPE) + def check_output(args, env): # note: we only use these two args + pipe = subprocess.Popen(args, stdout=subprocess.PIPE, env=env) output, _ = pipe.communicate() if pipe.returncode: raise subprocess.CalledProcessError(pipe.returncode, args) @@ -65,10 +61,10 @@ except AttributeError: ### note: this runs synchronously. within the current Twisted environment, ### it is called from ._get_match() which is run on a thread so it won't ### block the Twisted main loop. -def svn_info(svnbin, path): +def svn_info(svnbin, env, path): "Run 'svn info' on the target path, returning a dict of info data." args = [svnbin, "info", "--non-interactive", "--", path] - output = check_output(args).strip() + output = check_output(args, env=env).strip() info = { } for line in output.split('\n'): idx = line.index(':') @@ -78,20 +74,14 @@ def svn_info(svnbin, path): class WorkingCopy(object): def __init__(self, bdec, path, url): - self.bdec = bdec self.path = path self.url = url - self.repos = None - self.match = None - d = threads.deferToThread(self._get_match) - d.addCallback(self._set_match) - - def _set_match(self, value): - self.match = str(value[0]) - self.url = value[1] - self.repos = value[2] - self.uuid = value[3] - self.bdec.wc_ready(self) + + try: + self.match, self.uuid = self._get_match(bdec.svnbin, bdec.env) + bdec.wc_ready(self) + except: + logging.exception('problem with working copy: %s', path) def update_applies(self, uuid, path): if self.uuid != uuid: @@ -114,181 +104,44 @@ class WorkingCopy(object): return True return False - def _get_match(self): + def _get_match(self, svnbin, env): ### quick little hack to auto-checkout missing working copies if not os.path.isdir(self.path): logging.info("autopopulate %s from %s" % (self.path, self.url)) - subprocess.check_call([self.bdec.svnbin, 'co', '-q', + subprocess.check_call([svnbin, 'co', '-q', '--non-interactive', - '--config-dir', - '/home/svnwc/.subversion', - '--', self.url, self.path]) + '--', self.url, self.path], + env=env) # Fetch the info for matching dirs_changed against this WC - info = svn_info(self.bdec.svnbin, self.path) + info = svn_info(svnbin, env, self.path) + root = info['Repository Root'] url = info['URL'] - repos = info['Repository Root'] + relpath = url[len(root):] # also has leading '/' uuid = info['Repository UUID'] - relpath = url[len(repos):] # also has leading '/' - return [relpath, url, repos, uuid] - - -class HTTPStream(HTTPClientFactory): - protocol = HTTPPageDownloader - - def __init__(self, url): - self.url = url - HTTPClientFactory.__init__(self, url, method="GET", agent="SvnWcSub/0.1.0") - - def pageStart(self, partial): - pass - - def pagePart(self, data): - pass - - def pageEnd(self): - pass - -class Revision: - def __init__(self, repos, rev): - self.repos = repos - self.rev = rev - self.dirs_changed = [] - -class StreamHandler(handler.ContentHandler): - def __init__(self, stream, bdec): - handler.ContentHandler.__init__(self) - self.stream = stream - self.bdec = bdec - self.rev = None - self.text_value = None - - def startElement(self, name, attrs): - #print "start element: %s" % (name) - """ - <commit revision="7"> - <dirs_changed><path>/</path></dirs_changed> - </commit> - """ - if name == "commit": - self.rev = Revision(attrs['repository'], int(attrs['revision'])) - elif name == "stillalive": - self.bdec.stillalive(self.stream) - def characters(self, data): - if self.text_value is not None: - self.text_value = self.text_value + data - else: - self.text_value = data + return str(relpath), uuid - def endElement(self, name): - #print "end element: %s" % (name) - if name == "commit": - self.bdec.commit(self.stream, self.rev) - self.rev = None - if name == "path" and self.text_value is not None and self.rev is not None: - self.rev.dirs_changed.append(self.text_value.strip()) - self.text_value = None - - -class XMLHTTPStream(HTTPStream): - def __init__(self, url, bdec): - HTTPStream.__init__(self, url) - self.alive = 0 - self.bdec = bdec - self.parser = make_parser(['xml.sax.expatreader']) - self.handler = StreamHandler(self, bdec) - self.parser.setContentHandler(self.handler) - - def pageStart(self, parital): - self.bdec.pageStart(self) - - def pagePart(self, data): - self.parser.feed(data) - - def pageEnd(self): - self.bdec.pageEnd(self) - -def connectTo(url, bdec): - u = urlparse(url) - port = u.port - if not port: - port = 80 - s = XMLHTTPStream(url, bdec) - if bdec.service: - conn = internet.TCPClient(u.hostname, u.port, s) - conn.setServiceParent(bdec.service) - else: - conn = reactor.connectTCP(u.hostname, u.port, s) - return [s, conn] - -CHECKBEAT_TIME = 60 PRODUCTION_RE_FILTER = re.compile("/websites/production/[^/]+/") class BigDoEverythingClasss(object): - def __init__(self, config, service = None): - self.urls = [s.strip() for s in config.get_value('streams').split()] + def __init__(self, config): self.svnbin = config.get_value('svnbin') self.env = config.get_env() + self.tracking = config.get_track() self.worker = BackgroundWorker(self.svnbin, self.env) - self.service = service - self.failures = 0 - self.alive = time.time() - self.checker = task.LoopingCall(self._checkalive) - self.transports = {} - self.streams = {} - for u in self.urls: - self._restartStream(u) - self.watch = [] - for path, url in config.get_track().items(): + self.watch = [ ] + + self.hostports = [ ] + ### switch from URLs in the config to just host:port pairs + for url in config.get_value('streams').split(): + parsed = urlparse.urlparse(url.strip()) + self.hostports.append((parsed.hostname, parsed.port)) + + def start(self): + for path, url in self.tracking.items(): # working copies auto-register with the BDEC when they are ready. WorkingCopy(self, path, url) - self.checker.start(CHECKBEAT_TIME) - - def pageStart(self, stream): - logging.info("Stream %s Connection Established" % (stream.url)) - self.failures = 0 - - def pageEnd(self, stream): - logging.info("Stream %s Connection Dead" % (stream.url)) - self.streamDead(stream.url) - - def _restartStream(self, url): - (self.streams[url], self.transports[url]) = connectTo(url, self) - self.streams[url].deferred.addBoth(self.streamDead, url) - self.streams[url].alive = time.time() - - def _checkalive(self): - n = time.time() - for k in self.streams.keys(): - s = self.streams[k] - if n - s.alive > CHECKBEAT_TIME: - logging.info("Stream %s is dead, reconnecting" % (s.url)) - #self.transports[s.url].disconnect() - self.streamDead(self, s.url) - -# d=filter(lambda x:x not in self.streams.keys(), self.urls) -# for u in d: -# self._restartStream(u) - - def stillalive(self, stream): - stream.alive = time.time() - - def streamDead(self, url, result=None): - s = self.streams.get(url) - if not s: - logging.info("Stream %s is messed up" % (url)) - return - BACKOFF_SECS = 5 - BACKOFF_MAX = 60 - #self.checker.stop() - - self.streams[url] = None - self.transports[url] = None - self.failures += 1 - backoff = min(self.failures * BACKOFF_SECS, BACKOFF_MAX) - logging.info("Stream disconnected, trying again in %d seconds.... %s" % (backoff, s.url)) - reactor.callLater(backoff, self._restartStream, url) def wc_ready(self, wc): # called when a working copy object has its basic info/url, @@ -302,8 +155,10 @@ class BigDoEverythingClasss(object): return "/" + path return os.path.abspath(path) - def commit(self, stream, rev): - logging.info("COMMIT r%d (%d paths) via %s" % (rev.rev, len(rev.dirs_changed), stream.url)) + def commit(self, host, port, rev): + logging.info("COMMIT r%d (%d paths) from %s:%d" + % (rev.rev, len(rev.dirs_changed), host, port)) + paths = map(self._normalize_path, rev.dirs_changed) if len(paths): pre = os.path.commonprefix(paths) @@ -317,7 +172,7 @@ class BigDoEverythingClasss(object): break #print "Common Prefix: %s" % (pre) - wcs = [wc for wc in self.watch if wc.update_applies(rev.repos, pre)] + wcs = [wc for wc in self.watch if wc.update_applies(rev.uuid, pre)] logging.info("Updating %d WC for r%d" % (len(wcs), rev.rev)) for wc in wcs: self.worker.add_work(OP_UPDATE, wc) @@ -384,7 +239,6 @@ class BackgroundWorker(threading.Thread) ### still specific to the ASF setup. args = [self.svnbin, 'update', '--quiet', - '--config-dir', '/home/svnwc/.subversion', '--non-interactive', '--trust-server-cert', '--ignore-externals', @@ -392,7 +246,7 @@ class BackgroundWorker(threading.Thread) subprocess.check_call(args, env=self.env) ### check the loglevel before running 'svn info'? - info = svn_info(self.svnbin, wc.path) + info = svn_info(self.svnbin, self.env, wc.path) logging.info("updated: %s now at r%s", wc.path, info['Revision']) def _cleanup(self, wc): @@ -401,7 +255,6 @@ class BackgroundWorker(threading.Thread) ### we need to move some of these args into the config. these are ### still specific to the ASF setup. args = [self.svnbin, 'cleanup', - '--config-dir', '/home/svnwc/.subversion', '--non-interactive', '--trust-server-cert', wc.path] @@ -452,6 +305,45 @@ class ReloadableConfig(ConfigParser.Safe return str(option) +class Daemon(daemonize.Daemon): + def __init__(self, logfile, pidfile, umask, bdec): + daemonize.Daemon.__init__(self, logfile, pidfile) + + self.umask = umask + self.bdec = bdec + + def setup(self): + # There is no setup which the parent needs to wait for. + pass + + def run(self): + logging.info('svnwcsub started, pid=%d', os.getpid()) + + # Set the umask in the daemon process. Defaults to 000 for + # daemonized processes. Foreground processes simply inherit + # the value from the parent process. + if self.umask is not None: + umask = int(self.umask, 8) + os.umask(umask) + logging.info('umask set to %03o', umask) + + # Start the BDEC (on the main thread), then start the client + self.bdec.start() + + mc = svnpubsub.client.MultiClient(self.bdec.hostports, + self.bdec.commit, + self._event) + mc.run_forever() + + def _event(self, host, port, event_name): + if event_name == 'error': + logging.exception('from %s:%s', host, port) + elif event_name == 'ping': + logging.debug('ping from %s:%s', host, port) + else: + logging.info('"%s" from %s:%s', event_name, host, port) + + def prepare_logging(logfile): "Log to the specified file, or to stdout if None." @@ -480,20 +372,13 @@ def handle_options(options): # Set up the logging, then process the rest of the options. prepare_logging(options.logfile) - if options.pidfile: + # In daemon mode, we let the daemonize module handle the pidfile. + # Otherwise, we should write this (foreground) PID into the file. + if options.pidfile and not options.daemon: pid = os.getpid() open(options.pidfile, 'w').write('%s\n' % pid) logging.info('pid %d written to %s', pid, options.pidfile) - if options.uid: - try: - uid = int(options.uid) - except ValueError: - import pwd - uid = pwd.getpwnam(options.uid)[2] - logging.info('setting uid %d', uid) - os.setuid(uid) - if options.gid: try: gid = int(options.gid) @@ -503,10 +388,14 @@ def handle_options(options): logging.info('setting gid %d', gid) os.setgid(gid) - if options.umask: - umask = int(options.umask, 8) - os.umask(umask) - logging.info('umask set to %03o', umask) + if options.uid: + try: + uid = int(options.uid) + except ValueError: + import pwd + uid = pwd.getpwnam(options.uid)[2] + logging.info('setting uid %d', uid) + os.setuid(uid) def main(args): @@ -525,6 +414,8 @@ def main(args): help='switch to this GID before running') parser.add_option('--umask', help='set this (octal) umask before running') + parser.add_option('--daemon', action='store_true', + help='run as a background daemon') options, extra = parser.parse_args(args) @@ -532,12 +423,26 @@ def main(args): parser.error('CONFIG_FILE is required') config_file = extra[0] + if options.daemon and not options.logfile: + parser.error('LOGFILE is required when running as a daemon') + if options.daemon and not options.pidfile: + parser.error('PIDFILE is required when running as a daemon') + # Process any provided options. handle_options(options) c = ReloadableConfig(config_file) - big = BigDoEverythingClasss(c) - reactor.run() + bdec = BigDoEverythingClasss(c) + + # We manage the logfile ourselves (along with possible rotation). The + # daemon process can just drop stdout/stderr into /dev/null. + d = Daemon('/dev/null', options.pidfile, options.umask, bdec) + if options.daemon: + # Daemonize the process and call sys.exit() with appropriate code + d.daemonize_exit() + else: + # Just run in the foreground (the default) + d.foreground() if __name__ == "__main__":
