Author: tack
Date: Wed Feb 13 15:17:50 2008
New Revision: 3080

Log:
Remove jobserver.py and decorators.py and move code into more sensible
places.


Removed:
   trunk/base/src/notifier/decorators.py
   trunk/base/src/notifier/jobserver.py
Modified:
   trunk/base/src/notifier/main.py
   trunk/base/src/notifier/thread.py
   trunk/base/src/notifier/timer.py

Modified: trunk/base/src/notifier/main.py
==============================================================================
--- trunk/base/src/notifier/main.py     (original)
+++ trunk/base/src/notifier/main.py     Wed Feb 13 15:17:50 2008
@@ -43,7 +43,7 @@
 from signals import Signal
 from popen import proclist as _proclist
 from thread import is_mainthread, wakeup, set_as_mainthread
-from jobserver import killall as kill_jobserver
+from thread import killall as kill_jobserver
 from decorators import threaded, MAINTHREAD
 
 __all__ = [ 'run', 'stop', 'step', 'select_notifier', 'is_running', 'wakeup',

Modified: trunk/base/src/notifier/thread.py
==============================================================================
--- trunk/base/src/notifier/thread.py   (original)
+++ trunk/base/src/notifier/thread.py   Wed Feb 13 15:17:50 2008
@@ -46,8 +46,9 @@
 #
 # -----------------------------------------------------------------------------
 
-__all__ = [ 'MainThreadCallback', 'ThreadCallback', 'is_mainthread',
-            'wakeup', 'set_as_mainthread', 'create_thread_notifier_pipe' ]
+__all__ = [ 'MainThreadCallback', 'ThreadCallback', 'is_mainthread', 
+            'wakeup', 'set_as_mainthread', 'create_thread_notifier_pipe',
+            'threaded', 'MAINTHREAD' ]
 
 # python imports
 import sys
@@ -57,6 +58,7 @@
 import fcntl
 import socket
 import errno
+import thread
 
 # notifier imports
 import nf_wrapper as notifier
@@ -67,7 +69,175 @@
 # get logging object
 log = logging.getLogger('notifier')
 
+# TODO: organize thread notifier stuff into its own namespace
+
+_thread_notifier_mainthread = threading.currentThread()
+_thread_notifier_lock = threading.Lock()
+_thread_notifier_queue = []
+
+# For MainThread* callbacks. The pipe will be created when it is used the first
+# time. This solves a nasty bug when you fork() into a second notifier based
+# process without exec. If you have this pipe, communication will go wrong.
+# (kaa.utils.daemonize does not have this problem.)
+_thread_notifier_pipe = None
+
+# internal list of named threads
+_threads = {}
+
+# For threaded decorator
+MAINTHREAD = object()
+
+def threaded(name=None, priority=0, async=True):
+    """
+    The decorator makes sure the function is always called in the thread
+    with the given name. The function will return an InProgress object if
+    async=True (default), otherwise it will cause invoking the decorated
+    function to block (the main loop is kept alive) and its result is
+    returned.
+
+    If name=kaa.MAINTHREAD, the decorated function will be invoked from
+    the main thread.  (In this case, currently the priority kwarg is
+    ignored.)
+    """
+    def decorator(func):
+
+        def newfunc(*args, **kwargs):
+            if name is MAINTHREAD:
+                if not async and is_mainthread():
+                    # Fast-path case: mainthread synchronous call from the 
mainthread
+                    return func(*args, **kwargs)
+                callback =  MainThreadCallback(func)
+            elif name:
+                callback = NamedThreadCallback((name, priority), func)
+            else:
+                callback = ThreadCallback(func)
+                callback.wait_on_exit(False)
+
+            # callback will always return InProgress
+            in_progress = callback(*args, **kwargs)
+            if not async:
+                return in_progress.wait()
+            return in_progress
+
+        try:
+            newfunc.func_name = func.func_name
+        except TypeError:
+            pass
+        return newfunc
+
+    return decorator
+
+
+def is_mainthread():
+    """
+    Return True if the caller is in the main thread right now.
+    """
+    # If threading module is None, assume main thread.  (Silences pointless
+    # exceptions on shutdown.)
+    return (not threading) or threading.currentThread() == 
_thread_notifier_mainthread
+
+
+def wakeup():
+    """
+    Wake up main thread.
+    """
+    if _thread_notifier_pipe and len(_thread_notifier_queue) == 0:
+        os.write(_thread_notifier_pipe[1], "1")
+
+
+def create_thread_notifier_pipe(new = True, purge = False):
+    """
+    Creates a new pipe for the thread notifier.  If new is True, a new pipe
+    will always be created; if it is False, it will only be created if one
+    already exists.  If purge is True, any previously queued work will be
+    discarded.
+
+    This is an internal function, but we export it for kaa.utils.daemonize.
+    """
+    global _thread_notifier_pipe
+    log.info('create thread notifier pipe')
+
+    if not _thread_notifier_pipe and not new:
+        return
+    elif _thread_notifier_pipe:
+        # There is an existing pipe already, so stop monitoring it.
+        notifier.socket_remove(_thread_notifier_pipe[0])
+
+    if purge:
+        del _thread_notifier_queue[:]
+
+    _thread_notifier_pipe = os.pipe()
+    fcntl.fcntl(_thread_notifier_pipe[0], fcntl.F_SETFL, os.O_NONBLOCK)
+    fcntl.fcntl(_thread_notifier_pipe[1], fcntl.F_SETFL, os.O_NONBLOCK)
+    notifier.socket_add(_thread_notifier_pipe[0], _thread_notifier_run_queue)
+
+    if _thread_notifier_queue:
+        # A thread is already running and wanted to run something in the
+        # mainloop before the mainloop is started. In that case we need
+        # to wakeup the loop ASAP to handle the requests.
+        os.write(_thread_notifier_pipe[1], "1")
+
+
+def set_as_mainthread():
+    global _thread_notifier_mainthread
+    _thread_notifier_mainthread = threading.currentThread()
+    if not _thread_notifier_pipe:
+        # Make sure we have a pipe between the mainloop and threads. Since
+        # loop() calls set_as_mainthread it is safe to assume the loop is
+        # connected correctly. If someone calls step() without loop() and
+        # without set_as_mainthread inter-thread communication does not work.
+        create_thread_notifier_pipe()
+ 
+
+def killall():
+    """
+    Kill all running job server. This function will be called by the main
+    loop when it shuts down.
+    """
+    for j in _threads.values():
+        j.stop()
+        j.join()
+
+
+def _thread_notifier_queue_callback(callback, args, kwargs, in_progress):
+    _thread_notifier_lock.acquire()
+    _thread_notifier_queue.append((callback, args, kwargs, in_progress))
+    if len(_thread_notifier_queue) == 1:
+        if _thread_notifier_pipe:
+            os.write(_thread_notifier_pipe[1], "1")
+    _thread_notifier_lock.release()
+
+
+def _thread_notifier_run_queue(fd):
+    global _thread_notifier_queue
+    try:
+        os.read(_thread_notifier_pipe[0], 1000)
+    except socket.error, (err, msg):
+        if err == errno.EAGAIN:
+            # Resource temporarily unavailable -- we are trying to read
+            # data on a socket when none is avilable.  This should not
+            # happen under normal circumstances, so log an error.
+            log.error("Thread notifier pipe woke but no data available.")
+    except OSError:
+        pass
+
+    while _thread_notifier_queue:
+        _thread_notifier_lock.acquire()
+        callback, args, kwargs, in_progress = _thread_notifier_queue.pop(0)
+        _thread_notifier_lock.release()
+
+        try:
+            in_progress.finished(callback(*args, **kwargs))
+        except:
+            in_progress.throw(*sys.exc_info())
+
+    return True
+
+
 class MainThreadCallback(Callback):
+    """
+    Callback that is invoked from the main thread.
+    """
     def __call__(self, *args, **kwargs):
         in_progress = InProgress()
 
@@ -162,109 +332,99 @@
         return self._create_thread
 
 
-    
-def is_mainthread():
+
+class NamedThreadCallback(Callback):
     """
-    Return True if the caller is in the main thread right now.
+    A callback to run a function in a thread. This class is used by the
+    threaded decorator, but it is also possible to use this call directly.
     """
-    # If threading module is None, assume main thread.  (Silences pointless
-    # exceptions on shutdown.)
-    return (not threading) or threading.currentThread() == 
_thread_notifier_mainthread
+    def __init__(self, thread_information, func, *args, **kwargs):
+        Callback.__init__(self, func, *args, **kwargs)
+        self.priority = 0
+        if isinstance(thread_information, (list, tuple)):
+            thread_information, self.priority = thread_information
+        self._thread = thread_information
 
 
-_thread_notifier_mainthread = threading.currentThread()
-_thread_notifier_lock = threading.Lock()
-_thread_notifier_queue = []
+    def _create_job(self, *args, **kwargs):
+        cb = Callback._get_callback(self)
+        job = thread.ThreadInProgress(cb, *args, **kwargs)
+        job.priority = self.priority
+        if not _threads.has_key(self._thread):
+            _threads[self._thread] = _JobServer(self._thread)
+        server = _threads[self._thread]
+        server.add(job)
+        return job
 
-# For MainThread* callbacks. The pipe will be created when it is used the first
-# time. This solves a nasty bug when you fork() into a second notifier based
-# process without exec. If you have this pipe, communication will go wrong.
-# (kaa.utils.daemonize does not have this problem.)
-_thread_notifier_pipe = None
 
-    
-def wakeup():
-    """
-    Wake up main thread.
-    """
-    if _thread_notifier_pipe and len(_thread_notifier_queue) == 0:
-        os.write(_thread_notifier_pipe[1], "1")
+    def _get_callback(self):
+        return self._create_job
 
 
-def create_thread_notifier_pipe(new = True, purge = False):
+class _JobServer(threading.Thread):
     """
-    Creates a new pipe for the thread notifier.  If new is True, a new pipe
-    will always be created; if it is False, it will only be created if one
-    already exists.  If purge is True, any previously queued work will be
-    discarded.
-
-    This is an internal function, but we export it for kaa.utils.daemonize.
+    Thread processing NamedThreadCallback jobs.
     """
-    global _thread_notifier_pipe
-    log.info('create thread notifier pipe')
-
-    if not _thread_notifier_pipe and not new:
-        return
-    elif _thread_notifier_pipe:
-        # There is an existing pipe already, so stop monitoring it.
-        notifier.socket_remove(_thread_notifier_pipe[0])
+    def __init__(self, name):
+        log.debug('start jobserver %s' % name)
+        threading.Thread.__init__(self)
+        self.setDaemon(True)
+        self.condition = threading.Condition()
+        self.stopped = False
+        self.jobs = []
+        self.name = name
+        self.start()
 
-    if purge:
-        del _thread_notifier_queue[:]
 
-    _thread_notifier_pipe = os.pipe()
-    fcntl.fcntl(_thread_notifier_pipe[0], fcntl.F_SETFL, os.O_NONBLOCK)
-    fcntl.fcntl(_thread_notifier_pipe[1], fcntl.F_SETFL, os.O_NONBLOCK)
-    notifier.socket_add(_thread_notifier_pipe[0], _thread_notifier_run_queue)
+    def stop(self):
+        """
+        Stop the thread.
+        """
+        self.condition.acquire()
+        self.stopped = True
+        self.condition.notify()
+        self.condition.release()
 
-    if _thread_notifier_queue:
-        # A thread is already running and wanted to run something in the
-        # mainloop before the mainloop is started. In that case we need
-        # to wakeup the loop ASAP to handle the requests.
-        os.write(_thread_notifier_pipe[1], "1")
 
+    def add(self, job):
+        """
+        Add a NamedThreadCallback to the thread.
+        """
+        self.condition.acquire()
+        self.jobs.append(job)
+        self.jobs.sort(lambda x,y: -cmp(x.priority, y.priority))
+        self.condition.notify()
+        self.condition.release()
 
-def set_as_mainthread():
-    global _thread_notifier_mainthread
-    _thread_notifier_mainthread = threading.currentThread()
-    if not _thread_notifier_pipe:
-        # Make sure we have a pipe between the mainloop and threads. Since
-        # loop() calls set_as_mainthread it is safe to assume the loop is
-        # connected correctly. If someone calls step() without loop() and
-        # without set_as_mainthread inter-thread communication does not work.
-        create_thread_notifier_pipe()
-    
 
-def _thread_notifier_queue_callback(callback, args, kwargs, in_progress):
-    _thread_notifier_lock.acquire()
-    _thread_notifier_queue.append((callback, args, kwargs, in_progress))
-    if len(_thread_notifier_queue) == 1:
-        if _thread_notifier_pipe:
-            os.write(_thread_notifier_pipe[1], "1")
-    _thread_notifier_lock.release()
+    def remove(self, job):
+        """
+        Remove a NamedThreadCallback from the schedule.
+        """
+        if job in self.jobs:
+            self.condition.acquire()
+            self.jobs.remove(job)
+            self.condition.release()
 
 
-def _thread_notifier_run_queue(fd):
-    global _thread_notifier_queue
-    try:
-        os.read(_thread_notifier_pipe[0], 1000)
-    except socket.error, (err, msg):
-        if err == errno.EAGAIN:
-            # Resource temporarily unavailable -- we are trying to read
-            # data on a socket when none is avilable.  This should not
-            # happen under normal circumstances, so log an error.
-            log.error("Thread notifier pipe woke but no data available.")
-    except OSError:
-        pass
+    def run(self):
+        """
+        Thread main function.
+        """
+        while not self.stopped:
+            # get a new job to process
+            self.condition.acquire()
+            while not self.jobs and not self.stopped:
+                # nothing to do, wait
+                self.condition.wait()
+            if self.stopped:
+                self.condition.release()
+                continue
+            job = self.jobs.pop(0)
+            self.condition.release()
+            job._execute()
+        # server stopped
+        log.debug('stop thread %s' % self.name)
 
-    while _thread_notifier_queue:
-        _thread_notifier_lock.acquire()
-        callback, args, kwargs, in_progress = _thread_notifier_queue.pop(0)
-        _thread_notifier_lock.release()
 
-        try:
-            in_progress.finished(callback(*args, **kwargs))
-        except:
-            in_progress.throw(*sys.exc_info())
 
-    return True

Modified: trunk/base/src/notifier/timer.py
==============================================================================
--- trunk/base/src/notifier/timer.py    (original)
+++ trunk/base/src/notifier/timer.py    Wed Feb 13 15:17:50 2008
@@ -31,7 +31,8 @@
 # -----------------------------------------------------------------------------
 
 __all__ = [ 'Timer', 'WeakTimer', 'OneShotTimer', 'WeakOneShotTimer',
-            'AtTimer', 'OneShotAtTimer' ]
+            'AtTimer', 'OneShotAtTimer', 'POLICY_ONCE', 'POLICY_MANY', 
+            'POLICY_RESTART' ]
 
 import logging
 import datetime
@@ -39,9 +40,89 @@
 import nf_wrapper as notifier
 from thread import MainThreadCallback, is_mainthread
 
+POLICY_ONCE = 'once'
+POLICY_MANY = 'many'
+POLICY_RESTART = 'restart'
+
 # get logging object
 log = logging.getLogger('notifier')
 
+
+def timed(interval, timer=None, policy=POLICY_MANY):
+    """
+    Decorator to call the decorated function in a Timer. When calling the
+    function, a timer will be started with the given interval calling that
+    function.  The decorated function will be called from the main thread.
+
+    The timer parameter optionally specifies which timer class should be
+    used to wrap the function.  kaa.Timer (default) or kaa.WeakTimer will
+    repeatedly invoke the decorated function until it returns False.
+    kaa.OneShotTimer or kaa.WeakOneShotTimer will invoke the function once,
+    delaying it by the specified interval.  (In this case the return value
+    of the decorated function is irrelevant.)
+
+    The policy parameter controls how multiple invocations of the decorated
+    function should be handled.  By default (POLICY_MANY), each invocation of
+    the function will create a new timer, each firing at the specified
+    interval.  If policy is POLICY_ONCE, subsequent invocations are ignored
+    while the first timer is still active.  If the policy is POLICY_RESTART,
+    subsequent invocations will restart the first timer.
+
+    Note that in the case of POLICY_ONCE or POLICY_RESTART, if the timer is
+    currently running, any arguments passed to the decorated function on
+    subsequent calls will be discarded.
+    """
+    if not policy in (POLICY_MANY, POLICY_ONCE, POLICY_RESTART):
+        raise RunTimeError('Invalid @kaa.timed policy %s' % policy)
+
+    def decorator(func):
+        def newfunc(*args, **kwargs):
+            if policy == POLICY_MANY:
+                # just start the timer
+                t = (timer or Timer)(func, *args, **kwargs)
+                t.start(interval)
+                return True
+
+            # Object to save the timer in; the function itself for non-methods,
+            # or the instance object for methods.
+            # object to save the timer in
+            obj = func
+            # name of the attribute in the object
+            name = '__kaa_timer_decorator'
+
+            # Try to find out if the function is actually an instance method.
+            # The decorator only sees a function object, even for methods, so
+            # this kludge compares the code object of newfunc (this wrapper)
+            # with the code object of the first argument's attribute of the
+            # function's name.  If they're the same, then we must be decorating
+            # a method, and we can attach the timer object to the instance
+            # instead of the function.
+            if args and newfunc.func_code == \
+                        getattr(getattr(args[0], func.func_name, None), 
'func_code', None):
+                obj  = args[0]
+                name = '%s__%s' % (name, func.func_name)
+
+            # check current timer
+            if getattr(obj, name, None) and getattr(obj, name).active():
+                if policy == POLICY_ONCE:
+                    # timer already running and not override
+                    return False
+                # stop old timer
+                getattr(obj, name).stop()
+
+            # create new timer, set it to the object and start it
+            t = timer(func, *args, **kwargs)
+            setattr(obj, name, weakref(t))
+            getattr(obj, name).start(interval)
+            return True
+
+        newfunc.func_name = func.func_name
+        return newfunc
+
+    return decorator
+
+
+
 class Timer(notifier.NotifierCallback):
 
     def __init__(self, callback, *args, **kwargs):

-------------------------------------------------------------------------
This SF.net email is sponsored by: Microsoft
Defy all challenges. Microsoft(R) Visual Studio 2008.
http://clk.atdmt.com/MRT/go/vse0120000070mrt/direct/01/
_______________________________________________
Freevo-cvslog mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/freevo-cvslog

Reply via email to