Author: dmeyer
Date: Fri Jun 23 18:46:40 2006
New Revision: 1551

Added:
   trunk/base/src/rpc.py

Log:
Add a simple rpc module. It has much less features
than kaa.ipc but it is possible to use this without
any blocking (step() calling)


Added: trunk/base/src/rpc.py
==============================================================================
--- (empty file)
+++ trunk/base/src/rpc.py       Fri Jun 23 18:46:40 2006
@@ -0,0 +1,435 @@
+# -*- coding: iso-8859-1 -*-
+# -----------------------------------------------------------------------------
+# rpc.py - Simple RPC InterProcessCommunication
+# -----------------------------------------------------------------------------
+# $Id$
+#
+# This module defines an alternative way for InterProcessCommunication with
+# less features than the ipc.py module. It does not keep references, return
+# values are only given back as a callback and it is only possible to access
+# functions.
+#
+# So wy use this module and not kaa.ipc? Well, kaa.ipc makes it very easy to
+# shoot yourself into the foot. It keeps references over ipc which could
+# confuse the garbage collector and a simple function call on an object can
+# result in many notifier steps incl. recursion inside the notifier.
+#
+# -----------------------------------------------------------------------------
+# Copyright (C) 2006 Dirk Meyer, et al.
+#
+# First Version: Dirk Meyer <[EMAIL PROTECTED]>
+# Maintainer:    Dirk Meyer <[EMAIL PROTECTED]>
+#
+# Please see the file AUTHORS for a complete list of authors.
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful, but
+# WITHOUT ANY WARRANTY; without even the implied warranty of MER-
+# CHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
+# Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+#
+# -----------------------------------------------------------------------------
+
+import types
+import socket
+import errno
+import logging
+import os
+import cPickle
+import pickle
+import struct
+import sys
+import sha
+import time
+
+import kaa
+import kaa.notifier
+
+log = logging.getLogger('ipc')
+
+class Server(object):
+
+    def __init__(self, address, auth_secret = ''):
+
+        self._auth_secret = auth_secret
+        if type(address) in types.StringTypes:
+            if address.find('/') == -1:
+                # create socket in kaa temp dir
+                address = '%s/%s' % (kaa.TEMP, address)
+                
+            if os.path.exists(address):
+                # maybe a server is already running at this address, test it
+                try:
+                    s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+                    s.connect(address)
+                except socket.error, (err, msg):
+                    if err == errno.ECONNREFUSED:
+                        # not running, everything is fine
+                        log.info('remove socket from dead server')
+                    else:
+                        # some error we do not expect
+                        raise socket.error(err, msg)
+                else:
+                    # server already running
+                    raise IOError('server already running')
+                os.unlink(address)
+            self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+            kaa.signals["shutdown"].connect_weak(self.close)
+            
+        elif type(address) == tuple:
+            self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+            self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+
+        self.socket.setblocking(False)
+        self.socket.bind(address)
+        self.socket.listen(5)
+        self._monitor = 
kaa.notifier.WeakSocketDispatcher(self.handle_connection)
+        self._monitor.register(self.socket.fileno())
+        # Remove socket file and close clients on shutdown
+        kaa.signals["shutdown"].connect_weak(self.close)
+
+        self.signals = {
+            "client_connected": kaa.notifier.Signal(),
+        }
+        self.objects = []
+        
+
+    def handle_connection(self):
+        client_sock = self.socket.accept()[0]
+        client_sock.setblocking(False)
+        log.error("New connection %s", client_sock)
+        client = Channel(socket = client_sock, auth_secret = self._auth_secret)
+        for obj in self.objects:
+            client.connect(obj)
+        self.signals["client_connected"].emit(client)
+        client._request_auth_packet()
+
+
+    def close(self):
+        self.socket = None
+        self._monitor.unregister()
+        kaa.signals["shutdown"].disconnect(self.close)
+
+    
+    def connect(self, obj):
+        self.objects.append(obj)
+        
+
+class RPC(kaa.notifier.Callback):
+    """
+    Client part of RPC. An object of this class has a call function to do
+    the real calling of the RPC.
+    """
+    def __init__(self, channel, command, function=None, *args, **kwargs):
+        if callable(function):
+            super(RPC, self).__init__(function, *args, **kwargs)
+        else:
+            self.handle_return = function
+        self.channel = channel
+        self.command = command
+
+    def handle_return(self, result):
+        super(RPC, self).__call__(result)
+
+    def __call__(self, *args, **kwargs):
+        return self.channel._send_rpc(self.command, args, kwargs, 
self.handle_return)
+
+
+class Channel(object):
+    
+    def __init__(self, socket, auth_secret):
+        self._socket = socket
+
+        self._rmon = kaa.notifier.SocketDispatcher(self._handle_read)
+        self._rmon.register(self._socket.fileno(), kaa.notifier.IO_READ)
+        self._wmon = kaa.notifier.SocketDispatcher(self._handle_write)
+        self._authenticated = False
+        self._write_buffer = ''
+        self._read_buffer = []
+        self._callbacks = {}
+        self._next_seq = 1
+        self._rpc_in_progress = {}
+        self._auth_secret = auth_secret
+        self._pending_challenge = None
+
+        self.signals = { 'closed': kaa.notifier.Signal() }
+        
+        
+    def connect(self, obj):
+        for func in [ getattr(obj, func) for func in dir(obj) ]:
+            if callable(func) and hasattr(func, '_kaa_rpc'):
+                self._callbacks[func._kaa_rpc] = func
+
+        
+    def _send_rpc(self, function, args, kwargs, callback):
+        if not self._wmon:
+            raise IOError('channel is disconnected')
+        seq = self._next_seq
+        self._next_seq += 1
+        packet_type = 'CALL'
+        payload = cPickle.dumps((function, args, kwargs), 
pickle.HIGHEST_PROTOCOL)
+        self._send_packet(seq, packet_type, len(payload), payload)
+        if callable(callback):
+            self._rpc_in_progress[seq] = callback
+            return seq
+        if callback == 'blocking':
+            self._rpc_in_progress[seq] = True, True, None
+            while self._rpc_in_progress[seq][0]:
+                # print 'step'
+                kaa.notifier.step()
+            ok, result = self._rpc_in_progress[seq][1:]
+            del self._rpc_in_progress[seq]
+            if not ok:
+                raise Exception(result)
+            return result
+        return seq
+
+    
+    def rpc(self, function, *args, **kwargs):
+        return RPC(self, function, *args, **kwargs)
+
+    
+    def _handle_close(self):
+        log.info('close socket for %s', self)
+        self._socket.close()
+        if self._wmon.active():
+            self._wmon.unregister()
+        if self._rmon.active():
+            self._rmon.unregister()
+        self._wmon = self._rmon = None
+        self.signals['closed'].emit()
+        self.signals = {}
+
+        
+    def _handle_read(self):
+        try:
+            data = self._socket.recv(1024*1024)
+        except socket.error, (err, msg):
+            if err == errno.EAGAIN:
+                # Resource temporarily unavailable -- we are trying to read
+                # data on a socket when none is available.
+                return
+            # If we're here, then the socket is likely disconnected.
+            data = None
+        except:
+            log.exception('_handle_read failed, close socket')
+            data = None
+
+        if not data:
+            log.info('no data received')
+            self._handle_close()
+            # Return False to cause notifier to remove fd handler.
+            return False
+
+        header_size = struct.calcsize("I4sI")
+        self._read_buffer.append(data)
+        # Before we start into the loop, make sure we have enough data for
+        # a full packet.  For very large packets (if we just received a huge
+        # pickled object), this saves the string.join() which can be very 
+        # expensive.  (This is the reason we use a list for our read buffer.)
+        buflen = reduce(lambda x, y: x + len(y), self._read_buffer, 0)
+        if buflen < header_size:
+            return
+
+        # Ensure the first block in the read buffer is big enough for a full
+        # packet header.  If it isn't, then we must have more than 1 block in
+        # the buffer, so keep merging blocks until we have a block big enough
+        # to be a header.  If we're here, it means that buflen >= header_size,
+        # so we can safely loop.
+        while len(self._read_buffer[0]) < header_size:
+            self._read_buffer[0] += self._read_buffer.pop(1)
+
+        # Make sure the the buffer holds enough data as indicated by the
+        # payload size in the header.
+        payload_len = struct.unpack("I4sI", 
self._read_buffer[0][:header_size])[2]
+        if buflen < payload_len + header_size:
+            return
+
+        # At this point we know we have enough data in the buffer for the
+        # packet, so we merge the array into a single buffer.
+        strbuf = ''.join(self._read_buffer)
+        self._read_buffer = []
+        while 1:
+            if len(strbuf) <= header_size:
+                if len(strbuf) > 0:
+                    self._read_buffer.append(str(strbuf))
+                break
+            seq, packet_type, payload_len = struct.unpack("I4sI", 
strbuf[:header_size])
+            if len(strbuf) < payload_len + header_size:
+                # We've also received portion of another packet that we
+                # haven't fully received yet.  Put back to the buffer what
+                # we have so far, and we can exit the loop.
+                self._read_buffer.append(str(strbuf))
+                break
+
+            # Grab the payload for this packet, and shuffle strbuf to the
+            # next packet.
+            payload = strbuf[header_size:header_size + payload_len]
+            strbuf = buffer(strbuf, header_size + payload_len)
+            self._handle_packet(seq, packet_type, payload)
+
+
+    def _send_packet(self, seq, packet_type, length, payload):
+        header = struct.pack("I4sI", seq, packet_type, length)
+        if not self._authenticated:
+            if packet_type in ('RESP', 'AUTH'):
+                self._write_buffer = header + payload + self._write_buffer
+                if not self._wmon.active():
+                    self._wmon.register(self._socket.fileno(), 
kaa.notifier.IO_WRITE)
+                return True
+            log.info('delay packet %s', packet_type)
+            self._write_buffer += header + payload
+            return True
+
+        self._write_buffer += header + payload
+        if not self._wmon.active():
+            self._wmon.register(self._socket.fileno(), kaa.notifier.IO_WRITE)
+
+
+    def _handle_write(self):
+        if not len(self._write_buffer):
+            return False
+        try:
+            sent = self._socket.send(self._write_buffer)
+            self._write_buffer = self._write_buffer[sent:]
+            if not self._write_buffer:
+                self._wmon.unregister()
+        except socket.error, (err, msg):
+            if err == errno.EAGAIN:
+                # Resource temporarily unavailable -- we are trying to write
+                # data to a socket when none is available.
+                return
+            # If we're here, then the socket is likely disconnected.
+            self._handle_close()
+            return False
+        return True
+    
+
+    def _handle_packet(self, seq, type, payload):
+        if not self._authenticated:
+            if type == 'AUTH':
+                response, salt = self._get_challenge_response(payload)
+                payload = struct.pack("20s20s20s", payload, response, salt)
+                self._send_packet(seq, 'RESP', len(payload), payload)
+                self._authenticated = True
+                return True
+            
+            if type == 'RESP':
+                challenge, response, salt = struct.unpack("20s20s20s", payload)
+                if response == self._get_challenge_response(challenge, 
salt)[0] \
+                   and challenge == self._pending_challenge:
+                    self._authenticated = True
+                    return True
+                log.error('authentication error')
+                return True
+
+            log.error('got %s before challenge response is complete', type)
+            return True
+        
+        if type == 'CALL':
+            # Remote function call, send answer
+            payload = cPickle.loads(payload)
+            function, args, kwargs = payload
+            try:
+                payload = self._callbacks[function](*args, **kwargs)
+                packet_type = 'RETN'
+            except (SystemExit, KeyboardInterrupt):
+                sys.exit(0)
+            except Exception, e:
+                log.exception('rpc call %s', function)
+                packet_type = 'EXCP'
+                payload = e
+            payload = cPickle.dumps(payload, pickle.HIGHEST_PROTOCOL)
+            self._send_packet(seq, packet_type, len(payload), payload)
+            return True
+
+        if type == 'RETN':
+            # RPC return
+            payload = cPickle.loads(payload)
+            callback = self._rpc_in_progress.get(seq)
+            if not callback:
+                return True
+            if callable(callback):
+                del self._rpc_in_progress[seq]
+                try:
+                    callback(payload)
+                except (SystemExit, KeyboardInterrupt):
+                    sys.exit(0)
+                except Exception, e:
+                    log.exception('rpc return')
+                return True
+            self._rpc_in_progress[seq] = False, True, payload
+            return True
+            
+        if type == 'EXCP':
+            # Exception for remote call
+            error = cPickle.loads(payload)
+            callback = self._rpc_in_progress.get(seq)
+            if not callback:
+                return True
+            if callable(callback):
+                # nothing to do
+                del self._rpc_in_progress[seq]
+                log.error(error)
+                return True
+            self._rpc_in_progress[seq] = False, False, error
+            return True
+        
+        log.error('unknown packet type %s', type)
+        return True
+
+                
+    def _request_auth_packet(self):
+        """
+        Request an auth packet response for initial setup.
+        """
+        rbytes = file("/dev/urandom").read(64)
+        self._pending_challenge = sha.sha(str(time.time()) + rbytes).digest()
+        self._send_packet(0, 'AUTH', 20, self._pending_challenge)
+
+
+    def _get_challenge_response(self, challenge, salt = None):
+        """
+        Generate a response for the challenge based on the auth secret
+        supplied to the constructor.  This hashes twice to prevent against
+        certain attacks on the hash function.  If salt is not None, it is
+        the value generated by the remote end that was used in computing 
+        their response.  If it is None, a new 20-byte salt is generated
+        and used in computing our response.  
+        
+        """
+        if salt == None:
+            rbytes = file("/dev/urandom").read(64)
+            salt = sha.sha(str(time.time()) + rbytes).digest()
+        m = challenge + self._auth_secret + salt
+        return sha.sha(sha.sha(m).digest() + m).digest(), salt
+
+
+class Client(Channel):
+    def __init__(self, address, auth_secret = ''):
+        if type(address) in types.StringTypes:
+            address = '%s/%s' % (kaa.TEMP, address)
+            fd = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+        if type(address) == tuple:
+            fd = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+        fd.connect(address)
+        fd.setblocking(False)
+        Channel.__init__(self, fd, auth_secret)
+
+
+def expose(command):
+    """
+    Decorator to expose a function.
+    """
+    def decorator(func):
+        func._kaa_rpc = command
+        return func
+    return decorator

Using Tomcat but need to do more? Need to support web services, security?
Get stuff done quickly with pre-integrated technology to make your job easier
Download IBM WebSphere Application Server v.1.0.1 based on Apache Geronimo
http://sel.as-us.falkag.net/sel?cmd=lnk&kid=120709&bid=263057&dat=121642
_______________________________________________
Freevo-cvslog mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/freevo-cvslog

Reply via email to