Author: aum
Date: 2006-06-08 01:53:47 +0000 (Thu, 08 Jun 2006)
New Revision: 9079
Modified:
trunk/apps/pyFreenet/code.leo
trunk/apps/pyFreenet/fcp/node.py
Log:
Implemented progress messages for freesitemgr
(requires -v and one of -m, -a, -s)
Modified: trunk/apps/pyFreenet/code.leo
===================================================================
--- trunk/apps/pyFreenet/code.leo 2006-06-08 01:34:42 UTC (rev 9078)
+++ trunk/apps/pyFreenet/code.leo 2006-06-08 01:53:47 UTC (rev 9079)
@@ -1,7 +1,7 @@
<?xml version="1.0" encoding="UTF-8"?>
<leo_file>
<leo_header file_format="2" tnodes="0" max_tnode_index="64" clone_windows="0"/>
-<globals body_outline_ratio="0.287165281625">
+<globals body_outline_ratio="0.256694367498">
<global_window_position top="70" left="78" height="748" width="1083"/>
<global_log_window_position top="0" left="0" height="0" width="0"/>
</globals>
@@ -41,7 +41,7 @@
<v t="aum.20060506231352.1"><vh>genkey</vh></v>
<v t="aum.20060506231352"><vh>get</vh></v>
<v t="aum.20060507003931"><vh>put</vh></v>
-<v t="aum.20060511001853" a="V"><vh>putdir</vh></v>
+<v t="aum.20060511001853"><vh>putdir</vh></v>
<v t="aum.20060521180804"><vh>invertprivate</vh></v>
</v>
<v t="aum.20060506224238" a="E"><vh>Other High Level Methods</vh>
@@ -516,6 +516,7 @@
</v>
</v>
<v t="aum.20060513073239.5" a="E"><vh>Test files</vh>
+<v t="aum.20060608131556" a="V" tnodeList="aum.20060608131556"><vh>@file
putdirtest.py</vh></v>
<v t="aum.20060526123909" tnodeList="aum.20060526123909"><vh>@file
fstest.c</vh></v>
<v t="aum.20060511003500" tnodeList="aum.20060511003500"><vh>@file
test.py</vh></v>
<v t="aum.20060512152233" tnodeList="aum.20060512152233"><vh>@file
genkey.py</vh></v>
@@ -6749,11 +6750,11 @@
allAtOnce = False
if kw.has_key('maxconcurrent'):
- maxConcurrentInserts = kw['maxconcurrent']
+ maxConcurrent = kw['maxconcurrent']
filebyfile = True
allAtOnce = True
else:
- maxConcurrentInserts = 10
+ maxConcurrent = 10
id = kw.pop("id", None)
if not id:
@@ -6773,24 +6774,66 @@
manifestDict = {}
jobs = []
#allAtOnce = False
+
if filebyfile:
+
+ lastProgressMsgTime = time.time()
+
# insert each file, one at a time
- for filerec in manifest:
+ nTotal = len(manifest)
- # wait if too many concurrent inserts are in progress
- while len([j for j in jobs if not j.isComplete()]) \
- >= maxConcurrentInserts:
+ # output status messages, and manage concurrent inserts
+ while True:
+ # get progress counts
+ nQueued = len(jobs)
+ nComplete = len(
+ filter(
+ lambda j: j.isComplete(),
+ jobs
+ )
+ )
+ nWaiting = nTotal - nQueued
+ nInserting = nQueued - nComplete
+
+ # spit a progress message every 10 seconds
+ now = time.time()
+ if now - lastProgressMsgTime >= 10:
+ lastProgressMsgTime = time.time()
+ log(INFO,
+ "putdir: waiting=%s inserting=%s done=%s total=%s" % (
+ nWaiting, nInserting, nComplete, nTotal)
+ )
+
+ # can bail if all done
+ if nComplete == nTotal:
+ log(INFO, "putdir: all inserts completed (or failed)")
+ break
+
+ # wait and go round again if concurrent inserts are maxed
+ if nInserting >= maxConcurrent:
time.sleep(1)
+ continue
+ # just go round again if manifest is empty (all remaining are in
progress)
+ if len(manifest) == 0:
+ time.sleep(1)
+ continue
+
+ # got >0 waiting jobs and >0 spare slots, so we can submit a
new one
+ filerec = manifest.pop(0)
relpath = filerec['relpath']
fullpath = filerec['fullpath']
mimetype = filerec['mimetype']
-
+
manifestDict[relpath] = filerec
log(INFO, "Launching insert of %s" % relpath)
+ # gotta suck raw data, since we might be inserting to a remote FCP
+ # service (which means we can't use 'file=' (UploadFrom=pathmae)
keyword)
raw = file(fullpath, "rb").read()
+
+ # fire up the insert job asynchronously
job = self.put("CHK@",
data=raw,
mimetype=mimetype,
@@ -6799,17 +6842,13 @@
)
jobs.append(job)
filerec['job'] = job
+ job.filerec = filerec
+ # wait for that job to finish if we are in the slow 'one at a
time' mode
if not allAtOnce:
job.wait()
log(INFO, "Insert finished for %s" % relpath)
- # wait for jobs to complete
- if allAtOnce:
- log(INFO, "Waiting for raw file inserts to finish")
- while len([j for j in jobs if not j.isComplete()]) > 0:
- time.sleep(1)
-
# all done
log(INFO, "All raw files now inserted (or failed)")
@@ -6824,27 +6863,28 @@
"DefaultName=index.html",
]
+ # support global queue option
if kw.get('Global', False):
msgLines.append("Global=true")
else:
msgLines.append("Global=false")
- # add the files
+ # add each file's entry to the command buffer
n = 0
default = None
- for filerec in manifest:
+ for job in jobs:
+ filerec = job.filerec
relpath = filerec['relpath']
fullpath = filerec['fullpath']
mimetype = filerec['mimetype']
+ # don't add if the file failed to insert
if filebyfile:
if isinstance(filerec['job'].result, Exception):
log(ERROR, "File %s failed to insert" % relpath)
continue
- if relpath == 'index.html':
- default = filerec
- self._log(DETAIL, "n=%s relpath=%s" % (repr(n), repr(relpath)))
+ log(DETAIL, "n=%s relpath=%s" % (repr(n), repr(relpath)))
msgLines.extend(["Files.%d.Name=%s" % (n, relpath),
])
@@ -6858,34 +6898,27 @@
])
n += 1
- # now, add the default file
- if 0:
- if filebyfile:
- msgLines.extend(["Files.%d.Name=" % n,
- "Files.%d.UploadFrom=disk" % n,
- "Files.%d.Filename=%s" % (n, default['fullpath']),
- ])
- else:
- msgLines.extend(["Files.%d.Name=" % n,
- "Files.%d.UploadFrom=redirect" % n,
- "Files.%d.TargetURI=%s" % filerec['job'].result,
- ])
-
+ # finish the command buffer
msgLines.append("EndMessage")
-
- for line in msgLines:
- self._log(DETAIL, line)
fullbuf = "\n".join(msgLines) + "\n"
+ # gotta log the command buffer here, since it's not sent via .put()
+ for line in msgLines:
+ log(DETAIL, line)
+
# --------------------------------------
# now dispatch the job
- return self._submitCmd(id, "ClientPutComplexDir",
- rawcmd=fullbuf,
- async=kw.get('async', False),
- callback=kw.get('callback', False),
- Persistence=kw.get('Persistence', 'connection'),
- )
+ finalResult = self._submitCmd(
+ id, "ClientPutComplexDir",
+ rawcmd=fullbuf,
+ async=kw.get('async', False),
+ callback=kw.get('callback', False),
+ Persistence=kw.get('Persistence', 'connection'),
+ )
+ # finally all done, return result or job ticket
+ return finalResult
+
</t>
<t tx="aum.20060511003500">from fcp import *
@@ -13479,5 +13512,49 @@
main()
</t>
+<t tx="aum.20060608131556">@first #! /usr/bin/env python
+"""
+Test harness for freesitemgr
+
+Generates n random files into a temporary directory
+"""
+
+nfiles = 2
+maxConcurrent = 10
+tmpDir = "/tmp/freesitetest"
+
+import sys, os, time
+import fcp
+
+sh = os.system
+
+node = fcp.FCPNode()
+
+# grab a keypair
+pub, priv = node.genkey()
+
+# create index.html header
+indexlines = ["<html><head><title>aum's test
freesite</title></head>",
+ "<body><h1>aum's test freesite</h1>",
+ "<ul>",
+ ]
+
+# create fresh new directory full of n files
+sh("rm -rf %s" % tmpDir)
+sh("mkdir %s" % tmpDir)
+for i in xrange(nfiles):
+ name = "file_%s.txt" % i
+ data = "This is file %s, generated on %s\n" % (i, time.asctime())
+ file("%s/%s" % (tmpDir, name), "w").write(data)
+ indexlines.append('<li><a href="%s">%s</a></li>' %
(name, name))
+
+indexlines.append("</ul></body></html>\n")
+indexhtml = "\n".join(indexlines)
+file("%s/index.html" % tmpDir, "w").write(indexhtml)
+
+sh("sb python setup.py install")
+sh("freesitemgr -v -m %s update" % maxConcurrent)
+
+</t>
</tnodes>
</leo_file>
Modified: trunk/apps/pyFreenet/fcp/node.py
===================================================================
--- trunk/apps/pyFreenet/fcp/node.py 2006-06-08 01:34:42 UTC (rev 9078)
+++ trunk/apps/pyFreenet/fcp/node.py 2006-06-08 01:53:47 UTC (rev 9079)
@@ -539,11 +539,11 @@
allAtOnce = False
if kw.has_key('maxconcurrent'):
- maxConcurrentInserts = kw['maxconcurrent']
+ maxConcurrent = kw['maxconcurrent']
filebyfile = True
allAtOnce = True
else:
- maxConcurrentInserts = 10
+ maxConcurrent = 10
id = kw.pop("id", None)
if not id:
@@ -563,24 +563,66 @@
manifestDict = {}
jobs = []
#allAtOnce = False
+
if filebyfile:
+
+ lastProgressMsgTime = time.time()
+
# insert each file, one at a time
- for filerec in manifest:
+ nTotal = len(manifest)
- # wait if too many concurrent inserts are in progress
- while len([j for j in jobs if not j.isComplete()]) \
- >= maxConcurrentInserts:
+ # output status messages, and manage concurrent inserts
+ while True:
+ # get progress counts
+ nQueued = len(jobs)
+ nComplete = len(
+ filter(
+ lambda j: j.isComplete(),
+ jobs
+ )
+ )
+ nWaiting = nTotal - nQueued
+ nInserting = nQueued - nComplete
+
+ # spit a progress message every 10 seconds
+ now = time.time()
+ if now - lastProgressMsgTime >= 10:
+ lastProgressMsgTime = time.time()
+ log(INFO,
+ "putdir: waiting=%s inserting=%s done=%s total=%s" % (
+ nWaiting, nInserting, nComplete, nTotal)
+ )
+
+ # can bail if all done
+ if nComplete == nTotal:
+ log(INFO, "putdir: all inserts completed (or failed)")
+ break
+
+ # wait and go round again if concurrent inserts are maxed
+ if nInserting >= maxConcurrent:
time.sleep(1)
+ continue
+ # just go round again if manifest is empty (all remaining are
in progress)
+ if len(manifest) == 0:
+ time.sleep(1)
+ continue
+
+ # got >0 waiting jobs and >0 spare slots, so we can submit a
new one
+ filerec = manifest.pop(0)
relpath = filerec['relpath']
fullpath = filerec['fullpath']
mimetype = filerec['mimetype']
-
+
manifestDict[relpath] = filerec
log(INFO, "Launching insert of %s" % relpath)
+ # gotta suck raw data, since we might be inserting to a remote
FCP
+ # service (which means we can't use 'file='
(UploadFrom=pathmae) keyword)
raw = file(fullpath, "rb").read()
+
+ # fire up the insert job asynchronously
job = self.put("CHK@",
data=raw,
mimetype=mimetype,
@@ -589,17 +631,13 @@
)
jobs.append(job)
filerec['job'] = job
+ job.filerec = filerec
+ # wait for that job to finish if we are in the slow 'one at a
time' mode
if not allAtOnce:
job.wait()
log(INFO, "Insert finished for %s" % relpath)
- # wait for jobs to complete
- if allAtOnce:
- log(INFO, "Waiting for raw file inserts to finish")
- while len([j for j in jobs if not j.isComplete()]) > 0:
- time.sleep(1)
-
# all done
log(INFO, "All raw files now inserted (or failed)")
@@ -614,27 +652,28 @@
"DefaultName=index.html",
]
+ # support global queue option
if kw.get('Global', False):
msgLines.append("Global=true")
else:
msgLines.append("Global=false")
- # add the files
+ # add each file's entry to the command buffer
n = 0
default = None
- for filerec in manifest:
+ for job in jobs:
+ filerec = job.filerec
relpath = filerec['relpath']
fullpath = filerec['fullpath']
mimetype = filerec['mimetype']
+ # don't add if the file failed to insert
if filebyfile:
if isinstance(filerec['job'].result, Exception):
log(ERROR, "File %s failed to insert" % relpath)
continue
- if relpath == 'index.html':
- default = filerec
- self._log(DETAIL, "n=%s relpath=%s" % (repr(n), repr(relpath)))
+ log(DETAIL, "n=%s relpath=%s" % (repr(n), repr(relpath)))
msgLines.extend(["Files.%d.Name=%s" % (n, relpath),
])
@@ -648,34 +687,27 @@
])
n += 1
- # now, add the default file
- if 0:
- if filebyfile:
- msgLines.extend(["Files.%d.Name=" % n,
- "Files.%d.UploadFrom=disk" % n,
- "Files.%d.Filename=%s" % (n,
default['fullpath']),
- ])
- else:
- msgLines.extend(["Files.%d.Name=" % n,
- "Files.%d.UploadFrom=redirect" % n,
- "Files.%d.TargetURI=%s" %
filerec['job'].result,
- ])
-
+ # finish the command buffer
msgLines.append("EndMessage")
-
- for line in msgLines:
- self._log(DETAIL, line)
fullbuf = "\n".join(msgLines) + "\n"
+ # gotta log the command buffer here, since it's not sent via .put()
+ for line in msgLines:
+ log(DETAIL, line)
+
# --------------------------------------
# now dispatch the job
- return self._submitCmd(id, "ClientPutComplexDir",
- rawcmd=fullbuf,
- async=kw.get('async', False),
- callback=kw.get('callback', False),
- Persistence=kw.get('Persistence', 'connection'),
- )
+ finalResult = self._submitCmd(
+ id, "ClientPutComplexDir",
+ rawcmd=fullbuf,
+ async=kw.get('async', False),
+ callback=kw.get('callback', False),
+ Persistence=kw.get('Persistence', 'connection'),
+ )
+ # finally all done, return result or job ticket
+ return finalResult
+
def invertprivate(self, privatekey):
"""
Converts an SSK or USK private key to a public equivalent