Author: aum
Date: 2006-05-12 03:47:00 +0000 (Fri, 12 May 2006)
New Revision: 8673

Modified:
   trunk/apps/pyFreenet/code.leo
   trunk/apps/pyFreenet/fcp.py
   trunk/apps/pyFreenet/sitemgr.py
Log:
Implemented asynchronous mode requests.
PyFcp is now threadsafe for the basic primitives, whether
these primitives are executed synchronously or asynchronously.

All primitives can now be executed with 'async=True' which
will cause them to return a 'job ticket' object, which can
then be polled for completion or waited on in a blocking fashion.

By default, primitives still execute synchronously

TODO:
 - implement callbacks
 - implement persistent requests



Modified: trunk/apps/pyFreenet/code.leo
===================================================================
--- trunk/apps/pyFreenet/code.leo       2006-05-12 02:43:18 UTC (rev 8672)
+++ trunk/apps/pyFreenet/code.leo       2006-05-12 03:47:00 UTC (rev 8673)
@@ -1,8 +1,8 @@
 <?xml version="1.0" encoding="UTF-8"?>
 <leo_file>
 <leo_header file_format="2" tnodes="0" max_tnode_index="6" clone_windows="0"/>
-<globals body_outline_ratio="0.290740740741">
-       <global_window_position top="21" left="104" height="668" width="1080"/>
+<globals body_outline_ratio="0.265866209262">
+       <global_window_position top="147" left="74" height="694" width="1166"/>
        <global_log_window_position top="0" left="0" height="0" width="0"/>
 </globals>
 <preferences/>
@@ -132,21 +132,24 @@
 <v t="aum.20060509223528.119"><vh>exceptions</vh></v>
 </v>
 <v t="aum.20060511003500" tnodeList="aum.20060511003500"><vh>@file 
test.py</vh></v>
-<v t="aum.20060511101147" a="E" 
tnodeList="aum.20060511101147,aum.20060511113333,aum.20060511113333.1,aum.20060511114439,aum.20060511114439.1,aum.20060511114439.2,aum.20060511120059,aum.20060511114604,aum.20060511114604.1,aum.20060511113333.3,aum.20060511130507,aum.20060511120024"><vh>@nosent
 sitemgr.py</vh>
+<v t="aum.20060512152233" a="V" tnodeList="aum.20060512152233"><vh>@file 
genkey.py</vh></v>
+<v t="aum.20060512140230" tnodeList="aum.20060512140230"><vh>@file 
updatesites.py</vh></v>
+<v t="aum.20060511101147" a="E" 
tnodeList="aum.20060511101147,aum.20060511113333,aum.20060511113333.1,aum.20060511114439,aum.20060511114439.1,aum.20060512150118,aum.20060511114439.2,aum.20060511120059,aum.20060511114604,aum.20060511114604.1,aum.20060511113333.3,aum.20060511130507,aum.20060511120024"><vh>@nosent
 sitemgr.py</vh>
 <v t="aum.20060511113333"><vh>imports</vh></v>
 <v t="aum.20060511113333.1"><vh>config</vh></v>
 <v t="aum.20060511114439" a="E"><vh>class SiteMgr</vh>
 <v t="aum.20060511114439.1"><vh>__init__</vh></v>
+<v t="aum.20060512150118"><vh>__del__</vh></v>
 <v t="aum.20060511114439.2"><vh>createConfig</vh></v>
 <v t="aum.20060511120059"><vh>createNode</vh></v>
 <v t="aum.20060511114604"><vh>loadConfig</vh></v>
 <v t="aum.20060511114604.1"><vh>saveConfig</vh></v>
 <v t="aum.20060511113333.3"><vh>update</vh></v>
 </v>
-<v t="aum.20060511130507" a="V"><vh>help</vh></v>
+<v t="aum.20060511130507"><vh>help</vh></v>
 <v t="aum.20060511120024"><vh>mainline</vh></v>
 </v>
-<v t="aum.20060506215707" a="E" 
tnodeList="aum.20060506215707,aum.20060506215707.1,aum.20060506220237,aum.20060506215707.2,aum.20060506215707.3,aum.20060506220237.1,aum.20060506220237.2,aum.20060506224238,aum.20060506224238.1,aum.20060506231352,aum.20060507003931,aum.20060511001853,aum.20060506231352.1,aum.20060506232639,aum.20060506232639.1,aum.20060506223545,aum.20060506231352.2,aum.20060506220856,aum.20060506222005,aum.20060507124316,aum.20060511103841,aum.20060511103841.1,aum.20060511103952,aum.20060511103952.1,aum.20060509184020,aum.20060507155016,aum.20060507162314,aum.20060507162543,aum.20060507162314.1,aum.20060509194923,aum.20060507162314.2,aum.20060507162314.3,aum.20060507162543.1,aum.20060507154638,aum.20060507163143,aum.20060509184020.1,aum.20060509184020.2,aum.20060509224119,aum.20060509224221,aum.20060507195029,aum.20060507195029.1,aum.20060506224545"><vh>@nosent
 fcp.py</vh>
+<v t="aum.20060506215707" a="E" 
tnodeList="aum.20060506215707,aum.20060506215707.1,aum.20060506220237,aum.20060506215707.2,aum.20060506215707.3,aum.20060506220237.1,aum.20060506220237.2,aum.20060506224238,aum.20060506224238.1,aum.20060506231352.1,aum.20060506231352,aum.20060507003931,aum.20060511001853,aum.20060511205201,aum.20060506232639,aum.20060506232639.1,aum.20060511222538,aum.20060512101715,aum.20060511205201.1,aum.20060511205201.2,aum.20060506223545,aum.20060506231352.2,aum.20060506220856,aum.20060506222005,aum.20060507124316,aum.20060511103841,aum.20060511103841.1,aum.20060511103952,aum.20060511103952.1,aum.20060512102840,aum.20060509184020,aum.20060507155016,aum.20060507162314,aum.20060507162543,aum.20060507162314.1,aum.20060509194923,aum.20060507162314.2,aum.20060507162314.3,aum.20060507162543.1,aum.20060507154638,aum.20060507163143,aum.20060509184020.1,aum.20060509184020.2,aum.20060509224119,aum.20060509224221,aum.20060507195029,aum.20060507195029.1,aum.20060506224545"><vh>@nosent
 fcp.py</vh>
 <v t="aum.20060506215707.1"><vh>imports</vh></v>
 <v t="aum.20060506220237"><vh>exceptions</vh></v>
 <v t="aum.20060506215707.2"><vh>globals</vh></v>
@@ -154,27 +157,33 @@
 <v t="aum.20060506220237.1"><vh>__init__</vh></v>
 <v t="aum.20060506220237.2"><vh>__del__</vh></v>
 <v t="aum.20060506224238" a="E"><vh>High Level Methods</vh>
-<v t="aum.20060506224238.1"><vh>hello</vh></v>
+<v t="aum.20060506224238.1"><vh>_hello</vh></v>
+<v t="aum.20060506231352.1"><vh>genkey</vh></v>
 <v t="aum.20060506231352"><vh>get</vh></v>
 <v t="aum.20060507003931"><vh>put</vh></v>
 <v t="aum.20060511001853"><vh>putdir</vh></v>
-<v t="aum.20060506231352.1"><vh>genkey</vh></v>
+<v t="aum.20060511205201"><vh>shutdown</vh></v>
 </v>
-<v t="aum.20060506232639"><vh>Receiver Thread</vh>
-<v t="aum.20060506232639.1"><vh>_rxThread</vh></v>
+<v t="aum.20060506232639" a="E"><vh>Manager Thread</vh>
+<v t="aum.20060506232639.1"><vh>_mgrThread</vh></v>
+<v t="aum.20060511222538"><vh>_msgIncoming</vh></v>
+<v t="aum.20060512101715"><vh>_submitCmd</vh></v>
+<v t="aum.20060511205201.1"><vh>_on_rxMsg</vh></v>
+<v t="aum.20060511205201.2"><vh>_on_clientReq</vh></v>
 </v>
 <v t="aum.20060506223545" a="E"><vh>Low Level Methods</vh>
 <v t="aum.20060506231352.2"><vh>_getUniqueId</vh></v>
-<v t="aum.20060506220856"><vh>_sendMessage</vh></v>
-<v t="aum.20060506222005"><vh>_receiveMessage</vh></v>
+<v t="aum.20060506220856"><vh>_txMsg</vh></v>
+<v t="aum.20060506222005"><vh>_rxMsg</vh></v>
 <v t="aum.20060507124316"><vh>_log</vh></v>
 </v>
+</v>
 <v t="aum.20060511103841" a="E"><vh>class JobTicket</vh>
 <v t="aum.20060511103841.1"><vh>__init__</vh></v>
 <v t="aum.20060511103952"><vh>isDone</vh></v>
 <v t="aum.20060511103952.1"><vh>wait</vh></v>
+<v t="aum.20060512102840"><vh>_putResult</vh></v>
 </v>
-</v>
 <v t="aum.20060509184020" a="E"><vh>XML-RPC Server</vh>
 <v t="aum.20060507155016" a="E"><vh>class FreenetXMLRPCRequest</vh>
 <v t="aum.20060507162314"><vh>__init__</vh></v>
@@ -227,25 +236,35 @@
 @others

 </t>
-<t tx="aum.20060506215707.1">import sys, os, socket, time, thread, threading, 
mimetypes, sha
+<t tx="aum.20060506215707.1">import sys, os, socket, time, thread
+import threading, mimetypes, sha, Queue
+import select, traceback

 from SimpleXMLRPCServer import SimpleXMLRPCServer

 </t>
-<t tx="aum.20060506215707.2">defaultFCPHost = "127.0.0.1"
+<t tx="aum.20060506215707.2"># where we can find the freenet node FCP port
+defaultFCPHost = "127.0.0.1"
 defaultFCPPort = 9481

+# where to listen, for the xml-rpc server
 xmlrpcHost = "127.0.0.1"
 xmlrpcPort = 19481

+# poll timeout period for manager thread
+pollTimeout = 0.1
+#pollTimeout = 3
+
 # list of keywords sent from node to client, which have
 # int values
 intKeys = [
     'DataLength', 'Code',
     ]

+# for the FCP 'ClientHello' handshake
 expectedVersion="2.0"

+# logger verbosity levels
 SILENT = 0
 FATAL = 1
 CRITICAL = 2
@@ -308,10 +327,12 @@
         - verbosity - how detailed the log messages should be, defaults to 0
           (silence)
     """
+    # grab and save parms
     self.name = kw.get('clientName', self._getUniqueId())
     self.host = kw.get('host', defaultFCPHost)
     self.port = kw.get('port', defaultFCPPort)
-    
+
+    # set up the logger
     logfile = kw.get('logfile', sys.stderr)
     if not hasattr(logfile, 'write'):
         # might be a pathname
@@ -319,53 +340,64 @@
             raise Exception("Bad logfile, must be pathname or file object")
         logfile = file(logfile, "a")
     self.logfile = logfile
-
     self.verbosity = kw.get('verbosity', 0)

-    # try to connect
+    # try to connect to node
     self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
     self.socket.connect((self.host, self.port))

     # now do the hello
-    self.hello()
+    self._hello()

-    # the incoming response queues
-    self.pendingResponses = {} # keyed by request ID
+    # the pending job tickets
+    self.jobs = {} # keyed by request ID

-    # lock for socket operations
-    self.socketLock = threading.Lock()
+    # queue for incoming client requests
+    self.clientReqQueue = Queue.Queue()

     # launch receiver thread
-    #thread.start_new_thread(self.rxThread, ())
+    self.running = True
+    thread.start_new_thread(self._mgrThread, ())

 </t>
 <t tx="aum.20060506220237.2">def __del__(self):
     """
     object is getting cleaned up, so disconnect
     """
-    if self.socket:
-        self.socket.close()
-        del self.socket
+    # terminate the node
+    try:
+        self.shutdown()
+    except:
+        traceback.print_exc()
+        pass

-    if self.logfile not in [sys.stdout, sys.stderr]:
-        self.logfile.close()
-
 </t>
-<t tx="aum.20060506220856">def _sendMessage(self, msgType, 
sendEndMessage=True, **kw):
+<t tx="aum.20060506220856">def _txMsg(self, msgType, **kw):
     """
     low level message send

     Arguments:
         - msgType - one of the FCP message headers, such as 'ClientHello'
         - args - zero or more (keyword, value) tuples
+    Keywords:
+        - rawcmd - if given, this is the raw buffer to send
     """
+    log = self._log
+
+    # just send the raw command, if given    
+    rawcmd = kw.get('rawcmd', None)
+    if rawcmd:
+        self.socket.send(rawcmd)
+        log(DETAIL, "CLIENT: %s" % rawcmd)
+        return
+
     if kw.has_key("Data"):
         data = kw.pop("Data")
+        sendEndMessage = False
     else:
         data = None
+        sendEndMessage = True

-    log = self._log
-    
     items = [msgType + "\n"]
     log(DETAIL, "CLIENT: %s" % msgType)

@@ -393,7 +425,7 @@
     self.socket.send(raw)

 </t>
-<t tx="aum.20060506222005">def _receiveMessage(self):
+<t tx="aum.20060506222005">def _rxMsg(self):
     """
     Receives and returns a message as a dict

@@ -486,13 +518,13 @@


 </t>
-<t tx="aum.20060506224238.1">def hello(self):
+<t tx="aum.20060506224238.1">def _hello(self):

-    self._sendMessage("ClientHello", 
+    self._txMsg("ClientHello", 
                      Name=self.name,
                      ExpectedVersion=expectedVersion)

-    resp = self._receiveMessage()
+    resp = self._rxMsg()
     return resp

 </t>
@@ -506,6 +538,8 @@
     Does a direct get of a key

     Keywords:
+        - async - whether to return immediately with a job ticket object, 
default
+          False (wait for completion)
         - dsnly - whether to only check local datastore
         - ignoreds - don't check local datastore
         - file - if given, this is a pathname to which to store the retrieved 
key
@@ -519,12 +553,18 @@
         - if 'dontReturnData' is true, returns (mimetype, 1) if key is returned
     If key is not found, raises an exception
     """
+    # format the request
     opts = {}

+    id = self._getUniqueId()
+
+    opts['async'] = kw.pop('async', False)
+
     file = kw.pop("file", None)
     if file:
         opts['ReturnType'] = "disk"
-        opts['File'] = file
+        #opts['File'] = file
+        opts['Filename'] = file

     elif opts.get('nodata', False):
         nodata = True
@@ -533,7 +573,7 @@
         nodata = False
         opts['ReturnType'] = "direct"

-    opts['Identifier'] = self._getUniqueId()
+    opts['Identifier'] = id

     if kw.get("ignoreds", False):
         opts["IgnoreDS"] = "true"
@@ -553,28 +593,19 @@
     opts['PriorityClass'] = int(kw.get("priority", 1))
     opts['Global'] = "false"

-    self._sendMessage("ClientGet", **opts)
+    # now enqueue the request
+    return self._submitCmd(id, "ClientGet", **opts)
+
+
+
+    # ------------------------------------------------
+    # DEPRECATED CODE!!
+
+    self._txMsg("ClientGet", **opts)

-#ClientGet
-#IgnoreDS=false // true = ignore the datastore (in old FCP this was 
RemoveLocalKey)
-#DSOnly=false // true = only check the datastore, don't route (~= htl 0)
-#URI=KSK at gpl.txt // key to fetch
-#Identifier=Request Number One
-#Verbosity=0 // no status, just tell us when it's done
-#ReturnType=direct // return all at once over the FCP connection
-#MaxSize=100 // maximum size of returned data (all numbers in DECIMAL)
-#MaxTempSize=1000 // maximum size of intermediary data
-#MaxRetries=100 // automatic retry supported as an option; -1 means retry 
forever
-#PriorityClass=1 // priority class 1 = interactive
-#Persistence=reboot // continue until node is restarted; report progress while 
client is
-#       connected, including if it reconnects after losing connection
-#ClientToken=hello // returned in PersistentGet, a hint to the client, so the 
client
-#       doesn't need to maintain its own state
-#Global=false // see Persistence section below
-#EndMessage

     # get a response
-    resp = self._receiveMessage()
+    resp = self._rxMsg()
     hdr = resp['header']
     if hdr == 'DataFound':
         mimetype = resp['Metadata.ContentType']
@@ -585,7 +616,7 @@
         elif nodata:
             return (mimetype, 1)
         else:
-            resp = self._receiveMessage()
+            resp = self._rxMsg()
             if resp['header'] == 'AllData':
                 return (mimetype, resp['Data'])
             else:
@@ -598,47 +629,76 @@
         raise FCPException(resp)

 </t>
-<t tx="aum.20060506231352.1">def genkey(self, id=None):
+<t tx="aum.20060506231352.1">def genkey(self, **kw):
     """
     Generates and returns an SSK keypair
+    
+    Keywords:
+        - async - whether to do this call asynchronously, and
+          return a JobTicket object
     """
-    if not id:
-        id = self._getUniqueId()
+    id = self._getUniqueId()

-    self._sendMessage("GenerateSSK",
-                     Identifier=id)
-    
-    while True:
-        resp = self._receiveMessage()
-        #print resp
-        if resp['header'] == 'SSKKeypair' and str(resp['Identifier']) == id:
-            break
+    return self._submitCmd(id, "GenerateSSK", Identifier=id, **kw)

-    return resp['RequestURI'], resp['InsertURI']
+    #self._txMsg("GenerateSSK",
+    #                 Identifier=id)
+    #while True:
+    #    resp = self._rxMsg()
+    #    #print resp
+    #    if resp['header'] == 'SSKKeypair' and str(resp['Identifier']) == id:
+    #        break
+    #return resp['RequestURI'], resp['InsertURI']

 </t>
 <t tx="aum.20060506231352.2">def _getUniqueId(self):
     return "id" + str(int(time.time() * 1000000))

 </t>
-<t tx="aum.20060506232639"># methods for receiver thread
+<t tx="aum.20060506232639"># methods for manager thread

 @others

 </t>
-<t tx="aum.20060506232639.1">def _rxThread(self):
+<t tx="aum.20060506232639.1">def _mgrThread(self):
     """
     Receives all incoming messages
     """
-    while self.running:
-        self.socketLock.acquire()
-        self.socket.settimeout(0.1)
-        try:
-            msg = self._receiveMessage()
-        except socket.timeout:
-            self.socketLock.release()
-            continue
-        
+    log = self._log
+    try:
+        while self.running:
+
+            log(DEBUG, "Top of manager thread")
+
+            # try for incoming messages from node
+            log(DEBUG, "Testing for incoming message")
+            if self._msgIncoming():
+                log(DEBUG, "Retrieving incoming message")
+                msg = self._rxMsg()
+                log(DEBUG, "Got incoming message, dispatching")
+                self._on_rxMsg(msg)
+                log(DEBUG, "back from on_rxMsg")
+            else:
+                log(DEBUG, "No incoming message from node")
+    
+            # try for incoming requests from clients
+            log(DEBUG, "Testing for client req")
+            try:
+                req = self.clientReqQueue.get(True, pollTimeout)
+                log(DEBUG, "Got client req, dispatching")
+                self._on_clientReq(req)
+                log(DEBUG, "Back from on_clientReq")
+            except Queue.Empty:
+                log(DEBUG, "No incoming client req")
+                pass
+
+        self._log(INFO, "Manager thread terminated normally")
+        return
+
+    except:
+        traceback.print_exc()
+        self._log(CRITICAL, "manager thread crashed")
+
 </t>
 <t tx="aum.20060507003931">def put(self, uri="CHK@", **kw):
     """
@@ -668,42 +728,21 @@
     Keywords valid for all modes:
         - maxretries - maximum number of retries, default 3
         - priority - default 1
+        - async - whether to do the job asynchronously, returning a job ticket
+          object (default False)

     Notes:
         - exactly one of 'file', 'data' or 'dir' keyword arguments must be 
present
     """
-#ClientPut
-#URI=CHK@ // could as easily be an insertable SSK or KSK URI
-#Metadata.ContentType=text/html // MIME type; for text, if charset is not 
specified, node #should auto-detect it and force the auto-detected version
-#Identifier=Insert-1 // identifier, as always
-#Verbosity=0 // just report when complete
-#MaxRetries=999999 // lots of retries; -1 = retry forever
-#PriorityClass=1 // fproxy priority level
-#GetCHKOnly=false // true = don't insert the data, just return the key it 
would generate
-#Global=false // see Persistence section below
-#DontCompress=true // hint to node: don't try to compress the data, it's 
already compressed
-#ClientToken=Hello!!! // sent back to client on the PersistentPut if this is a 
persistent #request

-# the following fields decide where the data is to come from:
-
-#UploadFrom=direct // attached directly to this message
-#DataLength=100 // 100 bytes decimal
-#Data
-#&lt;data&gt;
-# or
-#UploadFrom=disk // upload a file from disk
-#Filename=/home/toad/something.html
-#End
-# or
-#UploadFrom=redirect // create a redirect to another key
-#TargetURI=KSK at gpl.txt // some other freenet URI
-#End
-
     # divert to putdir if dir keyword present
     if kw.has_key('dir'):
         return self.putdir(uri, **kw)

     opts = {}
+
+    opts['async'] = kw.get('async', False)
+
     opts['URI'] = uri
     opts['Metadata.ContentType'] = kw.get("mimetype", "text/plain")
     id = self._getUniqueId()
@@ -714,60 +753,36 @@
     opts['GetCHKOnly'] = toBool(kw.get("chkonly", "false"))
     opts['DontCompress'] = toBool(kw.get("nocompress", "false"))

-    # if inserting a freesite, scan the directory and insert each bit piecemeal
-    if kw.has_key("dir"):
-        if kw.get('usk', False):
-            uri = uri.replace("SSK@", "USK@")
-        if not uri.endswith("/"):
-            uri = uri + "/"
-
-        # form a base privkey-based URI
-        siteuri = uri + "%s/%s/" % (kw['sitename'], kw.get('version', 1))
-
+    if kw.has_key("file"):
         opts['UploadFrom'] = "disk"
-
-        # upload all files in turn - rework this later when queueing is 
implemented
-        files = readdir(kw['dir'])
-        for f in files:
-            thisuri = siteuri + f['relpath']
-            opts['file'] = f['fullpath']
-            opts['mimetype'] = f['mimetype']
-            self.put(thisuri, **opts)
-
-        # last bit - insert index.html
-        opts['file'] = os.path.join(kw['dir'], "index.html")
-        thisuri = siteuri + "index.html"
-        opts['mimetype'] = "text/html"
-        self.put(thisuri, **opts)
-        
-        return uri
-
-    elif kw.has_key("file"):
-        opts['UploadFrom'] = "disk"
         opts['Filename'] = kw['file']
         if not kw.has_key("mimetype"):
             opts['Metadata.ContentType'] = mimetypes.guess_type(kw['file'])[0] 
or "text/plain"
-        sendEnd = True

     elif kw.has_key("data"):
         opts["UploadFrom"] = "direct"
         opts["Data"] = kw['data']
-        sendEnd = False

     elif kw.has_key("redirect"):
         opts["UploadFrom"] = "redirect"
         opts["TargetURI"] = kw['redirect']
-        sendEnd = True
     else:
         raise Exception("Must specify file, data or redirect keywords")

     #print "sendEnd=%s" % sendEnd

+    # now dispatch the job
+    return self._submitCmd(id, "ClientPut", **opts)
+
+
+    # ------------------------------------------------------------
+    # DEPRECATED CODE
+
     # issue the command
-    self._sendMessage("ClientPut", sendEnd, **opts)
+    self._txMsg("ClientPut", **opts)

     # expect URIGenerated
-    resp1 = self._receiveMessage()
+    resp1 = self._rxMsg()
     hdr = resp1['header']
     if hdr != 'URIGenerated':
         raise FCPException(resp1)
@@ -780,7 +795,7 @@
             return newUri

     # expect outcome
-    resp2 = self._receiveMessage()
+    resp2 = self._rxMsg()
     hdr = resp2['header']
     if hdr == 'PutSuccessful':
         return resp2['URI']
@@ -791,6 +806,7 @@
     else:
         raise FCPException(resp2)

+
 </t>
 <t tx="aum.20060507124316">def _log(self, level, msg):
     """
@@ -6158,6 +6174,8 @@
         - maxretries - maximum number of retries, default 3
         - priority - default 1

+        - async - default False - if True, return a job ticket
+
     Returns:
         - the URI under which the freesite can be retrieved
     """
@@ -6179,7 +6197,7 @@
         uriFull = uriFull.replace("SSK@", "USK@")

     # issue the command
-    #self._sendMessage("ClientPutComplexDir", True, **opts)
+    #self._txMsg("ClientPutComplexDir", True, **opts)
     msgLines = ["ClientPutComplexDir",
                 "Identifier=%s" % id,
                 "Verbosity=0",
@@ -6217,10 +6235,21 @@
     for line in msgLines:
         self._log(DETAIL, line)
     fullbuf = "\n".join(msgLines) + "\n"
+
+    # now dispatch the job
+    return self._submitCmd(id, "ClientPutComplexDir",
+                           rawcmd=fullbuf,
+                           async=kw.get('async', False),
+                           )
+
+    # ------------------------------------------------------------
+    # DEPRECATED CODE
+
+
     self.socket.send(fullbuf)

     # expect URIGenerated
-    resp1 = self._receiveMessage()
+    resp1 = self._rxMsg()
     hdr = resp1['header']
     if hdr != 'URIGenerated':
         raise FCPException(resp1)
@@ -6228,7 +6257,7 @@
     newUri = resp1['URI']

     # expect outcome
-    resp2 = self._receiveMessage()
+    resp2 = self._rxMsg()
     hdr = resp2['header']
     if hdr == 'PutSuccessful':
         return resp2['URI']
@@ -6239,6 +6268,7 @@
     else:
         raise FCPException(resp2)

+
 </t>
 <t tx="aum.20060511003500">from fcp import *

@@ -6269,24 +6299,37 @@
     @others

 </t>
-<t tx="aum.20060511103841.1">def __init__(self, id):
+<t tx="aum.20060511103841.1">def __init__(self, id, cmd, kw):
     """
     You should never instantiate a JobTicket object yourself
     """
     self.id = id
-    self.queue = Queue.Queue()
+    self.cmd = cmd
+    self.kw = kw

+    self.lock = threading.Lock()
+    self.lock.acquire()
+    self.result = None
+
 </t>
 <t tx="aum.20060511103952">def isComplete(self):
     """
     Returns True if the job has been completed
     """
+    return self.result != None

 </t>
 <t tx="aum.20060511103952.1">def wait(self, timeout=None):
     """
     Waits forever (or for a given timeout) for a job to complete
     """
+    self.lock.acquire()
+    self.lock.release()
+    if isinstance(self.result, Exception):
+        raise self.result
+    else:
+        return self.result
+
 </t>
 <t tx="aum.20060511113333">import fcp, sys, os, sha

@@ -6323,7 +6366,11 @@
             print "Updating site %s" % sitename
             print "privatekey=%s" % privatekey
             noSites = False
-            self.node.put(privatekey, dir=dir, name=sitename, version=version, 
usk=True)
+            res = self.node.put(privatekey,
+                                dir=dir,
+                                name=sitename,
+                                version=version,
+                                usk=True)
             conf.set(sitename, "hash", hashNew)

     self.saveConfig()
@@ -6331,6 +6378,8 @@
     if noSites:
         print "No sites needed updating"

+    return res
+
 </t>
 <t tx="aum.20060511114439">class SiteMgr:
     """
@@ -6452,11 +6501,12 @@

     s = SiteMgr()
     s.update()
+    s.shutdown()

 </t>
 <t tx="aum.20060511120059">def createNode(self, **kw):

-    kw = {}
+    #kw = {}

     if fcpHost and not kw.has_key("fcpHost"):
         kw['host'] = fcpHost
@@ -6466,6 +6516,8 @@
         kw['verbosity'] = verbosity
     if logfile and not kw.has_key("logfile"):
         kw['logfile'] = logfile
+
+    #print kw

     self.node = fcp.FCPNodeConnection(**kw)

@@ -6489,5 +6541,307 @@
     sys.exit(0)

 </t>
+<t tx="aum.20060511205201">def shutdown(self):
+    """
+    Terminates the manager thread
+    """
+    self.running = False
+
+    # give the manager thread a chance to bail out
+    time.sleep(pollTimeout * 3)
+
+    # shut down FCP connection
+    if hasattr(self, 'socket'):
+        self.socket.close()
+        del self.socket
+
+    # and close the logfile
+    if self.logfile not in [sys.stdout, sys.stderr]:
+        self.logfile.close()
+
+
+</t>
+<t tx="aum.20060511205201.1">def _on_rxMsg(self, msg):
+    """
+    Handler for incoming messages from node
+    """
+    log = self._log
+
+    # find the job this relates to
+    id = msg['Identifier']
+    hdr = msg['header']
+
+    job = self.jobs.get(id, None)
+    
+    # bail if job not known
+    if not job:
+        log(ERROR, "Received %s for unknown job %s" % (hdr, id))
+        return
+
+    # action from here depends on what kind of message we got
+
+    # -----------------------------
+    # handle GenerateSSK responses
+
+    if hdr == 'SSKKeypair':
+        # got requested keys back
+        job._putResult((msg['RequestURI'], msg['InsertURI']))
+
+        # and remove job from queue
+        self.jobs.pop(id, None)
+        return
+
+    # -----------------------------
+    # handle ClientGet responses
+
+    if hdr == 'DataFound':
+        mimetype = msg['Metadata.ContentType']
+        if job.kw.has_key('Filename'):
+            # already stored to disk, done
+            #resp['file'] = file
+            job._putResult((mimetype, job.kw['Filename']))
+            del self.jobs[id]
+            return
+
+        elif job.kw['ReturnType'] == 'none':
+            job._putResult((mimetype, 1))
+            del self.jobs[id]
+            return
+
+        # otherwise, we're expecting an AllData and will react to it then
+        else:
+            job.mimetype = mimetype
+            return
+
+    if hdr == 'AllData':
+        job._putResult((job.mimetype, msg['Data']))
+        del self.jobs[id]
+        return
+
+    if hdr == 'GetFailed':
+        # return an exception
+        job._putResult(FCPGetFailed(msg))
+        del self.jobs[id]
+        return
+
+    # -----------------------------
+    # handle ClientPut responses
+
+    if hdr == 'URIGenerated':
+
+        job.uri = msg['URI']
+        newUri = msg['URI']
+
+        return
+
+        # bail here if no data coming back
+        if job.kw.get('GetCHKOnly', False) == 'true':
+            # done - only wanted a CHK
+            job._putResult(newUri)
+            del self.jobs[id]
+            return
+
+    if hdr == 'PutSuccessful':
+        job._putResult(msg['URI'])
+        del self.jobs[id]
+        return
+
+    if hdr == 'PutFailed':
+        job._putResult(FCPPutFailed(msg))
+        del self.jobs[id]
+        return
+
+    # -----------------------------
+    # handle progress messages
+
+    if hdr == 'StartedCompression':
+        job.notify(msg)
+        return
+
+    if hdr == 'FinishedCompression':
+        job.notify(msg)
+        return
+
+    if hdr == 'SimpleProgress':
+        return
+
+    # -----------------------------
+    # handle persistent job messages
+
+    if hdr == 'PersistentGet':
+        return
+
+    if hdr == 'PersistentPut':
+        return
+
+    if hdr == 'EndListPersistentRequests':
+        return
+
+    if hdr == 'PersistentPutDir':
+        return
+
+    # -----------------------------
+    # handle various errors
+
+    if hdr == 'ProtocolError':
+        job._putResult(FCPProtocolError(msg))
+        del self.jobs[id]
+        return
+
+    if hdr == 'IdentifierCollision':
+        log(ERROR, "IdentifierCollision on id %s ???" % id)
+        job._putResult(Exception("Duplicate job identifier %s" % id))
+        del self.jobs[id]
+        return
+
+    # -----------------------------
+    # wtf is happening here?!?
+
+    log(ERROR, "Unknown message type from node: %s" % hdr)
+    job._putResult(FCPException(msg))
+    del self.jobs[id]
+    return
+
+</t>
+<t tx="aum.20060511205201.2">def _on_clientReq(self, req):
+    """
+    handler for incoming requests from clients
+    """
+    id = req.id
+    cmd = req.cmd
+    kw = req.kw
+
+    # register the req
+    self.jobs[id] = req
+    
+    # now can send, since we're the only one who will
+    self._txMsg(cmd, **kw)
+
+</t>
+<t tx="aum.20060511222538">def _msgIncoming(self):
+    """
+    Returns True if a message is coming in from the node
+    """
+    return len(select.select([self.socket], [], [], pollTimeout)[0]) &gt; 0
+
+</t>
+<t tx="aum.20060512101715">def _submitCmd(self, id, cmd, **kw):
+    """
+    Submits a command for execution
+    
+    Arguments:
+        - id - the command identifier
+        - cmd - the command name, such as 'ClientPut'
+    
+    Keywords:
+        - async - whether to return a JobTicket object, rather than
+          the command result
+        - rawcmd - a raw command buffer to send directly
+        - options specific to command
+    
+    Returns:
+        - if command is sent in sync mode, returns the result
+        - if command is sent in async mode, returns a JobTicket
+          object which the client can poll or block on later
+    """
+    async = kw.pop('async', False)
+    job = JobTicket(id, cmd, kw)
+    self.clientReqQueue.put(job)
+
+    self._log(DEBUG, "_submitCmd: id=%s cmd=%s kw=%s" % (id, cmd, 
str(kw)[:256]))
+
+    if async:
+        return job
+    else:
+        return job.wait()
+
+</t>
+<t tx="aum.20060512102840">def _putResult(self, result):
+    """
+    Called by manager thread to indicate job is complete,
+    and submit a result to be picked up by client
+    """
+    self.result = result
+    self.lock.release()
+
+</t>
+<t tx="aum.20060512140230">@first #!/usr/bin/env python
+
+import sys, os, time
+import sitemgr
+
+startupTime = 60
+freenetDir = "/home/david/freenet"
+pidFile = os.path.join(freenetDir, "Freenet.pid")
+
+def main(verbose=None):
+
+    if verbose == None:
+        verbose = ('-v' in sys.argv)
+
+    print "--------------------------------------------"
+    print "Start of site updating run"
+    
+    # start freenet and let it warm up, if it's not already running
+    if not os.path.isfile(pidFile):
+        startingFreenet = True
+        os.chdir(freenetDir)
+        print "Starting freenet..."
+        os.system("./run.sh start")
+        print "Letting node settle for %s seconds..." % startupTime
+        time.sleep(startupTime)
+    else:
+        print "Freenet node is already running!"
+        startingFreenet = False
+    
+    if verbose:
+        kw = {"verbosity" : sitemgr.fcp.DETAIL}
+    else:
+        kw = {}
+    #print "updatesites: kw=%s" % str(kw)
+    s = sitemgr.SiteMgr(**kw)
+    uri = s.update()
+    del s
+    print "Site updated: uri=%s" % uri
+    
+    # kill freenet if it was dynamically started
+    if startingFreenet:
+        print "Waiting %s for inserts to finish..." % startupTime
+        time.sleep(startupTime)
+        print "Stopping node..."
+        os.system("./run.sh stop")
+        print "Node stopped"
+
+if __name__ == '__main__':
+    main()
+
+</t>
+<t tx="aum.20060512150118">def __del__(self):
+
+    try:
+        del self.node
+        self.node = None
+    except:
+        pass
+
+</t>
+<t tx="aum.20060512152233">@first #! /usr/bin/env python
+
+import time
+
+import fcp
+
+n = fcp.FCPNodeConnection(host="thoth", verbosity=fcp.ERROR)
+pub, priv = n.genkey()
+print pub
+print priv
+
+n.shutdown()
+
+#del n
+#time.sleep(3)
+#print "done"
+
+</t>
 </tnodes>
 </leo_file>

Modified: trunk/apps/pyFreenet/fcp.py
===================================================================
--- trunk/apps/pyFreenet/fcp.py 2006-05-12 02:43:18 UTC (rev 8672)
+++ trunk/apps/pyFreenet/fcp.py 2006-05-12 03:47:00 UTC (rev 8673)
@@ -18,7 +18,9 @@

 """

-import sys, os, socket, time, thread, threading, mimetypes, sha
+import sys, os, socket, time, thread
+import threading, mimetypes, sha, Queue
+import select, traceback

 from SimpleXMLRPCServer import SimpleXMLRPCServer

@@ -53,20 +55,28 @@
 class FCPProtocolError(FCPException):
     pass

+# where we can find the freenet node FCP port
 defaultFCPHost = "127.0.0.1"
 defaultFCPPort = 9481

+# where to listen, for the xml-rpc server
 xmlrpcHost = "127.0.0.1"
 xmlrpcPort = 19481

+# poll timeout period for manager thread
+pollTimeout = 0.1
+#pollTimeout = 3
+
 # list of keywords sent from node to client, which have
 # int values
 intKeys = [
     'DataLength', 'Code',
     ]

+# for the FCP 'ClientHello' handshake
 expectedVersion="2.0"

+# logger verbosity levels
 SILENT = 0
 FATAL = 1
 CRITICAL = 2
@@ -93,10 +103,12 @@
             - verbosity - how detailed the log messages should be, defaults to 0
               (silence)
         """
+        # grab and save parms
         self.name = kw.get('clientName', self._getUniqueId())
         self.host = kw.get('host', defaultFCPHost)
         self.port = kw.get('port', defaultFCPPort)
-        
+    
+        # set up the logger
         logfile = kw.get('logfile', sys.stderr)
         if not hasattr(logfile, 'write'):
             # might be a pathname
@@ -104,52 +116,75 @@
                 raise Exception("Bad logfile, must be pathname or file object")
             logfile = file(logfile, "a")
         self.logfile = logfile
-    
         self.verbosity = kw.get('verbosity', 0)

-        # try to connect
+        # try to connect to node
         self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
         self.socket.connect((self.host, self.port))

         # now do the hello
-        self.hello()
+        self._hello()

-        # the incoming response queues
-        self.pendingResponses = {} # keyed by request ID
+        # the pending job tickets
+        self.jobs = {} # keyed by request ID

-        # lock for socket operations
-        self.socketLock = threading.Lock()
+        # queue for incoming client requests
+        self.clientReqQueue = Queue.Queue()

         # launch receiver thread
-        #thread.start_new_thread(self.rxThread, ())
+        self.running = True
+        thread.start_new_thread(self._mgrThread, ())

     def __del__(self):
         """
         object is getting cleaned up, so disconnect
         """
-        if self.socket:
-            self.socket.close()
-            del self.socket
+        # terminate the node
+        try:
+            self.shutdown()
+        except:
+            traceback.print_exc()
+            pass

-        if self.logfile not in [sys.stdout, sys.stderr]:
-            self.logfile.close()
-    
     # high level client methods

-    def hello(self):
+    def _hello(self):

-        self._sendMessage("ClientHello", 
+        self._txMsg("ClientHello", 
                          Name=self.name,
                          ExpectedVersion=expectedVersion)

-        resp = self._receiveMessage()
+        resp = self._rxMsg()
         return resp

+    def genkey(self, **kw):
+        """
+        Generates and returns an SSK keypair
+        
+        Keywords:
+            - async - whether to do this call asynchronously, and
+              return a JobTicket object
+        """
+        id = self._getUniqueId()
+        
+        return self._submitCmd(id, "GenerateSSK", Identifier=id, **kw)
+    
+        #self._txMsg("GenerateSSK",
+        #                 Identifier=id)
+        #while True:
+        #    resp = self._rxMsg()
+        #    #print resp
+        #    if resp['header'] == 'SSKKeypair' and str(resp['Identifier']) == 
id:
+        #        break
+        #return resp['RequestURI'], resp['InsertURI']
+    
     def get(self, uri, **kw):
         """
         Does a direct get of a key

         Keywords:
+            - async - whether to return immediately with a job ticket object, 
default
+              False (wait for completion)
             - dsnly - whether to only check local datastore
             - ignoreds - don't check local datastore
             - file - if given, this is a pathname to which to store the 
retrieved key
@@ -163,12 +198,18 @@
             - if 'dontReturnData' is true, returns (mimetype, 1) if key is 
returned
         If key is not found, raises an exception
         """
+        # format the request
         opts = {}

+        id = self._getUniqueId()
+    
+        opts['async'] = kw.pop('async', False)
+    
         file = kw.pop("file", None)
         if file:
             opts['ReturnType'] = "disk"
-            opts['File'] = file
+            #opts['File'] = file
+            opts['Filename'] = file

         elif opts.get('nodata', False):
             nodata = True
@@ -177,7 +218,7 @@
             nodata = False
             opts['ReturnType'] = "direct"

-        opts['Identifier'] = self._getUniqueId()
+        opts['Identifier'] = id

         if kw.get("ignoreds", False):
             opts["IgnoreDS"] = "true"
@@ -197,28 +238,19 @@
         opts['PriorityClass'] = int(kw.get("priority", 1))
         opts['Global'] = "false"

-        self._sendMessage("ClientGet", **opts)
+        # now enqueue the request
+        return self._submitCmd(id, "ClientGet", **opts)
+    
+    
+    
+        # ------------------------------------------------
+        # DEPRECATED CODE!!
+    
+        self._txMsg("ClientGet", **opts)

-    #ClientGet
-    #IgnoreDS=false // true = ignore the datastore (in old FCP this was 
RemoveLocalKey)
-    #DSOnly=false // true = only check the datastore, don't route (~= htl 0)
-    #URI=KSK at gpl.txt // key to fetch
-    #Identifier=Request Number One
-    #Verbosity=0 // no status, just tell us when it's done
-    #ReturnType=direct // return all at once over the FCP connection
-    #MaxSize=100 // maximum size of returned data (all numbers in DECIMAL)
-    #MaxTempSize=1000 // maximum size of intermediary data
-    #MaxRetries=100 // automatic retry supported as an option; -1 means retry 
forever
-    #PriorityClass=1 // priority class 1 = interactive
-    #Persistence=reboot // continue until node is restarted; report progress 
while client is
-    #   connected, including if it reconnects after losing connection
-    #ClientToken=hello // returned in PersistentGet, a hint to the client, so 
the client
-    #   doesn't need to maintain its own state
-    #Global=false // see Persistence section below
-    #EndMessage

         # get a response
-        resp = self._receiveMessage()
+        resp = self._rxMsg()
         hdr = resp['header']
         if hdr == 'DataFound':
             mimetype = resp['Metadata.ContentType']
@@ -229,7 +261,7 @@
             elif nodata:
                 return (mimetype, 1)
             else:
-                resp = self._receiveMessage()
+                resp = self._rxMsg()
                 if resp['header'] == 'AllData':
                     return (mimetype, resp['Data'])
                 else:
@@ -269,42 +301,21 @@
         Keywords valid for all modes:
             - maxretries - maximum number of retries, default 3
             - priority - default 1
+            - async - whether to do the job asynchronously, returning a job 
ticket
+              object (default False)

         Notes:
             - exactly one of 'file', 'data' or 'dir' keyword arguments must be 
present
         """
-    #ClientPut
-    #URI=CHK@ // could as easily be an insertable SSK or KSK URI
-    #Metadata.ContentType=text/html // MIME type; for text, if charset is not 
specified, node #should auto-detect it and force the auto-detected version
-    #Identifier=Insert-1 // identifier, as always
-    #Verbosity=0 // just report when complete
-    #MaxRetries=999999 // lots of retries; -1 = retry forever
-    #PriorityClass=1 // fproxy priority level
-    #GetCHKOnly=false // true = don't insert the data, just return the key it 
would generate
-    #Global=false // see Persistence section below
-    #DontCompress=true // hint to node: don't try to compress the data, it's 
already compressed
-    #ClientToken=Hello!!! // sent back to client on the PersistentPut if this 
is a persistent #request

-    # the following fields decide where the data is to come from:
-    
-    #UploadFrom=direct // attached directly to this message
-    #DataLength=100 // 100 bytes decimal
-    #Data
-    #<data>
-    # or
-    #UploadFrom=disk // upload a file from disk
-    #Filename=/home/toad/something.html
-    #End
-    # or
-    #UploadFrom=redirect // create a redirect to another key
-    #TargetURI=KSK at gpl.txt // some other freenet URI
-    #End
-    
         # divert to putdir if dir keyword present
         if kw.has_key('dir'):
             return self.putdir(uri, **kw)

         opts = {}
+    
+        opts['async'] = kw.get('async', False)
+    
         opts['URI'] = uri
         opts['Metadata.ContentType'] = kw.get("mimetype", "text/plain")
         id = self._getUniqueId()
@@ -315,60 +326,36 @@
         opts['GetCHKOnly'] = toBool(kw.get("chkonly", "false"))
         opts['DontCompress'] = toBool(kw.get("nocompress", "false"))

-        # if inserting a freesite, scan the directory and insert each bit 
piecemeal
-        if kw.has_key("dir"):
-            if kw.get('usk', False):
-                uri = uri.replace("SSK@", "USK@")
-            if not uri.endswith("/"):
-                uri = uri + "/"
-    
-            # form a base privkey-based URI
-            siteuri = uri + "%s/%s/" % (kw['sitename'], kw.get('version', 1))
-    
+        if kw.has_key("file"):
             opts['UploadFrom'] = "disk"
-    
-            # upload all files in turn - rework this later when queueing is 
implemented
-            files = readdir(kw['dir'])
-            for f in files:
-                thisuri = siteuri + f['relpath']
-                opts['file'] = f['fullpath']
-                opts['mimetype'] = f['mimetype']
-                self.put(thisuri, **opts)
-    
-            # last bit - insert index.html
-            opts['file'] = os.path.join(kw['dir'], "index.html")
-            thisuri = siteuri + "index.html"
-            opts['mimetype'] = "text/html"
-            self.put(thisuri, **opts)
-            
-            return uri
-    
-        elif kw.has_key("file"):
-            opts['UploadFrom'] = "disk"
             opts['Filename'] = kw['file']
             if not kw.has_key("mimetype"):
                 opts['Metadata.ContentType'] = 
mimetypes.guess_type(kw['file'])[0] or "text/plain"
-            sendEnd = True

         elif kw.has_key("data"):
             opts["UploadFrom"] = "direct"
             opts["Data"] = kw['data']
-            sendEnd = False

         elif kw.has_key("redirect"):
             opts["UploadFrom"] = "redirect"
             opts["TargetURI"] = kw['redirect']
-            sendEnd = True
         else:
             raise Exception("Must specify file, data or redirect keywords")

         #print "sendEnd=%s" % sendEnd

+        # now dispatch the job
+        return self._submitCmd(id, "ClientPut", **opts)
+    
+    
+        # ------------------------------------------------------------
+        # DEPRECATED CODE
+    
         # issue the command
-        self._sendMessage("ClientPut", sendEnd, **opts)
+        self._txMsg("ClientPut", **opts)

         # expect URIGenerated
-        resp1 = self._receiveMessage()
+        resp1 = self._rxMsg()
         hdr = resp1['header']
         if hdr != 'URIGenerated':
             raise FCPException(resp1)
@@ -381,7 +368,7 @@
                 return newUri

         # expect outcome
-        resp2 = self._receiveMessage()
+        resp2 = self._rxMsg()
         hdr = resp2['header']
         if hdr == 'PutSuccessful':
             return resp2['URI']
@@ -392,6 +379,7 @@
         else:
             raise FCPException(resp2)

+    
     def putdir(self, uri, **kw):
         """
         Inserts a freesite
@@ -409,6 +397,8 @@
             - maxretries - maximum number of retries, default 3
             - priority - default 1

+            - async - default False - if True, return a job ticket
+    
         Returns:
             - the URI under which the freesite can be retrieved
         """
@@ -430,7 +420,7 @@
             uriFull = uriFull.replace("SSK@", "USK@")

         # issue the command
-        #self._sendMessage("ClientPutComplexDir", True, **opts)
+        #self._txMsg("ClientPutComplexDir", True, **opts)
         msgLines = ["ClientPutComplexDir",
                     "Identifier=%s" % id,
                     "Verbosity=0",
@@ -468,10 +458,21 @@
         for line in msgLines:
             self._log(DETAIL, line)
         fullbuf = "\n".join(msgLines) + "\n"
+    
+        # now dispatch the job
+        return self._submitCmd(id, "ClientPutComplexDir",
+                               rawcmd=fullbuf,
+                               async=kw.get('async', False),
+                               )
+    
+        # ------------------------------------------------------------
+        # DEPRECATED CODE
+    
+    
         self.socket.send(fullbuf)

         # expect URIGenerated
-        resp1 = self._receiveMessage()
+        resp1 = self._rxMsg()
         hdr = resp1['header']
         if hdr != 'URIGenerated':
             raise FCPException(resp1)
@@ -479,7 +480,7 @@
         newUri = resp1['URI']

         # expect outcome
-        resp2 = self._receiveMessage()
+        resp2 = self._rxMsg()
         hdr = resp2['header']
         if hdr == 'PutSuccessful':
             return resp2['URI']
@@ -490,63 +491,293 @@
         else:
             raise FCPException(resp2)

-    def genkey(self, id=None):
+    
+    def shutdown(self):
         """
-        Generates and returns an SSK keypair
+        Terminates the manager thread
         """
-        if not id:
-            id = self._getUniqueId()
+        self.running = False
+    
+        # give the manager thread a chance to bail out
+        time.sleep(pollTimeout * 3)
+    
+        # shut down FCP connection
+        if hasattr(self, 'socket'):
+            self.socket.close()
+            del self.socket
+    
+        # and close the logfile
+        if self.logfile not in [sys.stdout, sys.stderr]:
+            self.logfile.close()
+    
+    
+    
+    
+    
+    # methods for manager thread
+    
+    def _mgrThread(self):
+        """
+        Receives all incoming messages
+        """
+        log = self._log
+        try:
+            while self.running:
+    
+                log(DEBUG, "Top of manager thread")
+    
+                # try for incoming messages from node
+                log(DEBUG, "Testing for incoming message")
+                if self._msgIncoming():
+                    log(DEBUG, "Retrieving incoming message")
+                    msg = self._rxMsg()
+                    log(DEBUG, "Got incoming message, dispatching")
+                    self._on_rxMsg(msg)
+                    log(DEBUG, "back from on_rxMsg")
+                else:
+                    log(DEBUG, "No incoming message from node")

-        self._sendMessage("GenerateSSK",
-                         Identifier=id)
+                # try for incoming requests from clients
+                log(DEBUG, "Testing for client req")
+                try:
+                    req = self.clientReqQueue.get(True, pollTimeout)
+                    log(DEBUG, "Got client req, dispatching")
+                    self._on_clientReq(req)
+                    log(DEBUG, "Back from on_clientReq")
+                except Queue.Empty:
+                    log(DEBUG, "No incoming client req")
+                    pass
+    
+            self._log(INFO, "Manager thread terminated normally")
+            return
+    
+        except:
+            traceback.print_exc()
+            self._log(CRITICAL, "manager thread crashed")
+    
+    def _msgIncoming(self):
+        """
+        Returns True if a message is coming in from the node
+        """
+        return len(select.select([self.socket], [], [], pollTimeout)[0]) > 0
+    
+    def _submitCmd(self, id, cmd, **kw):
+        """
+        Submits a command for execution

-        while True:
-            resp = self._receiveMessage()
-            #print resp
-            if resp['header'] == 'SSKKeypair' and str(resp['Identifier']) == 
id:
-                break
+        Arguments:
+            - id - the command identifier
+            - cmd - the command name, such as 'ClientPut'
+        
+        Keywords:
+            - async - whether to return a JobTicket object, rather than
+              the command result
+            - rawcmd - a raw command buffer to send directly
+            - options specific to command
+        
+        Returns:
+            - if command is sent in sync mode, returns the result
+            - if command is sent in async mode, returns a JobTicket
+              object which the client can poll or block on later
+        """
+        async = kw.pop('async', False)
+        job = JobTicket(id, cmd, kw)
+        self.clientReqQueue.put(job)

-        return resp['RequestURI'], resp['InsertURI']
+        self._log(DEBUG, "_submitCmd: id=%s cmd=%s kw=%s" % (id, cmd, 
str(kw)[:256]))

+        if async:
+            return job
+        else:
+            return job.wait()

+    def _on_rxMsg(self, msg):
+        """
+        Handler for incoming messages from node
+        """
+        log = self._log

+        # find the job this relates to
+        id = msg['Identifier']
+        hdr = msg['header']

-    # methods for receiver thread
+        job = self.jobs.get(id, None)
+        
+        # bail if job not known
+        if not job:
+            log(ERROR, "Received %s for unknown job %s" % (hdr, id))
+            return

-    def _rxThread(self):
+        # action from here depends on what kind of message we got
+    
+        # -----------------------------
+        # handle GenerateSSK responses
+    
+        if hdr == 'SSKKeypair':
+            # got requested keys back
+            job._putResult((msg['RequestURI'], msg['InsertURI']))
+    
+            # and remove job from queue
+            self.jobs.pop(id, None)
+            return
+    
+        # -----------------------------
+        # handle ClientGet responses
+    
+        if hdr == 'DataFound':
+            mimetype = msg['Metadata.ContentType']
+            if job.kw.has_key('Filename'):
+                # already stored to disk, done
+                #resp['file'] = file
+                job._putResult((mimetype, job.kw['Filename']))
+                del self.jobs[id]
+                return
+    
+            elif job.kw['ReturnType'] == 'none':
+                job._putResult((mimetype, 1))
+                del self.jobs[id]
+                return
+    
+            # otherwise, we're expecting an AllData and will react to it then
+            else:
+                job.mimetype = mimetype
+                return
+    
+        if hdr == 'AllData':
+            job._putResult((job.mimetype, msg['Data']))
+            del self.jobs[id]
+            return
+    
+        if hdr == 'GetFailed':
+            # return an exception
+            job._putResult(FCPGetFailed(msg))
+            del self.jobs[id]
+            return
+    
+        # -----------------------------
+        # handle ClientPut responses
+    
+        if hdr == 'URIGenerated':
+    
+            job.uri = msg['URI']
+            newUri = msg['URI']
+    
+            return
+    
+            # bail here if no data coming back
+            if job.kw.get('GetCHKOnly', False) == 'true':
+                # done - only wanted a CHK
+                job._putResult(newUri)
+                del self.jobs[id]
+                return
+    
+        if hdr == 'PutSuccessful':
+            job._putResult(msg['URI'])
+            del self.jobs[id]
+            return
+    
+        if hdr == 'PutFailed':
+            job._putResult(FCPPutFailed(msg))
+            del self.jobs[id]
+            return
+    
+        # -----------------------------
+        # handle progress messages
+    
+        if hdr == 'StartedCompression':
+            job.notify(msg)
+            return
+    
+        if hdr == 'FinishedCompression':
+            job.notify(msg)
+            return
+    
+        if hdr == 'SimpleProgress':
+            return
+    
+        # -----------------------------
+        # handle persistent job messages
+    
+        if hdr == 'PersistentGet':
+            return
+    
+        if hdr == 'PersistentPut':
+            return
+    
+        if hdr == 'EndListPersistentRequests':
+            return
+    
+        if hdr == 'PersistentPutDir':
+            return
+    
+        # -----------------------------
+        # handle various errors
+    
+        if hdr == 'ProtocolError':
+            job._putResult(FCPProtocolError(msg))
+            del self.jobs[id]
+            return
+    
+        if hdr == 'IdentifierCollision':
+            log(ERROR, "IdentifierCollision on id %s ???" % id)
+            job._putResult(Exception("Duplicate job identifier %s" % id))
+            del self.jobs[id]
+            return
+    
+        # -----------------------------
+        # wtf is happening here?!?
+    
+        log(ERROR, "Unknown message type from node: %s" % hdr)
+        job._putResult(FCPException(msg))
+        del self.jobs[id]
+        return
+    
+    def _on_clientReq(self, req):
         """
-        Receives all incoming messages
+        handler for incoming requests from clients
         """
-        while self.running:
-            self.socketLock.acquire()
-            self.socket.settimeout(0.1)
-            try:
-                msg = self._receiveMessage()
-            except socket.timeout:
-                self.socketLock.release()
-                continue
-            
+        id = req.id
+        cmd = req.cmd
+        kw = req.kw

+        # register the req
+        self.jobs[id] = req
+        
+        # now can send, since we're the only one who will
+        self._txMsg(cmd, **kw)
+    
+    
     # low level noce comms methods

     def _getUniqueId(self):
         return "id" + str(int(time.time() * 1000000))

-    def _sendMessage(self, msgType, sendEndMessage=True, **kw):
+    def _txMsg(self, msgType, **kw):
         """
         low level message send

         Arguments:
             - msgType - one of the FCP message headers, such as 'ClientHello'
             - args - zero or more (keyword, value) tuples
+        Keywords:
+            - rawcmd - if given, this is the raw buffer to send
         """
+        log = self._log
+    
+        # just send the raw command, if given    
+        rawcmd = kw.get('rawcmd', None)
+        if rawcmd:
+            self.socket.send(rawcmd)
+            log(DETAIL, "CLIENT: %s" % rawcmd)
+            return
+    
         if kw.has_key("Data"):
             data = kw.pop("Data")
+            sendEndMessage = False
         else:
             data = None
+            sendEndMessage = True

-        log = self._log
-        
         items = [msgType + "\n"]
         log(DETAIL, "CLIENT: %s" % msgType)

@@ -573,7 +804,7 @@

         self.socket.send(raw)

-    def _receiveMessage(self):
+    def _rxMsg(self):
         """
         Receives and returns a message as a dict

@@ -665,34 +896,55 @@
         self.logfile.write(msg)
         self.logfile.flush()

-    class JobTicket:
+
+class JobTicket:
+    """
+    A JobTicket is an object returned to clients making
+    asynchronous requests. It puts them in control of how
+    they manage n concurrent requests.
+    
+    When you as a client receive a JobTicket, you can choose to:
+        - block, awaiting completion of the job
+        - poll the job for completion status
+        - receive a callback upon completion
+    """
+    def __init__(self, id, cmd, kw):
         """
-        A JobTicket is an object returned to clients making
-        asynchronous requests. It puts them in control of how
-        they manage n concurrent requests.
-        
-        When you as a client receive a JobTicket, you can choose to:
-            - block, awaiting completion of the job
-            - poll the job for completion status
-            - receive a callback upon completion
+        You should never instantiate a JobTicket object yourself
         """
-        def __init__(self, id):
-            """
-            You should never instantiate a JobTicket object yourself
-            """
-            self.id = id
-            self.queue = Queue.Queue()
-        
-        def isComplete(self):
-            """
-            Returns True if the job has been completed
-            """
-        
-        def wait(self, timeout=None):
-            """
-            Waits forever (or for a given timeout) for a job to complete
-            """
+        self.id = id
+        self.cmd = cmd
+        self.kw = kw

+        self.lock = threading.Lock()
+        self.lock.acquire()
+        self.result = None
+    
+    def isComplete(self):
+        """
+        Returns True if the job has been completed
+        """
+        return self.result != None
+    
+    def wait(self, timeout=None):
+        """
+        Waits forever (or for a given timeout) for a job to complete
+        """
+        self.lock.acquire()
+        self.lock.release()
+        if isinstance(self.result, Exception):
+            raise self.result
+        else:
+            return self.result
+    
+    def _putResult(self, result):
+        """
+        Called by manager thread to indicate job is complete,
+        and submit a result to be picked up by client
+        """
+        self.result = result
+        self.lock.release()
+    

 class FreenetXMLRPCRequest:
     """

Modified: trunk/apps/pyFreenet/sitemgr.py
===================================================================
--- trunk/apps/pyFreenet/sitemgr.py     2006-05-12 02:43:18 UTC (rev 8672)
+++ trunk/apps/pyFreenet/sitemgr.py     2006-05-12 03:47:00 UTC (rev 8673)
@@ -46,6 +46,14 @@

         self.loadConfig()

+    def __del__(self):
+    
+        try:
+            del self.node
+            self.node = None
+        except:
+            pass
+    
     def createConfig(self):
         """
         Creates a whole new config
@@ -70,7 +78,7 @@

     def createNode(self, **kw):

-        kw = {}
+        #kw = {}

         if fcpHost and not kw.has_key("fcpHost"):
             kw['host'] = fcpHost
@@ -80,6 +88,8 @@
             kw['verbosity'] = verbosity
         if logfile and not kw.has_key("logfile"):
             kw['logfile'] = logfile
+    
+        #print kw

         self.node = fcp.FCPNodeConnection(**kw)

@@ -153,7 +163,11 @@
                 print "Updating site %s" % sitename
                 print "privatekey=%s" % privatekey
                 noSites = False
-                self.node.put(privatekey, dir=dir, name=sitename, 
version=version, usk=True)
+                res = self.node.put(privatekey,
+                                    dir=dir,
+                                    name=sitename,
+                                    version=version,
+                                    usk=True)
                 conf.set(sitename, "hash", hashNew)

         self.saveConfig()
@@ -161,6 +175,8 @@
         if noSites:
             print "No sites needed updating"

+        return res
+    

 def help():
     print "%s: A console-based, cron-able freesite inserter" % sys.argv[0]
@@ -190,4 +206,5 @@

     s = SiteMgr()
     s.update()
+    s.shutdown()



Reply via email to