Modified: subversion/branches/ev2-export/tools/server-side/svnpubsub/svntweet.py URL: http://svn.apache.org/viewvc/subversion/branches/ev2-export/tools/server-side/svnpubsub/svntweet.py?rev=1436688&r1=1436687&r2=1436688&view=diff ============================================================================== --- subversion/branches/ev2-export/tools/server-side/svnpubsub/svntweet.py (original) +++ subversion/branches/ev2-export/tools/server-side/svnpubsub/svntweet.py Mon Jan 21 23:37:01 2013 @@ -23,7 +23,7 @@ # svntweet.py my-config.json # # With my-config.json containing stream paths and the twitter auth info: -# {"stream": "http://svn.apache.org:2069/commits/xml", +# {"stream": "http://svn.apache.org:2069/commits", # "username": "asfcommits", # "password": "MyLuggageComboIs1234"} # @@ -43,8 +43,8 @@ from twisted.python import failure, log from twisted.web.client import HTTPClientFactory, HTTPPageDownloader from urlparse import urlparse -from xml.sax import handler, make_parser import time +import posixpath sys.path.insert(0, os.path.join(os.path.dirname(os.path.abspath(__file__)), "twitty-twister", "lib")) try: @@ -81,81 +81,60 @@ class HTTPStream(HTTPClientFactory): def pageEnd(self): pass -class Revision: - def __init__(self, repos, rev): - self.repos = repos - self.rev = rev - self.dirs_changed = [] - self.author = None - self.log = None - self.date = None - -class StreamHandler(handler.ContentHandler): - def __init__(self, bdec): - handler.ContentHandler.__init__(self) - self.bdec = bdec - self.rev = None - self.text_value = None - - def startElement(self, name, attrs): - #print "start element: %s" % (name) - """ - <commit repository="13f79535-47bb-0310-9956-ffa450edef68" - revision="815618"> - <author>joehni</author> - <date>2009-09-16 06:00:21 +0000 (Wed, 16 Sep 2009)</date> - <log>pom.xml is not executable.</log> - <dirs_changed><path>commons/proper/commons-parent/trunk/</path></dirs_changed> - </commit> - """ - if name == "commit": - self.rev = Revision(repos=attrs['repository'], - rev=int(attrs['revision'])) - elif name == "stillalive": - self.bdec.stillalive() - def characters(self, data): - if self.text_value is not None: - self.text_value = self.text_value + data - else: - self.text_value = data - - def endElement(self, name): - #print "end element: %s" % (name) - if name == "commit": - self.bdec.commit(self.rev) - self.rev = None - if self.text_value is not None and self.rev is not None: - if name == "path": - self.rev.dirs_changed.append(self.text_value.strip()) - if name == "author": - self.rev.author = self.text_value.strip() - if name == "date": - self.rev.date = self.text_value.strip() - if name == "log": - self.rev.log = self.text_value.strip() - self.text_value = None +class Commit(object): + def __init__(self, commit): + self.__dict__.update(commit) + +class JSONRecordHandler: + def __init__(self, bdec): + self.bdec = bdec + + def feed(self, record): + obj = json.loads(record) + if 'svnpubsub' in obj: + actual_version = obj['svnpubsub'].get('version') + EXPECTED_VERSION = 1 + if actual_version != EXPECTED_VERSION: + raise ValueException("Unknown svnpubsub format: %r != %d" + % (actual_format, expected_format)) + elif 'commit' in obj: + commit = Commit(obj['commit']) + if not hasattr(commit, 'type'): + raise ValueException("Commit object is missing type field.") + if not hasattr(commit, 'format'): + raise ValueException("Commit object is missing format field.") + if commit.type != 'svn' and commit.format != 1: + raise ValueException("Unexpected type and/or format: %s:%s" + % (commit.type, commit.format)) + self.bdec.commit(commit) + elif 'stillalive' in obj: + self.bdec.stillalive() - -class XMLHTTPStream(HTTPStream): +class JSONHTTPStream(HTTPStream): def __init__(self, url, bdec): HTTPStream.__init__(self, url) self.bdec = bdec - self.parser = make_parser(['xml.sax.expatreader']) - self.handler = StreamHandler(bdec) - self.parser.setContentHandler(self.handler) + self.ibuffer = [] + self.parser = JSONRecordHandler(bdec) - def pageStart(self, parital): + def pageStart(self, partial): self.bdec.pageStart() def pagePart(self, data): - self.parser.feed(data) + eor = data.find("\0") + if eor >= 0: + self.ibuffer.append(data[0:eor]) + self.parser.feed(''.join(self.ibuffer)) + self.ibuffer = [data[eor+1:]] + else: + self.ibuffer.append(data) def connectTo(url, bdec): u = urlparse(url) port = u.port if not port: port = 80 - s = XMLHTTPStream(url, bdec) + s = JSONHTTPStream(url, bdec) conn = reactor.connectTCP(u.hostname, u.port, s) return [s, conn] @@ -209,7 +188,7 @@ class BigDoEverythingClasss(object): def _normalize_path(self, path): if path[0] != '/': return "/" + path - return os.path.abspath(path) + return posixpath.abspath(path) def tweet(self, msg): log.msg("SEND TWEET: %s" % (msg)) @@ -218,29 +197,29 @@ class BigDoEverythingClasss(object): def tweet_done(self, x): log.msg("TWEET: Success!") - def build_tweet(self, rev): + def build_tweet(self, commit): maxlen = 144 left = maxlen - paths = map(self._normalize_path, rev.dirs_changed) + paths = map(self._normalize_path, commit.changed) if not len(paths): return None - path = os.path.commonprefix(paths) + path = posixpath.commonprefix(paths) if path[0:1] == '/' and len(path) > 1: path = path[1:] - #TODO: shorter link - link = " - http://svn.apache.org/viewvc?view=rev&revision=%d" % (rev.rev) + #TODO: allow URL to be configurable. + link = " - http://svn.apache.org/r%d" % (commit.id) left -= len(link) - msg = "r%d in %s by %s: " % (rev.rev, path, rev.author) + msg = "r%d in %s by %s: " % (commit.id, path, commit.committer) left -= len(msg) if left > 3: - msg += rev.log[0:left] + msg += commit.log[0:left] msg += link return msg - def commit(self, rev): - log.msg("COMMIT r%d (%d paths)" % (rev.rev, len(rev.dirs_changed))) - msg = self.build_tweet(rev) + def commit(self, commit): + log.msg("COMMIT r%d (%d paths)" % (commit.id, len(commit.changed))) + msg = self.build_tweet(commit) if msg: self.tweet(msg) #print "Common Prefix: %s" % (pre)
Modified: subversion/branches/ev2-export/tools/server-side/svnpubsub/svnwcsub.py URL: http://svn.apache.org/viewvc/subversion/branches/ev2-export/tools/server-side/svnpubsub/svnwcsub.py?rev=1436688&r1=1436687&r2=1436688&view=diff ============================================================================== --- subversion/branches/ev2-export/tools/server-side/svnpubsub/svnwcsub.py (original) +++ subversion/branches/ev2-export/tools/server-side/svnpubsub/svnwcsub.py Mon Jan 21 23:37:01 2013 @@ -30,19 +30,42 @@ # See svnwcsub.conf for more information on its contents. # +# TODO: +# - bulk update at startup time to avoid backlog warnings +# - fold BDEC into Daemon +# - fold WorkingCopy._get_match() into __init__ +# - remove wc_ready(). assume all WorkingCopy instances are usable. +# place the instances into .watch at creation. the .update_applies() +# just returns if the wc is disabled (eg. could not find wc dir) +# - figure out way to avoid the ASF-specific PRODUCTION_RE_FILTER +# (a base path exclusion list should work for the ASF) +# - add support for SIGHUP to reread the config and reinitialize working copies +# - joes will write documentation for svnpubsub as these items become fulfilled +# - make LOGLEVEL configurable + import errno import subprocess import threading import sys import os import re -import ConfigParser +import posixpath +try: + import ConfigParser +except ImportError: + import configparser as ConfigParser import time import logging.handlers -import Queue +try: + import Queue +except ImportError: + import queue as Queue import optparse import functools -import urlparse +try: + import urlparse +except ImportError: + import urllib.parse as urlparse import daemonize import svnpubsub.client @@ -59,6 +82,20 @@ except AttributeError: raise subprocess.CalledProcessError(pipe.returncode, args) return output +assert hasattr(subprocess, 'check_call') +def check_call(*args, **kwds): + """Wrapper around subprocess.check_call() that logs stderr upon failure.""" + assert 'stderr' not in kwds + kwds.update(stderr=subprocess.PIPE) + pipe = subprocess.Popen(*args, **kwds) + output, errput = pipe.communicate() + if pipe.returncode: + cmd = args[0] if len(args) else kwds.get('args', '(no command)') + # TODO: log stdout too? + logging.error('Command failed: returncode=%d command=%r stderr=%r', + pipe.returncode, cmd, errput) + raise subprocess.CalledProcessError(pipe.returncode, args) + return pipe.returncode # is EXIT_OK ### note: this runs synchronously. within the current Twisted environment, ### it is called from ._get_match() which is run on a thread so it won't @@ -124,15 +161,16 @@ class WorkingCopy(object): def _get_match(self, svnbin, env): ### quick little hack to auto-checkout missing working copies - if not os.path.isdir(self.path + "/.svn") or is_emptydir(self.path): + dotsvn = os.path.join(self.path, ".svn") + if not os.path.isdir(dotsvn) or is_emptydir(dotsvn): logging.info("autopopulate %s from %s" % (self.path, self.url)) - subprocess.check_call([svnbin, 'co', '-q', - '--force', - '--non-interactive', - '--config-option', - 'config:miscellany:use-commit-times=on', - '--', self.url, self.path], - env=env) + check_call([svnbin, 'co', '-q', + '--force', + '--non-interactive', + '--config-option', + 'config:miscellany:use-commit-times=on', + '--', self.url, self.path], + env=env) # Fetch the info for matching dirs_changed against this WC info = svn_info(svnbin, env, self.path) @@ -150,16 +188,11 @@ class BigDoEverythingClasss(object): self.svnbin = config.get_value('svnbin') self.env = config.get_env() self.tracking = config.get_track() - self.hook = config.get_value('hook') + self.hook = config.get_optional_value('hook') + self.streams = config.get_value('streams').split() self.worker = BackgroundWorker(self.svnbin, self.env, self.hook) self.watch = [ ] - self.hostports = [ ] - ### switch from URLs in the config to just host:port pairs - for url in config.get_value('streams').split(): - parsed = urlparse.urlparse(url.strip()) - self.hostports.append((parsed.hostname, parsed.port)) - def start(self): for path, url in self.tracking.items(): # working copies auto-register with the BDEC when they are ready. @@ -175,15 +208,19 @@ class BigDoEverythingClasss(object): def _normalize_path(self, path): if path[0] != '/': return "/" + path - return os.path.abspath(path) + return posixpath.abspath(path) - def commit(self, host, port, rev): - logging.info("COMMIT r%d (%d paths) from %s:%d" - % (rev.rev, len(rev.dirs_changed), host, port)) + def commit(self, url, commit): + if commit.type != 'svn' or commit.format != 1: + logging.info("SKIP unknown commit format (%s.%d)", + commit.type, commit.format) + return + logging.info("COMMIT r%d (%d paths) from %s" + % (commit.id, len(commit.changed), url)) - paths = map(self._normalize_path, rev.dirs_changed) + paths = map(self._normalize_path, commit.changed) if len(paths): - pre = os.path.commonprefix(paths) + pre = posixpath.commonprefix(paths) if pre == "/websites/": # special case for svnmucc "dynamic content" buildbot commits # just take the first production path to avoid updating all cms working copies @@ -194,8 +231,8 @@ class BigDoEverythingClasss(object): break #print "Common Prefix: %s" % (pre) - wcs = [wc for wc in self.watch if wc.update_applies(rev.uuid, pre)] - logging.info("Updating %d WC for r%d" % (len(wcs), rev.rev)) + wcs = [wc for wc in self.watch if wc.update_applies(commit.repository, pre)] + logging.info("Updating %d WC for r%d" % (len(wcs), commit.id)) for wc in wcs: self.worker.add_work(OP_UPDATE, wc) @@ -278,7 +315,7 @@ class BackgroundWorker(threading.Thread) '--', wc.url, wc.path] - subprocess.check_call(args, env=self.env) + check_call(args, env=self.env) ### check the loglevel before running 'svn info'? info = svn_info(self.svnbin, self.env, wc.path) @@ -291,7 +328,7 @@ class BackgroundWorker(threading.Thread) wc.path, info['Revision'], hook_mode) args = [self.hook, hook_mode, wc.path, info['Revision'], wc.url] - subprocess.check_call(args, env=self.env) + check_call(args, env=self.env) def _cleanup(self, wc): "Run a cleanup on the specified working copy." @@ -304,7 +341,7 @@ class BackgroundWorker(threading.Thread) '--config-option', 'config:miscellany:use-commit-times=on', wc.path] - subprocess.check_call(args, env=self.env) + check_call(args, env=self.env) class ReloadableConfig(ConfigParser.SafeConfigParser): @@ -331,6 +368,12 @@ class ReloadableConfig(ConfigParser.Safe def get_value(self, which): return self.get(ConfigParser.DEFAULTSECT, which) + def get_optional_value(self, which, default=None): + if self.has_option(ConfigParser.DEFAULTSECT, which): + return self.get(ConfigParser.DEFAULTSECT, which) + else: + return default + def get_env(self): env = os.environ.copy() default_options = self.defaults().keys() @@ -376,18 +419,18 @@ class Daemon(daemonize.Daemon): # Start the BDEC (on the main thread), then start the client self.bdec.start() - mc = svnpubsub.client.MultiClient(self.bdec.hostports, + mc = svnpubsub.client.MultiClient(self.bdec.streams, self.bdec.commit, self._event) mc.run_forever() - def _event(self, host, port, event_name): + def _event(self, url, event_name, event_arg): if event_name == 'error': - logging.exception('from %s:%s', host, port) + logging.exception('from %s', url) elif event_name == 'ping': - logging.debug('ping from %s:%s', host, port) + logging.debug('ping from %s', url) else: - logging.info('"%s" from %s:%s', event_name, host, port) + logging.info('"%s" from %s', event_name, url) def prepare_logging(logfile): Modified: subversion/branches/ev2-export/tools/server-side/svnpubsub/testserver.py URL: http://svn.apache.org/viewvc/subversion/branches/ev2-export/tools/server-side/svnpubsub/testserver.py?rev=1436688&r1=1436687&r2=1436688&view=diff ============================================================================== --- subversion/branches/ev2-export/tools/server-side/svnpubsub/testserver.py (original) +++ subversion/branches/ev2-export/tools/server-side/svnpubsub/testserver.py Mon Jan 21 23:37:01 2013 @@ -30,7 +30,7 @@ import BaseHTTPServer PORT = 2069 -TEST_BODY = '<commit repository="12345678-1234-1234-1234-123456789012" revision="1234"><author>johndoe</author><date>2012-01-01 01:01:01 +0000 (Sun, 01 Jan 2012)</date><log>Frob the ganoozle with the snookish</log><dirs_changed><path>one/path/</path><path>some/other/directory/</path></dirs_changed></commit>' +TEST_BODY = '{"svnpubsub": {"version": 1}}\n\0{"commit": {"type": "svn", "format": 1, "repository": "12345678-1234-1234-1234-123456789012", "id": "1234", "committer": "johndoe", "date": "2012-01-01 01:01:01 +0000 (Sun, 01 Jan 2012)", "log": "Frob the ganoozle with the snookish", "changed": {"one/path/alpha": {"flags": "U "}, "some/other/directory/": {"flags": "_U "}}}}\n\0' SEND_KEEPALIVE = True Modified: subversion/branches/ev2-export/tools/server-side/svnpubsub/watcher.py URL: http://svn.apache.org/viewvc/subversion/branches/ev2-export/tools/server-side/svnpubsub/watcher.py?rev=1436688&r1=1436687&r2=1436688&view=diff ============================================================================== --- subversion/branches/ev2-export/tools/server-side/svnpubsub/watcher.py (original) +++ subversion/branches/ev2-export/tools/server-side/svnpubsub/watcher.py Mon Jan 21 23:37:01 2013 @@ -19,39 +19,37 @@ # # Watch for events from SvnPubSub and print them to stdout # -# ### usage... # import sys -import urlparse import pprint +try: + import urlparse +except ImportError: + import urllib.parse as urlparse import svnpubsub.client -import svnwcsub ### for ReloadableConfig -def _commit(host, port, rev): - print 'COMMIT: from %s:%s' % (host, port) - pprint.pprint(vars(rev), indent=2) +def _commit(url, commit): + print('COMMIT: from %s' % url) + pprint.pprint(vars(commit), indent=2) -def _event(host, port, event_name): - print 'EVENT: from %s:%s "%s"' % (host, port, event_name) +def _event(url, event_name, event_arg): + if event_arg: + print('EVENT: from %s "%s" "%s"' % (url, event_name, event_arg)) + else: + print('EVENT: from %s "%s"' % (url, event_name)) -def main(config_file): - config = svnwcsub.ReloadableConfig(config_file) - hostports = [ ] - for url in config.get_value('streams').split(): - parsed = urlparse.urlparse(url) - hostports.append((parsed.hostname, parsed.port)) - - mc = svnpubsub.client.MultiClient(hostports, _commit, _event) +def main(urls): + mc = svnpubsub.client.MultiClient(urls, _commit, _event) mc.run_forever() if __name__ == "__main__": - if len(sys.argv) != 2: - print "invalid args, read source code" + if len(sys.argv) < 2: + print("usage: watcher.py URL [URL...]") sys.exit(0) - main(sys.argv[1]) + main(sys.argv[1:])
