Author: aum
Date: 2006-05-12 03:47:00 +0000 (Fri, 12 May 2006)
New Revision: 8673
Modified:
trunk/apps/pyFreenet/code.leo
trunk/apps/pyFreenet/fcp.py
trunk/apps/pyFreenet/sitemgr.py
Log:
Implemented asynchronous mode requests.
PyFcp is now threadsafe for the basic primitives, whether
these primitives are executed synchronously or asynchronously.
All primitives can now be executed with 'async=True' which
will cause them to return a 'job ticket' object, which can
then be polled for completion or waited on in a blocking fashion.
By default, primitives still execute synchronously
TODO:
- implement callbacks
- implement persistent requests
Modified: trunk/apps/pyFreenet/code.leo
===================================================================
--- trunk/apps/pyFreenet/code.leo 2006-05-12 02:43:18 UTC (rev 8672)
+++ trunk/apps/pyFreenet/code.leo 2006-05-12 03:47:00 UTC (rev 8673)
@@ -1,8 +1,8 @@
<?xml version="1.0" encoding="UTF-8"?>
<leo_file>
<leo_header file_format="2" tnodes="0" max_tnode_index="6" clone_windows="0"/>
-<globals body_outline_ratio="0.290740740741">
- <global_window_position top="21" left="104" height="668" width="1080"/>
+<globals body_outline_ratio="0.265866209262">
+ <global_window_position top="147" left="74" height="694" width="1166"/>
<global_log_window_position top="0" left="0" height="0" width="0"/>
</globals>
<preferences/>
@@ -132,21 +132,24 @@
<v t="aum.20060509223528.119"><vh>exceptions</vh></v>
</v>
<v t="aum.20060511003500" tnodeList="aum.20060511003500"><vh>@file
test.py</vh></v>
-<v t="aum.20060511101147" a="E"
tnodeList="aum.20060511101147,aum.20060511113333,aum.20060511113333.1,aum.20060511114439,aum.20060511114439.1,aum.20060511114439.2,aum.20060511120059,aum.20060511114604,aum.20060511114604.1,aum.20060511113333.3,aum.20060511130507,aum.20060511120024"><vh>@nosent
sitemgr.py</vh>
+<v t="aum.20060512152233" a="V" tnodeList="aum.20060512152233"><vh>@file
genkey.py</vh></v>
+<v t="aum.20060512140230" tnodeList="aum.20060512140230"><vh>@file
updatesites.py</vh></v>
+<v t="aum.20060511101147" a="E"
tnodeList="aum.20060511101147,aum.20060511113333,aum.20060511113333.1,aum.20060511114439,aum.20060511114439.1,aum.20060512150118,aum.20060511114439.2,aum.20060511120059,aum.20060511114604,aum.20060511114604.1,aum.20060511113333.3,aum.20060511130507,aum.20060511120024"><vh>@nosent
sitemgr.py</vh>
<v t="aum.20060511113333"><vh>imports</vh></v>
<v t="aum.20060511113333.1"><vh>config</vh></v>
<v t="aum.20060511114439" a="E"><vh>class SiteMgr</vh>
<v t="aum.20060511114439.1"><vh>__init__</vh></v>
+<v t="aum.20060512150118"><vh>__del__</vh></v>
<v t="aum.20060511114439.2"><vh>createConfig</vh></v>
<v t="aum.20060511120059"><vh>createNode</vh></v>
<v t="aum.20060511114604"><vh>loadConfig</vh></v>
<v t="aum.20060511114604.1"><vh>saveConfig</vh></v>
<v t="aum.20060511113333.3"><vh>update</vh></v>
</v>
-<v t="aum.20060511130507" a="V"><vh>help</vh></v>
+<v t="aum.20060511130507"><vh>help</vh></v>
<v t="aum.20060511120024"><vh>mainline</vh></v>
</v>
-<v t="aum.20060506215707" a="E"
tnodeList="aum.20060506215707,aum.20060506215707.1,aum.20060506220237,aum.20060506215707.2,aum.20060506215707.3,aum.20060506220237.1,aum.20060506220237.2,aum.20060506224238,aum.20060506224238.1,aum.20060506231352,aum.20060507003931,aum.20060511001853,aum.20060506231352.1,aum.20060506232639,aum.20060506232639.1,aum.20060506223545,aum.20060506231352.2,aum.20060506220856,aum.20060506222005,aum.20060507124316,aum.20060511103841,aum.20060511103841.1,aum.20060511103952,aum.20060511103952.1,aum.20060509184020,aum.20060507155016,aum.20060507162314,aum.20060507162543,aum.20060507162314.1,aum.20060509194923,aum.20060507162314.2,aum.20060507162314.3,aum.20060507162543.1,aum.20060507154638,aum.20060507163143,aum.20060509184020.1,aum.20060509184020.2,aum.20060509224119,aum.20060509224221,aum.20060507195029,aum.20060507195029.1,aum.20060506224545"><vh>@nosent
fcp.py</vh>
+<v t="aum.20060506215707" a="E"
tnodeList="aum.20060506215707,aum.20060506215707.1,aum.20060506220237,aum.20060506215707.2,aum.20060506215707.3,aum.20060506220237.1,aum.20060506220237.2,aum.20060506224238,aum.20060506224238.1,aum.20060506231352.1,aum.20060506231352,aum.20060507003931,aum.20060511001853,aum.20060511205201,aum.20060506232639,aum.20060506232639.1,aum.20060511222538,aum.20060512101715,aum.20060511205201.1,aum.20060511205201.2,aum.20060506223545,aum.20060506231352.2,aum.20060506220856,aum.20060506222005,aum.20060507124316,aum.20060511103841,aum.20060511103841.1,aum.20060511103952,aum.20060511103952.1,aum.20060512102840,aum.20060509184020,aum.20060507155016,aum.20060507162314,aum.20060507162543,aum.20060507162314.1,aum.20060509194923,aum.20060507162314.2,aum.20060507162314.3,aum.20060507162543.1,aum.20060507154638,aum.20060507163143,aum.20060509184020.1,aum.20060509184020.2,aum.20060509224119,aum.20060509224221,aum.20060507195029,aum.20060507195029.1,aum.20060506224545"><vh>@nosent
fcp.py</vh>
<v t="aum.20060506215707.1"><vh>imports</vh></v>
<v t="aum.20060506220237"><vh>exceptions</vh></v>
<v t="aum.20060506215707.2"><vh>globals</vh></v>
@@ -154,27 +157,33 @@
<v t="aum.20060506220237.1"><vh>__init__</vh></v>
<v t="aum.20060506220237.2"><vh>__del__</vh></v>
<v t="aum.20060506224238" a="E"><vh>High Level Methods</vh>
-<v t="aum.20060506224238.1"><vh>hello</vh></v>
+<v t="aum.20060506224238.1"><vh>_hello</vh></v>
+<v t="aum.20060506231352.1"><vh>genkey</vh></v>
<v t="aum.20060506231352"><vh>get</vh></v>
<v t="aum.20060507003931"><vh>put</vh></v>
<v t="aum.20060511001853"><vh>putdir</vh></v>
-<v t="aum.20060506231352.1"><vh>genkey</vh></v>
+<v t="aum.20060511205201"><vh>shutdown</vh></v>
</v>
-<v t="aum.20060506232639"><vh>Receiver Thread</vh>
-<v t="aum.20060506232639.1"><vh>_rxThread</vh></v>
+<v t="aum.20060506232639" a="E"><vh>Manager Thread</vh>
+<v t="aum.20060506232639.1"><vh>_mgrThread</vh></v>
+<v t="aum.20060511222538"><vh>_msgIncoming</vh></v>
+<v t="aum.20060512101715"><vh>_submitCmd</vh></v>
+<v t="aum.20060511205201.1"><vh>_on_rxMsg</vh></v>
+<v t="aum.20060511205201.2"><vh>_on_clientReq</vh></v>
</v>
<v t="aum.20060506223545" a="E"><vh>Low Level Methods</vh>
<v t="aum.20060506231352.2"><vh>_getUniqueId</vh></v>
-<v t="aum.20060506220856"><vh>_sendMessage</vh></v>
-<v t="aum.20060506222005"><vh>_receiveMessage</vh></v>
+<v t="aum.20060506220856"><vh>_txMsg</vh></v>
+<v t="aum.20060506222005"><vh>_rxMsg</vh></v>
<v t="aum.20060507124316"><vh>_log</vh></v>
</v>
+</v>
<v t="aum.20060511103841" a="E"><vh>class JobTicket</vh>
<v t="aum.20060511103841.1"><vh>__init__</vh></v>
<v t="aum.20060511103952"><vh>isDone</vh></v>
<v t="aum.20060511103952.1"><vh>wait</vh></v>
+<v t="aum.20060512102840"><vh>_putResult</vh></v>
</v>
-</v>
<v t="aum.20060509184020" a="E"><vh>XML-RPC Server</vh>
<v t="aum.20060507155016" a="E"><vh>class FreenetXMLRPCRequest</vh>
<v t="aum.20060507162314"><vh>__init__</vh></v>
@@ -227,25 +236,35 @@
@others
</t>
-<t tx="aum.20060506215707.1">import sys, os, socket, time, thread, threading,
mimetypes, sha
+<t tx="aum.20060506215707.1">import sys, os, socket, time, thread
+import threading, mimetypes, sha, Queue
+import select, traceback
from SimpleXMLRPCServer import SimpleXMLRPCServer
</t>
-<t tx="aum.20060506215707.2">defaultFCPHost = "127.0.0.1"
+<t tx="aum.20060506215707.2"># where we can find the freenet node FCP port
+defaultFCPHost = "127.0.0.1"
defaultFCPPort = 9481
+# where to listen, for the xml-rpc server
xmlrpcHost = "127.0.0.1"
xmlrpcPort = 19481
+# poll timeout period for manager thread
+pollTimeout = 0.1
+#pollTimeout = 3
+
# list of keywords sent from node to client, which have
# int values
intKeys = [
'DataLength', 'Code',
]
+# for the FCP 'ClientHello' handshake
expectedVersion="2.0"
+# logger verbosity levels
SILENT = 0
FATAL = 1
CRITICAL = 2
@@ -308,10 +327,12 @@
- verbosity - how detailed the log messages should be, defaults to 0
(silence)
"""
+ # grab and save parms
self.name = kw.get('clientName', self._getUniqueId())
self.host = kw.get('host', defaultFCPHost)
self.port = kw.get('port', defaultFCPPort)
-
+
+ # set up the logger
logfile = kw.get('logfile', sys.stderr)
if not hasattr(logfile, 'write'):
# might be a pathname
@@ -319,53 +340,64 @@
raise Exception("Bad logfile, must be pathname or file object")
logfile = file(logfile, "a")
self.logfile = logfile
-
self.verbosity = kw.get('verbosity', 0)
- # try to connect
+ # try to connect to node
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.connect((self.host, self.port))
# now do the hello
- self.hello()
+ self._hello()
- # the incoming response queues
- self.pendingResponses = {} # keyed by request ID
+ # the pending job tickets
+ self.jobs = {} # keyed by request ID
- # lock for socket operations
- self.socketLock = threading.Lock()
+ # queue for incoming client requests
+ self.clientReqQueue = Queue.Queue()
# launch receiver thread
- #thread.start_new_thread(self.rxThread, ())
+ self.running = True
+ thread.start_new_thread(self._mgrThread, ())
</t>
<t tx="aum.20060506220237.2">def __del__(self):
"""
object is getting cleaned up, so disconnect
"""
- if self.socket:
- self.socket.close()
- del self.socket
+ # terminate the node
+ try:
+ self.shutdown()
+ except:
+ traceback.print_exc()
+ pass
- if self.logfile not in [sys.stdout, sys.stderr]:
- self.logfile.close()
-
</t>
-<t tx="aum.20060506220856">def _sendMessage(self, msgType,
sendEndMessage=True, **kw):
+<t tx="aum.20060506220856">def _txMsg(self, msgType, **kw):
"""
low level message send
Arguments:
- msgType - one of the FCP message headers, such as 'ClientHello'
- args - zero or more (keyword, value) tuples
+ Keywords:
+ - rawcmd - if given, this is the raw buffer to send
"""
+ log = self._log
+
+ # just send the raw command, if given
+ rawcmd = kw.get('rawcmd', None)
+ if rawcmd:
+ self.socket.send(rawcmd)
+ log(DETAIL, "CLIENT: %s" % rawcmd)
+ return
+
if kw.has_key("Data"):
data = kw.pop("Data")
+ sendEndMessage = False
else:
data = None
+ sendEndMessage = True
- log = self._log
-
items = [msgType + "\n"]
log(DETAIL, "CLIENT: %s" % msgType)
@@ -393,7 +425,7 @@
self.socket.send(raw)
</t>
-<t tx="aum.20060506222005">def _receiveMessage(self):
+<t tx="aum.20060506222005">def _rxMsg(self):
"""
Receives and returns a message as a dict
@@ -486,13 +518,13 @@
</t>
-<t tx="aum.20060506224238.1">def hello(self):
+<t tx="aum.20060506224238.1">def _hello(self):
- self._sendMessage("ClientHello",
+ self._txMsg("ClientHello",
Name=self.name,
ExpectedVersion=expectedVersion)
- resp = self._receiveMessage()
+ resp = self._rxMsg()
return resp
</t>
@@ -506,6 +538,8 @@
Does a direct get of a key
Keywords:
+ - async - whether to return immediately with a job ticket object,
default
+ False (wait for completion)
- dsnly - whether to only check local datastore
- ignoreds - don't check local datastore
- file - if given, this is a pathname to which to store the retrieved
key
@@ -519,12 +553,18 @@
- if 'dontReturnData' is true, returns (mimetype, 1) if key is returned
If key is not found, raises an exception
"""
+ # format the request
opts = {}
+ id = self._getUniqueId()
+
+ opts['async'] = kw.pop('async', False)
+
file = kw.pop("file", None)
if file:
opts['ReturnType'] = "disk"
- opts['File'] = file
+ #opts['File'] = file
+ opts['Filename'] = file
elif opts.get('nodata', False):
nodata = True
@@ -533,7 +573,7 @@
nodata = False
opts['ReturnType'] = "direct"
- opts['Identifier'] = self._getUniqueId()
+ opts['Identifier'] = id
if kw.get("ignoreds", False):
opts["IgnoreDS"] = "true"
@@ -553,28 +593,19 @@
opts['PriorityClass'] = int(kw.get("priority", 1))
opts['Global'] = "false"
- self._sendMessage("ClientGet", **opts)
+ # now enqueue the request
+ return self._submitCmd(id, "ClientGet", **opts)
+
+
+
+ # ------------------------------------------------
+ # DEPRECATED CODE!!
+
+ self._txMsg("ClientGet", **opts)
-#ClientGet
-#IgnoreDS=false // true = ignore the datastore (in old FCP this was
RemoveLocalKey)
-#DSOnly=false // true = only check the datastore, don't route (~= htl 0)
-#URI=KSK at gpl.txt // key to fetch
-#Identifier=Request Number One
-#Verbosity=0 // no status, just tell us when it's done
-#ReturnType=direct // return all at once over the FCP connection
-#MaxSize=100 // maximum size of returned data (all numbers in DECIMAL)
-#MaxTempSize=1000 // maximum size of intermediary data
-#MaxRetries=100 // automatic retry supported as an option; -1 means retry
forever
-#PriorityClass=1 // priority class 1 = interactive
-#Persistence=reboot // continue until node is restarted; report progress while
client is
-# connected, including if it reconnects after losing connection
-#ClientToken=hello // returned in PersistentGet, a hint to the client, so the
client
-# doesn't need to maintain its own state
-#Global=false // see Persistence section below
-#EndMessage
# get a response
- resp = self._receiveMessage()
+ resp = self._rxMsg()
hdr = resp['header']
if hdr == 'DataFound':
mimetype = resp['Metadata.ContentType']
@@ -585,7 +616,7 @@
elif nodata:
return (mimetype, 1)
else:
- resp = self._receiveMessage()
+ resp = self._rxMsg()
if resp['header'] == 'AllData':
return (mimetype, resp['Data'])
else:
@@ -598,47 +629,76 @@
raise FCPException(resp)
</t>
-<t tx="aum.20060506231352.1">def genkey(self, id=None):
+<t tx="aum.20060506231352.1">def genkey(self, **kw):
"""
Generates and returns an SSK keypair
+
+ Keywords:
+ - async - whether to do this call asynchronously, and
+ return a JobTicket object
"""
- if not id:
- id = self._getUniqueId()
+ id = self._getUniqueId()
- self._sendMessage("GenerateSSK",
- Identifier=id)
-
- while True:
- resp = self._receiveMessage()
- #print resp
- if resp['header'] == 'SSKKeypair' and str(resp['Identifier']) == id:
- break
+ return self._submitCmd(id, "GenerateSSK", Identifier=id, **kw)
- return resp['RequestURI'], resp['InsertURI']
+ #self._txMsg("GenerateSSK",
+ # Identifier=id)
+ #while True:
+ # resp = self._rxMsg()
+ # #print resp
+ # if resp['header'] == 'SSKKeypair' and str(resp['Identifier']) == id:
+ # break
+ #return resp['RequestURI'], resp['InsertURI']
</t>
<t tx="aum.20060506231352.2">def _getUniqueId(self):
return "id" + str(int(time.time() * 1000000))
</t>
-<t tx="aum.20060506232639"># methods for receiver thread
+<t tx="aum.20060506232639"># methods for manager thread
@others
</t>
-<t tx="aum.20060506232639.1">def _rxThread(self):
+<t tx="aum.20060506232639.1">def _mgrThread(self):
"""
Receives all incoming messages
"""
- while self.running:
- self.socketLock.acquire()
- self.socket.settimeout(0.1)
- try:
- msg = self._receiveMessage()
- except socket.timeout:
- self.socketLock.release()
- continue
-
+ log = self._log
+ try:
+ while self.running:
+
+ log(DEBUG, "Top of manager thread")
+
+ # try for incoming messages from node
+ log(DEBUG, "Testing for incoming message")
+ if self._msgIncoming():
+ log(DEBUG, "Retrieving incoming message")
+ msg = self._rxMsg()
+ log(DEBUG, "Got incoming message, dispatching")
+ self._on_rxMsg(msg)
+ log(DEBUG, "back from on_rxMsg")
+ else:
+ log(DEBUG, "No incoming message from node")
+
+ # try for incoming requests from clients
+ log(DEBUG, "Testing for client req")
+ try:
+ req = self.clientReqQueue.get(True, pollTimeout)
+ log(DEBUG, "Got client req, dispatching")
+ self._on_clientReq(req)
+ log(DEBUG, "Back from on_clientReq")
+ except Queue.Empty:
+ log(DEBUG, "No incoming client req")
+ pass
+
+ self._log(INFO, "Manager thread terminated normally")
+ return
+
+ except:
+ traceback.print_exc()
+ self._log(CRITICAL, "manager thread crashed")
+
</t>
<t tx="aum.20060507003931">def put(self, uri="CHK@", **kw):
"""
@@ -668,42 +728,21 @@
Keywords valid for all modes:
- maxretries - maximum number of retries, default 3
- priority - default 1
+ - async - whether to do the job asynchronously, returning a job ticket
+ object (default False)
Notes:
- exactly one of 'file', 'data' or 'dir' keyword arguments must be
present
"""
-#ClientPut
-#URI=CHK@ // could as easily be an insertable SSK or KSK URI
-#Metadata.ContentType=text/html // MIME type; for text, if charset is not
specified, node #should auto-detect it and force the auto-detected version
-#Identifier=Insert-1 // identifier, as always
-#Verbosity=0 // just report when complete
-#MaxRetries=999999 // lots of retries; -1 = retry forever
-#PriorityClass=1 // fproxy priority level
-#GetCHKOnly=false // true = don't insert the data, just return the key it
would generate
-#Global=false // see Persistence section below
-#DontCompress=true // hint to node: don't try to compress the data, it's
already compressed
-#ClientToken=Hello!!! // sent back to client on the PersistentPut if this is a
persistent #request
-# the following fields decide where the data is to come from:
-
-#UploadFrom=direct // attached directly to this message
-#DataLength=100 // 100 bytes decimal
-#Data
-#<data>
-# or
-#UploadFrom=disk // upload a file from disk
-#Filename=/home/toad/something.html
-#End
-# or
-#UploadFrom=redirect // create a redirect to another key
-#TargetURI=KSK at gpl.txt // some other freenet URI
-#End
-
# divert to putdir if dir keyword present
if kw.has_key('dir'):
return self.putdir(uri, **kw)
opts = {}
+
+ opts['async'] = kw.get('async', False)
+
opts['URI'] = uri
opts['Metadata.ContentType'] = kw.get("mimetype", "text/plain")
id = self._getUniqueId()
@@ -714,60 +753,36 @@
opts['GetCHKOnly'] = toBool(kw.get("chkonly", "false"))
opts['DontCompress'] = toBool(kw.get("nocompress", "false"))
- # if inserting a freesite, scan the directory and insert each bit piecemeal
- if kw.has_key("dir"):
- if kw.get('usk', False):
- uri = uri.replace("SSK@", "USK@")
- if not uri.endswith("/"):
- uri = uri + "/"
-
- # form a base privkey-based URI
- siteuri = uri + "%s/%s/" % (kw['sitename'], kw.get('version', 1))
-
+ if kw.has_key("file"):
opts['UploadFrom'] = "disk"
-
- # upload all files in turn - rework this later when queueing is
implemented
- files = readdir(kw['dir'])
- for f in files:
- thisuri = siteuri + f['relpath']
- opts['file'] = f['fullpath']
- opts['mimetype'] = f['mimetype']
- self.put(thisuri, **opts)
-
- # last bit - insert index.html
- opts['file'] = os.path.join(kw['dir'], "index.html")
- thisuri = siteuri + "index.html"
- opts['mimetype'] = "text/html"
- self.put(thisuri, **opts)
-
- return uri
-
- elif kw.has_key("file"):
- opts['UploadFrom'] = "disk"
opts['Filename'] = kw['file']
if not kw.has_key("mimetype"):
opts['Metadata.ContentType'] = mimetypes.guess_type(kw['file'])[0]
or "text/plain"
- sendEnd = True
elif kw.has_key("data"):
opts["UploadFrom"] = "direct"
opts["Data"] = kw['data']
- sendEnd = False
elif kw.has_key("redirect"):
opts["UploadFrom"] = "redirect"
opts["TargetURI"] = kw['redirect']
- sendEnd = True
else:
raise Exception("Must specify file, data or redirect keywords")
#print "sendEnd=%s" % sendEnd
+ # now dispatch the job
+ return self._submitCmd(id, "ClientPut", **opts)
+
+
+ # ------------------------------------------------------------
+ # DEPRECATED CODE
+
# issue the command
- self._sendMessage("ClientPut", sendEnd, **opts)
+ self._txMsg("ClientPut", **opts)
# expect URIGenerated
- resp1 = self._receiveMessage()
+ resp1 = self._rxMsg()
hdr = resp1['header']
if hdr != 'URIGenerated':
raise FCPException(resp1)
@@ -780,7 +795,7 @@
return newUri
# expect outcome
- resp2 = self._receiveMessage()
+ resp2 = self._rxMsg()
hdr = resp2['header']
if hdr == 'PutSuccessful':
return resp2['URI']
@@ -791,6 +806,7 @@
else:
raise FCPException(resp2)
+
</t>
<t tx="aum.20060507124316">def _log(self, level, msg):
"""
@@ -6158,6 +6174,8 @@
- maxretries - maximum number of retries, default 3
- priority - default 1
+ - async - default False - if True, return a job ticket
+
Returns:
- the URI under which the freesite can be retrieved
"""
@@ -6179,7 +6197,7 @@
uriFull = uriFull.replace("SSK@", "USK@")
# issue the command
- #self._sendMessage("ClientPutComplexDir", True, **opts)
+ #self._txMsg("ClientPutComplexDir", True, **opts)
msgLines = ["ClientPutComplexDir",
"Identifier=%s" % id,
"Verbosity=0",
@@ -6217,10 +6235,21 @@
for line in msgLines:
self._log(DETAIL, line)
fullbuf = "\n".join(msgLines) + "\n"
+
+ # now dispatch the job
+ return self._submitCmd(id, "ClientPutComplexDir",
+ rawcmd=fullbuf,
+ async=kw.get('async', False),
+ )
+
+ # ------------------------------------------------------------
+ # DEPRECATED CODE
+
+
self.socket.send(fullbuf)
# expect URIGenerated
- resp1 = self._receiveMessage()
+ resp1 = self._rxMsg()
hdr = resp1['header']
if hdr != 'URIGenerated':
raise FCPException(resp1)
@@ -6228,7 +6257,7 @@
newUri = resp1['URI']
# expect outcome
- resp2 = self._receiveMessage()
+ resp2 = self._rxMsg()
hdr = resp2['header']
if hdr == 'PutSuccessful':
return resp2['URI']
@@ -6239,6 +6268,7 @@
else:
raise FCPException(resp2)
+
</t>
<t tx="aum.20060511003500">from fcp import *
@@ -6269,24 +6299,37 @@
@others
</t>
-<t tx="aum.20060511103841.1">def __init__(self, id):
+<t tx="aum.20060511103841.1">def __init__(self, id, cmd, kw):
"""
You should never instantiate a JobTicket object yourself
"""
self.id = id
- self.queue = Queue.Queue()
+ self.cmd = cmd
+ self.kw = kw
+ self.lock = threading.Lock()
+ self.lock.acquire()
+ self.result = None
+
</t>
<t tx="aum.20060511103952">def isComplete(self):
"""
Returns True if the job has been completed
"""
+ return self.result != None
</t>
<t tx="aum.20060511103952.1">def wait(self, timeout=None):
"""
Waits forever (or for a given timeout) for a job to complete
"""
+ self.lock.acquire()
+ self.lock.release()
+ if isinstance(self.result, Exception):
+ raise self.result
+ else:
+ return self.result
+
</t>
<t tx="aum.20060511113333">import fcp, sys, os, sha
@@ -6323,7 +6366,11 @@
print "Updating site %s" % sitename
print "privatekey=%s" % privatekey
noSites = False
- self.node.put(privatekey, dir=dir, name=sitename, version=version,
usk=True)
+ res = self.node.put(privatekey,
+ dir=dir,
+ name=sitename,
+ version=version,
+ usk=True)
conf.set(sitename, "hash", hashNew)
self.saveConfig()
@@ -6331,6 +6378,8 @@
if noSites:
print "No sites needed updating"
+ return res
+
</t>
<t tx="aum.20060511114439">class SiteMgr:
"""
@@ -6452,11 +6501,12 @@
s = SiteMgr()
s.update()
+ s.shutdown()
</t>
<t tx="aum.20060511120059">def createNode(self, **kw):
- kw = {}
+ #kw = {}
if fcpHost and not kw.has_key("fcpHost"):
kw['host'] = fcpHost
@@ -6466,6 +6516,8 @@
kw['verbosity'] = verbosity
if logfile and not kw.has_key("logfile"):
kw['logfile'] = logfile
+
+ #print kw
self.node = fcp.FCPNodeConnection(**kw)
@@ -6489,5 +6541,307 @@
sys.exit(0)
</t>
+<t tx="aum.20060511205201">def shutdown(self):
+ """
+ Terminates the manager thread
+ """
+ self.running = False
+
+ # give the manager thread a chance to bail out
+ time.sleep(pollTimeout * 3)
+
+ # shut down FCP connection
+ if hasattr(self, 'socket'):
+ self.socket.close()
+ del self.socket
+
+ # and close the logfile
+ if self.logfile not in [sys.stdout, sys.stderr]:
+ self.logfile.close()
+
+
+</t>
+<t tx="aum.20060511205201.1">def _on_rxMsg(self, msg):
+ """
+ Handler for incoming messages from node
+ """
+ log = self._log
+
+ # find the job this relates to
+ id = msg['Identifier']
+ hdr = msg['header']
+
+ job = self.jobs.get(id, None)
+
+ # bail if job not known
+ if not job:
+ log(ERROR, "Received %s for unknown job %s" % (hdr, id))
+ return
+
+ # action from here depends on what kind of message we got
+
+ # -----------------------------
+ # handle GenerateSSK responses
+
+ if hdr == 'SSKKeypair':
+ # got requested keys back
+ job._putResult((msg['RequestURI'], msg['InsertURI']))
+
+ # and remove job from queue
+ self.jobs.pop(id, None)
+ return
+
+ # -----------------------------
+ # handle ClientGet responses
+
+ if hdr == 'DataFound':
+ mimetype = msg['Metadata.ContentType']
+ if job.kw.has_key('Filename'):
+ # already stored to disk, done
+ #resp['file'] = file
+ job._putResult((mimetype, job.kw['Filename']))
+ del self.jobs[id]
+ return
+
+ elif job.kw['ReturnType'] == 'none':
+ job._putResult((mimetype, 1))
+ del self.jobs[id]
+ return
+
+ # otherwise, we're expecting an AllData and will react to it then
+ else:
+ job.mimetype = mimetype
+ return
+
+ if hdr == 'AllData':
+ job._putResult((job.mimetype, msg['Data']))
+ del self.jobs[id]
+ return
+
+ if hdr == 'GetFailed':
+ # return an exception
+ job._putResult(FCPGetFailed(msg))
+ del self.jobs[id]
+ return
+
+ # -----------------------------
+ # handle ClientPut responses
+
+ if hdr == 'URIGenerated':
+
+ job.uri = msg['URI']
+ newUri = msg['URI']
+
+ return
+
+ # bail here if no data coming back
+ if job.kw.get('GetCHKOnly', False) == 'true':
+ # done - only wanted a CHK
+ job._putResult(newUri)
+ del self.jobs[id]
+ return
+
+ if hdr == 'PutSuccessful':
+ job._putResult(msg['URI'])
+ del self.jobs[id]
+ return
+
+ if hdr == 'PutFailed':
+ job._putResult(FCPPutFailed(msg))
+ del self.jobs[id]
+ return
+
+ # -----------------------------
+ # handle progress messages
+
+ if hdr == 'StartedCompression':
+ job.notify(msg)
+ return
+
+ if hdr == 'FinishedCompression':
+ job.notify(msg)
+ return
+
+ if hdr == 'SimpleProgress':
+ return
+
+ # -----------------------------
+ # handle persistent job messages
+
+ if hdr == 'PersistentGet':
+ return
+
+ if hdr == 'PersistentPut':
+ return
+
+ if hdr == 'EndListPersistentRequests':
+ return
+
+ if hdr == 'PersistentPutDir':
+ return
+
+ # -----------------------------
+ # handle various errors
+
+ if hdr == 'ProtocolError':
+ job._putResult(FCPProtocolError(msg))
+ del self.jobs[id]
+ return
+
+ if hdr == 'IdentifierCollision':
+ log(ERROR, "IdentifierCollision on id %s ???" % id)
+ job._putResult(Exception("Duplicate job identifier %s" % id))
+ del self.jobs[id]
+ return
+
+ # -----------------------------
+ # wtf is happening here?!?
+
+ log(ERROR, "Unknown message type from node: %s" % hdr)
+ job._putResult(FCPException(msg))
+ del self.jobs[id]
+ return
+
+</t>
+<t tx="aum.20060511205201.2">def _on_clientReq(self, req):
+ """
+ handler for incoming requests from clients
+ """
+ id = req.id
+ cmd = req.cmd
+ kw = req.kw
+
+ # register the req
+ self.jobs[id] = req
+
+ # now can send, since we're the only one who will
+ self._txMsg(cmd, **kw)
+
+</t>
+<t tx="aum.20060511222538">def _msgIncoming(self):
+ """
+ Returns True if a message is coming in from the node
+ """
+ return len(select.select([self.socket], [], [], pollTimeout)[0]) > 0
+
+</t>
+<t tx="aum.20060512101715">def _submitCmd(self, id, cmd, **kw):
+ """
+ Submits a command for execution
+
+ Arguments:
+ - id - the command identifier
+ - cmd - the command name, such as 'ClientPut'
+
+ Keywords:
+ - async - whether to return a JobTicket object, rather than
+ the command result
+ - rawcmd - a raw command buffer to send directly
+ - options specific to command
+
+ Returns:
+ - if command is sent in sync mode, returns the result
+ - if command is sent in async mode, returns a JobTicket
+ object which the client can poll or block on later
+ """
+ async = kw.pop('async', False)
+ job = JobTicket(id, cmd, kw)
+ self.clientReqQueue.put(job)
+
+ self._log(DEBUG, "_submitCmd: id=%s cmd=%s kw=%s" % (id, cmd,
str(kw)[:256]))
+
+ if async:
+ return job
+ else:
+ return job.wait()
+
+</t>
+<t tx="aum.20060512102840">def _putResult(self, result):
+ """
+ Called by manager thread to indicate job is complete,
+ and submit a result to be picked up by client
+ """
+ self.result = result
+ self.lock.release()
+
+</t>
+<t tx="aum.20060512140230">@first #!/usr/bin/env python
+
+import sys, os, time
+import sitemgr
+
+startupTime = 60
+freenetDir = "/home/david/freenet"
+pidFile = os.path.join(freenetDir, "Freenet.pid")
+
+def main(verbose=None):
+
+ if verbose == None:
+ verbose = ('-v' in sys.argv)
+
+ print "--------------------------------------------"
+ print "Start of site updating run"
+
+ # start freenet and let it warm up, if it's not already running
+ if not os.path.isfile(pidFile):
+ startingFreenet = True
+ os.chdir(freenetDir)
+ print "Starting freenet..."
+ os.system("./run.sh start")
+ print "Letting node settle for %s seconds..." % startupTime
+ time.sleep(startupTime)
+ else:
+ print "Freenet node is already running!"
+ startingFreenet = False
+
+ if verbose:
+ kw = {"verbosity" : sitemgr.fcp.DETAIL}
+ else:
+ kw = {}
+ #print "updatesites: kw=%s" % str(kw)
+ s = sitemgr.SiteMgr(**kw)
+ uri = s.update()
+ del s
+ print "Site updated: uri=%s" % uri
+
+ # kill freenet if it was dynamically started
+ if startingFreenet:
+ print "Waiting %s for inserts to finish..." % startupTime
+ time.sleep(startupTime)
+ print "Stopping node..."
+ os.system("./run.sh stop")
+ print "Node stopped"
+
+if __name__ == '__main__':
+ main()
+
+</t>
+<t tx="aum.20060512150118">def __del__(self):
+
+ try:
+ del self.node
+ self.node = None
+ except:
+ pass
+
+</t>
+<t tx="aum.20060512152233">@first #! /usr/bin/env python
+
+import time
+
+import fcp
+
+n = fcp.FCPNodeConnection(host="thoth", verbosity=fcp.ERROR)
+pub, priv = n.genkey()
+print pub
+print priv
+
+n.shutdown()
+
+#del n
+#time.sleep(3)
+#print "done"
+
+</t>
</tnodes>
</leo_file>
Modified: trunk/apps/pyFreenet/fcp.py
===================================================================
--- trunk/apps/pyFreenet/fcp.py 2006-05-12 02:43:18 UTC (rev 8672)
+++ trunk/apps/pyFreenet/fcp.py 2006-05-12 03:47:00 UTC (rev 8673)
@@ -18,7 +18,9 @@
"""
-import sys, os, socket, time, thread, threading, mimetypes, sha
+import sys, os, socket, time, thread
+import threading, mimetypes, sha, Queue
+import select, traceback
from SimpleXMLRPCServer import SimpleXMLRPCServer
@@ -53,20 +55,28 @@
class FCPProtocolError(FCPException):
pass
+# where we can find the freenet node FCP port
defaultFCPHost = "127.0.0.1"
defaultFCPPort = 9481
+# where to listen, for the xml-rpc server
xmlrpcHost = "127.0.0.1"
xmlrpcPort = 19481
+# poll timeout period for manager thread
+pollTimeout = 0.1
+#pollTimeout = 3
+
# list of keywords sent from node to client, which have
# int values
intKeys = [
'DataLength', 'Code',
]
+# for the FCP 'ClientHello' handshake
expectedVersion="2.0"
+# logger verbosity levels
SILENT = 0
FATAL = 1
CRITICAL = 2
@@ -93,10 +103,12 @@
- verbosity - how detailed the log messages should be, defaults to 0
(silence)
"""
+ # grab and save parms
self.name = kw.get('clientName', self._getUniqueId())
self.host = kw.get('host', defaultFCPHost)
self.port = kw.get('port', defaultFCPPort)
-
+
+ # set up the logger
logfile = kw.get('logfile', sys.stderr)
if not hasattr(logfile, 'write'):
# might be a pathname
@@ -104,52 +116,75 @@
raise Exception("Bad logfile, must be pathname or file object")
logfile = file(logfile, "a")
self.logfile = logfile
-
self.verbosity = kw.get('verbosity', 0)
- # try to connect
+ # try to connect to node
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.connect((self.host, self.port))
# now do the hello
- self.hello()
+ self._hello()
- # the incoming response queues
- self.pendingResponses = {} # keyed by request ID
+ # the pending job tickets
+ self.jobs = {} # keyed by request ID
- # lock for socket operations
- self.socketLock = threading.Lock()
+ # queue for incoming client requests
+ self.clientReqQueue = Queue.Queue()
# launch receiver thread
- #thread.start_new_thread(self.rxThread, ())
+ self.running = True
+ thread.start_new_thread(self._mgrThread, ())
def __del__(self):
"""
object is getting cleaned up, so disconnect
"""
- if self.socket:
- self.socket.close()
- del self.socket
+ # terminate the node
+ try:
+ self.shutdown()
+ except:
+ traceback.print_exc()
+ pass
- if self.logfile not in [sys.stdout, sys.stderr]:
- self.logfile.close()
-
# high level client methods
- def hello(self):
+ def _hello(self):
- self._sendMessage("ClientHello",
+ self._txMsg("ClientHello",
Name=self.name,
ExpectedVersion=expectedVersion)
- resp = self._receiveMessage()
+ resp = self._rxMsg()
return resp
+ def genkey(self, **kw):
+ """
+ Generates and returns an SSK keypair
+
+ Keywords:
+ - async - whether to do this call asynchronously, and
+ return a JobTicket object
+ """
+ id = self._getUniqueId()
+
+ return self._submitCmd(id, "GenerateSSK", Identifier=id, **kw)
+
+ #self._txMsg("GenerateSSK",
+ # Identifier=id)
+ #while True:
+ # resp = self._rxMsg()
+ # #print resp
+ # if resp['header'] == 'SSKKeypair' and str(resp['Identifier']) ==
id:
+ # break
+ #return resp['RequestURI'], resp['InsertURI']
+
def get(self, uri, **kw):
"""
Does a direct get of a key
Keywords:
+ - async - whether to return immediately with a job ticket object,
default
+ False (wait for completion)
- dsnly - whether to only check local datastore
- ignoreds - don't check local datastore
- file - if given, this is a pathname to which to store the
retrieved key
@@ -163,12 +198,18 @@
- if 'dontReturnData' is true, returns (mimetype, 1) if key is
returned
If key is not found, raises an exception
"""
+ # format the request
opts = {}
+ id = self._getUniqueId()
+
+ opts['async'] = kw.pop('async', False)
+
file = kw.pop("file", None)
if file:
opts['ReturnType'] = "disk"
- opts['File'] = file
+ #opts['File'] = file
+ opts['Filename'] = file
elif opts.get('nodata', False):
nodata = True
@@ -177,7 +218,7 @@
nodata = False
opts['ReturnType'] = "direct"
- opts['Identifier'] = self._getUniqueId()
+ opts['Identifier'] = id
if kw.get("ignoreds", False):
opts["IgnoreDS"] = "true"
@@ -197,28 +238,19 @@
opts['PriorityClass'] = int(kw.get("priority", 1))
opts['Global'] = "false"
- self._sendMessage("ClientGet", **opts)
+ # now enqueue the request
+ return self._submitCmd(id, "ClientGet", **opts)
+
+
+
+ # ------------------------------------------------
+ # DEPRECATED CODE!!
+
+ self._txMsg("ClientGet", **opts)
- #ClientGet
- #IgnoreDS=false // true = ignore the datastore (in old FCP this was
RemoveLocalKey)
- #DSOnly=false // true = only check the datastore, don't route (~= htl 0)
- #URI=KSK at gpl.txt // key to fetch
- #Identifier=Request Number One
- #Verbosity=0 // no status, just tell us when it's done
- #ReturnType=direct // return all at once over the FCP connection
- #MaxSize=100 // maximum size of returned data (all numbers in DECIMAL)
- #MaxTempSize=1000 // maximum size of intermediary data
- #MaxRetries=100 // automatic retry supported as an option; -1 means retry
forever
- #PriorityClass=1 // priority class 1 = interactive
- #Persistence=reboot // continue until node is restarted; report progress
while client is
- # connected, including if it reconnects after losing connection
- #ClientToken=hello // returned in PersistentGet, a hint to the client, so
the client
- # doesn't need to maintain its own state
- #Global=false // see Persistence section below
- #EndMessage
# get a response
- resp = self._receiveMessage()
+ resp = self._rxMsg()
hdr = resp['header']
if hdr == 'DataFound':
mimetype = resp['Metadata.ContentType']
@@ -229,7 +261,7 @@
elif nodata:
return (mimetype, 1)
else:
- resp = self._receiveMessage()
+ resp = self._rxMsg()
if resp['header'] == 'AllData':
return (mimetype, resp['Data'])
else:
@@ -269,42 +301,21 @@
Keywords valid for all modes:
- maxretries - maximum number of retries, default 3
- priority - default 1
+ - async - whether to do the job asynchronously, returning a job
ticket
+ object (default False)
Notes:
- exactly one of 'file', 'data' or 'dir' keyword arguments must be
present
"""
- #ClientPut
- #URI=CHK@ // could as easily be an insertable SSK or KSK URI
- #Metadata.ContentType=text/html // MIME type; for text, if charset is not
specified, node #should auto-detect it and force the auto-detected version
- #Identifier=Insert-1 // identifier, as always
- #Verbosity=0 // just report when complete
- #MaxRetries=999999 // lots of retries; -1 = retry forever
- #PriorityClass=1 // fproxy priority level
- #GetCHKOnly=false // true = don't insert the data, just return the key it
would generate
- #Global=false // see Persistence section below
- #DontCompress=true // hint to node: don't try to compress the data, it's
already compressed
- #ClientToken=Hello!!! // sent back to client on the PersistentPut if this
is a persistent #request
- # the following fields decide where the data is to come from:
-
- #UploadFrom=direct // attached directly to this message
- #DataLength=100 // 100 bytes decimal
- #Data
- #<data>
- # or
- #UploadFrom=disk // upload a file from disk
- #Filename=/home/toad/something.html
- #End
- # or
- #UploadFrom=redirect // create a redirect to another key
- #TargetURI=KSK at gpl.txt // some other freenet URI
- #End
-
# divert to putdir if dir keyword present
if kw.has_key('dir'):
return self.putdir(uri, **kw)
opts = {}
+
+ opts['async'] = kw.get('async', False)
+
opts['URI'] = uri
opts['Metadata.ContentType'] = kw.get("mimetype", "text/plain")
id = self._getUniqueId()
@@ -315,60 +326,36 @@
opts['GetCHKOnly'] = toBool(kw.get("chkonly", "false"))
opts['DontCompress'] = toBool(kw.get("nocompress", "false"))
- # if inserting a freesite, scan the directory and insert each bit
piecemeal
- if kw.has_key("dir"):
- if kw.get('usk', False):
- uri = uri.replace("SSK@", "USK@")
- if not uri.endswith("/"):
- uri = uri + "/"
-
- # form a base privkey-based URI
- siteuri = uri + "%s/%s/" % (kw['sitename'], kw.get('version', 1))
-
+ if kw.has_key("file"):
opts['UploadFrom'] = "disk"
-
- # upload all files in turn - rework this later when queueing is
implemented
- files = readdir(kw['dir'])
- for f in files:
- thisuri = siteuri + f['relpath']
- opts['file'] = f['fullpath']
- opts['mimetype'] = f['mimetype']
- self.put(thisuri, **opts)
-
- # last bit - insert index.html
- opts['file'] = os.path.join(kw['dir'], "index.html")
- thisuri = siteuri + "index.html"
- opts['mimetype'] = "text/html"
- self.put(thisuri, **opts)
-
- return uri
-
- elif kw.has_key("file"):
- opts['UploadFrom'] = "disk"
opts['Filename'] = kw['file']
if not kw.has_key("mimetype"):
opts['Metadata.ContentType'] =
mimetypes.guess_type(kw['file'])[0] or "text/plain"
- sendEnd = True
elif kw.has_key("data"):
opts["UploadFrom"] = "direct"
opts["Data"] = kw['data']
- sendEnd = False
elif kw.has_key("redirect"):
opts["UploadFrom"] = "redirect"
opts["TargetURI"] = kw['redirect']
- sendEnd = True
else:
raise Exception("Must specify file, data or redirect keywords")
#print "sendEnd=%s" % sendEnd
+ # now dispatch the job
+ return self._submitCmd(id, "ClientPut", **opts)
+
+
+ # ------------------------------------------------------------
+ # DEPRECATED CODE
+
# issue the command
- self._sendMessage("ClientPut", sendEnd, **opts)
+ self._txMsg("ClientPut", **opts)
# expect URIGenerated
- resp1 = self._receiveMessage()
+ resp1 = self._rxMsg()
hdr = resp1['header']
if hdr != 'URIGenerated':
raise FCPException(resp1)
@@ -381,7 +368,7 @@
return newUri
# expect outcome
- resp2 = self._receiveMessage()
+ resp2 = self._rxMsg()
hdr = resp2['header']
if hdr == 'PutSuccessful':
return resp2['URI']
@@ -392,6 +379,7 @@
else:
raise FCPException(resp2)
+
def putdir(self, uri, **kw):
"""
Inserts a freesite
@@ -409,6 +397,8 @@
- maxretries - maximum number of retries, default 3
- priority - default 1
+ - async - default False - if True, return a job ticket
+
Returns:
- the URI under which the freesite can be retrieved
"""
@@ -430,7 +420,7 @@
uriFull = uriFull.replace("SSK@", "USK@")
# issue the command
- #self._sendMessage("ClientPutComplexDir", True, **opts)
+ #self._txMsg("ClientPutComplexDir", True, **opts)
msgLines = ["ClientPutComplexDir",
"Identifier=%s" % id,
"Verbosity=0",
@@ -468,10 +458,21 @@
for line in msgLines:
self._log(DETAIL, line)
fullbuf = "\n".join(msgLines) + "\n"
+
+ # now dispatch the job
+ return self._submitCmd(id, "ClientPutComplexDir",
+ rawcmd=fullbuf,
+ async=kw.get('async', False),
+ )
+
+ # ------------------------------------------------------------
+ # DEPRECATED CODE
+
+
self.socket.send(fullbuf)
# expect URIGenerated
- resp1 = self._receiveMessage()
+ resp1 = self._rxMsg()
hdr = resp1['header']
if hdr != 'URIGenerated':
raise FCPException(resp1)
@@ -479,7 +480,7 @@
newUri = resp1['URI']
# expect outcome
- resp2 = self._receiveMessage()
+ resp2 = self._rxMsg()
hdr = resp2['header']
if hdr == 'PutSuccessful':
return resp2['URI']
@@ -490,63 +491,293 @@
else:
raise FCPException(resp2)
- def genkey(self, id=None):
+
+ def shutdown(self):
"""
- Generates and returns an SSK keypair
+ Terminates the manager thread
"""
- if not id:
- id = self._getUniqueId()
+ self.running = False
+
+ # give the manager thread a chance to bail out
+ time.sleep(pollTimeout * 3)
+
+ # shut down FCP connection
+ if hasattr(self, 'socket'):
+ self.socket.close()
+ del self.socket
+
+ # and close the logfile
+ if self.logfile not in [sys.stdout, sys.stderr]:
+ self.logfile.close()
+
+
+
+
+
+ # methods for manager thread
+
+ def _mgrThread(self):
+ """
+ Receives all incoming messages
+ """
+ log = self._log
+ try:
+ while self.running:
+
+ log(DEBUG, "Top of manager thread")
+
+ # try for incoming messages from node
+ log(DEBUG, "Testing for incoming message")
+ if self._msgIncoming():
+ log(DEBUG, "Retrieving incoming message")
+ msg = self._rxMsg()
+ log(DEBUG, "Got incoming message, dispatching")
+ self._on_rxMsg(msg)
+ log(DEBUG, "back from on_rxMsg")
+ else:
+ log(DEBUG, "No incoming message from node")
- self._sendMessage("GenerateSSK",
- Identifier=id)
+ # try for incoming requests from clients
+ log(DEBUG, "Testing for client req")
+ try:
+ req = self.clientReqQueue.get(True, pollTimeout)
+ log(DEBUG, "Got client req, dispatching")
+ self._on_clientReq(req)
+ log(DEBUG, "Back from on_clientReq")
+ except Queue.Empty:
+ log(DEBUG, "No incoming client req")
+ pass
+
+ self._log(INFO, "Manager thread terminated normally")
+ return
+
+ except:
+ traceback.print_exc()
+ self._log(CRITICAL, "manager thread crashed")
+
+ def _msgIncoming(self):
+ """
+ Returns True if a message is coming in from the node
+ """
+ return len(select.select([self.socket], [], [], pollTimeout)[0]) > 0
+
+ def _submitCmd(self, id, cmd, **kw):
+ """
+ Submits a command for execution
- while True:
- resp = self._receiveMessage()
- #print resp
- if resp['header'] == 'SSKKeypair' and str(resp['Identifier']) ==
id:
- break
+ Arguments:
+ - id - the command identifier
+ - cmd - the command name, such as 'ClientPut'
+
+ Keywords:
+ - async - whether to return a JobTicket object, rather than
+ the command result
+ - rawcmd - a raw command buffer to send directly
+ - options specific to command
+
+ Returns:
+ - if command is sent in sync mode, returns the result
+ - if command is sent in async mode, returns a JobTicket
+ object which the client can poll or block on later
+ """
+ async = kw.pop('async', False)
+ job = JobTicket(id, cmd, kw)
+ self.clientReqQueue.put(job)
- return resp['RequestURI'], resp['InsertURI']
+ self._log(DEBUG, "_submitCmd: id=%s cmd=%s kw=%s" % (id, cmd,
str(kw)[:256]))
+ if async:
+ return job
+ else:
+ return job.wait()
+ def _on_rxMsg(self, msg):
+ """
+ Handler for incoming messages from node
+ """
+ log = self._log
+ # find the job this relates to
+ id = msg['Identifier']
+ hdr = msg['header']
- # methods for receiver thread
+ job = self.jobs.get(id, None)
+
+ # bail if job not known
+ if not job:
+ log(ERROR, "Received %s for unknown job %s" % (hdr, id))
+ return
- def _rxThread(self):
+ # action from here depends on what kind of message we got
+
+ # -----------------------------
+ # handle GenerateSSK responses
+
+ if hdr == 'SSKKeypair':
+ # got requested keys back
+ job._putResult((msg['RequestURI'], msg['InsertURI']))
+
+ # and remove job from queue
+ self.jobs.pop(id, None)
+ return
+
+ # -----------------------------
+ # handle ClientGet responses
+
+ if hdr == 'DataFound':
+ mimetype = msg['Metadata.ContentType']
+ if job.kw.has_key('Filename'):
+ # already stored to disk, done
+ #resp['file'] = file
+ job._putResult((mimetype, job.kw['Filename']))
+ del self.jobs[id]
+ return
+
+ elif job.kw['ReturnType'] == 'none':
+ job._putResult((mimetype, 1))
+ del self.jobs[id]
+ return
+
+ # otherwise, we're expecting an AllData and will react to it then
+ else:
+ job.mimetype = mimetype
+ return
+
+ if hdr == 'AllData':
+ job._putResult((job.mimetype, msg['Data']))
+ del self.jobs[id]
+ return
+
+ if hdr == 'GetFailed':
+ # return an exception
+ job._putResult(FCPGetFailed(msg))
+ del self.jobs[id]
+ return
+
+ # -----------------------------
+ # handle ClientPut responses
+
+ if hdr == 'URIGenerated':
+
+ job.uri = msg['URI']
+ newUri = msg['URI']
+
+ return
+
+ # bail here if no data coming back
+ if job.kw.get('GetCHKOnly', False) == 'true':
+ # done - only wanted a CHK
+ job._putResult(newUri)
+ del self.jobs[id]
+ return
+
+ if hdr == 'PutSuccessful':
+ job._putResult(msg['URI'])
+ del self.jobs[id]
+ return
+
+ if hdr == 'PutFailed':
+ job._putResult(FCPPutFailed(msg))
+ del self.jobs[id]
+ return
+
+ # -----------------------------
+ # handle progress messages
+
+ if hdr == 'StartedCompression':
+ job.notify(msg)
+ return
+
+ if hdr == 'FinishedCompression':
+ job.notify(msg)
+ return
+
+ if hdr == 'SimpleProgress':
+ return
+
+ # -----------------------------
+ # handle persistent job messages
+
+ if hdr == 'PersistentGet':
+ return
+
+ if hdr == 'PersistentPut':
+ return
+
+ if hdr == 'EndListPersistentRequests':
+ return
+
+ if hdr == 'PersistentPutDir':
+ return
+
+ # -----------------------------
+ # handle various errors
+
+ if hdr == 'ProtocolError':
+ job._putResult(FCPProtocolError(msg))
+ del self.jobs[id]
+ return
+
+ if hdr == 'IdentifierCollision':
+ log(ERROR, "IdentifierCollision on id %s ???" % id)
+ job._putResult(Exception("Duplicate job identifier %s" % id))
+ del self.jobs[id]
+ return
+
+ # -----------------------------
+ # wtf is happening here?!?
+
+ log(ERROR, "Unknown message type from node: %s" % hdr)
+ job._putResult(FCPException(msg))
+ del self.jobs[id]
+ return
+
+ def _on_clientReq(self, req):
"""
- Receives all incoming messages
+ handler for incoming requests from clients
"""
- while self.running:
- self.socketLock.acquire()
- self.socket.settimeout(0.1)
- try:
- msg = self._receiveMessage()
- except socket.timeout:
- self.socketLock.release()
- continue
-
+ id = req.id
+ cmd = req.cmd
+ kw = req.kw
+ # register the req
+ self.jobs[id] = req
+
+ # now can send, since we're the only one who will
+ self._txMsg(cmd, **kw)
+
+
# low level noce comms methods
def _getUniqueId(self):
return "id" + str(int(time.time() * 1000000))
- def _sendMessage(self, msgType, sendEndMessage=True, **kw):
+ def _txMsg(self, msgType, **kw):
"""
low level message send
Arguments:
- msgType - one of the FCP message headers, such as 'ClientHello'
- args - zero or more (keyword, value) tuples
+ Keywords:
+ - rawcmd - if given, this is the raw buffer to send
"""
+ log = self._log
+
+ # just send the raw command, if given
+ rawcmd = kw.get('rawcmd', None)
+ if rawcmd:
+ self.socket.send(rawcmd)
+ log(DETAIL, "CLIENT: %s" % rawcmd)
+ return
+
if kw.has_key("Data"):
data = kw.pop("Data")
+ sendEndMessage = False
else:
data = None
+ sendEndMessage = True
- log = self._log
-
items = [msgType + "\n"]
log(DETAIL, "CLIENT: %s" % msgType)
@@ -573,7 +804,7 @@
self.socket.send(raw)
- def _receiveMessage(self):
+ def _rxMsg(self):
"""
Receives and returns a message as a dict
@@ -665,34 +896,55 @@
self.logfile.write(msg)
self.logfile.flush()
- class JobTicket:
+
+class JobTicket:
+ """
+ A JobTicket is an object returned to clients making
+ asynchronous requests. It puts them in control of how
+ they manage n concurrent requests.
+
+ When you as a client receive a JobTicket, you can choose to:
+ - block, awaiting completion of the job
+ - poll the job for completion status
+ - receive a callback upon completion
+ """
+ def __init__(self, id, cmd, kw):
"""
- A JobTicket is an object returned to clients making
- asynchronous requests. It puts them in control of how
- they manage n concurrent requests.
-
- When you as a client receive a JobTicket, you can choose to:
- - block, awaiting completion of the job
- - poll the job for completion status
- - receive a callback upon completion
+ You should never instantiate a JobTicket object yourself
"""
- def __init__(self, id):
- """
- You should never instantiate a JobTicket object yourself
- """
- self.id = id
- self.queue = Queue.Queue()
-
- def isComplete(self):
- """
- Returns True if the job has been completed
- """
-
- def wait(self, timeout=None):
- """
- Waits forever (or for a given timeout) for a job to complete
- """
+ self.id = id
+ self.cmd = cmd
+ self.kw = kw
+ self.lock = threading.Lock()
+ self.lock.acquire()
+ self.result = None
+
+ def isComplete(self):
+ """
+ Returns True if the job has been completed
+ """
+ return self.result != None
+
+ def wait(self, timeout=None):
+ """
+ Waits forever (or for a given timeout) for a job to complete
+ """
+ self.lock.acquire()
+ self.lock.release()
+ if isinstance(self.result, Exception):
+ raise self.result
+ else:
+ return self.result
+
+ def _putResult(self, result):
+ """
+ Called by manager thread to indicate job is complete,
+ and submit a result to be picked up by client
+ """
+ self.result = result
+ self.lock.release()
+
class FreenetXMLRPCRequest:
"""
Modified: trunk/apps/pyFreenet/sitemgr.py
===================================================================
--- trunk/apps/pyFreenet/sitemgr.py 2006-05-12 02:43:18 UTC (rev 8672)
+++ trunk/apps/pyFreenet/sitemgr.py 2006-05-12 03:47:00 UTC (rev 8673)
@@ -46,6 +46,14 @@
self.loadConfig()
+ def __del__(self):
+
+ try:
+ del self.node
+ self.node = None
+ except:
+ pass
+
def createConfig(self):
"""
Creates a whole new config
@@ -70,7 +78,7 @@
def createNode(self, **kw):
- kw = {}
+ #kw = {}
if fcpHost and not kw.has_key("fcpHost"):
kw['host'] = fcpHost
@@ -80,6 +88,8 @@
kw['verbosity'] = verbosity
if logfile and not kw.has_key("logfile"):
kw['logfile'] = logfile
+
+ #print kw
self.node = fcp.FCPNodeConnection(**kw)
@@ -153,7 +163,11 @@
print "Updating site %s" % sitename
print "privatekey=%s" % privatekey
noSites = False
- self.node.put(privatekey, dir=dir, name=sitename,
version=version, usk=True)
+ res = self.node.put(privatekey,
+ dir=dir,
+ name=sitename,
+ version=version,
+ usk=True)
conf.set(sitename, "hash", hashNew)
self.saveConfig()
@@ -161,6 +175,8 @@
if noSites:
print "No sites needed updating"
+ return res
+
def help():
print "%s: A console-based, cron-able freesite inserter" % sys.argv[0]
@@ -190,4 +206,5 @@
s = SiteMgr()
s.update()
+ s.shutdown()