Author: dmeyer
Date: Fri Jun 23 18:46:40 2006
New Revision: 1551
Added:
trunk/base/src/rpc.py
Log:
Add a simple rpc module. It has much less features
than kaa.ipc but it is possible to use this without
any blocking (step() calling)
Added: trunk/base/src/rpc.py
==============================================================================
--- (empty file)
+++ trunk/base/src/rpc.py Fri Jun 23 18:46:40 2006
@@ -0,0 +1,435 @@
+# -*- coding: iso-8859-1 -*-
+# -----------------------------------------------------------------------------
+# rpc.py - Simple RPC InterProcessCommunication
+# -----------------------------------------------------------------------------
+# $Id$
+#
+# This module defines an alternative way for InterProcessCommunication with
+# less features than the ipc.py module. It does not keep references, return
+# values are only given back as a callback and it is only possible to access
+# functions.
+#
+# So wy use this module and not kaa.ipc? Well, kaa.ipc makes it very easy to
+# shoot yourself into the foot. It keeps references over ipc which could
+# confuse the garbage collector and a simple function call on an object can
+# result in many notifier steps incl. recursion inside the notifier.
+#
+# -----------------------------------------------------------------------------
+# Copyright (C) 2006 Dirk Meyer, et al.
+#
+# First Version: Dirk Meyer <[EMAIL PROTECTED]>
+# Maintainer: Dirk Meyer <[EMAIL PROTECTED]>
+#
+# Please see the file AUTHORS for a complete list of authors.
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful, but
+# WITHOUT ANY WARRANTY; without even the implied warranty of MER-
+# CHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
+# Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+#
+# -----------------------------------------------------------------------------
+
+import types
+import socket
+import errno
+import logging
+import os
+import cPickle
+import pickle
+import struct
+import sys
+import sha
+import time
+
+import kaa
+import kaa.notifier
+
+log = logging.getLogger('ipc')
+
+class Server(object):
+
+ def __init__(self, address, auth_secret = ''):
+
+ self._auth_secret = auth_secret
+ if type(address) in types.StringTypes:
+ if address.find('/') == -1:
+ # create socket in kaa temp dir
+ address = '%s/%s' % (kaa.TEMP, address)
+
+ if os.path.exists(address):
+ # maybe a server is already running at this address, test it
+ try:
+ s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+ s.connect(address)
+ except socket.error, (err, msg):
+ if err == errno.ECONNREFUSED:
+ # not running, everything is fine
+ log.info('remove socket from dead server')
+ else:
+ # some error we do not expect
+ raise socket.error(err, msg)
+ else:
+ # server already running
+ raise IOError('server already running')
+ os.unlink(address)
+ self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+ kaa.signals["shutdown"].connect_weak(self.close)
+
+ elif type(address) == tuple:
+ self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+
+ self.socket.setblocking(False)
+ self.socket.bind(address)
+ self.socket.listen(5)
+ self._monitor =
kaa.notifier.WeakSocketDispatcher(self.handle_connection)
+ self._monitor.register(self.socket.fileno())
+ # Remove socket file and close clients on shutdown
+ kaa.signals["shutdown"].connect_weak(self.close)
+
+ self.signals = {
+ "client_connected": kaa.notifier.Signal(),
+ }
+ self.objects = []
+
+
+ def handle_connection(self):
+ client_sock = self.socket.accept()[0]
+ client_sock.setblocking(False)
+ log.error("New connection %s", client_sock)
+ client = Channel(socket = client_sock, auth_secret = self._auth_secret)
+ for obj in self.objects:
+ client.connect(obj)
+ self.signals["client_connected"].emit(client)
+ client._request_auth_packet()
+
+
+ def close(self):
+ self.socket = None
+ self._monitor.unregister()
+ kaa.signals["shutdown"].disconnect(self.close)
+
+
+ def connect(self, obj):
+ self.objects.append(obj)
+
+
+class RPC(kaa.notifier.Callback):
+ """
+ Client part of RPC. An object of this class has a call function to do
+ the real calling of the RPC.
+ """
+ def __init__(self, channel, command, function=None, *args, **kwargs):
+ if callable(function):
+ super(RPC, self).__init__(function, *args, **kwargs)
+ else:
+ self.handle_return = function
+ self.channel = channel
+ self.command = command
+
+ def handle_return(self, result):
+ super(RPC, self).__call__(result)
+
+ def __call__(self, *args, **kwargs):
+ return self.channel._send_rpc(self.command, args, kwargs,
self.handle_return)
+
+
+class Channel(object):
+
+ def __init__(self, socket, auth_secret):
+ self._socket = socket
+
+ self._rmon = kaa.notifier.SocketDispatcher(self._handle_read)
+ self._rmon.register(self._socket.fileno(), kaa.notifier.IO_READ)
+ self._wmon = kaa.notifier.SocketDispatcher(self._handle_write)
+ self._authenticated = False
+ self._write_buffer = ''
+ self._read_buffer = []
+ self._callbacks = {}
+ self._next_seq = 1
+ self._rpc_in_progress = {}
+ self._auth_secret = auth_secret
+ self._pending_challenge = None
+
+ self.signals = { 'closed': kaa.notifier.Signal() }
+
+
+ def connect(self, obj):
+ for func in [ getattr(obj, func) for func in dir(obj) ]:
+ if callable(func) and hasattr(func, '_kaa_rpc'):
+ self._callbacks[func._kaa_rpc] = func
+
+
+ def _send_rpc(self, function, args, kwargs, callback):
+ if not self._wmon:
+ raise IOError('channel is disconnected')
+ seq = self._next_seq
+ self._next_seq += 1
+ packet_type = 'CALL'
+ payload = cPickle.dumps((function, args, kwargs),
pickle.HIGHEST_PROTOCOL)
+ self._send_packet(seq, packet_type, len(payload), payload)
+ if callable(callback):
+ self._rpc_in_progress[seq] = callback
+ return seq
+ if callback == 'blocking':
+ self._rpc_in_progress[seq] = True, True, None
+ while self._rpc_in_progress[seq][0]:
+ # print 'step'
+ kaa.notifier.step()
+ ok, result = self._rpc_in_progress[seq][1:]
+ del self._rpc_in_progress[seq]
+ if not ok:
+ raise Exception(result)
+ return result
+ return seq
+
+
+ def rpc(self, function, *args, **kwargs):
+ return RPC(self, function, *args, **kwargs)
+
+
+ def _handle_close(self):
+ log.info('close socket for %s', self)
+ self._socket.close()
+ if self._wmon.active():
+ self._wmon.unregister()
+ if self._rmon.active():
+ self._rmon.unregister()
+ self._wmon = self._rmon = None
+ self.signals['closed'].emit()
+ self.signals = {}
+
+
+ def _handle_read(self):
+ try:
+ data = self._socket.recv(1024*1024)
+ except socket.error, (err, msg):
+ if err == errno.EAGAIN:
+ # Resource temporarily unavailable -- we are trying to read
+ # data on a socket when none is available.
+ return
+ # If we're here, then the socket is likely disconnected.
+ data = None
+ except:
+ log.exception('_handle_read failed, close socket')
+ data = None
+
+ if not data:
+ log.info('no data received')
+ self._handle_close()
+ # Return False to cause notifier to remove fd handler.
+ return False
+
+ header_size = struct.calcsize("I4sI")
+ self._read_buffer.append(data)
+ # Before we start into the loop, make sure we have enough data for
+ # a full packet. For very large packets (if we just received a huge
+ # pickled object), this saves the string.join() which can be very
+ # expensive. (This is the reason we use a list for our read buffer.)
+ buflen = reduce(lambda x, y: x + len(y), self._read_buffer, 0)
+ if buflen < header_size:
+ return
+
+ # Ensure the first block in the read buffer is big enough for a full
+ # packet header. If it isn't, then we must have more than 1 block in
+ # the buffer, so keep merging blocks until we have a block big enough
+ # to be a header. If we're here, it means that buflen >= header_size,
+ # so we can safely loop.
+ while len(self._read_buffer[0]) < header_size:
+ self._read_buffer[0] += self._read_buffer.pop(1)
+
+ # Make sure the the buffer holds enough data as indicated by the
+ # payload size in the header.
+ payload_len = struct.unpack("I4sI",
self._read_buffer[0][:header_size])[2]
+ if buflen < payload_len + header_size:
+ return
+
+ # At this point we know we have enough data in the buffer for the
+ # packet, so we merge the array into a single buffer.
+ strbuf = ''.join(self._read_buffer)
+ self._read_buffer = []
+ while 1:
+ if len(strbuf) <= header_size:
+ if len(strbuf) > 0:
+ self._read_buffer.append(str(strbuf))
+ break
+ seq, packet_type, payload_len = struct.unpack("I4sI",
strbuf[:header_size])
+ if len(strbuf) < payload_len + header_size:
+ # We've also received portion of another packet that we
+ # haven't fully received yet. Put back to the buffer what
+ # we have so far, and we can exit the loop.
+ self._read_buffer.append(str(strbuf))
+ break
+
+ # Grab the payload for this packet, and shuffle strbuf to the
+ # next packet.
+ payload = strbuf[header_size:header_size + payload_len]
+ strbuf = buffer(strbuf, header_size + payload_len)
+ self._handle_packet(seq, packet_type, payload)
+
+
+ def _send_packet(self, seq, packet_type, length, payload):
+ header = struct.pack("I4sI", seq, packet_type, length)
+ if not self._authenticated:
+ if packet_type in ('RESP', 'AUTH'):
+ self._write_buffer = header + payload + self._write_buffer
+ if not self._wmon.active():
+ self._wmon.register(self._socket.fileno(),
kaa.notifier.IO_WRITE)
+ return True
+ log.info('delay packet %s', packet_type)
+ self._write_buffer += header + payload
+ return True
+
+ self._write_buffer += header + payload
+ if not self._wmon.active():
+ self._wmon.register(self._socket.fileno(), kaa.notifier.IO_WRITE)
+
+
+ def _handle_write(self):
+ if not len(self._write_buffer):
+ return False
+ try:
+ sent = self._socket.send(self._write_buffer)
+ self._write_buffer = self._write_buffer[sent:]
+ if not self._write_buffer:
+ self._wmon.unregister()
+ except socket.error, (err, msg):
+ if err == errno.EAGAIN:
+ # Resource temporarily unavailable -- we are trying to write
+ # data to a socket when none is available.
+ return
+ # If we're here, then the socket is likely disconnected.
+ self._handle_close()
+ return False
+ return True
+
+
+ def _handle_packet(self, seq, type, payload):
+ if not self._authenticated:
+ if type == 'AUTH':
+ response, salt = self._get_challenge_response(payload)
+ payload = struct.pack("20s20s20s", payload, response, salt)
+ self._send_packet(seq, 'RESP', len(payload), payload)
+ self._authenticated = True
+ return True
+
+ if type == 'RESP':
+ challenge, response, salt = struct.unpack("20s20s20s", payload)
+ if response == self._get_challenge_response(challenge,
salt)[0] \
+ and challenge == self._pending_challenge:
+ self._authenticated = True
+ return True
+ log.error('authentication error')
+ return True
+
+ log.error('got %s before challenge response is complete', type)
+ return True
+
+ if type == 'CALL':
+ # Remote function call, send answer
+ payload = cPickle.loads(payload)
+ function, args, kwargs = payload
+ try:
+ payload = self._callbacks[function](*args, **kwargs)
+ packet_type = 'RETN'
+ except (SystemExit, KeyboardInterrupt):
+ sys.exit(0)
+ except Exception, e:
+ log.exception('rpc call %s', function)
+ packet_type = 'EXCP'
+ payload = e
+ payload = cPickle.dumps(payload, pickle.HIGHEST_PROTOCOL)
+ self._send_packet(seq, packet_type, len(payload), payload)
+ return True
+
+ if type == 'RETN':
+ # RPC return
+ payload = cPickle.loads(payload)
+ callback = self._rpc_in_progress.get(seq)
+ if not callback:
+ return True
+ if callable(callback):
+ del self._rpc_in_progress[seq]
+ try:
+ callback(payload)
+ except (SystemExit, KeyboardInterrupt):
+ sys.exit(0)
+ except Exception, e:
+ log.exception('rpc return')
+ return True
+ self._rpc_in_progress[seq] = False, True, payload
+ return True
+
+ if type == 'EXCP':
+ # Exception for remote call
+ error = cPickle.loads(payload)
+ callback = self._rpc_in_progress.get(seq)
+ if not callback:
+ return True
+ if callable(callback):
+ # nothing to do
+ del self._rpc_in_progress[seq]
+ log.error(error)
+ return True
+ self._rpc_in_progress[seq] = False, False, error
+ return True
+
+ log.error('unknown packet type %s', type)
+ return True
+
+
+ def _request_auth_packet(self):
+ """
+ Request an auth packet response for initial setup.
+ """
+ rbytes = file("/dev/urandom").read(64)
+ self._pending_challenge = sha.sha(str(time.time()) + rbytes).digest()
+ self._send_packet(0, 'AUTH', 20, self._pending_challenge)
+
+
+ def _get_challenge_response(self, challenge, salt = None):
+ """
+ Generate a response for the challenge based on the auth secret
+ supplied to the constructor. This hashes twice to prevent against
+ certain attacks on the hash function. If salt is not None, it is
+ the value generated by the remote end that was used in computing
+ their response. If it is None, a new 20-byte salt is generated
+ and used in computing our response.
+
+ """
+ if salt == None:
+ rbytes = file("/dev/urandom").read(64)
+ salt = sha.sha(str(time.time()) + rbytes).digest()
+ m = challenge + self._auth_secret + salt
+ return sha.sha(sha.sha(m).digest() + m).digest(), salt
+
+
+class Client(Channel):
+ def __init__(self, address, auth_secret = ''):
+ if type(address) in types.StringTypes:
+ address = '%s/%s' % (kaa.TEMP, address)
+ fd = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+ if type(address) == tuple:
+ fd = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ fd.connect(address)
+ fd.setblocking(False)
+ Channel.__init__(self, fd, auth_secret)
+
+
+def expose(command):
+ """
+ Decorator to expose a function.
+ """
+ def decorator(func):
+ func._kaa_rpc = command
+ return func
+ return decorator
Using Tomcat but need to do more? Need to support web services, security?
Get stuff done quickly with pre-integrated technology to make your job easier
Download IBM WebSphere Application Server v.1.0.1 based on Apache Geronimo
http://sel.as-us.falkag.net/sel?cmd=lnk&kid=120709&bid=263057&dat=121642
_______________________________________________
Freevo-cvslog mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/freevo-cvslog