Author: tack
Date: Wed Feb 13 15:17:50 2008
New Revision: 3080
Log:
Remove jobserver.py and decorators.py and move code into more sensible
places.
Removed:
trunk/base/src/notifier/decorators.py
trunk/base/src/notifier/jobserver.py
Modified:
trunk/base/src/notifier/main.py
trunk/base/src/notifier/thread.py
trunk/base/src/notifier/timer.py
Modified: trunk/base/src/notifier/main.py
==============================================================================
--- trunk/base/src/notifier/main.py (original)
+++ trunk/base/src/notifier/main.py Wed Feb 13 15:17:50 2008
@@ -43,7 +43,7 @@
from signals import Signal
from popen import proclist as _proclist
from thread import is_mainthread, wakeup, set_as_mainthread
-from jobserver import killall as kill_jobserver
+from thread import killall as kill_jobserver
from decorators import threaded, MAINTHREAD
__all__ = [ 'run', 'stop', 'step', 'select_notifier', 'is_running', 'wakeup',
Modified: trunk/base/src/notifier/thread.py
==============================================================================
--- trunk/base/src/notifier/thread.py (original)
+++ trunk/base/src/notifier/thread.py Wed Feb 13 15:17:50 2008
@@ -46,8 +46,9 @@
#
# -----------------------------------------------------------------------------
-__all__ = [ 'MainThreadCallback', 'ThreadCallback', 'is_mainthread',
- 'wakeup', 'set_as_mainthread', 'create_thread_notifier_pipe' ]
+__all__ = [ 'MainThreadCallback', 'ThreadCallback', 'is_mainthread',
+ 'wakeup', 'set_as_mainthread', 'create_thread_notifier_pipe',
+ 'threaded', 'MAINTHREAD' ]
# python imports
import sys
@@ -57,6 +58,7 @@
import fcntl
import socket
import errno
+import thread
# notifier imports
import nf_wrapper as notifier
@@ -67,7 +69,175 @@
# get logging object
log = logging.getLogger('notifier')
+# TODO: organize thread notifier stuff into its own namespace
+
+_thread_notifier_mainthread = threading.currentThread()
+_thread_notifier_lock = threading.Lock()
+_thread_notifier_queue = []
+
+# For MainThread* callbacks. The pipe will be created when it is used the first
+# time. This solves a nasty bug when you fork() into a second notifier based
+# process without exec. If you have this pipe, communication will go wrong.
+# (kaa.utils.daemonize does not have this problem.)
+_thread_notifier_pipe = None
+
+# internal list of named threads
+_threads = {}
+
+# For threaded decorator
+MAINTHREAD = object()
+
+def threaded(name=None, priority=0, async=True):
+ """
+ The decorator makes sure the function is always called in the thread
+ with the given name. The function will return an InProgress object if
+ async=True (default), otherwise it will cause invoking the decorated
+ function to block (the main loop is kept alive) and its result is
+ returned.
+
+ If name=kaa.MAINTHREAD, the decorated function will be invoked from
+ the main thread. (In this case, currently the priority kwarg is
+ ignored.)
+ """
+ def decorator(func):
+
+ def newfunc(*args, **kwargs):
+ if name is MAINTHREAD:
+ if not async and is_mainthread():
+ # Fast-path case: mainthread synchronous call from the
mainthread
+ return func(*args, **kwargs)
+ callback = MainThreadCallback(func)
+ elif name:
+ callback = NamedThreadCallback((name, priority), func)
+ else:
+ callback = ThreadCallback(func)
+ callback.wait_on_exit(False)
+
+ # callback will always return InProgress
+ in_progress = callback(*args, **kwargs)
+ if not async:
+ return in_progress.wait()
+ return in_progress
+
+ try:
+ newfunc.func_name = func.func_name
+ except TypeError:
+ pass
+ return newfunc
+
+ return decorator
+
+
+def is_mainthread():
+ """
+ Return True if the caller is in the main thread right now.
+ """
+ # If threading module is None, assume main thread. (Silences pointless
+ # exceptions on shutdown.)
+ return (not threading) or threading.currentThread() ==
_thread_notifier_mainthread
+
+
+def wakeup():
+ """
+ Wake up main thread.
+ """
+ if _thread_notifier_pipe and len(_thread_notifier_queue) == 0:
+ os.write(_thread_notifier_pipe[1], "1")
+
+
+def create_thread_notifier_pipe(new = True, purge = False):
+ """
+ Creates a new pipe for the thread notifier. If new is True, a new pipe
+ will always be created; if it is False, it will only be created if one
+ already exists. If purge is True, any previously queued work will be
+ discarded.
+
+ This is an internal function, but we export it for kaa.utils.daemonize.
+ """
+ global _thread_notifier_pipe
+ log.info('create thread notifier pipe')
+
+ if not _thread_notifier_pipe and not new:
+ return
+ elif _thread_notifier_pipe:
+ # There is an existing pipe already, so stop monitoring it.
+ notifier.socket_remove(_thread_notifier_pipe[0])
+
+ if purge:
+ del _thread_notifier_queue[:]
+
+ _thread_notifier_pipe = os.pipe()
+ fcntl.fcntl(_thread_notifier_pipe[0], fcntl.F_SETFL, os.O_NONBLOCK)
+ fcntl.fcntl(_thread_notifier_pipe[1], fcntl.F_SETFL, os.O_NONBLOCK)
+ notifier.socket_add(_thread_notifier_pipe[0], _thread_notifier_run_queue)
+
+ if _thread_notifier_queue:
+ # A thread is already running and wanted to run something in the
+ # mainloop before the mainloop is started. In that case we need
+ # to wakeup the loop ASAP to handle the requests.
+ os.write(_thread_notifier_pipe[1], "1")
+
+
+def set_as_mainthread():
+ global _thread_notifier_mainthread
+ _thread_notifier_mainthread = threading.currentThread()
+ if not _thread_notifier_pipe:
+ # Make sure we have a pipe between the mainloop and threads. Since
+ # loop() calls set_as_mainthread it is safe to assume the loop is
+ # connected correctly. If someone calls step() without loop() and
+ # without set_as_mainthread inter-thread communication does not work.
+ create_thread_notifier_pipe()
+
+
+def killall():
+ """
+ Kill all running job server. This function will be called by the main
+ loop when it shuts down.
+ """
+ for j in _threads.values():
+ j.stop()
+ j.join()
+
+
+def _thread_notifier_queue_callback(callback, args, kwargs, in_progress):
+ _thread_notifier_lock.acquire()
+ _thread_notifier_queue.append((callback, args, kwargs, in_progress))
+ if len(_thread_notifier_queue) == 1:
+ if _thread_notifier_pipe:
+ os.write(_thread_notifier_pipe[1], "1")
+ _thread_notifier_lock.release()
+
+
+def _thread_notifier_run_queue(fd):
+ global _thread_notifier_queue
+ try:
+ os.read(_thread_notifier_pipe[0], 1000)
+ except socket.error, (err, msg):
+ if err == errno.EAGAIN:
+ # Resource temporarily unavailable -- we are trying to read
+ # data on a socket when none is avilable. This should not
+ # happen under normal circumstances, so log an error.
+ log.error("Thread notifier pipe woke but no data available.")
+ except OSError:
+ pass
+
+ while _thread_notifier_queue:
+ _thread_notifier_lock.acquire()
+ callback, args, kwargs, in_progress = _thread_notifier_queue.pop(0)
+ _thread_notifier_lock.release()
+
+ try:
+ in_progress.finished(callback(*args, **kwargs))
+ except:
+ in_progress.throw(*sys.exc_info())
+
+ return True
+
+
class MainThreadCallback(Callback):
+ """
+ Callback that is invoked from the main thread.
+ """
def __call__(self, *args, **kwargs):
in_progress = InProgress()
@@ -162,109 +332,99 @@
return self._create_thread
-
-def is_mainthread():
+
+class NamedThreadCallback(Callback):
"""
- Return True if the caller is in the main thread right now.
+ A callback to run a function in a thread. This class is used by the
+ threaded decorator, but it is also possible to use this call directly.
"""
- # If threading module is None, assume main thread. (Silences pointless
- # exceptions on shutdown.)
- return (not threading) or threading.currentThread() ==
_thread_notifier_mainthread
+ def __init__(self, thread_information, func, *args, **kwargs):
+ Callback.__init__(self, func, *args, **kwargs)
+ self.priority = 0
+ if isinstance(thread_information, (list, tuple)):
+ thread_information, self.priority = thread_information
+ self._thread = thread_information
-_thread_notifier_mainthread = threading.currentThread()
-_thread_notifier_lock = threading.Lock()
-_thread_notifier_queue = []
+ def _create_job(self, *args, **kwargs):
+ cb = Callback._get_callback(self)
+ job = thread.ThreadInProgress(cb, *args, **kwargs)
+ job.priority = self.priority
+ if not _threads.has_key(self._thread):
+ _threads[self._thread] = _JobServer(self._thread)
+ server = _threads[self._thread]
+ server.add(job)
+ return job
-# For MainThread* callbacks. The pipe will be created when it is used the first
-# time. This solves a nasty bug when you fork() into a second notifier based
-# process without exec. If you have this pipe, communication will go wrong.
-# (kaa.utils.daemonize does not have this problem.)
-_thread_notifier_pipe = None
-
-def wakeup():
- """
- Wake up main thread.
- """
- if _thread_notifier_pipe and len(_thread_notifier_queue) == 0:
- os.write(_thread_notifier_pipe[1], "1")
+ def _get_callback(self):
+ return self._create_job
-def create_thread_notifier_pipe(new = True, purge = False):
+class _JobServer(threading.Thread):
"""
- Creates a new pipe for the thread notifier. If new is True, a new pipe
- will always be created; if it is False, it will only be created if one
- already exists. If purge is True, any previously queued work will be
- discarded.
-
- This is an internal function, but we export it for kaa.utils.daemonize.
+ Thread processing NamedThreadCallback jobs.
"""
- global _thread_notifier_pipe
- log.info('create thread notifier pipe')
-
- if not _thread_notifier_pipe and not new:
- return
- elif _thread_notifier_pipe:
- # There is an existing pipe already, so stop monitoring it.
- notifier.socket_remove(_thread_notifier_pipe[0])
+ def __init__(self, name):
+ log.debug('start jobserver %s' % name)
+ threading.Thread.__init__(self)
+ self.setDaemon(True)
+ self.condition = threading.Condition()
+ self.stopped = False
+ self.jobs = []
+ self.name = name
+ self.start()
- if purge:
- del _thread_notifier_queue[:]
- _thread_notifier_pipe = os.pipe()
- fcntl.fcntl(_thread_notifier_pipe[0], fcntl.F_SETFL, os.O_NONBLOCK)
- fcntl.fcntl(_thread_notifier_pipe[1], fcntl.F_SETFL, os.O_NONBLOCK)
- notifier.socket_add(_thread_notifier_pipe[0], _thread_notifier_run_queue)
+ def stop(self):
+ """
+ Stop the thread.
+ """
+ self.condition.acquire()
+ self.stopped = True
+ self.condition.notify()
+ self.condition.release()
- if _thread_notifier_queue:
- # A thread is already running and wanted to run something in the
- # mainloop before the mainloop is started. In that case we need
- # to wakeup the loop ASAP to handle the requests.
- os.write(_thread_notifier_pipe[1], "1")
+ def add(self, job):
+ """
+ Add a NamedThreadCallback to the thread.
+ """
+ self.condition.acquire()
+ self.jobs.append(job)
+ self.jobs.sort(lambda x,y: -cmp(x.priority, y.priority))
+ self.condition.notify()
+ self.condition.release()
-def set_as_mainthread():
- global _thread_notifier_mainthread
- _thread_notifier_mainthread = threading.currentThread()
- if not _thread_notifier_pipe:
- # Make sure we have a pipe between the mainloop and threads. Since
- # loop() calls set_as_mainthread it is safe to assume the loop is
- # connected correctly. If someone calls step() without loop() and
- # without set_as_mainthread inter-thread communication does not work.
- create_thread_notifier_pipe()
-
-def _thread_notifier_queue_callback(callback, args, kwargs, in_progress):
- _thread_notifier_lock.acquire()
- _thread_notifier_queue.append((callback, args, kwargs, in_progress))
- if len(_thread_notifier_queue) == 1:
- if _thread_notifier_pipe:
- os.write(_thread_notifier_pipe[1], "1")
- _thread_notifier_lock.release()
+ def remove(self, job):
+ """
+ Remove a NamedThreadCallback from the schedule.
+ """
+ if job in self.jobs:
+ self.condition.acquire()
+ self.jobs.remove(job)
+ self.condition.release()
-def _thread_notifier_run_queue(fd):
- global _thread_notifier_queue
- try:
- os.read(_thread_notifier_pipe[0], 1000)
- except socket.error, (err, msg):
- if err == errno.EAGAIN:
- # Resource temporarily unavailable -- we are trying to read
- # data on a socket when none is avilable. This should not
- # happen under normal circumstances, so log an error.
- log.error("Thread notifier pipe woke but no data available.")
- except OSError:
- pass
+ def run(self):
+ """
+ Thread main function.
+ """
+ while not self.stopped:
+ # get a new job to process
+ self.condition.acquire()
+ while not self.jobs and not self.stopped:
+ # nothing to do, wait
+ self.condition.wait()
+ if self.stopped:
+ self.condition.release()
+ continue
+ job = self.jobs.pop(0)
+ self.condition.release()
+ job._execute()
+ # server stopped
+ log.debug('stop thread %s' % self.name)
- while _thread_notifier_queue:
- _thread_notifier_lock.acquire()
- callback, args, kwargs, in_progress = _thread_notifier_queue.pop(0)
- _thread_notifier_lock.release()
- try:
- in_progress.finished(callback(*args, **kwargs))
- except:
- in_progress.throw(*sys.exc_info())
- return True
Modified: trunk/base/src/notifier/timer.py
==============================================================================
--- trunk/base/src/notifier/timer.py (original)
+++ trunk/base/src/notifier/timer.py Wed Feb 13 15:17:50 2008
@@ -31,7 +31,8 @@
# -----------------------------------------------------------------------------
__all__ = [ 'Timer', 'WeakTimer', 'OneShotTimer', 'WeakOneShotTimer',
- 'AtTimer', 'OneShotAtTimer' ]
+ 'AtTimer', 'OneShotAtTimer', 'POLICY_ONCE', 'POLICY_MANY',
+ 'POLICY_RESTART' ]
import logging
import datetime
@@ -39,9 +40,89 @@
import nf_wrapper as notifier
from thread import MainThreadCallback, is_mainthread
+POLICY_ONCE = 'once'
+POLICY_MANY = 'many'
+POLICY_RESTART = 'restart'
+
# get logging object
log = logging.getLogger('notifier')
+
+def timed(interval, timer=None, policy=POLICY_MANY):
+ """
+ Decorator to call the decorated function in a Timer. When calling the
+ function, a timer will be started with the given interval calling that
+ function. The decorated function will be called from the main thread.
+
+ The timer parameter optionally specifies which timer class should be
+ used to wrap the function. kaa.Timer (default) or kaa.WeakTimer will
+ repeatedly invoke the decorated function until it returns False.
+ kaa.OneShotTimer or kaa.WeakOneShotTimer will invoke the function once,
+ delaying it by the specified interval. (In this case the return value
+ of the decorated function is irrelevant.)
+
+ The policy parameter controls how multiple invocations of the decorated
+ function should be handled. By default (POLICY_MANY), each invocation of
+ the function will create a new timer, each firing at the specified
+ interval. If policy is POLICY_ONCE, subsequent invocations are ignored
+ while the first timer is still active. If the policy is POLICY_RESTART,
+ subsequent invocations will restart the first timer.
+
+ Note that in the case of POLICY_ONCE or POLICY_RESTART, if the timer is
+ currently running, any arguments passed to the decorated function on
+ subsequent calls will be discarded.
+ """
+ if not policy in (POLICY_MANY, POLICY_ONCE, POLICY_RESTART):
+ raise RunTimeError('Invalid @kaa.timed policy %s' % policy)
+
+ def decorator(func):
+ def newfunc(*args, **kwargs):
+ if policy == POLICY_MANY:
+ # just start the timer
+ t = (timer or Timer)(func, *args, **kwargs)
+ t.start(interval)
+ return True
+
+ # Object to save the timer in; the function itself for non-methods,
+ # or the instance object for methods.
+ # object to save the timer in
+ obj = func
+ # name of the attribute in the object
+ name = '__kaa_timer_decorator'
+
+ # Try to find out if the function is actually an instance method.
+ # The decorator only sees a function object, even for methods, so
+ # this kludge compares the code object of newfunc (this wrapper)
+ # with the code object of the first argument's attribute of the
+ # function's name. If they're the same, then we must be decorating
+ # a method, and we can attach the timer object to the instance
+ # instead of the function.
+ if args and newfunc.func_code == \
+ getattr(getattr(args[0], func.func_name, None),
'func_code', None):
+ obj = args[0]
+ name = '%s__%s' % (name, func.func_name)
+
+ # check current timer
+ if getattr(obj, name, None) and getattr(obj, name).active():
+ if policy == POLICY_ONCE:
+ # timer already running and not override
+ return False
+ # stop old timer
+ getattr(obj, name).stop()
+
+ # create new timer, set it to the object and start it
+ t = timer(func, *args, **kwargs)
+ setattr(obj, name, weakref(t))
+ getattr(obj, name).start(interval)
+ return True
+
+ newfunc.func_name = func.func_name
+ return newfunc
+
+ return decorator
+
+
+
class Timer(notifier.NotifierCallback):
def __init__(self, callback, *args, **kwargs):
-------------------------------------------------------------------------
This SF.net email is sponsored by: Microsoft
Defy all challenges. Microsoft(R) Visual Studio 2008.
http://clk.atdmt.com/MRT/go/vse0120000070mrt/direct/01/
_______________________________________________
Freevo-cvslog mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/freevo-cvslog