Author: gstein
Date: Wed Mar 7 00:10:13 2012
New Revision: 1297805
URL: http://svn.apache.org/viewvc?rev=1297805&view=rev
Log:
Strip all of the reconnect logic. Joe reports that it doesn't work
very well, and we're eventually going to rely on the logic in
client.py anyways.
* tools/server-side/svnpubsub/svnwcsub.py:
(...): remove some unused imports
(StreamHandler.startElement): ignore the keepalive elements
(XMLHTTPStream.__init__): no need to store ALIVE or BDEC
(XMLHTTPStream.pageStart, .pageEnd): remove these methods overrides;
we don't need to inform BDEC
(BigDoEverythingClasss.__init__): no need for the CHECKER, or to
record FAILURES or ALIVE
(BigDoEverythingClasss.pageStart, .pageEnd, ._checkalive,
.stillalive, .streamDead): removed. unneeded.
(BigDoEverythingClasss._restartStream): no need to record ALIVE or
to def a streamDead() call.
Modified:
subversion/trunk/tools/server-side/svnpubsub/svnwcsub.py
Modified: subversion/trunk/tools/server-side/svnpubsub/svnwcsub.py
URL:
http://svn.apache.org/viewvc/subversion/trunk/tools/server-side/svnpubsub/svnwcsub.py?rev=1297805&r1=1297804&r2=1297805&view=diff
==============================================================================
--- subversion/trunk/tools/server-side/svnpubsub/svnwcsub.py (original)
+++ subversion/trunk/tools/server-side/svnpubsub/svnwcsub.py Wed Mar 7
00:10:13 2012
@@ -40,13 +40,11 @@ import logging.handlers
import Queue
import optparse
-from twisted.internet import reactor, task, threads
-from twisted.internet.utils import getProcessOutput
+from twisted.internet import reactor
from twisted.application import internet
from twisted.web.client import HTTPClientFactory, HTTPPageDownloader
from urlparse import urlparse
from xml.sax import handler, make_parser
-from twisted.internet import protocol
# check_output() is only available in Python 2.7. Allow us to run with
@@ -159,7 +157,6 @@ class StreamHandler(handler.ContentHandl
self.text_value = None
def startElement(self, name, attrs):
- #print "start element: %s" % (name)
"""
<commit revision="7">
<dirs_changed><path>/</path></dirs_changed>
@@ -167,8 +164,7 @@ class StreamHandler(handler.ContentHandl
"""
if name == "commit":
self.rev = Revision(attrs['repository'], int(attrs['revision']))
- elif name == "stillalive":
- self.bdec.stillalive(self.stream)
+
def characters(self, data):
if self.text_value is not None:
self.text_value = self.text_value + data
@@ -176,7 +172,6 @@ class StreamHandler(handler.ContentHandl
self.text_value = data
def endElement(self, name):
- #print "end element: %s" % (name)
if name == "commit":
self.bdec.commit(self.stream, self.rev)
self.rev = None
@@ -188,20 +183,13 @@ class StreamHandler(handler.ContentHandl
class XMLHTTPStream(HTTPStream):
def __init__(self, url, bdec):
HTTPStream.__init__(self, url)
- self.alive = 0
- self.bdec = bdec
self.parser = make_parser(['xml.sax.expatreader'])
self.handler = StreamHandler(self, bdec)
self.parser.setContentHandler(self.handler)
- def pageStart(self, parital):
- self.bdec.pageStart(self)
-
def pagePart(self, data):
self.parser.feed(data)
- def pageEnd(self):
- self.bdec.pageEnd(self)
def connectTo(url, bdec):
u = urlparse(url)
@@ -228,65 +216,19 @@ class BigDoEverythingClasss(object):
self.tracking = config.get_track()
self.worker = BackgroundWorker(self.svnbin, self.env)
self.service = service
- self.failures = 0
- self.alive = time.time()
- self.checker = task.LoopingCall(self._checkalive)
self.transports = {}
self.streams = {}
for u in self.urls:
self._restartStream(u)
self.watch = []
- self.checker.start(CHECKBEAT_TIME)
def start(self):
for path, url in self.tracking.items():
# working copies auto-register with the BDEC when they are ready.
WorkingCopy(self, path, url)
- def pageStart(self, stream):
- logging.info("Stream %s Connection Established" % (stream.url))
- self.failures = 0
-
- def pageEnd(self, stream):
- logging.info("Stream %s Connection Dead" % (stream.url))
- self.streamDead(stream.url)
-
def _restartStream(self, url):
(self.streams[url], self.transports[url]) = connectTo(url, self)
- self.streams[url].deferred.addBoth(self.streamDead, url)
- self.streams[url].alive = time.time()
-
- def _checkalive(self):
- n = time.time()
- for k in self.streams.keys():
- s = self.streams[k]
- if n - s.alive > CHECKBEAT_TIME:
- logging.info("Stream %s is dead, reconnecting" % (s.url))
- #self.transports[s.url].disconnect()
- self.streamDead(self, s.url)
-
-# d=filter(lambda x:x not in self.streams.keys(), self.urls)
-# for u in d:
-# self._restartStream(u)
-
- def stillalive(self, stream):
- stream.alive = time.time()
-
- def streamDead(self, url, result=None):
- s = self.streams.get(url)
- if not s:
- logging.info("Stream %s is messed up" % (url))
- return
- BACKOFF_SECS = 5
- BACKOFF_MAX = 60
- #self.checker.stop()
-
- self.streams[url] = None
- self.transports[url] = None
- self.failures += 1
- backoff = min(self.failures * BACKOFF_SECS, BACKOFF_MAX)
- logging.info("Stream disconnected, trying again in %d seconds.... %s"
% (backoff, s.url))
- reactor.callLater(backoff, self._restartStream, url)
def wc_ready(self, wc):
# called when a working copy object has its basic info/url,