Author: tack
Date: Sun Feb 10 18:58:09 2008
New Revision: 3041
Log:
MainThreadCallback.set_async is deprecated (and logs a warning). Calling a
MainThreadCallback will now return an InProgress object, and the caller
can wait() on this InProgress if synchronous behaviour is needed.
InProgress.wait() is now thread safe. If wait() is called from the main
thread, the notifier is kept alive via a step() loop. If it's called from
a thread, the thread is blocked by waiting on a threading.Event object.
wait() now accepts an optional timeout argument, in seconds. If timeout
expires, a TimeoutException is raised.
Updated test/asynctest.py to conform to API changes (removes deprecation
warnings from output).
Changed is_mainthread() semantics. is_mainthread() now returns None if
no mainthread has been set. main.run() now raises an exception if some
other thread has been set as the main thread.
Unhandled InProgress exceptions are now logged in the destructor. This
is still not perfect, but it's a hard problem to solve. :)
New method InProgress.link() to link together two InProgress objects.
With one.link(two), when two finishes, one is finished (or throws).
Modified:
trunk/base/API_CHANGES.txt
trunk/base/src/notifier/async.py
trunk/base/src/notifier/decorators.py
trunk/base/src/notifier/main.py
trunk/base/src/notifier/thread.py
trunk/base/src/notifier/yieldfunc.py
trunk/base/src/rpc.py
trunk/base/test/asynctest.py
Modified: trunk/base/API_CHANGES.txt
==============================================================================
--- trunk/base/API_CHANGES.txt (original)
+++ trunk/base/API_CHANGES.txt Sun Feb 10 18:58:09 2008
@@ -39,3 +39,7 @@
InProgress object may already be finished. Using yield on a
finished object will return without waiting. For Python 2.5 the
yield also returns the value or can raise an exception.
+
+9. MainThreadCallback.set_async() is deprecated. Calling a
+ MainThreadCallback will return an InProgress object, and the
+ correct approach now is to wait() on this InProgress.
Modified: trunk/base/src/notifier/async.py
==============================================================================
--- trunk/base/src/notifier/async.py (original)
+++ trunk/base/src/notifier/async.py Sun Feb 10 18:58:09 2008
@@ -34,6 +34,8 @@
# python imports
import logging
import traceback
+import time
+import threading
# kaa.notifier imports
from callback import Signal
@@ -41,6 +43,8 @@
# get logging object
log = logging.getLogger('notifier.async')
+class TimeoutException(Exception):
+ pass
class InProgress(Signal):
"""
@@ -103,8 +107,15 @@
Signal.__init__(self)
self.exception = Signal()
self._finished = False
+ self._finished_event = threading.Event()
+ self._unhandled_exception = False
self.status = None
+ def __del__(self):
+ if self._unhandled_exception:
+ # We didn't get a chance to log this unhandled exception, so do
+ # it now.
+ self._log_exception()
def set_status(self, s):
"""
@@ -132,14 +143,16 @@
done and no longer in progress.
"""
if isinstance(result, InProgress):
- # we are still not finished, register to this result
- result.connect(self.finished)
- result.exception.connect(self.throw)
+ # we are still not finished, link to this new InProgress
+ self.link(result)
return
+
# store result
self._finished = True
self._result = result
self._exception = None
+ # Wake any threads waiting on us
+ self._finished_event.set()
# emit signal
self.emit_when_handled(result)
# cleanup
@@ -151,22 +164,32 @@
This function should be called when the creating function is
done because it raised an exception.
"""
- if self.exception.count() == 0:
- # FIXME: we must still log the exception if we have an internal
- # handler but no external handler. In this case count() > 0.
- # There is no handler, so dump the exception.
- trace = ''.join(traceback.format_exception(type, value, tb))
- log.error('*** Unhandled InProgress exception ***\n%s', trace)
-
# store result
self._finished = True
self._exception = type, value, tb
- # emit signal
- self.exception.emit_when_handled(type, value, tb)
+ self._unhandled_exception = False
+ # Wake any threads waiting on us
+ if self._finished_event:
+ self._finished_event.set()
+
+ if self.exception.emit_when_handled(type, value, tb) != False:
+ # No handler returned False to block us from logging the exception.
+ # Set a flag to log the exception in the destructor if it is
+ # not raised with get_result().
+ self._unhandled_exception = True
+
# cleanup
self._callbacks = []
+ def _log_exception(self):
+ if not self._unhandled_exception:
+ return
+ self._unhandled_exception = False
+ trace = ''.join(traceback.format_exception(*self._exception)).strip()
+ log.error('*** Unhandled %s exception ***\n%s',
self.__class__.__name__, trace)
+
+
def __call__(self, *args, **kwargs):
"""
You can call the InProgress object to get the results when finished.
@@ -193,38 +216,70 @@
if not self._finished:
raise RuntimeError('operation not finished')
if self._exception:
+ self._unhandled_exception = False
type, value, tb = self._exception
# Special 3-argument form of raise; preserves traceback
raise type, value, tb
return self._result
- def wait(self):
+ def wait(self, timeout = None):
"""
Waits for the result (or exception) of the InProgress object. The
- main loop is kept alive.
+ main loop is kept alive if waiting in the main thread, otherwise
+ the thread is blocked until another thread finishes the InProgress.
+
+ If timeout is specified, wait() blocks for at most timeout seconds
+ (which may be fractional). If wait times out, a TimeoutException is
+ raised.
"""
# Import modules here rather than globally to avoid circular importing.
import main
+ from thread import set_as_mainthread, is_mainthread
+
if not main.is_running():
# No main loop is running yet. We're calling step() below,
# but we won't get notified of any thread completion
# unless the thread notifier pipe is initialized.
- from thread import set_as_mainthread
set_as_mainthread()
- if self.exception.count() == 0:
- # No existing exception handler. Connect a dummy handler to
- # prevent it from being logged in throw(). It will get raised
- # later when we call get_result().
- self.exception.connect(lambda *args: None)
-
- while not self.is_finished():
- main.step()
+ # Connect a dummy handler to prevent any exception from being logged in
+ # throw(). It will get raised later when we call get_result().
+ dummy_handler = lambda *args: False
+ self.exception.connect_once(dummy_handler)
+
+ if is_mainthread():
+ # We're waiting in the main thread, so we must keep the mainloop
+ # alive by calling step() until we're finished.
+ abort = []
+ if timeout:
+ # Add a timer to make sure the notifier doesn't sleep
+ # beyond out timeout.
+ from timer import OneShotTimer
+ OneShotTimer(lambda: abort.append(True)).start(timeout)
+
+ while not self.is_finished() and not abort:
+ main.step()
+ else:
+ # We're waiting in some other thread, so wait for some other
+ # thread to wake us up.
+ self._finished_event.wait(timeout)
+
+ if not self.is_finished():
+ self.exception.disconnect(dummy_handler)
+ raise TimeoutException
return self.get_result()
+ def link(self, in_progress):
+ """
+ Links with another InProgress object. When the supplied in_progress
+ object finishes (or throws), we do too.
+ """
+ in_progress.connect_both(self.finished, self.throw)
+
+
def _connect(self, callback, args = (), kwargs = {}, once = False,
weak = False, pos = -1):
"""
@@ -239,4 +294,4 @@
Connect a finished and an exception callback without extra arguments.
"""
self.connect(finished)
- self.exception.connect(exception)
+ self.exception.connect_once(exception)
Modified: trunk/base/src/notifier/decorators.py
==============================================================================
--- trunk/base/src/notifier/decorators.py (original)
+++ trunk/base/src/notifier/decorators.py Sun Feb 10 18:58:09 2008
@@ -100,22 +100,26 @@
return decorator
-def execute_in_mainloop(async=False):
+def execute_in_mainloop(async = False):
"""
- This decorator makes sure the function is called from the main loop. If
- the calling thread is the mainloop, it is a normal function call, if not,
- MainThreadCallback is used to call the function. If 'async' is set to
False,
- the thread will wait for the answer. It is possible with this decorator to
- have a longer codeblock in a thread and call functions not thread save.
+ This decorator makes sure the function is called from the main loop. If
+ async is True, any decorated function will return InProgress, whether the
+ function is called in the main thread or another thread.
+
+ If async is False and this function is called in the main thread, it
+ behaves as a normal function call (as if it weren't decorated). But if the
+ calling thread is not the main thread, it is blocked until the function
+ finishes, and its return value is passed (or any exception is raised)
"""
def decorator(func):
def newfunc(*args, **kwargs):
- if is_mainthread():
+ if not async and is_mainthread():
return func(*args, **kwargs)
- t = MainThreadCallback(func, *args, **kwargs)
- t.set_async(async)
- return t()
+ in_progress = MainThreadCallback(func)(*args, **kwargs)
+ if not async:
+ return in_progress.wait()
+ return in_progress
try:
newfunc.func_name = func.func_name
Modified: trunk/base/src/notifier/main.py
==============================================================================
--- trunk/base/src/notifier/main.py (original)
+++ trunk/base/src/notifier/main.py Sun Feb 10 18:58:09 2008
@@ -90,6 +90,9 @@
_running = True
unhandled_exception = None
+ if is_mainthread() is False:
+ raise RuntimeError('Mainthread is already running')
+
set_as_mainthread()
while True:
try:
Modified: trunk/base/src/notifier/thread.py
==============================================================================
--- trunk/base/src/notifier/thread.py (original)
+++ trunk/base/src/notifier/thread.py Sun Feb 10 18:58:09 2008
@@ -72,61 +72,57 @@
self.lock = threading.Lock()
self._sync_return = None
self._sync_exception = None
- self.set_async()
+ self._async = True
def set_async(self, async = True):
+ log.warning("set_async() is deprecated; use callback().wait()
instead.")
self._async = async
- def _set_result(self, result):
- self._sync_return = result
- if isinstance(self._sync_return, InProgress):
- if not self._sync_return.is_finished():
- self._sync_return.connect_both(self._set_result,
self._set_exception)
- return
- self._sync_return = self._sync_return()
- self._wakeup()
-
- def _set_exception(self, type, value, tb):
- self._sync_exception = type, value, tb
- self._wakeup()
-
def _wakeup(self):
+ # XXX: this function is called by _thread_notifier_run_queue(). It
+ # is also deprecated.
self.lock.acquire(False)
self.lock.release()
def __call__(self, *args, **kwargs):
- if threading.currentThread() == _thread_notifier_mainthread:
- return super(MainThreadCallback, self).__call__(*args, **kwargs)
+ in_progress = InProgress()
+
+ if is_mainthread():
+ if not self._async:
+ # TODO: async flag is deprecated, caller should call wait() on
+ # the inprogress instead.
+ return super(MainThreadCallback, self).__call__(*args,
**kwargs)
+
+ try:
+ result = super(MainThreadCallback, self).__call__(*args,
**kwargs)
+ in_progress.finished(result)
+ except:
+ in_progress.throw(*sys.exc_info())
+
+ return in_progress
self.lock.acquire(False)
_thread_notifier_lock.acquire()
- _thread_notifier_queue.insert(0, (self, args, kwargs))
+ _thread_notifier_queue.insert(0, (self, args, kwargs, in_progress))
if len(_thread_notifier_queue) == 1:
if _thread_notifier_pipe:
os.write(_thread_notifier_pipe[1], "1")
_thread_notifier_lock.release()
- # FIXME: what happens if we switch threads here and execute
- # the callback? In that case we may block because we already
- # have the result. Should we use a Condition here?
-
+ # TODO: this is deprecated, caller should use wait() on the InProgress
+ # we return (when set_async(False) isn't called). This is also broken
+ # because we share a single lock for multiple invocations of this
+ # callback.
if not self._async:
# Synchronous execution: wait for main call us and collect
# the return value.
self.lock.acquire()
- if self._sync_exception:
- type, value, tb = self._sync_exception
- # Special 3-argument form of raise; preserves traceback
- raise type, value, tb
-
- return self._sync_return
-
- # Asynchronous: explicitly return None here. We could return
- # self._sync_return and there's a chance it'd be valid even
- # in the async case, but that's non-deterministic and dangerous
- # to rely on.
- return None
+ return in_progress.get_result()
+
+ # Return an InProgress object which the caller can connect to
+ # or wait on.
+ return in_progress
class ThreadInProgress(InProgress):
@@ -206,15 +202,20 @@
def is_mainthread():
"""
- Return True if the caller is in the main thread right now.
+ Return True if the caller is in the main thread right now, and False if
+ some other thread is the main thread. If no main thread has been
+ set, returns None.
"""
# If threading module is None, assume main thread. (Silences pointless
# exceptions on shutdown.)
- return (not threading) or threading.currentThread() ==
_thread_notifier_mainthread
-
+ if not threading or threading.currentThread() ==
_thread_notifier_mainthread:
+ return True
+ elif _thread_notifier_mainthread is None:
+ return None
+ return False
-_thread_notifier_mainthread = threading.currentThread()
+_thread_notifier_mainthread = None
_thread_notifier_lock = threading.Lock()
_thread_notifier_queue = []
@@ -269,17 +270,15 @@
while _thread_notifier_queue:
_thread_notifier_lock.acquire()
- callback, args, kwargs = _thread_notifier_queue.pop()
+ callback, args, kwargs, in_progress = _thread_notifier_queue.pop()
_thread_notifier_lock.release()
+
try:
- # call callback and set result
- callback._set_result(callback(*args, **kwargs))
- except ( KeyboardInterrupt, SystemExit ), e:
- # only wakeup to make it possible to stop the thread
+ in_progress.finished(callback(*args, **kwargs))
+ except:
+ in_progress.throw(*sys.exc_info())
+
+ if in_progress.is_finished():
callback._wakeup()
- raise SystemExit
- except Exception, e:
- log.exception('mainthread callback')
- # set exception in callback
- callback._set_exception(e)
+
return True
Modified: trunk/base/src/notifier/yieldfunc.py
==============================================================================
--- trunk/base/src/notifier/yieldfunc.py (original)
+++ trunk/base/src/notifier/yieldfunc.py Sun Feb 10 18:58:09 2008
@@ -132,6 +132,7 @@
"""
if _python25 and async is not None:
if async._exception:
+ async._unhandled_exception = False
return func.throw(*async._exception)
return func.send(async._result)
return func.next()
@@ -265,6 +266,12 @@
if self._timer:
# continue calling _step
self._timer.start(self._interval)
+ if len(args) == 3 and isinstance(args[1], Exception):
+ # An InProgress we were waiting on raised an exception. We are
+ # "inheriting" this exception, so return False to prevent it
+ # from being logged as unhandled in the other InProgress.
+ self.throw(*args)
+ return False
def _step(self):
Modified: trunk/base/src/rpc.py
==============================================================================
--- trunk/base/src/rpc.py (original)
+++ trunk/base/src/rpc.py Sun Feb 10 18:58:09 2008
@@ -229,6 +229,15 @@
self.objects.append(obj)
+ def disconnect(self, obj):
+ """
+ Disconnects a previously connected object.
+ """
+ try:
+ self.objects.remove(obj)
+ except ValueError:
+ pass
+
class Channel(object):
"""
Modified: trunk/base/test/asynctest.py
==============================================================================
--- trunk/base/test/asynctest.py (original)
+++ trunk/base/test/asynctest.py Sun Feb 10 18:58:09 2008
@@ -59,8 +59,8 @@
cb = kaa.MainThreadCallback(c.rpc)
# we not only wait to get the InProgress back, we also wait
# for the real return from rpc
- cb.set_async(False)
- x = cb('test5', x)
+ #cb.set_async(False)
+ x = cb('test5', x).wait()
print x
return x + 1
@@ -94,7 +94,7 @@
print f, 'needs more time'
yield x # waiting...
# subyield is now done
- x = x()
+ x = x.get_result()
else:
# this should happen for fast
print f, 'was a yield function but did not stop'
@@ -135,14 +135,14 @@
x = thread(13)
# this is also an InProgress object
yield x
- print x() # 13
+ print x.get_result() # 13
x = thread('crash')
- yield x
try:
# the thread raised an exception, so x() will
# raise it here
- print x()
+ yield x
+ print x.get_result()
print 'crash test failed'
except:
print 'crash test ok'
@@ -163,37 +163,37 @@
# normal rpc
result = c.rpc('test1', 15)
yield result
- print result()
+ print result.get_result()
# rpc in a thread
result = c.rpc('test2', 16)
yield result
- print result()
+ print result.get_result()
# rpc with yield direct
result = c.rpc('test3', 17)
yield result
- print result()
+ print result.get_result()
# rpc with yield indirect
result = c.rpc('test4', 18)
yield result
- print result()
+ print result.get_result()
# rpc with yield error
result = c.rpc('crash')
- yield result
try:
- result()
+ yield result
+ result.get_result()
print 'bad rpc test failed'
except:
print 'bad rpc test ok'
# rpc with remote exception
result = c.rpc('test6', 18)
- yield result
try:
- result()
+ yield result
+ result.get_result()
print 'remote rpc exception test failed'
except ValueError, e:
print 'remote rpc exception test ok'
@@ -203,7 +203,7 @@
# call rpc in thread
x = thread2(c, 19)
yield x # print 19
- print x() # 20
+ print x.get_result() # 20
# normal rpc, we don't care about the answer
c.rpc('shutdown')
-------------------------------------------------------------------------
This SF.net email is sponsored by: Microsoft
Defy all challenges. Microsoft(R) Visual Studio 2008.
http://clk.atdmt.com/MRT/go/vse0120000070mrt/direct/01/
_______________________________________________
Freevo-cvslog mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/freevo-cvslog