Author: tack
Date: Mon Mar 10 21:21:00 2008
New Revision: 3173
Log:
AsyncException: use str() on the proxied exception to get the message instead
of accessing self.message or self.args. Results in simpler code that I think
is more correct.
kaa.Socket changes: new buffer_size and chunk_size properties; custom __repr__
that shows fileno; use kaa.tempfile() on unix sockets if only filename is
specified; clear out stale unix sockets when listening (this logic moved from
kaa.rpc); register shutdown handler to close socket and clean up.
kaa.rpc: significantly refactored to use kaa.Socket; new Channel.close() method
added.
Modified:
trunk/base/src/notifier/async.py
trunk/base/src/notifier/sockets.py
trunk/base/src/rpc.py
Modified: trunk/base/src/notifier/async.py
==============================================================================
--- trunk/base/src/notifier/async.py (original)
+++ trunk/base/src/notifier/async.py Mon Mar 10 21:21:00 2008
@@ -92,15 +92,7 @@
def __str__(self):
dump = ''.join(traceback.format_list(self._kaa_exc_stack))
- # Python 2.5 always has self.message; for Python 2.4, fall back to
- # first argument if it's a string.
- msg = (hasattr(self, 'message') and self.message) or \
- (self.args and isinstance(self.args[0], basestring) and
self.args[0])
- if msg:
- info = '%s: %s' % (self._kaa_exc.__class__.__name__, msg)
- else:
- info = self._kaa_exc.__class__.__name__
-
+ info = '%s: %s' % (self._kaa_exc.__class__.__name__,
str(self._kaa_exc))
return self._kaa_get_header() + '\n' + dump + info
Modified: trunk/base/src/notifier/sockets.py
==============================================================================
--- trunk/base/src/notifier/sockets.py (original)
+++ trunk/base/src/notifier/sockets.py Mon Mar 10 21:21:00 2008
@@ -32,7 +32,10 @@
__all__ = [ 'IOMonitor', 'WeakIOMonitor', 'Socket', 'IO_READ', 'IO_WRITE' ]
+import sys
+import os
import socket
+import errno
import logging
import nf_wrapper as notifier
@@ -42,6 +45,7 @@
from async import InProgress, InProgressCallback
from kaa.utils import property
from timer import OneShotTimer, timed, POLICY_ONCE
+from kaa.tmpfile import tempfile
# get logging object
log = logging.getLogger('notifier')
@@ -87,13 +91,15 @@
Notifier-aware socket class.
"""
- def __init__(self):
+ def __init__(self, buffer_size=None, chunk_size=1024*1024):
self.signals = Signals('closed', 'read', 'readline', 'new-client')
self._socket = None
self._write_buffer = []
self._addr = None
self._listening = False
self._queue_close = False
+ self._buffer_size = buffer_size
+ self._chunk_size = chunk_size
# Internal signals for read() and readline() (these are different from
# the public signals 'read' and 'readline' as they get emitted even
@@ -111,6 +117,11 @@
# referenced.
self._rmon = self._wmon = None
+ def __repr__(self):
+ if not self._socket:
+ return '<kaa.Socket - disconnected>'
+ return '<kaa.Socket fd=%d>' % self.fileno
+
@property
def address(self):
@@ -141,6 +152,60 @@
return self._socket != None
+ @property
+ def buffer_size(self):
+ """
+ Size of the send and receive socket buffers (SO_SNDBUF and SO_RCVBUF)
+ in bytes. Setting this to higher values (say 1M) improves performance
+ when sending large amounts of data across the socket. Note that the
+ upper bound may be restricted by the kernel. (Under Linux, this can be
+ tuned by adjusting /proc/sys/net/core/[rw]mem_max)
+ """
+ return self._buffer_size
+
+
+ @buffer_size.setter
+ def buffer_size(self, size):
+ self._buffer_size = size
+ if self._socket and size:
+ self._set_buffer_size(self._socket, size)
+
+
+ @property
+ def chunk_size(self):
+ """
+ Number of bytes to attempt to read from the socket at a time. The
+ default is 1M. A 'read' signal is emitted for each chunk read from the
+ socket. (The number of bytes read at a time may be less than the chunk
+ size, but will never be more.)
+ """
+ return self._chunk_size
+
+
+ @chunk_size.setter
+ def chunk_size(self, size):
+ self._chunk_size = size
+
+
+ @property
+ def fileno(self):
+ """
+ Returns the file descriptor of the socket, or None if the socket is
+ not connected.
+ """
+ if not self._socket:
+ return None
+ return self._socket.fileno()
+
+
+ def _set_buffer_size(self, s, size):
+ """
+ Sets the send and receive buffers of the given socket s to size.
+ """
+ s.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, size)
+ s.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, size)
+
+
@timed(0, OneShotTimer, POLICY_ONCE)
def _update_read_monitor(self, signal = None, change = None):
# Update read IOMonitor to register or unregister based on if there are
@@ -151,7 +216,7 @@
return
elif not self._listening and len(self._read_signal) ==
len(self._readline_signal) == \
len(self.signals['read']) ==
len(self.signals['readline']) == 0:
- self._rmon.unregister()
+ self._rmon.unregister()
elif not self._rmon.active():
self._rmon.register(self._socket, IO_READ)
@@ -160,26 +225,57 @@
"""
Converts address strings in the form host:port into 2-tuples
containing the hostname and integer port. Strings not in that
- form are left untouched (as they represent unix socket paths).
+ form are assumed to represent unix socket paths. If such a string
+ contains no /, a tempfile is used using kaa.tempfile(). If we can't
+ make sense of the given address, a ValueError exception will
+ be raised.
"""
- if isinstance(addr, basestring) and ":" in addr:
- addr = addr.split(":")
- assert(len(addr) == 2)
- addr[1] = int(addr[1])
- addr = tuple(addr)
+ if isinstance(addr, basestring):
+ if addr.count(':') == 1:
+ addr, port = addr.split(':')
+ if not port.isdigit():
+ raise ValueError('Port specified is not an integer')
+ return addr, int(port)
+ elif '/' not in addr:
+ return tempfile(addr)
+ elif not isinstance(addr, (tuple, list)) or len(addr) != 2:
+ raise ValueError('Invalid address')
return addr
- def _make_socket(self, addr = None):
+ def _make_socket(self, addr = None, overwrite = False):
"""
Constructs a socket based on the given addr. Returns the socket and
the normalized address as a 2-tuple.
+
+ If overwrite is True, if addr specifies a path to a unix socket and
+ that unix socket already exists, it will be removed if the socket is
+ not actually in use. If it is in use, an IOError will be raised.
"""
addr = self._normalize_address(addr)
assert(type(addr) in (str, tuple, None))
if isinstance(addr, basestring):
+ if overwrite and os.path.exists(addr):
+ # Unix socket exists; test to see if it's active.
+ try:
+ dummy = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+ dummy.connect(addr)
+ except socket.error, (err, msg):
+ if err == errno.ECONNREFUSED:
+ # Socket is not active, so we can remove it.
+ log.debug('Replacing dead unix socket at %s' % addr)
+ else:
+ # Reraise unexpected exception
+ tp, exc, tb = sys.exc_info()
+ raise tp, exc, tb
+ else:
+ # We were able to connect to the existing socket, so it's
+ # in use. We won't overwrite it.
+ raise IOError('Address already in use')
+ os.unlink(addr)
+
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
else:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
@@ -218,7 +314,7 @@
# used with socket.bind()
bind_info = ('', bind_info)
- sock, addr = self._make_socket(bind_info)
+ sock, addr = self._make_socket(bind_info, overwrite=True)
sock.bind(addr)
if addr[1] == 0:
# get real port used
@@ -234,7 +330,8 @@
Connects to the host specified in addr. If addr is a string in the
form host:port, or a tuple the form (host, port), a TCP socket is
established. Otherwise a Unix socket is established and addr is
- treated as a filename.
+ treated as a filename. In this case, if addr does not contain a /
+ character, a kaa tempfile is created.
This function is executed in a thread to avoid blocking. It therefore
returns an InProgress object. If the socket is connected, the
InProgress
@@ -265,6 +362,8 @@
self._queue_close = False
sock.setblocking(False)
+ if self._buffer_size:
+ self._set_buffer_size(sock, self._buffer_size)
if self._rmon:
self._rmon.unregister()
@@ -277,6 +376,10 @@
if self._write_buffer:
self._wmon.register(sock, IO_WRITE)
+ import main
+ # Disconnect socket and remove socket file (if unix socket) on shutdown
+ main.signals['shutdown'].connect_weak(self.close)
+
def _async_read(self, signal):
if self._listening:
@@ -318,7 +421,7 @@
return self._accept()
try:
- data = self._socket.recv(1024*1024)
+ data = self._socket.recv(self._chunk_size)
except socket.error, (errno, msg):
if errno == 11:
# Resource temporarily unavailable -- we are trying to read
@@ -326,7 +429,12 @@
return
# If we're here, then the socket is likely disconnected.
data = None
+ except:
+ log.exception('kaa.Socket._handle_read failed with unknown
exception, closing socket')
+ data = None
+ # _read_signal is for InProgress objects waiting on the next read().
+ # For these we must emit even when data is None.
self._read_signal.emit(data)
if not data:
@@ -334,6 +442,7 @@
return self.close(immediate=True, expected=False)
self.signals['read'].emit(data)
+ # FIXME: why do this here?
self._update_read_monitor()
# TODO: parse input into separate lines and emit readline.
@@ -361,8 +470,19 @@
self._queue_close = False
self._socket.close()
+ if isinstance(self._addr, basestring) and '/' in self._addr:
+ # Remove unix socket if it exists.
+ try:
+ os.unlink(self._addr)
+ except OSError:
+ pass
+
+ self._addr = None
self._socket = None
+
self.signals['closed'].emit(expected)
+ import main
+ main.signals['shutdown'].disconnect(self.close)
def write(self, data):
Modified: trunk/base/src/rpc.py
==============================================================================
--- trunk/base/src/rpc.py (original)
+++ trunk/base/src/rpc.py Mon Mar 10 21:21:00 2008
@@ -94,10 +94,15 @@
# kaa imports
import kaa
from notifier.async import make_exception_class, AsyncExceptionBase
+from kaa.utils import property
# get logging object
log = logging.getLogger('rpc')
+# Global constants
+RPC_PACKET_HEADER_SIZE = struct.calcsize("I4sI")
+
+
class ConnectError(Exception):
pass
@@ -121,78 +126,46 @@
class Server(object):
"""
- RPC server class.
+ RPC server class. RPC servers accept incoming connections from client,
+ however RPC calls can be issued in either direction.
+
+ address specifies what address to bind the socket to, and can be in
+ the form ip:port or hostname:port or as a 2-tuple (hostname, port).
+ If hostname is an empty string, the socket is bound to all interfaces.
+ If address is a string but not in the above form, it is assumed to be
+ a unix socket. See kaa.Socket.connect docstring for more info.
- See Client documentation for explanation of bufsize.
+ See kaa.Socket.buffer_size docstring for information on buffer_size.
"""
- def __init__(self, address, auth_secret = '', bufsize = None):
+ def __init__(self, address, auth_secret = '', buffer_size=None):
self._auth_secret = auth_secret
- self._socket_buffer_size = bufsize
+ self._socket = kaa.Socket(buffer_size=buffer_size)
+ self._socket.listen(address)
+ self._socket.signals['new-client'].connect_weak(self._new_connection)
- if type(address) in types.StringTypes:
- if address.find('/') == -1:
- # create socket in kaa temp dir
- address = kaa.tempfile(address)
-
- if os.path.exists(address):
- # maybe a server is already running at this address, test it
- try:
- s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
- s.connect(address)
- except socket.error, (err, msg):
- if err == errno.ECONNREFUSED:
- # not running, everything is fine
- log.debug('remove socket from dead server')
- else:
- # some error we do not expect
- raise socket.error(err, msg)
- else:
- # server already running
- raise IOError('server already running')
- os.unlink(address)
- self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
-
- elif type(address) == tuple:
- self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
-
- self.socket.setblocking(False)
- self.socket.bind(address)
- self.socket.listen(5)
- self._mon = kaa.WeakIOMonitor(self._new_connection)
- self._mon.register(self.socket.fileno())
- # Remove socket file and close clients on shutdown
- kaa.main.signals["shutdown"].connect_weak(self.close)
-
- self.signals = {
- "client_connected": kaa.Signal(),
- }
+ self.signals = kaa.Signals('client_connected')
self.objects = []
-
- def _new_connection(self):
+ def _new_connection(self, client_sock):
"""
Callback when a new client connects.
"""
- client_sock = self.socket.accept()[0]
- client_sock.setblocking(False)
log.debug("New connection %s", client_sock)
- client = Channel(sock = client_sock, auth_secret = self._auth_secret,
- bufsize = self._socket_buffer_size)
+ client_sock.buffer_size = self._socket.buffer_size
+ client = Channel(sock = client_sock, auth_secret = self._auth_secret)
for obj in self.objects:
client.connect(obj)
client._send_auth_challenge()
- self.signals["client_connected"].emit(client)
+ self.signals['client_connected'].emit(client)
def close(self):
"""
Close the server socket.
"""
- self.socket = None
- self._mon.unregister()
- kaa.main.signals["shutdown"].disconnect(self.close)
+ self._socket.close()
+ self._socket = None
def connect(self, obj):
@@ -214,18 +187,20 @@
class Channel(object):
"""
- Channel object for two point communication. The server creates a Channel
- object for each client connection, Client itslef is a Channel.
+ Channel object for two point communication, implementing the kaa.rpc
+ protocol. The server creates a Channel object for each incoming client
+ connection. Client itself is also a Channel.
"""
- def __init__(self, sock, auth_secret, bufsize = None):
+
+ def __init__(self, sock, auth_secret):
self._socket = sock
- self._rmon = kaa.IOMonitor(self._handle_read)
- self._rmon.register(self._socket.fileno(), kaa.IO_READ)
- self._wmon = kaa.IOMonitor(self._handle_write)
self._authenticated = False
- self._write_buffer = ''
- self._write_buffer_delayed = ''
+ # We start off in an unauthenticated state; set chunk size to something
+ # small to prevent untrusted remote from flooding read buffer.
+ self._socket.chunk_size = 1024
+ # Buffer containing packets deferred until after authentication.
+ self._write_buffer_deferred = []
self._read_buffer = []
self._callbacks = {}
self._next_seq = 1
@@ -233,16 +208,22 @@
self._auth_secret = auth_secret
self._pending_challenge = None
- if bufsize:
- self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF,
bufsize)
- self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF,
bufsize)
+ self.signals = kaa.Signals('closed')
+ # Creates a circular reference so that RPC channels survive even when
+ # there is no reference to them. (Servers do not hold references to
+ # clients channels.) As long as the socket is connected, the channel
+ # will survive.
+ self._socket.signals['read'].connect(self._handle_read)
+ self._socket.signals['closed'].connect(self._handle_close)
- self.signals = { 'closed': kaa.Signal() }
- kaa.main.signals["shutdown"].connect_weak(self._handle_close)
+ @property
+ def connected(self):
+ return self._socket.connected
def is_connected(self):
- return self._rmon and self._rmon.active()
+ log.warning('kaa.rpc.Channel.is_connected deprecated; use connected
property instead')
+ return self.connected
def connect(self, obj):
@@ -272,7 +253,7 @@
kwargs['_kaa_rpc_callback'] = callback
kaa.MainThreadCallback(self.rpc)(cmd, *args, **kwargs)
return callback
- if not self._wmon:
+ if not self._socket.connected:
raise IOError('channel is disconnected')
seq = self._next_seq
self._next_seq += 1
@@ -286,73 +267,41 @@
return callback
- def _handle_close(self):
+ def close(self):
"""
- Socket is closed.
+ Forcefully close the RPC channel.
"""
- if not self._wmon:
- # already closed (no idea why this happens)
- return False
- log.debug('close socket for %s', self)
self._socket.close()
- self._socket = None
- if self._wmon.active():
- self._wmon.unregister()
- if self._rmon.active():
- self._rmon.unregister()
- self._wmon = self._rmon = None
- self.signals['closed'].emit()
- self.signals = {}
- kaa.main.signals["shutdown"].disconnect(self._handle_close)
- def _handle_read(self):
+ def _handle_close(self, expected):
"""
- Read from the socket (callback from notifier).
+ kaa.Socket callback invoked when socket is closed.
"""
- try:
- # If not authenticated, read at most 1k.
- data = self._socket.recv((1024, 1024*1024)[self._authenticated])
- except socket.error, (err, msg):
- if err == errno.EAGAIN:
- # Resource temporarily unavailable -- we are trying to read
- # data on a socket when none is available.
- return
- # If we're here, then the socket is likely disconnected.
- data = None
- except (KeyboardInterrupt, SystemExit):
- raise SystemExit
- except:
- log.exception('_handle_read failed, close socket')
- data = None
+ if not self._authenticated:
+ # Socket closed before authentication completed. We assume it's
+ # because authentication failed (though it may not be).
+ log.error('Socket closed before authentication completed')
- if not data:
- if not self._authenticated:
- # Remote end closed connection during authentication; this
- # almost certainly means auth secret mismatch.
- log.error('Authentication failed')
- self._handle_close()
- # Return False to cause notifier to remove fd handler.
- return False
+ log.debug('close socket for %s', self)
+ self.signals['closed'].emit()
+ self.signals = {}
+ self._socket.signals['read'].disconnect(self._handle_read)
+ self._socket.signals['closed'].disconnect(self._handle_close)
- self._read_buffer.append(data)
- # read as much data as we have
- while self._authenticated:
- try:
- data = self._socket.recv(1024*1024)
- except socket.error, (err, msg):
- break
- if not data:
- break
- self._read_buffer.append(data)
- header_size = struct.calcsize("I4sI")
+ def _handle_read(self, data):
+ """
+ Invoked when a new chunk is read from the socket. When not
authenticated,
+ chunk size is 1k; when authenticated it is 1M.
+ """
+ self._read_buffer.append(data)
# Before we start into the loop, make sure we have enough data for
# a full packet. For very large packets (if we just received a huge
# pickled object), this saves the string.join() which can be very
# expensive. (This is the reason we use a list for our read buffer.)
buflen = reduce(lambda x, y: x + len(y), self._read_buffer, 0)
- if buflen < header_size:
+ if buflen < RPC_PACKET_HEADER_SIZE:
return
if not self._authenticated and buflen > 1024:
@@ -360,36 +309,36 @@
# in the buffer. If we do it's because the remote has sent a
# large amount of data before completing authentication.
log.warning("Too much data received from remote end before
authentication; disconnecting")
- self._handle_close()
+ self.close()
return
# Ensure the first block in the read buffer is big enough for a full
# packet header. If it isn't, then we must have more than 1 block in
# the buffer, so keep merging blocks until we have a block big enough
- # to be a header. If we're here, it means that buflen >= header_size,
- # so we can safely loop.
- while len(self._read_buffer[0]) < header_size:
+ # to be a header. If we're here, it means that buflen >=
+ # RPC_PACKET_HEADER_SIZE, so we can safely loop.
+ while len(self._read_buffer[0]) < RPC_PACKET_HEADER_SIZE:
self._read_buffer[0] += self._read_buffer.pop(1)
# Make sure the the buffer holds enough data as indicated by the
# payload size in the header.
- header = self._read_buffer[0][:header_size]
+ header = self._read_buffer[0][:RPC_PACKET_HEADER_SIZE]
payload_len = struct.unpack("I4sI", header)[2]
- if buflen < payload_len + header_size:
+ if buflen < payload_len + RPC_PACKET_HEADER_SIZE:
return
# At this point we know we have enough data in the buffer for the
# packet, so we merge the array into a single buffer.
strbuf = ''.join(self._read_buffer)
self._read_buffer = []
- while 1:
- if len(strbuf) <= header_size:
+ while True:
+ if len(strbuf) <= RPC_PACKET_HEADER_SIZE:
if len(strbuf) > 0:
self._read_buffer.append(str(strbuf))
break
- header = strbuf[:header_size]
+ header = strbuf[:RPC_PACKET_HEADER_SIZE]
seq, packet_type, payload_len = struct.unpack("I4sI", header)
- if len(strbuf) < payload_len + header_size:
+ if len(strbuf) < payload_len + RPC_PACKET_HEADER_SIZE:
# We've also received portion of another packet that we
# haven't fully received yet. Put back to the buffer what
# we have so far, and we can exit the loop.
@@ -398,8 +347,8 @@
# Grab the payload for this packet, and shuffle strbuf to the
# next packet.
- payload = strbuf[header_size:header_size + payload_len]
- strbuf = buffer(strbuf, header_size + payload_len)
+ payload = strbuf[RPC_PACKET_HEADER_SIZE:RPC_PACKET_HEADER_SIZE +
payload_len]
+ strbuf = buffer(strbuf, RPC_PACKET_HEADER_SIZE + payload_len)
#log.debug("Got packet %s", packet_type)
if not self._authenticated:
self._handle_packet_before_auth(seq, packet_type, payload)
@@ -416,44 +365,9 @@
header = struct.pack("I4sI", seq, packet_type, len(payload))
if not self._authenticated and packet_type not in ('RESP', 'AUTH'):
log.debug('delay packet %s', packet_type)
- self._write_buffer_delayed += header + payload
+ self._write_buffer_deferred.append(header + payload)
else:
- self._write_buffer += header + payload
-
- self._handle_write(close_on_error=False)
- self._flush()
-
-
- def _flush(self):
- """
- If there is data pending in the write buffer, ensure that it is
- written next notifier loop.
- """
- if not self._wmon.active() and self._write_buffer:
- self._wmon.register(self._socket.fileno(), kaa.IO_WRITE)
-
-
- def _handle_write(self, close_on_error=True):
- """
- Write to the socket (callback from notifier).
- """
- if not len(self._write_buffer):
- return False
- try:
- sent = self._socket.send(self._write_buffer)
- self._write_buffer = self._write_buffer[sent:]
- if not self._write_buffer:
- return False
- except socket.error, (err, msg):
- if err == errno.EAGAIN:
- # Resource temporarily unavailable -- we are trying to write
- # data to a socket when none is available.
- return
- # If we're here, then the socket is likely disconnected.
- if close_on_error:
- self._handle_close()
- return False
- return True
+ self._socket.write(header + payload)
def _send_answer(self, answer, seq):
@@ -606,7 +520,7 @@
# Received a non-auth command while expecting auth.
log.error('got %s before authentication is complete; closing
socket.' % type)
# Hang up.
- self._handle_close()
+ self.close()
return
try:
@@ -624,7 +538,7 @@
challenge, response, salt = struct.unpack("20s20s20s", payload)
except:
log.warning("Malformed authentication packet from remote;
disconnecting.")
- self._handle_close()
+ self.close()
return
# At this point, challenge, response, and salt are 20 byte strings of
@@ -637,7 +551,7 @@
# disconnect immediately.
if self._pending_challenge:
self._pending_challenge = None
- self._handle_close()
+ self.close()
return
# Otherwise send the response, plus a challenge of our own.
@@ -654,7 +568,7 @@
if self._pending_challenge == None:
# We've received a response packet to auth, but we haven't
# sent a challenge. Something isn't right, so disconnect.
- self._handle_close()
+ self.close()
return
# Step 3/4: We are expecting a response to our previous challenge
@@ -669,12 +583,14 @@
# Now check to see if we were sent what we expected.
if response != expected_response:
log.error('authentication error')
- self._handle_close()
+ self.close()
return
# Challenge response was good, so the remote is considered
- # authenticated now.
+ # authenticated now. We increase the chunk size on the socket
+ # so we read more at once.
self._authenticated = True
+ self._socket.chunk_size = 1024*1024
log.debug('Valid response received, remote authenticated.')
# If remote has issued a counter-challenge along with their
@@ -692,9 +608,9 @@
self._send_packet(seq, 'RESP', payload)
log.debug('Sent response to challenge from client.')
- self._write_buffer += self._write_buffer_delayed
- self._write_buffer_delayed = ''
- self._flush()
+ # Empty deferred write buffer now that we're authenticated.
+ self._socket.write(''.join(self._write_buffer_deferred))
+ self._write_buffer_deferred = []
def _get_rand_value(self):
@@ -760,32 +676,22 @@
tp = self._get_channel_type()
if not self._socket:
return '<kaa.rpc.Channel (%s) - disconnected>' % tp
- return '<kaa.rpc.Channel (%s) %s>' % (tp, self._socket.fileno())
+ return '<kaa.rpc.Channel (%s) %s>' % (tp, self._socket.fileno)
class Client(Channel):
"""
RPC client to be connected to a server.
-
- If bufsize is not None, the socket send and receive buffers will be set to
- this size. Setting this to higher values (say 1M) improves performance
- when sending large amounts of data via RPC. Note that the upper bound may
- be restricted by the kernel. (Under Linux, this can be tuned by adjusting
- /proc/sys/net/core/[rw]mem_max)
"""
- def __init__(self, address, auth_secret = '', bufsize = None):
- if type(address) in types.StringTypes:
- address = kaa.tempfile(address)
- fd = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
- if type(address) == tuple:
- fd = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- try:
- fd.connect(address)
- except socket.error, e:
- raise ConnectError(e)
- fd.setblocking(False)
- Channel.__init__(self, fd, auth_secret, bufsize)
+ def __init__(self, address, auth_secret = '', buffer_size = None):
+ sock = kaa.Socket(buffer_size)
+ sock.buffer_size = buffer_size
+ # FIXME: we block on connect for now; Channel.rpc() tests socket
+ # connected and raises exception if it isn't, so if we do rpc() right
+ # after connecting, it will fail.
+ sock.connect(address).wait()
+ Channel.__init__(self, sock, auth_secret)
def _get_channel_type(self):
-------------------------------------------------------------------------
This SF.net email is sponsored by: Microsoft
Defy all challenges. Microsoft(R) Visual Studio 2008.
http://clk.atdmt.com/MRT/go/vse0120000070mrt/direct/01/
_______________________________________________
Freevo-cvslog mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/freevo-cvslog