Author: tack
Date: Wed Feb 13 22:24:32 2008
New Revision: 3092
Log:
Overhauled kaa.Socket; exposed Signal.changed_cb property; created
Signals.add method; import kaa.weakref for timed decorator; lazy
import create_thread_notifier_pipe in kaa.utils (to avoid circular
importing when using kaa.utils internally).
Added:
trunk/base/test/sockettest.py
Modified:
trunk/base/src/notifier/signals.py
trunk/base/src/notifier/sockets.py
trunk/base/src/notifier/timer.py
trunk/base/src/rpc.py
trunk/base/src/utils.py
Modified: trunk/base/src/notifier/signals.py
==============================================================================
--- trunk/base/src/notifier/signals.py (original)
+++ trunk/base/src/notifier/signals.py Wed Feb 13 22:24:32 2008
@@ -38,6 +38,7 @@
# callbacks from kaa.notifier
from callback import Callback, WeakCallback
+from kaa.utils import property
# get logging object
log = logging.getLogger('notifier')
@@ -59,10 +60,21 @@
def __init__(self, changed_cb = None):
self._callbacks = []
- self._changed_cb = changed_cb
+ self.changed_cb = changed_cb
self._deferred_args = []
+ @property
+ def changed_cb(self):
+ return self._changed_cb
+
+
+ @changed_cb.setter
+ def changed_cb(self, callback):
+ assert(callback is None or callable(callback))
+ self._changed_cb = callback
+
+
def __iter__(self):
for cb in self._callbacks:
yield cb
@@ -248,7 +260,15 @@
# parameter is something else, bad
raise AttributeError('signal key must be string')
-
+
+ def add(self, *signals):
+ """
+ Creates a new Signals object by merging all signals defined in
+ self and the signals specified in the arguments.
+ """
+ return Signals(self, *signals)
+
+
def __getattr__(self, attr):
"""
Get attribute function from Signal().
Modified: trunk/base/src/notifier/sockets.py
==============================================================================
--- trunk/base/src/notifier/sockets.py (original)
+++ trunk/base/src/notifier/sockets.py Wed Feb 13 22:24:32 2008
@@ -30,16 +30,19 @@
#
# -----------------------------------------------------------------------------
-__all__ = [ 'IOMonitor', 'WeakIOMonitor', 'Socket',
- 'IO_READ', 'IO_WRITE' ]
+__all__ = [ 'IOMonitor', 'WeakIOMonitor', 'Socket', 'IO_READ', 'IO_WRITE' ]
import socket
import logging
import nf_wrapper as notifier
-from callback import Callback
-from signals import Signal
+from callback import Callback, WeakCallback
+from signals import Signals, Signal
from thread import MainThreadCallback, ThreadCallback, is_mainthread, threaded
+from async import InProgress
+from coroutine import YieldCallback
+from kaa.utils import property
+from timer import OneShotTimer, timed, POLICY_ONCE
# get logging object
log = logging.getLogger('notifier')
@@ -48,7 +51,6 @@
IO_WRITE = 1
class IOMonitor(notifier.NotifierCallback):
-
def __init__(self, callback, *args, **kwargs):
super(IOMonitor, self).__init__(callback, *args, **kwargs)
self.set_ignore_caller_args()
@@ -56,11 +58,14 @@
def register(self, fd, condition = IO_READ):
if self.active():
+ if fd != self._id or condition != self._condition:
+ raise ValueError('Existing file descriptor already registered
with this IOMonitor.')
return
if not is_mainthread():
- return MainThreadCallback(self.register, fd, condition)()
+ return MainThreadCallback(self.register)(fd, condition)
notifier.socket_add(fd, self, condition)
self._condition = condition
+ # Must be called _id to correspond with base class.
self._id = fd
@@ -82,30 +87,83 @@
"""
Notifier-aware socket class.
"""
- def __init__(self, addr = None, async = None):
- self._addr = self._socket = None
- self._write_buffer = ""
- self._read_delim = None
-
- self.signals = {
- "closed": Signal(),
- "read": Signal(),
- "connected": Signal()
- }
-
- # These variables hold the socket dispatchers for monitoring; we
- # only allocate a dispatcher when the socket is connected to avoid
- # a ref cycle so that disconnected sockets will get properly deleted
- # when they are not referenced.
- self._rmon = self._wmon = None
+ signals = Signals('closed', 'read', 'readline', 'new-client')
+
+ def __init__(self):
+ self._socket = None
+ self._write_buffer = []
+ self._addr = None
self._listening = False
+ self._queue_close = False
+
+ # Internal signals for read() and readline() (these are different from
+ # the public signals 'read' and 'readline' as they get emitted even
+ # when data is None. When these signals get updated, we call
+ # _update_read_monitor to register the read IOMonitor.
+ cb = WeakCallback(self._update_read_monitor)
+ self._read_signal = Signal(cb)
+ self._readline_signal = Signal(cb)
+ self.signals['read'].changed_cb = cb
+ self.signals['readline'].changed_cb = cb
+
+ # These variables hold the IOMonitors for monitoring; we only allocate
+ # a monitor when the socket is connected to avoid a ref cycle so
+ # that disconnected sockets will get properly deleted when they are not
+ # referenced.
+ self._rmon = self._wmon = None
+
- if addr:
- self.connect(addr, async = async)
+ @property
+ def address(self):
+ """
+ Either a 2-tuple containing the (host, port) of the remote end of the
+ socket (host may be an IP address or hostname, but it always a string),
+ or a string in the case of a UNIX socket.
+
+ If this is a listening socket, it is a 2-tuple of the address
+ the socket was bound to.
+ """
+ return self._addr
+
+
+ @property
+ def listening(self):
+ """
+ True if this is a listening socket, and False otherwise.
+ """
+ return self._listening
+
+
+ @property
+ def connected(self):
+ """
+ Boolean representing the connected state of the socket.
+ """
+ return self._socket != None
+
+
+ @timed(0, OneShotTimer, POLICY_ONCE)
+ def _update_read_monitor(self, signal = None, change = None):
+ # Update read IOMonitor to register or unregister based on if there are
+ # any handlers attached to the read signals. If there are no handlers,
+ # there is no point in reading data from the socket since it will go
+ # nowhere. This also allows us to push back the read buffer to the OS.
+ if not self._rmon or change == Signal.SIGNAL_DISCONNECTED:
+ return
+ elif not self._listening and len(self._read_signal) ==
len(self._readline_signal) == \
+ len(self.signals['read']) ==
len(self.signals['readline']) == 0:
+ self._rmon.unregister()
+ elif not self._rmon.active():
+ self._rmon.register(self._socket, IO_READ)
def _normalize_address(self, addr):
+ """
+ Converts address strings in the form host:port into 2-tuples
+ containing the hostname and integer port. Strings not in that
+ form are left untouched (as they represent unix socket paths).
+ """
if isinstance(addr, basestring) and ":" in addr:
addr = addr.split(":")
assert(len(addr) == 2)
@@ -116,101 +174,96 @@
def _make_socket(self, addr = None):
+ """
+ Constructs a socket based on the given addr. Returns the socket and
+ the normalized address as a 2-tuple.
+ """
addr = self._normalize_address(addr)
-
- if self._socket:
- self.close()
-
assert(type(addr) in (str, tuple, None))
if isinstance(addr, basestring):
- self._socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+ sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
else:
- self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
-
- self._addr = addr
-
+ sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- def listen(self, bind_info, qlen = 5):
- if isinstance(bind_info, int):
- # Change port to (None, port)
- bind_info = ("", bind_info)
+ return sock, addr
- if not isinstance(bind_info, (tuple, list)) or \
- not isinstance(bind_info[0], (tuple, list)):
- bind_info = (bind_info, )
+ def _replace_socket(self, sock, addr):
+ """
+ Replaces the existing socket and address spec with the ones supplied.
+ Any existing socket is closed.
+ """
+ if self._socket:
+ self._socket.close()
- self._make_socket(bind_info[0])
+ self._socket, self._addr = sock, addr
- for addr in bind_info:
- addr = self._normalize_address(addr)
- try:
- self._socket.bind(addr)
- except socket.error:
- log.error('Failed to bind socket to: %s' % str(addr))
- self._socket.listen(qlen)
+ def listen(self, bind_info, qlen = 5):
+ """
+ Sets the socket to listen on bind_info, which is either an integer
+ corresponding the port to listen to, or a 2-tuple of the IP and port.
+ In the case where only the port number is specified, the socket will
+ be bound to all interfaces.
+
+ If the bind fails, an exception is raised.
+
+ Once listening, new connections are automatically accepted, and
+ the 'new-client' signal is emitted for each new connection. Callbacks
+ connecting to the signal will receive a new Socket object representing
+ the client connection.
+ """
+ if isinstance(bind_info, int):
+ # Only port number specified; translate to tuple that can be
+ # used with socket.bind()
+ bind_info = ('', bind_info)
+
+ sock, addr = self._make_socket(bind_info)
+ sock.bind(addr)
+ sock.listen(qlen)
self._listening = True
- self.wrap()
+ self.wrap(sock, addr)
-
- def connect(self, addr, async = None):
+ @threaded()
+ def connect(self, addr):
"""
Connects to the host specified in addr. If addr is a string in the
form host:port, or a tuple the form (host, port), a TCP socket is
established. Otherwise a Unix socket is established and addr is
treated as a filename.
- If async is not None, then an InProgress object is returned. If async
- is a callable, then it is automatically connected to both completed and
- exception handlers of the InProgress object. This callback takes one
- or three parameters: if the connection was successful, the first
- parameter is True; otherwise there was an exception, and the three
- parameters are type, value, and traceback of the exception.
-
- If async is None, this call will block until either connected or an
- exception is raised. Although this call blocks, the notifier loop
- remains active.
- """
- self._make_socket(addr)
-
- in_progress = self._connect_thread()
-
- if async:
- if callable(async):
- in_progress.connect_both(async, async)
- return in_progress
-
- # Any exception that occurred in the thread will get raised here:
- return in_progress.wait()
-
-
- @threaded()
- def _connect_thread(self):
- if type(self._addr) == str:
+ This function is executed in a thread to avoid blocking. It therefore
+ returns an InProgress object. If the socket is connected, the
InProgress
+ is finished with no arguments. If the connection cannot be
established,
+ an exception is thrown to the InProgress.
+ """
+ sock, addr = self._make_socket(addr)
+ if type(addr) == str:
# Unix socket, just connect.
- self._socket.connect(self._addr)
+ sock.connect(addr)
else:
- host, port = self._addr
+ host, port = addr
if not host.replace(".", "").isdigit():
# Resolve the hostname.
host = socket.gethostbyname(host)
- self._socket.connect((host, port))
+ sock.connect((host, port))
- self.wrap()
- return True
+ self.wrap(sock, addr)
- def wrap(self, sock = None, addr = None):
- if sock:
- self._socket = sock
- if addr:
- self._addr = addr
+ def wrap(self, sock, addr = None):
+ """
+ Wraps an existing low-level socket object. addr specifies the address
+ corresponding to the socket.
+ """
+ self._socket = sock or self._socket
+ self._addr = addr or self._addr
+ self._queue_close = False
- self._socket.setblocking(False)
+ sock.setblocking(False)
if self._rmon:
self._rmon.unregister()
@@ -219,9 +272,34 @@
self._rmon = IOMonitor(self._handle_read)
self._wmon = IOMonitor(self._handle_write)
- self._rmon.register(self._socket, IO_READ)
+ self._update_read_monitor()
if self._write_buffer:
- self._wmon.register(self._socket, IO_WRITE)
+ self._wmon.register(sock, IO_WRITE)
+
+
+ def _async_read(self, signal):
+ if self._listening:
+ raise RuntimeError("Can't read on a listening socket.")
+
+ return YieldCallback(signal)
+
+
+ def read(self):
+ """
+ Reads a chunk of data from the socket. This function returns an
+ InProgress object. If the InProgress is finished with None, it
+ means that no data was collected and the socket closed.
+ """
+ return self._async_read(self._read_signal)
+
+
+ def readline(self):
+ """
+ Reads a line from the socket (with newline stripped). The function
+ returns an InProgress object. If the InProgress is finished with
+ None, it means that no data was collected and the socket closed.
+ """
+ return self._async_read(self._readline_signal)
def _handle_read(self):
@@ -229,7 +307,7 @@
sock, addr = self._socket.accept()
client_socket = Socket()
client_socket.wrap(sock, addr)
- self.signals["connected"].emit(client_socket)
+ self.signals['new-client'].emit(client_socket)
return
try:
@@ -242,45 +320,74 @@
# If we're here, then the socket is likely disconnected.
data = None
+ self._read_signal.emit(data)
+
if not data:
- return self.close(False)
+ self._readline_signal.emit(data)
+ return self.close(immediate=True, expected=False)
+
+ self.signals['read'].emit(data)
+ self._update_read_monitor()
- self.signals["read"].emit(data)
+ # TODO: parse input into separate lines and emit readline.
+
+
+ def close(self, immediate = False, expected = True):
+ """
+ Closes the socket. If immediate is False and there is data in the
+ write buffer, the socket is closed once the write buffer is emptied.
+ Otherwise the socket is closed immediately and the 'closed' signal
+ is emitted.
+ """
+ if not immediate and self._write_buffer:
+ # Immediate close not requested and we have some data left
+ # to be written, so defer close until after write buffer
+ # is empty.
+ self._queue_close = True
+ return
- def close(self, expected = True):
self._rmon.unregister()
self._wmon.unregister()
self._rmon = self._wmon = None
- self._write_buffer = ""
+ del self._write_buffer[:]
+ self._queue_close = False
self._socket.close()
self._socket = None
- self.signals["closed"].emit(expected)
+ self.signals['closed'].emit(expected)
def write(self, data):
- self._write_buffer += data
+ self._write_buffer.append(data)
if self._socket and self._wmon and not self._wmon.active():
self._wmon.register(self._socket.fileno(), IO_WRITE)
+
def _handle_write(self):
- if len(self._write_buffer) == 0:
+ if not self._write_buffer:
return
try:
- sent = self._socket.send(self._write_buffer)
- self._write_buffer = self._write_buffer[sent:]
+ while self._write_buffer:
+ data = self._write_buffer.pop(0)
+ sent = self._socket.send(data)
+ if sent != len(data):
+ # Not all data was able to be sent; push remaining data
+ # back onto the write buffer.
+ self._write_buffer.insert(0, data[sent:])
+ break
+
if not self._write_buffer:
+ if self._queue_close:
+ return self.close(immediate=True)
self._wmon.unregister()
+
except socket.error, (errno, msg):
if errno == 11:
# Resource temporarily unavailable -- we are trying to write
# data to a socket when none is available.
return
- # If we're here, then the socket is likely disconnected.
- self.close(False)
-
- def is_connected(self):
- return self._socket != None
+ # If we're here, then the socket is likely disconnected.
+ self.close(immediate=True, expected=False)
Modified: trunk/base/src/notifier/timer.py
==============================================================================
--- trunk/base/src/notifier/timer.py (original)
+++ trunk/base/src/notifier/timer.py Wed Feb 13 22:24:32 2008
@@ -39,6 +39,7 @@
import nf_wrapper as notifier
from thread import MainThreadCallback, is_mainthread
+from kaa.weakref import weakref
POLICY_ONCE = 'once'
POLICY_MANY = 'many'
Modified: trunk/base/src/rpc.py
==============================================================================
--- trunk/base/src/rpc.py (original)
+++ trunk/base/src/rpc.py Wed Feb 13 22:24:32 2008
@@ -521,8 +521,7 @@
#log.exception('Exception in rpc function "%s"', function)
if not function in self._callbacks:
log.error(self._callbacks.keys())
- type, value, tb = sys.exc_info()
- self._send_exception(type, value, tb, seq)
+ self._send_exception(*sys.exc_info() + (seq,))
return True
if isinstance(result, kaa.InProgress):
Modified: trunk/base/src/utils.py
==============================================================================
--- trunk/base/src/utils.py (original)
+++ trunk/base/src/utils.py Wed Feb 13 22:24:32 2008
@@ -38,7 +38,6 @@
import kaa
import _utils
-from kaa.notifier.thread import create_thread_notifier_pipe
# get logging object
log = logging.getLogger('kaa')
@@ -154,6 +153,7 @@
# Replace any existing thread notifier pipe, otherwise we'll be listening
# to our parent's thread notifier.
+ from kaa.notifier.thread import create_thread_notifier_pipe
create_thread_notifier_pipe(new=False, purge=True)
return lock
Added: trunk/base/test/sockettest.py
==============================================================================
--- (empty file)
+++ trunk/base/test/sockettest.py Wed Feb 13 22:24:32 2008
@@ -0,0 +1,22 @@
+import kaa
+
[EMAIL PROTECTED]()
+def new_client(client):
+ ip, port = client.address
+ print 'New connection from %s:%s' % (ip, port)
+ client.write('Hello %s, connecting from port %d\n' % (ip, port))
+
+ remote = kaa.Socket()
+ yield remote.connect('www.freevo.org:80')
+ remote.write('GET / HTTP/1.0\n\n')
+ while remote.connected:
+ data = yield remote.read()
+ client.write(data)
+ client.write('\n\nBye!\n')
+ client.close()
+
+server = kaa.Socket()
+server.signals['new-client'].connect(new_client)
+server.listen(8080)
+print "Connect to localhost:8080"
+kaa.main.run()
-------------------------------------------------------------------------
This SF.net email is sponsored by: Microsoft
Defy all challenges. Microsoft(R) Visual Studio 2008.
http://clk.atdmt.com/MRT/go/vse0120000070mrt/direct/01/
_______________________________________________
Freevo-cvslog mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/freevo-cvslog