Author: aum
Date: 2006-06-08 01:53:47 +0000 (Thu, 08 Jun 2006)
New Revision: 9079

Modified:
   trunk/apps/pyFreenet/code.leo
   trunk/apps/pyFreenet/fcp/node.py
Log:
Implemented progress messages for freesitemgr
(requires -v and one of -m, -a, -s)


Modified: trunk/apps/pyFreenet/code.leo
===================================================================
--- trunk/apps/pyFreenet/code.leo       2006-06-08 01:34:42 UTC (rev 9078)
+++ trunk/apps/pyFreenet/code.leo       2006-06-08 01:53:47 UTC (rev 9079)
@@ -1,7 +1,7 @@
 <?xml version="1.0" encoding="UTF-8"?>
 <leo_file>
 <leo_header file_format="2" tnodes="0" max_tnode_index="64" clone_windows="0"/>
-<globals body_outline_ratio="0.287165281625">
+<globals body_outline_ratio="0.256694367498">
        <global_window_position top="70" left="78" height="748" width="1083"/>
        <global_log_window_position top="0" left="0" height="0" width="0"/>
 </globals>
@@ -41,7 +41,7 @@
 <v t="aum.20060506231352.1"><vh>genkey</vh></v>
 <v t="aum.20060506231352"><vh>get</vh></v>
 <v t="aum.20060507003931"><vh>put</vh></v>
-<v t="aum.20060511001853" a="V"><vh>putdir</vh></v>
+<v t="aum.20060511001853"><vh>putdir</vh></v>
 <v t="aum.20060521180804"><vh>invertprivate</vh></v>
 </v>
 <v t="aum.20060506224238" a="E"><vh>Other High Level Methods</vh>
@@ -516,6 +516,7 @@
 </v>
 </v>
 <v t="aum.20060513073239.5" a="E"><vh>Test files</vh>
+<v t="aum.20060608131556" a="V" tnodeList="aum.20060608131556"><vh>@file 
putdirtest.py</vh></v>
 <v t="aum.20060526123909" tnodeList="aum.20060526123909"><vh>@file 
fstest.c</vh></v>
 <v t="aum.20060511003500" tnodeList="aum.20060511003500"><vh>@file 
test.py</vh></v>
 <v t="aum.20060512152233" tnodeList="aum.20060512152233"><vh>@file 
genkey.py</vh></v>
@@ -6749,11 +6750,11 @@
         allAtOnce = False

     if kw.has_key('maxconcurrent'):
-        maxConcurrentInserts = kw['maxconcurrent']
+        maxConcurrent = kw['maxconcurrent']
         filebyfile = True
         allAtOnce = True
     else:
-        maxConcurrentInserts = 10
+        maxConcurrent = 10

     id = kw.pop("id", None)
     if not id:
@@ -6773,24 +6774,66 @@
     manifestDict = {}
     jobs = []
     #allAtOnce = False
+
     if filebyfile:
+
+        lastProgressMsgTime = time.time()
+
         # insert each file, one at a time
-        for filerec in manifest:
+        nTotal = len(manifest)

-            # wait if too many concurrent inserts are in progress
-            while len([j for j in jobs if not j.isComplete()]) \
-            &gt;= maxConcurrentInserts:
+        # output status messages, and manage concurrent inserts
+        while True:
+            # get progress counts
+            nQueued = len(jobs)
+            nComplete = len(
+                            filter(
+                                lambda j: j.isComplete(),
+                                jobs
+                                )
+                            )
+            nWaiting = nTotal - nQueued
+            nInserting = nQueued - nComplete
+
+            # spit a progress message every 10 seconds
+            now = time.time()
+            if now - lastProgressMsgTime &gt;= 10:
+                lastProgressMsgTime = time.time()
+                log(INFO,
+                    "putdir: waiting=%s inserting=%s done=%s total=%s" % (
+                        nWaiting, nInserting, nComplete, nTotal)
+                    )
+
+            # can bail if all done
+            if nComplete == nTotal:
+                log(INFO, "putdir: all inserts completed (or failed)")
+                break
+
+            # wait and go round again if concurrent inserts are maxed
+            if nInserting &gt;= maxConcurrent:
                 time.sleep(1)
+                continue

+            # just go round again if manifest is empty (all remaining are in 
progress)
+            if len(manifest) == 0:
+                time.sleep(1)
+                continue
+
+            # got &gt;0 waiting jobs and &gt;0 spare slots, so we can submit a 
new one
+            filerec = manifest.pop(0)
             relpath = filerec['relpath']
             fullpath = filerec['fullpath']
             mimetype = filerec['mimetype']
-            
+
             manifestDict[relpath] = filerec

             log(INFO, "Launching insert of %s" % relpath)

+            # gotta suck raw data, since we might be inserting to a remote FCP
+            # service (which means we can't use 'file=' (UploadFrom=pathmae) 
keyword)
             raw = file(fullpath, "rb").read()
+
+            # fire up the insert job asynchronously
             job = self.put("CHK@",
                            data=raw,
                            mimetype=mimetype,
@@ -6799,17 +6842,13 @@
                            )
             jobs.append(job)
             filerec['job'] = job
+            job.filerec = filerec

+            # wait for that job to finish if we are in the slow 'one at a 
time' mode
             if not allAtOnce:
                 job.wait()
                 log(INFO, "Insert finished for %s" % relpath)

-        # wait for jobs to complete
-        if allAtOnce:
-            log(INFO, "Waiting for raw file inserts to finish")
-            while len([j for j in jobs if not j.isComplete()]) &gt; 0:
-                time.sleep(1)
-        
         # all done
         log(INFO, "All raw files now inserted (or failed)")

@@ -6824,27 +6863,28 @@
                 "DefaultName=index.html",
                 ]

+    # support global queue option
     if kw.get('Global', False):
         msgLines.append("Global=true")
     else:
         msgLines.append("Global=false")

-    # add the files
+    # add each file's entry to the command buffer
     n = 0
     default = None
-    for filerec in manifest:
+    for job in jobs:
+        filerec = job.filerec
         relpath = filerec['relpath']
         fullpath = filerec['fullpath']
         mimetype = filerec['mimetype']

+        # don't add if the file failed to insert
         if filebyfile:
             if isinstance(filerec['job'].result, Exception):
                 log(ERROR, "File %s failed to insert" % relpath)
                 continue

-        if relpath == 'index.html':
-            default = filerec
-        self._log(DETAIL, "n=%s relpath=%s" % (repr(n), repr(relpath)))
+        log(DETAIL, "n=%s relpath=%s" % (repr(n), repr(relpath)))

         msgLines.extend(["Files.%d.Name=%s" % (n, relpath),
                          ])
@@ -6858,34 +6898,27 @@
                             ])
         n += 1

-    # now, add the default file
-    if 0:
-        if filebyfile:
-            msgLines.extend(["Files.%d.Name=" % n,
-                             "Files.%d.UploadFrom=disk" % n,
-                             "Files.%d.Filename=%s" % (n, default['fullpath']),
-                             ])
-        else:
-            msgLines.extend(["Files.%d.Name=" % n,
-                             "Files.%d.UploadFrom=redirect" % n,
-                             "Files.%d.TargetURI=%s" % filerec['job'].result,
-                             ])
-
+    # finish the command buffer
     msgLines.append("EndMessage")
-    
-    for line in msgLines:
-        self._log(DETAIL, line)
     fullbuf = "\n".join(msgLines) + "\n"

+    # gotta log the command buffer here, since it's not sent via .put()
+    for line in msgLines:
+        log(DETAIL, line)
+
     # --------------------------------------
     # now dispatch the job
-    return self._submitCmd(id, "ClientPutComplexDir",
-                           rawcmd=fullbuf,
-                           async=kw.get('async', False),
-                           callback=kw.get('callback', False),
-                           Persistence=kw.get('Persistence', 'connection'),
-                           )
+    finalResult = self._submitCmd(
+                    id, "ClientPutComplexDir",
+                    rawcmd=fullbuf,
+                    async=kw.get('async', False),
+                    callback=kw.get('callback', False),
+                    Persistence=kw.get('Persistence', 'connection'),
+                    )

+    # finally all done, return result or job ticket
+    return finalResult
+
 </t>
 <t tx="aum.20060511003500">from fcp import *

@@ -13479,5 +13512,49 @@
     main()

 </t>
+<t tx="aum.20060608131556">@first #! /usr/bin/env python
+"""
+Test harness for freesitemgr
+
+Generates n random files into a temporary directory
+"""
+
+nfiles = 2
+maxConcurrent = 10
+tmpDir = "/tmp/freesitetest"
+
+import sys, os, time
+import fcp
+
+sh = os.system
+
+node = fcp.FCPNode()
+
+# grab a keypair
+pub, priv = node.genkey()
+
+# create index.html header
+indexlines = ["&lt;html&gt;&lt;head&gt;&lt;title&gt;aum's test 
freesite&lt;/title&gt;&lt;/head&gt;",
+              "&lt;body&gt;&lt;h1&gt;aum's test freesite&lt;/h1&gt;",
+              "&lt;ul&gt;",
+              ]
+
+# create fresh new directory full of n files
+sh("rm -rf %s" % tmpDir)
+sh("mkdir %s" % tmpDir)
+for i in xrange(nfiles):
+    name = "file_%s.txt" % i
+    data = "This is file %s, generated on %s\n" % (i, time.asctime())
+    file("%s/%s" % (tmpDir, name), "w").write(data)
+    indexlines.append('&lt;li&gt;&lt;a href="%s"&gt;%s&lt;/a&gt;&lt;/li&gt;' % 
(name, name))
+
+indexlines.append("&lt;/ul&gt;&lt;/body&gt;&lt;/html&gt;\n")
+indexhtml = "\n".join(indexlines)
+file("%s/index.html" % tmpDir, "w").write(indexhtml)
+
+sh("sb python setup.py install")
+sh("freesitemgr -v -m %s update" % maxConcurrent)
+
+</t>
 </tnodes>
 </leo_file>

Modified: trunk/apps/pyFreenet/fcp/node.py
===================================================================
--- trunk/apps/pyFreenet/fcp/node.py    2006-06-08 01:34:42 UTC (rev 9078)
+++ trunk/apps/pyFreenet/fcp/node.py    2006-06-08 01:53:47 UTC (rev 9079)
@@ -539,11 +539,11 @@
             allAtOnce = False

         if kw.has_key('maxconcurrent'):
-            maxConcurrentInserts = kw['maxconcurrent']
+            maxConcurrent = kw['maxconcurrent']
             filebyfile = True
             allAtOnce = True
         else:
-            maxConcurrentInserts = 10
+            maxConcurrent = 10

         id = kw.pop("id", None)
         if not id:
@@ -563,24 +563,66 @@
         manifestDict = {}
         jobs = []
         #allAtOnce = False
+    
         if filebyfile:
+    
+            lastProgressMsgTime = time.time()
+    
             # insert each file, one at a time
-            for filerec in manifest:
+            nTotal = len(manifest)

-                # wait if too many concurrent inserts are in progress
-                while len([j for j in jobs if not j.isComplete()]) \
-                >= maxConcurrentInserts:
+            # output status messages, and manage concurrent inserts
+            while True:
+                # get progress counts
+                nQueued = len(jobs)
+                nComplete = len(
+                                filter(
+                                    lambda j: j.isComplete(),
+                                    jobs
+                                    )
+                                )
+                nWaiting = nTotal - nQueued
+                nInserting = nQueued - nComplete
+    
+                # spit a progress message every 10 seconds
+                now = time.time()
+                if now - lastProgressMsgTime >= 10:
+                    lastProgressMsgTime = time.time()
+                    log(INFO,
+                        "putdir: waiting=%s inserting=%s done=%s total=%s" % (
+                            nWaiting, nInserting, nComplete, nTotal)
+                        )
+    
+                # can bail if all done
+                if nComplete == nTotal:
+                    log(INFO, "putdir: all inserts completed (or failed)")
+                    break
+    
+                # wait and go round again if concurrent inserts are maxed
+                if nInserting >= maxConcurrent:
                     time.sleep(1)
+                    continue

+                # just go round again if manifest is empty (all remaining are 
in progress)
+                if len(manifest) == 0:
+                    time.sleep(1)
+                    continue
+    
+                # got >0 waiting jobs and >0 spare slots, so we can submit a 
new one
+                filerec = manifest.pop(0)
                 relpath = filerec['relpath']
                 fullpath = filerec['fullpath']
                 mimetype = filerec['mimetype']
-                
+    
                 manifestDict[relpath] = filerec

                 log(INFO, "Launching insert of %s" % relpath)

+                # gotta suck raw data, since we might be inserting to a remote 
FCP
+                # service (which means we can't use 'file=' 
(UploadFrom=pathmae) keyword)
                 raw = file(fullpath, "rb").read()
+    
+                # fire up the insert job asynchronously
                 job = self.put("CHK@",
                                data=raw,
                                mimetype=mimetype,
@@ -589,17 +631,13 @@
                                )
                 jobs.append(job)
                 filerec['job'] = job
+                job.filerec = filerec

+                # wait for that job to finish if we are in the slow 'one at a 
time' mode
                 if not allAtOnce:
                     job.wait()
                     log(INFO, "Insert finished for %s" % relpath)

-            # wait for jobs to complete
-            if allAtOnce:
-                log(INFO, "Waiting for raw file inserts to finish")
-                while len([j for j in jobs if not j.isComplete()]) > 0:
-                    time.sleep(1)
-            
             # all done
             log(INFO, "All raw files now inserted (or failed)")

@@ -614,27 +652,28 @@
                     "DefaultName=index.html",
                     ]

+        # support global queue option
         if kw.get('Global', False):
             msgLines.append("Global=true")
         else:
             msgLines.append("Global=false")

-        # add the files
+        # add each file's entry to the command buffer
         n = 0
         default = None
-        for filerec in manifest:
+        for job in jobs:
+            filerec = job.filerec
             relpath = filerec['relpath']
             fullpath = filerec['fullpath']
             mimetype = filerec['mimetype']

+            # don't add if the file failed to insert
             if filebyfile:
                 if isinstance(filerec['job'].result, Exception):
                     log(ERROR, "File %s failed to insert" % relpath)
                     continue

-            if relpath == 'index.html':
-                default = filerec
-            self._log(DETAIL, "n=%s relpath=%s" % (repr(n), repr(relpath)))
+            log(DETAIL, "n=%s relpath=%s" % (repr(n), repr(relpath)))

             msgLines.extend(["Files.%d.Name=%s" % (n, relpath),
                              ])
@@ -648,34 +687,27 @@
                                 ])
             n += 1

-        # now, add the default file
-        if 0:
-            if filebyfile:
-                msgLines.extend(["Files.%d.Name=" % n,
-                                 "Files.%d.UploadFrom=disk" % n,
-                                 "Files.%d.Filename=%s" % (n, 
default['fullpath']),
-                                 ])
-            else:
-                msgLines.extend(["Files.%d.Name=" % n,
-                                 "Files.%d.UploadFrom=redirect" % n,
-                                 "Files.%d.TargetURI=%s" % 
filerec['job'].result,
-                                 ])
-    
+        # finish the command buffer
         msgLines.append("EndMessage")
-        
-        for line in msgLines:
-            self._log(DETAIL, line)
         fullbuf = "\n".join(msgLines) + "\n"

+        # gotta log the command buffer here, since it's not sent via .put()
+        for line in msgLines:
+            log(DETAIL, line)
+    
         # --------------------------------------
         # now dispatch the job
-        return self._submitCmd(id, "ClientPutComplexDir",
-                               rawcmd=fullbuf,
-                               async=kw.get('async', False),
-                               callback=kw.get('callback', False),
-                               Persistence=kw.get('Persistence', 'connection'),
-                               )
+        finalResult = self._submitCmd(
+                        id, "ClientPutComplexDir",
+                        rawcmd=fullbuf,
+                        async=kw.get('async', False),
+                        callback=kw.get('callback', False),
+                        Persistence=kw.get('Persistence', 'connection'),
+                        )

+        # finally all done, return result or job ticket
+        return finalResult
+    
     def invertprivate(self, privatekey):
         """
         Converts an SSK or USK private key to a public equivalent


Reply via email to