Wow, this is nice! I had sort of given up finding the cause of this :-(
Thank you for looking at this, and just in time for my presentation at
PKC in 10 days :-)
You're welcome. :-)
--- /usr/lib/python2.5/site-packages/twisted/internet/base.py 2008-07-29
22:13:54.000000000 +0200
+++ internet/base.py 2009-02-20 12:27:42.000000000 +0100
@@ -1127,17 +1127,32 @@
self.startRunning(installSignalHandlers=installSignalHandlers)
self.mainLoop()
+
+ def setLoopCall(self, f, *args, **kw):
+ self.loopCall = (f, args, kw)
+
+
+ def myIteration(self, t):
+ # Advance simulation time in delayed event
+ # processors.
+ self.runUntilCurrent()
+
+ if (t is None):
+ t2 = self.timeout()
+ t = self.running and t2
+
+ self.doIteration(t)
+
+ if ("loopCall" in dir(self)):
+ f, args, kw = self.loopCall
+ f(*args, **kw)
+
def mainLoop(self):
while self._started:
try:
while self._started:
- # Advance simulation time in delayed event
- # processors.
- self.runUntilCurrent()
- t2 = self.timeout()
- t = self.running and t2
- self.doIteration(t)
+ self.myIteration(None)
except:
log.msg("Unexpected error in main loop.")
log.err()
The changes above basically insert a call to self.loopCall after each
doIteration invocation, right?
When the loopCall is process_deferred_queue the main loop becomes:
self.runUntilCurrent()
self.doIteration(t)
runtime.process_deferred_queue
self.runUntilCurrent()
self.doIteration(t)
runtime.process_deferred_queue
...
Yes, exactly.
I think we would get the same result if we started a LoopingCall that
executes process_deferred_queue with an interval of, say, 100 ms:
http://twistedmatrix.com/documents/8.2.0/api/twisted.internet.task.LoopingCall.html
This should work since the runUntilCurrent method runs through the
waiting calls and will trigger our process_deferred_queue method.
And voilá -- no hacking of the Twisted source needed.
I'm not sure but LoopingCall._reschedule() looks more like it schedules
the calls at certain tick, not as soon as possible after the interval is
elapsed. This might not cost too much time but still doesn't feel very
elegant. Furthermore, setting the interval very low leads to high CPU
usage when waiting. Again, this is not too bad but not elegant either.
The same applies if using reactor.callLater() directly.
Of course, we can avoid hacking the Twisted code by extending it within
VIFF. Still, I'm in favor of the two-threaded solution because it's more
elegant, doesn't have the danger of recursing too deep, and, in my
opinion, it should be feasible.
diff -r e2759515f57f viff/runtime.py
--- a/viff/runtime.py Thu Mar 05 21:02:57 2009 +0100
+++ b/viff/runtime.py Fri Mar 06 13:43:14 2009 +0100
@@ -306,6 +306,8 @@
deferred = deq.popleft()
if not deq:
del self.incoming_data[key]
+ # just queue
+ self.factory.runtime.queue_deferred(deferred)
Why is this done?
At this time, we shouldn't call the callbacks because we might recurse
into selectreactor.doSelect(). However, we want to know which deferreds
are ready so we can call deferred.callback() later.
+ #: Activation depth counter
+ self.depth_counter = 0
This is for keeping track of the recursion depth in the future?
Yes. It was used in some debug output earlier but I removed it to
simplify the patch.
+ def queue_deferred(self, deferred):
+ deferred.pause()
+ self.deferred_queue.append(deferred)
+
+ def process_deferred_queue(self):
+ while(self.deferred_queue):
+ deferred = self.deferred_queue.pop(0)
+ deferred.unpause()
Are you doing it this way to ensure that the Deferreds are unpaused in
the same order as they were added to the list?
Yes. I'm not sure whether this is really necessary but it seems just
cleaner because the callback of the some deferred might do a lot of
computations and recurse, which unnecessarily extends the lifetime of
the remaining deferreds.
If that doesn't matter, then I think this would be faster:
queue, self.deferred_queue = self.deferred_queue, []
map(Deferred.unpause, queue)
My idea is that looping over the list with map is faster than repeatedly
popping items from the beginning (an O(n) operation).
But map() still would need O(n) time because that is the nature of
calling a function n times, isn't it? Maybe the function calls are
optimized but the code in the function still is called n times.
+ def activate_reactor(self):
+ self.activation_counter += 1
+
+ # setting the number to n makes the reactor called
+ # only every n-th time
+ if (self.activation_counter >= 2):
+ self.depth_counter += 1
+ reactor.myIteration(0)
+ self.depth_counter -= 1
+ self.activation_counter = 0
If we remove the myIteration code above, we'll have to move the calls to
reactor.runUntilCurrent and reactor.doIteration here.
A question springs to my mind: calling
reactor.runUntilCurrent()
reactor.doIteration(0)
is the same as calling
reactor.iterate(0)
and the documentation for that method says:
[...] This method is not re-entrant: you must not call it recursively;
in particular, you must not call it while the reactor is running.
http://twistedmatrix.com/documents/8.2.0/api/twisted.internet.interfaces.IReactorCore.html#iterate
How does your code ensure that we only call myIteration when we're not
in a call made by the reactor? And could we simply call reactor.iterate
instead?
We actually call it recursively but it should be reentrant if it's not
called from doIteration(). doIteration() is a the same as
select.doSelect(), which certainly is not reentrant. We however call it
from the loop call (process_deferred_queue()) after doIterate().
Calling reactor.iterate() is not enough because it doesn't call
process_deferred_queue(). The principle is that not only the
communication with doSelect() is done but also the callbacks with
process_deferred_queue() are processed. Only the latter triggers further
communication and ends the lifetime of deferreds, which frees memory.
The following graph illustrates my hack:
myIteration()
/ \
doSelect() process_deferred_queue()
| |
communication callbacks
| |
stringReceived() mul() / open() etc.
| |
queue_deferred() myIteration()
|
...
_______________________________________________
viff-devel mailing list (http://viff.dk/)
viff-devel@viff.dk
http://lists.viff.dk/listinfo.cgi/viff-devel-viff.dk