Author: tack
Date: Wed Feb 13 22:24:32 2008
New Revision: 3092

Log:
Overhauled kaa.Socket; exposed Signal.changed_cb property; created
Signals.add method; import kaa.weakref for timed decorator; lazy
import create_thread_notifier_pipe in kaa.utils (to avoid circular
importing when using kaa.utils internally).


Added:
   trunk/base/test/sockettest.py
Modified:
   trunk/base/src/notifier/signals.py
   trunk/base/src/notifier/sockets.py
   trunk/base/src/notifier/timer.py
   trunk/base/src/rpc.py
   trunk/base/src/utils.py

Modified: trunk/base/src/notifier/signals.py
==============================================================================
--- trunk/base/src/notifier/signals.py  (original)
+++ trunk/base/src/notifier/signals.py  Wed Feb 13 22:24:32 2008
@@ -38,6 +38,7 @@
 
 # callbacks from kaa.notifier
 from callback import Callback, WeakCallback
+from kaa.utils import property
 
 # get logging object
 log = logging.getLogger('notifier')
@@ -59,10 +60,21 @@
 
     def __init__(self, changed_cb = None):
         self._callbacks = []
-        self._changed_cb = changed_cb
+        self.changed_cb = changed_cb
         self._deferred_args = []
 
 
+    @property
+    def changed_cb(self):
+        return self._changed_cb
+
+
+    @changed_cb.setter
+    def changed_cb(self, callback):
+        assert(callback is None or callable(callback))
+        self._changed_cb = callback
+
+
     def __iter__(self):
         for cb in self._callbacks:
             yield cb
@@ -248,7 +260,15 @@
                 # parameter is something else, bad
                 raise AttributeError('signal key must be string')
 
-            
+
+    def add(self, *signals):
+        """
+        Creates a new Signals object by merging all signals defined in
+        self and the signals specified in the arguments.
+        """
+        return Signals(self, *signals)
+
+
     def __getattr__(self, attr):
         """
         Get attribute function from Signal().

Modified: trunk/base/src/notifier/sockets.py
==============================================================================
--- trunk/base/src/notifier/sockets.py  (original)
+++ trunk/base/src/notifier/sockets.py  Wed Feb 13 22:24:32 2008
@@ -30,16 +30,19 @@
 #
 # -----------------------------------------------------------------------------
 
-__all__ = [ 'IOMonitor', 'WeakIOMonitor', 'Socket',
-            'IO_READ', 'IO_WRITE' ]
+__all__ = [ 'IOMonitor', 'WeakIOMonitor', 'Socket', 'IO_READ', 'IO_WRITE' ]
 
 import socket
 import logging
 
 import nf_wrapper as notifier
-from callback import Callback
-from signals import Signal
+from callback import Callback, WeakCallback
+from signals import Signals, Signal
 from thread import MainThreadCallback, ThreadCallback, is_mainthread, threaded
+from async import InProgress
+from coroutine import YieldCallback
+from kaa.utils import property
+from timer import OneShotTimer, timed, POLICY_ONCE
 
 # get logging object
 log = logging.getLogger('notifier')
@@ -48,7 +51,6 @@
 IO_WRITE  = 1
 
 class IOMonitor(notifier.NotifierCallback):
-
     def __init__(self, callback, *args, **kwargs):
         super(IOMonitor, self).__init__(callback, *args, **kwargs)
         self.set_ignore_caller_args()
@@ -56,11 +58,14 @@
 
     def register(self, fd, condition = IO_READ):
         if self.active():
+            if fd != self._id or condition != self._condition:
+                raise ValueError('Existing file descriptor already registered 
with this IOMonitor.')
             return
         if not is_mainthread():
-            return MainThreadCallback(self.register, fd, condition)()
+            return MainThreadCallback(self.register)(fd, condition)
         notifier.socket_add(fd, self, condition)
         self._condition = condition
+        # Must be called _id to correspond with base class.
         self._id = fd
 
 
@@ -82,30 +87,83 @@
     """
     Notifier-aware socket class.
     """
-    def __init__(self, addr = None, async = None):
-        self._addr = self._socket = None
-        self._write_buffer = ""
-        self._read_delim = None
-
-        self.signals = {
-            "closed": Signal(),
-            "read": Signal(),
-            "connected": Signal()
-        }
-
-        # These variables hold the socket dispatchers for monitoring; we
-        # only allocate a dispatcher when the socket is connected to avoid
-        # a ref cycle so that disconnected sockets will get properly deleted
-        # when they are not referenced.
-        self._rmon = self._wmon = None
 
+    signals = Signals('closed', 'read', 'readline', 'new-client')
+
+    def __init__(self):
+        self._socket = None
+        self._write_buffer = []
+        self._addr = None
         self._listening = False
+        self._queue_close = False
+
+        # Internal signals for read() and readline()  (these are different from
+        # the public signals 'read' and 'readline' as they get emitted even
+        # when data is None.  When these signals get updated, we call
+        # _update_read_monitor to register the read IOMonitor.
+        cb = WeakCallback(self._update_read_monitor)
+        self._read_signal = Signal(cb)
+        self._readline_signal = Signal(cb)
+        self.signals['read'].changed_cb = cb
+        self.signals['readline'].changed_cb = cb
+
+        # These variables hold the IOMonitors for monitoring; we only allocate
+        # a monitor when the socket is connected to avoid a ref cycle so
+        # that disconnected sockets will get properly deleted when they are not
+        # referenced.
+        self._rmon = self._wmon = None
+
 
-        if addr:
-            self.connect(addr, async = async)
+    @property
+    def address(self):
+        """
+        Either a 2-tuple containing the (host, port) of the remote end of the
+        socket (host may be an IP address or hostname, but it always a string),
+        or a string in the case of a UNIX socket.
+
+        If this is a listening socket, it is a 2-tuple of the address
+        the socket was bound to.
+        """
+        return self._addr
+
+
+    @property
+    def listening(self):
+        """
+        True if this is a listening socket, and False otherwise.
+        """
+        return self._listening
+
+
+    @property
+    def connected(self):
+        """
+        Boolean representing the connected state of the socket.
+        """
+        return self._socket != None
+
+
+    @timed(0, OneShotTimer, POLICY_ONCE)
+    def _update_read_monitor(self, signal = None, change = None):
+        # Update read IOMonitor to register or unregister based on if there are
+        # any handlers attached to the read signals.  If there are no handlers,
+        # there is no point in reading data from the socket since it will go 
+        # nowhere.  This also allows us to push back the read buffer to the OS.
+        if not self._rmon or change == Signal.SIGNAL_DISCONNECTED:
+            return
+        elif not self._listening and len(self._read_signal) == 
len(self._readline_signal) == \
+                                     len(self.signals['read']) == 
len(self.signals['readline']) == 0:
+           self._rmon.unregister()
+        elif not self._rmon.active():
+            self._rmon.register(self._socket, IO_READ)
 
 
     def _normalize_address(self, addr):
+        """
+        Converts address strings in the form host:port into 2-tuples 
+        containing the hostname and integer port.  Strings not in that
+        form are left untouched (as they represent unix socket paths).
+        """
         if isinstance(addr, basestring) and ":" in addr:
             addr = addr.split(":")
             assert(len(addr) == 2)
@@ -116,101 +174,96 @@
 
 
     def _make_socket(self, addr = None):
+        """
+        Constructs a socket based on the given addr.  Returns the socket and
+        the normalized address as a 2-tuple.
+        """
         addr = self._normalize_address(addr)
-
-        if self._socket:
-            self.close()
-
         assert(type(addr) in (str, tuple, None))
 
         if isinstance(addr, basestring):
-            self._socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+            sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
         else:
-            self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
-            self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
-
-        self._addr = addr
-
+            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+            sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
 
-    def listen(self, bind_info, qlen = 5):
-        if isinstance(bind_info, int):
-            # Change port to (None, port)
-            bind_info = ("", bind_info)
+        return sock, addr
 
-        if not isinstance(bind_info, (tuple, list)) or \
-               not isinstance(bind_info[0], (tuple, list)):
-            bind_info = (bind_info, )
 
+    def _replace_socket(self, sock, addr):
+        """
+        Replaces the existing socket and address spec with the ones supplied.
+        Any existing socket is closed.
+        """
+        if self._socket:
+            self._socket.close()
 
-        self._make_socket(bind_info[0])
+        self._socket, self._addr = sock, addr
 
-        for addr in bind_info:
-            addr = self._normalize_address(addr)
-            try:
-                self._socket.bind(addr)
-            except socket.error:
-                log.error('Failed to bind socket to: %s' % str(addr))
 
-        self._socket.listen(qlen)
+    def listen(self, bind_info, qlen = 5):
+        """
+        Sets the socket to listen on bind_info, which is either an integer
+        corresponding the port to listen to, or a 2-tuple of the IP and port.
+        In the case where only the port number is specified, the socket will
+        be bound to all interfaces.
+
+        If the bind fails, an exception is raised.
+
+        Once listening, new connections are automatically accepted, and
+        the 'new-client' signal is emitted for each new connection.  Callbacks
+        connecting to the signal will receive a new Socket object representing
+        the client connection.
+        """
+        if isinstance(bind_info, int):
+            # Only port number specified; translate to tuple that can be
+            # used with socket.bind()
+            bind_info = ('', bind_info)
+
+        sock, addr = self._make_socket(bind_info)
+        sock.bind(addr)
+        sock.listen(qlen)
         self._listening = True
-        self.wrap()
+        self.wrap(sock, addr)
 
 
-
-    def connect(self, addr, async = None):
+    @threaded()
+    def connect(self, addr):
         """
         Connects to the host specified in addr.  If addr is a string in the
         form host:port, or a tuple the form (host, port), a TCP socket is
         established.  Otherwise a Unix socket is established and addr is
         treated as a filename.
 
-        If async is not None, then an InProgress object is returned.  If async
-        is a callable, then it is automatically connected to both completed and
-        exception handlers of the InProgress object.  This callback takes one
-        or three parameters: if the connection was successful, the first
-        parameter is True; otherwise there was an exception, and the three
-        parameters are type, value, and traceback of the exception.
-
-        If async is None, this call will block until either connected or an
-        exception is raised.  Although this call blocks, the notifier loop
-        remains active.
-        """
-        self._make_socket(addr)
-
-        in_progress = self._connect_thread()
-
-        if async:
-            if callable(async):
-                in_progress.connect_both(async, async)
-            return in_progress
-
-        # Any exception that occurred in the thread will get raised here:
-        return in_progress.wait()
-
-
-    @threaded()
-    def _connect_thread(self):
-        if type(self._addr) == str:
+        This function is executed in a thread to avoid blocking.  It therefore
+        returns an InProgress object.  If the socket is connected, the 
InProgress
+        is finished with no arguments.  If the connection cannot be 
established,
+        an exception is thrown to the InProgress.
+        """
+        sock, addr = self._make_socket(addr)
+        if type(addr) == str:
             # Unix socket, just connect.
-            self._socket.connect(self._addr)
+            sock.connect(addr)
         else:
-            host, port = self._addr
+            host, port = addr
             if not host.replace(".", "").isdigit():
                 # Resolve the hostname.
                 host = socket.gethostbyname(host)
-            self._socket.connect((host, port))
+            sock.connect((host, port))
 
-        self.wrap()
-        return True
+        self.wrap(sock, addr)
 
 
-    def wrap(self, sock = None, addr = None):
-        if sock:
-            self._socket = sock
-        if addr:
-            self._addr = addr
+    def wrap(self, sock, addr = None):
+        """
+        Wraps an existing low-level socket object.  addr specifies the address
+        corresponding to the socket.
+        """
+        self._socket = sock or self._socket
+        self._addr = addr or self._addr
+        self._queue_close = False
 
-        self._socket.setblocking(False)
+        sock.setblocking(False)
 
         if self._rmon:
             self._rmon.unregister()
@@ -219,9 +272,34 @@
         self._rmon = IOMonitor(self._handle_read)
         self._wmon = IOMonitor(self._handle_write)
 
-        self._rmon.register(self._socket, IO_READ)
+        self._update_read_monitor()
         if self._write_buffer:
-            self._wmon.register(self._socket, IO_WRITE)
+            self._wmon.register(sock, IO_WRITE)
+
+
+    def _async_read(self, signal):
+        if self._listening:
+            raise RuntimeError("Can't read on a listening socket.")
+
+        return YieldCallback(signal)
+
+
+    def read(self):
+        """
+        Reads a chunk of data from the socket.  This function returns an 
+        InProgress object.  If the InProgress is finished with None, it
+        means that no data was collected and the socket closed.
+        """
+        return self._async_read(self._read_signal)
+
+
+    def readline(self):
+        """
+        Reads a line from the socket (with newline stripped).  The function
+        returns an InProgress object.  If the InProgress is finished with
+        None, it means that no data was collected and the socket closed.
+        """
+        return self._async_read(self._readline_signal)
 
 
     def _handle_read(self):
@@ -229,7 +307,7 @@
             sock, addr = self._socket.accept()
             client_socket = Socket()
             client_socket.wrap(sock, addr)
-            self.signals["connected"].emit(client_socket)
+            self.signals['new-client'].emit(client_socket)
             return
 
         try:
@@ -242,45 +320,74 @@
             # If we're here, then the socket is likely disconnected.
             data = None
 
+        self._read_signal.emit(data)
+
         if not data:
-            return self.close(False)
+            self._readline_signal.emit(data)
+            return self.close(immediate=True, expected=False)
+
+        self.signals['read'].emit(data)
+        self._update_read_monitor()
 
-        self.signals["read"].emit(data)
+        # TODO: parse input into separate lines and emit readline.
 
+        
+
+    def close(self, immediate = False, expected = True):
+        """
+        Closes the socket.  If immediate is False and there is data in the
+        write buffer, the socket is closed once the write buffer is emptied.
+        Otherwise the socket is closed immediately and the 'closed' signal
+        is emitted.
+        """
+        if not immediate and self._write_buffer:
+            # Immediate close not requested and we have some data left
+            # to be written, so defer close until after write buffer
+            # is empty.
+            self._queue_close = True
+            return
 
-    def close(self, expected = True):
         self._rmon.unregister()
         self._wmon.unregister()
         self._rmon = self._wmon = None
-        self._write_buffer = ""
+        del self._write_buffer[:]
+        self._queue_close = False
 
         self._socket.close()
         self._socket = None
-        self.signals["closed"].emit(expected)
+        self.signals['closed'].emit(expected)
 
 
     def write(self, data):
-        self._write_buffer += data
+        self._write_buffer.append(data)
         if self._socket and self._wmon and not self._wmon.active():
             self._wmon.register(self._socket.fileno(), IO_WRITE)
 
+
     def _handle_write(self):
-        if len(self._write_buffer) == 0:
+        if not self._write_buffer:
             return
 
         try:
-            sent = self._socket.send(self._write_buffer)
-            self._write_buffer = self._write_buffer[sent:]
+            while self._write_buffer:
+                data = self._write_buffer.pop(0)
+                sent = self._socket.send(data)
+                if sent != len(data):
+                    # Not all data was able to be sent; push remaining data
+                    # back onto the write buffer.
+                    self._write_buffer.insert(0, data[sent:])
+                    break
+
             if not self._write_buffer:
+                if self._queue_close:
+                    return self.close(immediate=True)
                 self._wmon.unregister()
+
         except socket.error, (errno, msg):
             if errno == 11:
                 # Resource temporarily unavailable -- we are trying to write
                 # data to a socket when none is available.
                 return
-            # If we're here, then the socket is likely disconnected.
-            self.close(False)
-
 
-    def is_connected(self):
-        return self._socket != None
+            # If we're here, then the socket is likely disconnected.
+            self.close(immediate=True, expected=False)

Modified: trunk/base/src/notifier/timer.py
==============================================================================
--- trunk/base/src/notifier/timer.py    (original)
+++ trunk/base/src/notifier/timer.py    Wed Feb 13 22:24:32 2008
@@ -39,6 +39,7 @@
 
 import nf_wrapper as notifier
 from thread import MainThreadCallback, is_mainthread
+from kaa.weakref import weakref
 
 POLICY_ONCE = 'once'
 POLICY_MANY = 'many'

Modified: trunk/base/src/rpc.py
==============================================================================
--- trunk/base/src/rpc.py       (original)
+++ trunk/base/src/rpc.py       Wed Feb 13 22:24:32 2008
@@ -521,8 +521,7 @@
                 #log.exception('Exception in rpc function "%s"', function)
                 if not function in self._callbacks:
                     log.error(self._callbacks.keys())
-                type, value, tb = sys.exc_info()
-                self._send_exception(type, value, tb, seq)
+                self._send_exception(*sys.exc_info() + (seq,))
                 return True
 
             if isinstance(result, kaa.InProgress):

Modified: trunk/base/src/utils.py
==============================================================================
--- trunk/base/src/utils.py     (original)
+++ trunk/base/src/utils.py     Wed Feb 13 22:24:32 2008
@@ -38,7 +38,6 @@
 
 import kaa
 import _utils
-from kaa.notifier.thread import create_thread_notifier_pipe
 
 # get logging object
 log = logging.getLogger('kaa')
@@ -154,6 +153,7 @@
 
     # Replace any existing thread notifier pipe, otherwise we'll be listening
     # to our parent's thread notifier.
+    from kaa.notifier.thread import create_thread_notifier_pipe
     create_thread_notifier_pipe(new=False, purge=True)
 
     return lock

Added: trunk/base/test/sockettest.py
==============================================================================
--- (empty file)
+++ trunk/base/test/sockettest.py       Wed Feb 13 22:24:32 2008
@@ -0,0 +1,22 @@
+import kaa
+
[EMAIL PROTECTED]()
+def new_client(client):
+    ip, port = client.address
+    print 'New connection from %s:%s' % (ip, port)
+    client.write('Hello %s, connecting from port %d\n' % (ip, port))
+
+    remote = kaa.Socket()
+    yield remote.connect('www.freevo.org:80')
+    remote.write('GET / HTTP/1.0\n\n')
+    while remote.connected:
+        data = yield remote.read()
+        client.write(data)
+    client.write('\n\nBye!\n')
+    client.close()
+
+server = kaa.Socket()
+server.signals['new-client'].connect(new_client)
+server.listen(8080)
+print "Connect to localhost:8080"
+kaa.main.run()

-------------------------------------------------------------------------
This SF.net email is sponsored by: Microsoft
Defy all challenges. Microsoft(R) Visual Studio 2008.
http://clk.atdmt.com/MRT/go/vse0120000070mrt/direct/01/
_______________________________________________
Freevo-cvslog mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/freevo-cvslog

Reply via email to