Author: tack
Date: Mon Mar 10 21:21:00 2008
New Revision: 3173

Log:
AsyncException: use str() on the proxied exception to get the message instead
of accessing self.message or self.args.  Results in simpler code that I think
is more correct.

kaa.Socket changes: new buffer_size and chunk_size properties; custom __repr__
that shows fileno; use kaa.tempfile() on unix sockets if only filename is
specified; clear out stale unix sockets when listening (this logic moved from
kaa.rpc); register shutdown handler to close socket and clean up.

kaa.rpc: significantly refactored to use kaa.Socket; new Channel.close() method
added.



Modified:
   trunk/base/src/notifier/async.py
   trunk/base/src/notifier/sockets.py
   trunk/base/src/rpc.py

Modified: trunk/base/src/notifier/async.py
==============================================================================
--- trunk/base/src/notifier/async.py    (original)
+++ trunk/base/src/notifier/async.py    Mon Mar 10 21:21:00 2008
@@ -92,15 +92,7 @@
 
     def __str__(self):
         dump = ''.join(traceback.format_list(self._kaa_exc_stack))
-        # Python 2.5 always has self.message; for Python 2.4, fall back to
-        # first argument if it's a string.
-        msg = (hasattr(self, 'message') and self.message) or \
-              (self.args and isinstance(self.args[0], basestring) and 
self.args[0])
-        if msg:
-            info = '%s: %s' % (self._kaa_exc.__class__.__name__, msg)
-        else:
-            info = self._kaa_exc.__class__.__name__
-
+        info = '%s: %s' % (self._kaa_exc.__class__.__name__, 
str(self._kaa_exc))
         return self._kaa_get_header() + '\n' + dump + info
 
 

Modified: trunk/base/src/notifier/sockets.py
==============================================================================
--- trunk/base/src/notifier/sockets.py  (original)
+++ trunk/base/src/notifier/sockets.py  Mon Mar 10 21:21:00 2008
@@ -32,7 +32,10 @@
 
 __all__ = [ 'IOMonitor', 'WeakIOMonitor', 'Socket', 'IO_READ', 'IO_WRITE' ]
 
+import sys
+import os
 import socket
+import errno
 import logging
 
 import nf_wrapper as notifier
@@ -42,6 +45,7 @@
 from async import InProgress, InProgressCallback
 from kaa.utils import property
 from timer import OneShotTimer, timed, POLICY_ONCE
+from kaa.tmpfile import tempfile
 
 # get logging object
 log = logging.getLogger('notifier')
@@ -87,13 +91,15 @@
     Notifier-aware socket class.
     """
 
-    def __init__(self):
+    def __init__(self, buffer_size=None, chunk_size=1024*1024):
         self.signals = Signals('closed', 'read', 'readline', 'new-client')
         self._socket = None
         self._write_buffer = []
         self._addr = None
         self._listening = False
         self._queue_close = False
+        self._buffer_size = buffer_size
+        self._chunk_size = chunk_size
 
         # Internal signals for read() and readline()  (these are different from
         # the public signals 'read' and 'readline' as they get emitted even
@@ -111,6 +117,11 @@
         # referenced.
         self._rmon = self._wmon = None
 
+    def __repr__(self):
+        if not self._socket:
+            return '<kaa.Socket - disconnected>'
+        return '<kaa.Socket fd=%d>' % self.fileno
+
 
     @property
     def address(self):
@@ -141,6 +152,60 @@
         return self._socket != None
 
 
+    @property
+    def buffer_size(self):
+        """
+        Size of the send and receive socket buffers (SO_SNDBUF and SO_RCVBUF)
+        in bytes.  Setting this to higher values (say 1M) improves performance
+        when sending large amounts of data across the socket.  Note that the
+        upper bound may be restricted by the kernel.  (Under Linux, this can be
+        tuned by adjusting /proc/sys/net/core/[rw]mem_max)
+        """
+        return self._buffer_size
+
+
+    @buffer_size.setter
+    def buffer_size(self, size):
+        self._buffer_size = size
+        if self._socket and size:
+            self._set_buffer_size(self._socket, size)
+
+
+    @property
+    def chunk_size(self):
+        """
+        Number of bytes to attempt to read from the socket at a time.  The
+        default is 1M.  A 'read' signal is emitted for each chunk read from the
+        socket.  (The number of bytes read at a time may be less than the chunk
+        size, but will never be more.)
+        """
+        return self._chunk_size
+
+
+    @chunk_size.setter
+    def chunk_size(self, size):
+        self._chunk_size = size
+
+
+    @property
+    def fileno(self):
+        """
+        Returns the file descriptor of the socket, or None if the socket is
+        not connected.
+        """
+        if not self._socket:
+            return None
+        return self._socket.fileno()
+
+
+    def _set_buffer_size(self, s, size):
+        """
+        Sets the send and receive buffers of the given socket s to size.
+        """
+        s.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, size)
+        s.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, size)
+        
+
     @timed(0, OneShotTimer, POLICY_ONCE)
     def _update_read_monitor(self, signal = None, change = None):
         # Update read IOMonitor to register or unregister based on if there are
@@ -151,7 +216,7 @@
             return
         elif not self._listening and len(self._read_signal) == 
len(self._readline_signal) == \
                                      len(self.signals['read']) == 
len(self.signals['readline']) == 0:
-           self._rmon.unregister()
+            self._rmon.unregister()
         elif not self._rmon.active():
             self._rmon.register(self._socket, IO_READ)
 
@@ -160,26 +225,57 @@
         """
         Converts address strings in the form host:port into 2-tuples 
         containing the hostname and integer port.  Strings not in that
-        form are left untouched (as they represent unix socket paths).
+        form are assumed to represent unix socket paths.  If such a string
+        contains no /, a tempfile is used using kaa.tempfile().  If we can't
+        make sense of the given address, a ValueError exception will
+        be raised.
         """
-        if isinstance(addr, basestring) and ":" in addr:
-            addr = addr.split(":")
-            assert(len(addr) == 2)
-            addr[1] = int(addr[1])
-            addr = tuple(addr)
+        if isinstance(addr, basestring):
+            if addr.count(':') == 1:
+                addr, port = addr.split(':')
+                if not port.isdigit():
+                    raise ValueError('Port specified is not an integer')
+                return addr, int(port)
+            elif '/' not in addr:
+                return tempfile(addr)
+        elif not isinstance(addr, (tuple, list)) or len(addr) != 2:
+            raise ValueError('Invalid address')
 
         return addr
 
 
-    def _make_socket(self, addr = None):
+    def _make_socket(self, addr = None, overwrite = False):
         """
         Constructs a socket based on the given addr.  Returns the socket and
         the normalized address as a 2-tuple.
+
+        If overwrite is True, if addr specifies a path to a unix socket and
+        that unix socket already exists, it will be removed if the socket is
+        not actually in use.  If it is in use, an IOError will be raised.
         """
         addr = self._normalize_address(addr)
         assert(type(addr) in (str, tuple, None))
 
         if isinstance(addr, basestring):
+            if overwrite and os.path.exists(addr):
+                # Unix socket exists; test to see if it's active.
+                try:
+                    dummy = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+                    dummy.connect(addr)
+                except socket.error, (err, msg):
+                    if err == errno.ECONNREFUSED:
+                        # Socket is not active, so we can remove it.
+                        log.debug('Replacing dead unix socket at %s' % addr)
+                    else:
+                        # Reraise unexpected exception
+                        tp, exc, tb = sys.exc_info()
+                        raise tp, exc, tb
+                else:
+                    # We were able to connect to the existing socket, so it's
+                    # in use.  We won't overwrite it.
+                    raise IOError('Address already in use')
+                os.unlink(addr)
+
             sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
         else:
             sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
@@ -218,7 +314,7 @@
             # used with socket.bind()
             bind_info = ('', bind_info)
 
-        sock, addr = self._make_socket(bind_info)
+        sock, addr = self._make_socket(bind_info, overwrite=True)
         sock.bind(addr)
         if addr[1] == 0:
             # get real port used
@@ -234,7 +330,8 @@
         Connects to the host specified in addr.  If addr is a string in the
         form host:port, or a tuple the form (host, port), a TCP socket is
         established.  Otherwise a Unix socket is established and addr is
-        treated as a filename.
+        treated as a filename.  In this case, if addr does not contain a /
+        character, a kaa tempfile is created.
 
         This function is executed in a thread to avoid blocking.  It therefore
         returns an InProgress object.  If the socket is connected, the 
InProgress
@@ -265,6 +362,8 @@
         self._queue_close = False
 
         sock.setblocking(False)
+        if self._buffer_size:
+            self._set_buffer_size(sock, self._buffer_size)
 
         if self._rmon:
             self._rmon.unregister()
@@ -277,6 +376,10 @@
         if self._write_buffer:
             self._wmon.register(sock, IO_WRITE)
 
+        import main
+        # Disconnect socket and remove socket file (if unix socket) on shutdown
+        main.signals['shutdown'].connect_weak(self.close)
+
 
     def _async_read(self, signal):
         if self._listening:
@@ -318,7 +421,7 @@
             return self._accept()
 
         try:
-            data = self._socket.recv(1024*1024)
+            data = self._socket.recv(self._chunk_size)
         except socket.error, (errno, msg):
             if errno == 11:
                 # Resource temporarily unavailable -- we are trying to read
@@ -326,7 +429,12 @@
                 return
             # If we're here, then the socket is likely disconnected.
             data = None
+        except:
+            log.exception('kaa.Socket._handle_read failed with unknown 
exception, closing socket')
+            data = None
 
+        # _read_signal is for InProgress objects waiting on the next read().
+        # For these we must emit even when data is None.
         self._read_signal.emit(data)
 
         if not data:
@@ -334,6 +442,7 @@
             return self.close(immediate=True, expected=False)
 
         self.signals['read'].emit(data)
+        # FIXME: why do this here?
         self._update_read_monitor()
 
         # TODO: parse input into separate lines and emit readline.
@@ -361,8 +470,19 @@
         self._queue_close = False
 
         self._socket.close()
+        if isinstance(self._addr, basestring) and '/' in self._addr:
+            # Remove unix socket if it exists.
+            try:
+                os.unlink(self._addr)
+            except OSError:
+                pass
+
+        self._addr = None
         self._socket = None
+
         self.signals['closed'].emit(expected)
+        import main
+        main.signals['shutdown'].disconnect(self.close)
 
 
     def write(self, data):

Modified: trunk/base/src/rpc.py
==============================================================================
--- trunk/base/src/rpc.py       (original)
+++ trunk/base/src/rpc.py       Mon Mar 10 21:21:00 2008
@@ -94,10 +94,15 @@
 # kaa imports
 import kaa
 from notifier.async import make_exception_class, AsyncExceptionBase
+from kaa.utils import property
 
 # get logging object
 log = logging.getLogger('rpc')
 
+# Global constants
+RPC_PACKET_HEADER_SIZE = struct.calcsize("I4sI")
+
+
 class ConnectError(Exception):
     pass
 
@@ -121,78 +126,46 @@
 
 class Server(object):
     """
-    RPC server class.
+    RPC server class.  RPC servers accept incoming connections from client,
+    however RPC calls can be issued in either direction.
+
+    address specifies what address to bind the socket to, and can be in
+    the form ip:port or hostname:port or as a 2-tuple (hostname, port).
+    If hostname is an empty string, the socket is bound to all interfaces.
+    If address is a string but not in the above form, it is assumed to be
+    a unix socket.  See kaa.Socket.connect docstring for more info.
 
-    See Client documentation for explanation of bufsize.
+    See kaa.Socket.buffer_size docstring for information on buffer_size.
     """
-    def __init__(self, address, auth_secret = '', bufsize = None):
+    def __init__(self, address, auth_secret = '', buffer_size=None):
 
         self._auth_secret = auth_secret
-        self._socket_buffer_size = bufsize
+        self._socket = kaa.Socket(buffer_size=buffer_size)
+        self._socket.listen(address)
+        self._socket.signals['new-client'].connect_weak(self._new_connection)
 
-        if type(address) in types.StringTypes:
-            if address.find('/') == -1:
-                # create socket in kaa temp dir
-                address = kaa.tempfile(address)
-
-            if os.path.exists(address):
-                # maybe a server is already running at this address, test it
-                try:
-                    s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
-                    s.connect(address)
-                except socket.error, (err, msg):
-                    if err == errno.ECONNREFUSED:
-                        # not running, everything is fine
-                        log.debug('remove socket from dead server')
-                    else:
-                        # some error we do not expect
-                        raise socket.error(err, msg)
-                else:
-                    # server already running
-                    raise IOError('server already running')
-                os.unlink(address)
-            self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
-
-        elif type(address) == tuple:
-            self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
-            self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
-
-        self.socket.setblocking(False)
-        self.socket.bind(address)
-        self.socket.listen(5)
-        self._mon = kaa.WeakIOMonitor(self._new_connection)
-        self._mon.register(self.socket.fileno())
-        # Remove socket file and close clients on shutdown
-        kaa.main.signals["shutdown"].connect_weak(self.close)
-
-        self.signals = {
-            "client_connected": kaa.Signal(),
-        }
+        self.signals = kaa.Signals('client_connected')
         self.objects = []
 
-
-    def _new_connection(self):
+    def _new_connection(self, client_sock):
         """
         Callback when a new client connects.
         """
-        client_sock = self.socket.accept()[0]
-        client_sock.setblocking(False)
         log.debug("New connection %s", client_sock)
-        client = Channel(sock = client_sock, auth_secret = self._auth_secret,
-                         bufsize = self._socket_buffer_size)
+        client_sock.buffer_size = self._socket.buffer_size
+        client = Channel(sock = client_sock, auth_secret = self._auth_secret)
         for obj in self.objects:
             client.connect(obj)
         client._send_auth_challenge()
-        self.signals["client_connected"].emit(client)
+        self.signals['client_connected'].emit(client)
 
 
     def close(self):
         """
         Close the server socket.
         """
-        self.socket = None
-        self._mon.unregister()
-        kaa.main.signals["shutdown"].disconnect(self.close)
+        self._socket.close()
+        self._socket = None
 
 
     def connect(self, obj):
@@ -214,18 +187,20 @@
 
 class Channel(object):
     """
-    Channel object for two point communication. The server creates a Channel
-    object for each client connection, Client itslef is a Channel.
+    Channel object for two point communication, implementing the kaa.rpc
+    protocol. The server creates a Channel object for each incoming client
+    connection.  Client itself is also a Channel.
     """
-    def __init__(self, sock, auth_secret, bufsize = None):
+
+    def __init__(self, sock, auth_secret):
         self._socket = sock
 
-        self._rmon = kaa.IOMonitor(self._handle_read)
-        self._rmon.register(self._socket.fileno(), kaa.IO_READ)
-        self._wmon = kaa.IOMonitor(self._handle_write)
         self._authenticated = False
-        self._write_buffer = ''
-        self._write_buffer_delayed = ''
+        # We start off in an unauthenticated state; set chunk size to something
+        # small to prevent untrusted remote from flooding read buffer.
+        self._socket.chunk_size = 1024
+        # Buffer containing packets deferred until after authentication.
+        self._write_buffer_deferred = []
         self._read_buffer = []
         self._callbacks = {}
         self._next_seq = 1
@@ -233,16 +208,22 @@
         self._auth_secret = auth_secret
         self._pending_challenge = None
 
-        if bufsize:
-            self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 
bufsize)
-            self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 
bufsize)
+        self.signals = kaa.Signals('closed')
+        # Creates a circular reference so that RPC channels survive even when
+        # there is no reference to them.  (Servers do not hold references to
+        # clients channels.)  As long as the socket is connected, the channel
+        # will survive.
+        self._socket.signals['read'].connect(self._handle_read)
+        self._socket.signals['closed'].connect(self._handle_close)
 
-        self.signals = { 'closed': kaa.Signal() }
-        kaa.main.signals["shutdown"].connect_weak(self._handle_close)
 
+    @property
+    def connected(self):
+        return self._socket.connected
 
     def is_connected(self):
-        return self._rmon and self._rmon.active()
+        log.warning('kaa.rpc.Channel.is_connected deprecated; use connected 
property instead')
+        return self.connected
 
 
     def connect(self, obj):
@@ -272,7 +253,7 @@
             kwargs['_kaa_rpc_callback'] = callback
             kaa.MainThreadCallback(self.rpc)(cmd, *args, **kwargs)
             return callback
-        if not self._wmon:
+        if not self._socket.connected:
             raise IOError('channel is disconnected')
         seq = self._next_seq
         self._next_seq += 1
@@ -286,73 +267,41 @@
         return callback
 
 
-    def _handle_close(self):
+    def close(self):
         """
-        Socket is closed.
+        Forcefully close the RPC channel.
         """
-        if not self._wmon:
-            # already closed (no idea why this happens)
-            return False
-        log.debug('close socket for %s', self)
         self._socket.close()
-        self._socket = None
-        if self._wmon.active():
-            self._wmon.unregister()
-        if self._rmon.active():
-            self._rmon.unregister()
-        self._wmon = self._rmon = None
-        self.signals['closed'].emit()
-        self.signals = {}
-        kaa.main.signals["shutdown"].disconnect(self._handle_close)
 
 
-    def _handle_read(self):
+    def _handle_close(self, expected):
         """
-        Read from the socket (callback from notifier).
+        kaa.Socket callback invoked when socket is closed.
         """
-        try:
-            # If not authenticated, read at most 1k.
-            data = self._socket.recv((1024, 1024*1024)[self._authenticated])
-        except socket.error, (err, msg):
-            if err == errno.EAGAIN:
-                # Resource temporarily unavailable -- we are trying to read
-                # data on a socket when none is available.
-                return
-            # If we're here, then the socket is likely disconnected.
-            data = None
-        except (KeyboardInterrupt, SystemExit):
-            raise SystemExit
-        except:
-            log.exception('_handle_read failed, close socket')
-            data = None
+        if not self._authenticated:
+            # Socket closed before authentication completed.  We assume it's
+            # because authentication failed (though it may not be).
+            log.error('Socket closed before authentication completed')
 
-        if not data:
-            if not self._authenticated:
-                # Remote end closed connection during authentication; this 
-                # almost certainly means auth secret mismatch.
-                log.error('Authentication failed')
-            self._handle_close()
-            # Return False to cause notifier to remove fd handler.
-            return False
+        log.debug('close socket for %s', self)
+        self.signals['closed'].emit()
+        self.signals = {}
+        self._socket.signals['read'].disconnect(self._handle_read)
+        self._socket.signals['closed'].disconnect(self._handle_close)
 
-        self._read_buffer.append(data)
-        # read as much data as we have
-        while self._authenticated:
-            try:
-                data = self._socket.recv(1024*1024)
-            except socket.error, (err, msg):
-                break
-            if not data:
-                break
-            self._read_buffer.append(data)
 
-        header_size = struct.calcsize("I4sI")
+    def _handle_read(self, data):
+        """
+        Invoked when a new chunk is read from the socket.  When not 
authenticated,
+        chunk size is 1k; when authenticated it is 1M.
+        """
+        self._read_buffer.append(data)
         # Before we start into the loop, make sure we have enough data for
         # a full packet.  For very large packets (if we just received a huge
         # pickled object), this saves the string.join() which can be very
         # expensive.  (This is the reason we use a list for our read buffer.)
         buflen = reduce(lambda x, y: x + len(y), self._read_buffer, 0)
-        if buflen < header_size:
+        if buflen < RPC_PACKET_HEADER_SIZE:
             return
 
         if not self._authenticated and buflen > 1024:
@@ -360,36 +309,36 @@
             # in the buffer.  If we do it's because the remote has sent a
             # large amount of data before completing authentication.
             log.warning("Too much data received from remote end before 
authentication; disconnecting")
-            self._handle_close()
+            self.close()
             return
 
         # Ensure the first block in the read buffer is big enough for a full
         # packet header.  If it isn't, then we must have more than 1 block in
         # the buffer, so keep merging blocks until we have a block big enough
-        # to be a header.  If we're here, it means that buflen >= header_size,
-        # so we can safely loop.
-        while len(self._read_buffer[0]) < header_size:
+        # to be a header.  If we're here, it means that buflen >=
+        # RPC_PACKET_HEADER_SIZE, so we can safely loop.
+        while len(self._read_buffer[0]) < RPC_PACKET_HEADER_SIZE:
             self._read_buffer[0] += self._read_buffer.pop(1)
 
         # Make sure the the buffer holds enough data as indicated by the
         # payload size in the header.
-        header = self._read_buffer[0][:header_size]
+        header = self._read_buffer[0][:RPC_PACKET_HEADER_SIZE]
         payload_len = struct.unpack("I4sI", header)[2]
-        if buflen < payload_len + header_size:
+        if buflen < payload_len + RPC_PACKET_HEADER_SIZE:
             return
 
         # At this point we know we have enough data in the buffer for the
         # packet, so we merge the array into a single buffer.
         strbuf = ''.join(self._read_buffer)
         self._read_buffer = []
-        while 1:
-            if len(strbuf) <= header_size:
+        while True:
+            if len(strbuf) <= RPC_PACKET_HEADER_SIZE:
                 if len(strbuf) > 0:
                     self._read_buffer.append(str(strbuf))
                 break
-            header = strbuf[:header_size]
+            header = strbuf[:RPC_PACKET_HEADER_SIZE]
             seq, packet_type, payload_len = struct.unpack("I4sI", header)
-            if len(strbuf) < payload_len + header_size:
+            if len(strbuf) < payload_len + RPC_PACKET_HEADER_SIZE:
                 # We've also received portion of another packet that we
                 # haven't fully received yet.  Put back to the buffer what
                 # we have so far, and we can exit the loop.
@@ -398,8 +347,8 @@
 
             # Grab the payload for this packet, and shuffle strbuf to the
             # next packet.
-            payload = strbuf[header_size:header_size + payload_len]
-            strbuf = buffer(strbuf, header_size + payload_len)
+            payload = strbuf[RPC_PACKET_HEADER_SIZE:RPC_PACKET_HEADER_SIZE + 
payload_len]
+            strbuf = buffer(strbuf, RPC_PACKET_HEADER_SIZE + payload_len)
             #log.debug("Got packet %s", packet_type)
             if not self._authenticated:
                 self._handle_packet_before_auth(seq, packet_type, payload)
@@ -416,44 +365,9 @@
         header = struct.pack("I4sI", seq, packet_type, len(payload))
         if not self._authenticated and packet_type not in ('RESP', 'AUTH'):
             log.debug('delay packet %s', packet_type)
-            self._write_buffer_delayed += header + payload
+            self._write_buffer_deferred.append(header + payload)
         else:
-            self._write_buffer += header + payload
-
-        self._handle_write(close_on_error=False)
-        self._flush()
-
-
-    def _flush(self):
-        """
-        If there is data pending in the write buffer, ensure that it is
-        written next notifier loop.
-        """
-        if not self._wmon.active() and self._write_buffer:
-            self._wmon.register(self._socket.fileno(), kaa.IO_WRITE)
-
-
-    def _handle_write(self, close_on_error=True):
-        """
-        Write to the socket (callback from notifier).
-        """
-        if not len(self._write_buffer):
-            return False
-        try:
-            sent = self._socket.send(self._write_buffer)
-            self._write_buffer = self._write_buffer[sent:]
-            if not self._write_buffer:
-                return False
-        except socket.error, (err, msg):
-            if err == errno.EAGAIN:
-                # Resource temporarily unavailable -- we are trying to write
-                # data to a socket when none is available.
-                return
-            # If we're here, then the socket is likely disconnected.
-            if close_on_error:
-                self._handle_close()
-            return False
-        return True
+            self._socket.write(header + payload)
 
 
     def _send_answer(self, answer, seq):
@@ -606,7 +520,7 @@
             # Received a non-auth command while expecting auth.
             log.error('got %s before authentication is complete; closing 
socket.' % type)
             # Hang up.
-            self._handle_close()
+            self.close()
             return
 
         try:
@@ -624,7 +538,7 @@
             challenge, response, salt = struct.unpack("20s20s20s", payload)
         except:
             log.warning("Malformed authentication packet from remote; 
disconnecting.")
-            self._handle_close()
+            self.close()
             return
 
         # At this point, challenge, response, and salt are 20 byte strings of
@@ -637,7 +551,7 @@
             # disconnect immediately.
             if self._pending_challenge:
                 self._pending_challenge = None
-                self._handle_close()
+                self.close()
                 return
 
             # Otherwise send the response, plus a challenge of our own.
@@ -654,7 +568,7 @@
             if self._pending_challenge == None:
                 # We've received a response packet to auth, but we haven't
                 # sent a challenge.  Something isn't right, so disconnect.
-                self._handle_close()
+                self.close()
                 return
 
             # Step 3/4: We are expecting a response to our previous challenge
@@ -669,12 +583,14 @@
             # Now check to see if we were sent what we expected.
             if response != expected_response:
                 log.error('authentication error')
-                self._handle_close()
+                self.close()
                 return
 
             # Challenge response was good, so the remote is considered
-            # authenticated now.
+            # authenticated now.  We increase the chunk size on the socket
+            # so we read more at once.
             self._authenticated = True
+            self._socket.chunk_size = 1024*1024
             log.debug('Valid response received, remote authenticated.')
 
             # If remote has issued a counter-challenge along with their
@@ -692,9 +608,9 @@
                 self._send_packet(seq, 'RESP', payload)
                 log.debug('Sent response to challenge from client.')
 
-            self._write_buffer += self._write_buffer_delayed
-            self._write_buffer_delayed = ''
-            self._flush()
+            # Empty deferred write buffer now that we're authenticated.
+            self._socket.write(''.join(self._write_buffer_deferred))
+            self._write_buffer_deferred = []
 
 
     def _get_rand_value(self):
@@ -760,32 +676,22 @@
         tp = self._get_channel_type()
         if not self._socket:
             return '<kaa.rpc.Channel (%s) - disconnected>' % tp
-        return '<kaa.rpc.Channel (%s) %s>' % (tp, self._socket.fileno())
+        return '<kaa.rpc.Channel (%s) %s>' % (tp, self._socket.fileno)
 
 
 
 class Client(Channel):
     """
     RPC client to be connected to a server.
-
-    If bufsize is not None, the socket send and receive buffers will be set to
-    this size.  Setting this to higher values (say 1M) improves performance
-    when sending large amounts of data via RPC.  Note that the upper bound may
-    be restricted by the kernel.  (Under Linux, this can be tuned by adjusting
-    /proc/sys/net/core/[rw]mem_max)
     """
-    def __init__(self, address, auth_secret = '', bufsize = None):
-        if type(address) in types.StringTypes:
-            address = kaa.tempfile(address)
-            fd = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
-        if type(address) == tuple:
-            fd = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
-        try:
-            fd.connect(address)
-        except socket.error, e:
-            raise ConnectError(e)
-        fd.setblocking(False)
-        Channel.__init__(self, fd, auth_secret, bufsize)
+    def __init__(self, address, auth_secret = '', buffer_size = None):
+        sock = kaa.Socket(buffer_size)
+        sock.buffer_size = buffer_size
+        # FIXME: we block on connect for now; Channel.rpc() tests socket
+        # connected and raises exception if it isn't, so if we do rpc() right
+        # after connecting, it will fail.
+        sock.connect(address).wait()
+        Channel.__init__(self, sock, auth_secret)
 
 
     def _get_channel_type(self):

-------------------------------------------------------------------------
This SF.net email is sponsored by: Microsoft
Defy all challenges. Microsoft(R) Visual Studio 2008.
http://clk.atdmt.com/MRT/go/vse0120000070mrt/direct/01/
_______________________________________________
Freevo-cvslog mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/freevo-cvslog

Reply via email to