Author: toad
Date: 2008-06-13 20:42:26 +0000 (Fri, 13 Jun 2008)
New Revision: 20329

Modified:
   branches/db4o/freenet/src/freenet/client/ArchiveExtractCallback.java
   branches/db4o/freenet/src/freenet/client/ArchiveHandler.java
   branches/db4o/freenet/src/freenet/client/ArchiveHandlerImpl.java
   branches/db4o/freenet/src/freenet/client/ArchiveManager.java
   branches/db4o/freenet/src/freenet/client/async/SingleFileFetcher.java
   branches/db4o/freenet/src/freenet/node/NodeClientCore.java
Log:
Unpack into the archive cache off-thread.
Create a tag at the beginning and delete it at the end so that it's restarted 
on a crash.
Call the callbacks on the database thread.

Modified: branches/db4o/freenet/src/freenet/client/ArchiveExtractCallback.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/ArchiveExtractCallback.java        
2008-06-13 19:54:07 UTC (rev 20328)
+++ branches/db4o/freenet/src/freenet/client/ArchiveExtractCallback.java        
2008-06-13 20:42:26 UTC (rev 20329)
@@ -1,5 +1,8 @@
 package freenet.client;

+import com.db4o.ObjectContainer;
+
+import freenet.client.async.ClientContext;
 import freenet.support.api.Bucket;

 /** Called when we have extracted an archive, and a specified file either is
@@ -7,9 +10,15 @@
 public interface ArchiveExtractCallback {

        /** Got the data */
-       public void gotBucket(Bucket data);
+       public void gotBucket(Bucket data, ObjectContainer container, 
ClientContext context);

        /** Not in the archive */
-       public void notInArchive();
+       public void notInArchive(ObjectContainer container, ClientContext 
context);

+       /** Failed: restart */
+       public void onFailed(ArchiveRestartException e, ObjectContainer 
container, ClientContext context);
+       
+       /** Failed for some other reason */
+       public void onFailed(ArchiveFailureException e, ObjectContainer 
container, ClientContext context);
+       
 }

Modified: branches/db4o/freenet/src/freenet/client/ArchiveHandler.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/ArchiveHandler.java        
2008-06-13 19:54:07 UTC (rev 20328)
+++ branches/db4o/freenet/src/freenet/client/ArchiveHandler.java        
2008-06-13 20:42:26 UTC (rev 20329)
@@ -3,6 +3,9 @@
  * http://www.gnu.org/ for further details of the GPL. */
 package freenet.client;

+import com.db4o.ObjectContainer;
+
+import freenet.client.async.ClientContext;
 import freenet.keys.FreenetURI;
 import freenet.support.api.Bucket;

@@ -66,6 +69,24 @@
         * @throws ArchiveFailureException
         * @throws ArchiveRestartException
         */
-       public abstract void extractToCache(Bucket bucket, ArchiveContext actx, 
String element, ArchiveExtractCallback callback, ArchiveManager manager) throws 
ArchiveFailureException, ArchiveRestartException;
+       public abstract void extractToCache(Bucket bucket, ArchiveContext actx, 
String element, ArchiveExtractCallback callback, ArchiveManager manager, 
+                       ObjectContainer container, ClientContext context) 
throws ArchiveFailureException, ArchiveRestartException;
+
+       /**
+        * Unpack a fetched archive on a separate thread for a persistent 
caller.
+        * This involves:
+        * - Add a tag to the database so that it will be restarted on a crash.
+        * - Run the actual unpack on a separate thread.
+        * - Copy the data to a persistent bucket.
+        * - Schedule a database job.
+        * - Call the callback.
+        * @param bucket
+        * @param actx
+        * @param element
+        * @param callback
+        * @param container
+        * @param context
+        */
+       public abstract void extractPersistentOffThread(Bucket bucket, 
ArchiveContext actx, String element, ArchiveExtractCallback callback, 
ObjectContainer container, ClientContext context);

 }
\ No newline at end of file

Modified: branches/db4o/freenet/src/freenet/client/ArchiveHandlerImpl.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/ArchiveHandlerImpl.java    
2008-06-13 19:54:07 UTC (rev 20328)
+++ branches/db4o/freenet/src/freenet/client/ArchiveHandlerImpl.java    
2008-06-13 20:42:26 UTC (rev 20329)
@@ -1,10 +1,17 @@
 package freenet.client;

+import com.db4o.ObjectContainer;
+import com.db4o.ObjectSet;
+import com.db4o.query.Predicate;
+
+import freenet.client.async.ClientContext;
+import freenet.client.async.DBJob;
 import freenet.keys.FreenetURI;
 import freenet.support.Logger;
 import freenet.support.api.Bucket;
+import freenet.support.io.NativeThread;

-public class ArchiveHandlerImpl implements ArchiveHandler {
+class ArchiveHandlerImpl implements ArchiveHandler {

        private final FreenetURI key;
        private final short archiveType;
@@ -48,11 +55,11 @@

        public void extractToCache(Bucket bucket, ArchiveContext actx,
                        String element, ArchiveExtractCallback callback,
-                       ArchiveManager manager) throws ArchiveFailureException,
+                       ArchiveManager manager, ObjectContainer container, 
ClientContext context) throws ArchiveFailureException,
                        ArchiveRestartException {
                forceRefetchArchive = false; // now we don't need to force 
refetch any more
                ArchiveStoreContext ctx = manager.makeContext(key, archiveType, 
false);
-               manager.extractToCache(key, archiveType, bucket, actx, ctx, 
element, callback);
+               manager.extractToCache(key, archiveType, bucket, actx, ctx, 
element, callback, container, context);
        }

        public short getArchiveType() {
@@ -63,4 +70,132 @@
                return key;
        }

+       /**
+        * Unpack a fetched archive on a separate thread for a persistent 
caller.
+        * This involves:
+        * - Add a tag to the database so that it will be restarted on a crash.
+        * - Run the actual unpack on a separate thread.
+        * - Copy the data to a persistent bucket.
+        * - Schedule a database job.
+        * - Call the callback.
+        * @param bucket
+        * @param actx
+        * @param element
+        * @param callback
+        * @param container
+        * @param context
+        */
+       public void extractPersistentOffThread(Bucket bucket, ArchiveContext 
actx, String element, ArchiveExtractCallback callback, ObjectContainer 
container, final ClientContext context) {
+               assert(element != null); // no callback would be called...
+               final ArchiveManager manager = context.archiveManager;
+               final ArchiveExtractTag tag = new ArchiveExtractTag(this, 
bucket, actx, element, callback, context.nodeDBHandle);
+               container.set(tag);
+               runPersistentOffThread(tag, context, manager);
+       }
+
+       private static void runPersistentOffThread(final ArchiveExtractTag tag, 
final ClientContext context, final ArchiveManager manager) {
+               final ProxyCallback proxyCallback = new ProxyCallback();
+               
+               context.mainExecutor.execute(new Runnable() {
+
+                       public void run() {
+                               try {
+                                       tag.handler.extractToCache(tag.data, 
tag.actx, tag.element, proxyCallback, manager, null, context);
+                                       context.jobRunner.queue(new DBJob() {
+
+                                               public void run(ObjectContainer 
container, ClientContext context) {
+                                                       container.delete(tag);
+                                                       if(proxyCallback.data 
== null)
+                                                               
tag.callback.notInArchive(container, context);
+                                                       else
+                                                               
tag.callback.gotBucket(proxyCallback.data, container, context);
+                                               }
+                                               
+                                       }, NativeThread.NORM_PRIORITY, false);
+                                       
+                               } catch (final ArchiveFailureException e) {
+                                       
+                                       context.jobRunner.queue(new DBJob() {
+
+                                               public void run(ObjectContainer 
container, ClientContext context) {
+                                                       container.delete(tag);
+                                                       
tag.callback.onFailed(e, container, context);
+                                               }
+                                               
+                                       }, NativeThread.NORM_PRIORITY, false);
+                                       
+                               } catch (final ArchiveRestartException e) {
+                                       
+                                       context.jobRunner.queue(new DBJob() {
+
+                                               public void run(ObjectContainer 
container, ClientContext context) {
+                                                       container.delete(tag);
+                                                       
tag.callback.onFailed(e, container, context);
+                                               }
+                                               
+                                       }, NativeThread.NORM_PRIORITY, false);
+                                       
+                               }
+                       }
+                       
+               }, "Off-thread extract");
+       }
+
+       /** Called from ArchiveManager.init() */
+       static void init(ObjectContainer container, ClientContext context, 
final long nodeDBHandle) {
+               ObjectSet set = container.query(new Predicate() {
+                       public boolean match(ArchiveExtractTag tag) {
+                               return tag.nodeDBHandle == nodeDBHandle;
+                       }
+               });
+               while(set.hasNext()) {
+                       ArchiveExtractTag tag = (ArchiveExtractTag) set.next();
+                       runPersistentOffThread(tag, context, 
context.archiveManager);
+               }
+       }
+       
+       private static class ProxyCallback implements ArchiveExtractCallback {
+
+               Bucket data;
+               
+               public void gotBucket(Bucket data, ObjectContainer container, 
ClientContext context) {
+                       this.data = data;
+               }
+
+               public void notInArchive(ObjectContainer container, 
ClientContext context) {
+                       this.data = null;
+               }
+
+               public void onFailed(ArchiveRestartException e, ObjectContainer 
container, ClientContext context) {
+                       // Must not be called.
+                       throw new UnsupportedOperationException();
+               }
+
+               public void onFailed(ArchiveFailureException e, ObjectContainer 
container, ClientContext context) {
+                       // Must not be called.
+                       throw new UnsupportedOperationException();
+               }
+               
+       }
+       
 }
+
+class ArchiveExtractTag {
+       
+       final ArchiveHandlerImpl handler;
+       final Bucket data;
+       final ArchiveContext actx;
+       final String element;
+       final ArchiveExtractCallback callback;
+       final long nodeDBHandle;
+       
+       ArchiveExtractTag(ArchiveHandlerImpl handler, Bucket data, 
ArchiveContext actx, String element, ArchiveExtractCallback callback, long 
nodeDBHandle) {
+               this.handler = handler;
+               this.data = data;
+               this.actx = actx;
+               this.element = element;
+               this.callback = callback;
+               this.nodeDBHandle = nodeDBHandle;
+       }
+       
+}
\ No newline at end of file

Modified: branches/db4o/freenet/src/freenet/client/ArchiveManager.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/ArchiveManager.java        
2008-06-13 19:54:07 UTC (rev 20328)
+++ branches/db4o/freenet/src/freenet/client/ArchiveManager.java        
2008-06-13 20:42:26 UTC (rev 20329)
@@ -13,6 +13,7 @@
 import java.util.zip.ZipEntry;
 import java.util.zip.ZipInputStream;

+import freenet.client.async.ClientContext;
 import freenet.crypt.RandomSource;
 import freenet.keys.FreenetURI;
 import freenet.support.LRUHashtable;
@@ -25,6 +26,8 @@
 import freenet.support.io.TempFileBucket;
 import java.util.Random;

+import com.db4o.ObjectContainer;
+
 /**
  * Cache of recently decoded archives:
  * - Keep up to N ArchiveHandler's in RAM (this can be large; we don't keep the
@@ -189,7 +192,7 @@
         * @throws ArchiveRestartException If the request needs to be restarted 
because the archive
         * changed.
         */
-       public void extractToCache(FreenetURI key, short archiveType, Bucket 
data, ArchiveContext archiveContext, ArchiveStoreContext ctx, String element, 
ArchiveExtractCallback callback) throws ArchiveFailureException, 
ArchiveRestartException {
+       public void extractToCache(FreenetURI key, short archiveType, Bucket 
data, ArchiveContext archiveContext, ArchiveStoreContext ctx, String element, 
ArchiveExtractCallback callback, ObjectContainer container, ClientContext 
context) throws ArchiveFailureException, ArchiveRestartException {

                logMINOR = Logger.shouldLog(Logger.MINOR, this);

@@ -269,7 +272,7 @@
                                        out.close();
                                        if(name.equals(".metadata"))
                                                gotMetadata = true;
-                                       addStoreElement(ctx, key, name, temp, 
gotElement, element, callback);
+                                       addStoreElement(ctx, key, name, temp, 
gotElement, element, callback, container, context);
                                        names.add(name);
                                        trimStoredData();
                                }
@@ -277,13 +280,13 @@

                        // If no metadata, generate some
                        if(!gotMetadata) {
-                               generateMetadata(ctx, key, names, gotElement, 
element, callback);
+                               generateMetadata(ctx, key, names, gotElement, 
element, callback, container, context);
                                trimStoredData();
                        }
                        if(throwAtExit) throw new 
ArchiveRestartException("Archive changed on re-fetch");

                        if((!gotElement.value) && element != null)
-                               callback.notInArchive();
+                               callback.notInArchive(container, context);

                } catch (IOException e) {
                        throw new ArchiveFailureException("Error reading 
archive: "+e.getMessage(), e);
@@ -308,7 +311,7 @@
         * @param callbackName If we generate a 
         * @throws ArchiveFailureException 
         */
-       private ArchiveStoreItem generateMetadata(ArchiveStoreContext ctx, 
FreenetURI key, HashSet names, MutableBoolean gotElement, String element2, 
ArchiveExtractCallback callback) throws ArchiveFailureException {
+       private ArchiveStoreItem generateMetadata(ArchiveStoreContext ctx, 
FreenetURI key, HashSet names, MutableBoolean gotElement, String element2, 
ArchiveExtractCallback callback, ObjectContainer container, ClientContext 
context) throws ArchiveFailureException {
                /* What we have to do is to:
                 * - Construct a filesystem tree of the names.
                 * - Turn each level of the tree into a Metadata object, 
including those below it, with
@@ -333,10 +336,10 @@
                                OutputStream os = 
element.bucket.getOutputStream();
                                os.write(buf);
                                os.close();
-                               return addStoreElement(ctx, key, ".metadata", 
element, gotElement, element2, callback);
+                               return addStoreElement(ctx, key, ".metadata", 
element, gotElement, element2, callback, container, context);
                        } catch (MetadataUnresolvedException e) {
                                try {
-                                       x = resolve(e, x, element, ctx, key, 
gotElement, element2, callback);
+                                       x = resolve(e, x, element, ctx, key, 
gotElement, element2, callback, container, context);
                                } catch (IOException e1) {
                                        throw new 
ArchiveFailureException("Failed to create metadata: "+e1, e1);
                                }
@@ -347,7 +350,7 @@
                }
        }

-       private int resolve(MetadataUnresolvedException e, int x, 
TempStoreElement element, ArchiveStoreContext ctx, FreenetURI key, 
MutableBoolean gotElement, String element2, ArchiveExtractCallback callback) 
throws IOException, ArchiveFailureException {
+       private int resolve(MetadataUnresolvedException e, int x, 
TempStoreElement element, ArchiveStoreContext ctx, FreenetURI key, 
MutableBoolean gotElement, String element2, ArchiveExtractCallback callback, 
ObjectContainer container, ClientContext context) throws IOException, 
ArchiveFailureException {
                Metadata[] m = e.mustResolve;
                for(int i=0;i<m.length;i++) {
                        try {
@@ -355,9 +358,9 @@
                                OutputStream os = 
element.bucket.getOutputStream();
                                os.write(buf);
                                os.close();
-                               addStoreElement(ctx, key, ".metadata-"+(x++), 
element, gotElement, element2, callback);
+                               addStoreElement(ctx, key, ".metadata-"+(x++), 
element, gotElement, element2, callback, container, context);
                        } catch (MetadataUnresolvedException e1) {
-                               x = resolve(e, x, element, ctx, key, 
gotElement, element2, callback);
+                               x = resolve(e, x, element, ctx, key, 
gotElement, element2, callback, container, context);
                        }
                }
                return x;
@@ -425,7 +428,7 @@
         * @throws ArchiveFailureException If a failure occurred resulting in 
the data not being readable. Only happens if
         * callback != null.
         */
-       private ArchiveStoreItem addStoreElement(ArchiveStoreContext ctx, 
FreenetURI key, String name, TempStoreElement temp, MutableBoolean gotElement, 
String callbackName, ArchiveExtractCallback callback) throws 
ArchiveFailureException {
+       private ArchiveStoreItem addStoreElement(ArchiveStoreContext ctx, 
FreenetURI key, String name, TempStoreElement temp, MutableBoolean gotElement, 
String callbackName, ArchiveExtractCallback callback, ObjectContainer 
container, ClientContext context) throws ArchiveFailureException {
                RealArchiveStoreItem element = new RealArchiveStoreItem(ctx, 
key, name, temp);
                if(logMINOR) Logger.minor(this, "Adding store element: 
"+element+" ( "+key+ ' ' +name+" size "+element.spaceUsed()+" )");
                ArchiveStoreItem oldItem;
@@ -444,7 +447,7 @@
                        }
                }
                if(matchBucket != null) {
-                       callback.gotBucket(matchBucket);
+                       callback.gotBucket(matchBucket, container, context);
                        gotElement.value = true;
                }
                return element;
@@ -509,4 +512,8 @@
                        return Metadata.ARCHIVE_ZIP;
                else throw new IllegalArgumentException(); 
        }
+       
+       public static void init(ObjectContainer container, ClientContext 
context, final long nodeDBHandle) {
+               ArchiveHandlerImpl.init(container, context, nodeDBHandle);
+       }
 }

Modified: branches/db4o/freenet/src/freenet/client/async/SingleFileFetcher.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/SingleFileFetcher.java       
2008-06-13 19:54:07 UTC (rev 20328)
+++ branches/db4o/freenet/src/freenet/client/async/SingleFileFetcher.java       
2008-06-13 20:42:26 UTC (rev 20329)
@@ -285,7 +285,7 @@
                                        }
                                } else {
                                        fetchArchive(false, archiveMetadata, 
ArchiveManager.METADATA_NAME, new ArchiveExtractCallback() {
-                                               public void gotBucket(Bucket 
data) {
+                                               public void gotBucket(Bucket 
data, ObjectContainer container, ClientContext context) {
                                                        try {
                                                                metadata = 
Metadata.construct(data);
                                                                
wrapHandleMetadata(true, container, context);
@@ -299,9 +299,15 @@
                                                                return;
                                                        }
                                                }
-                                               public void notInArchive() {
+                                               public void 
notInArchive(ObjectContainer container, ClientContext context) {
                                                        onFailure(new 
FetchException(FetchException.INTERNAL_ERROR, "No metadata in container! Cannot 
happen as ArchiveManager should synthesise some!"), false, sched, container, 
context);
                                                }
+                                               public void 
onFailed(ArchiveRestartException e, ObjectContainer container, ClientContext 
context) {
+                                                       
SingleFileFetcher.this.onFailure(new FetchException(e), false, sched, 
container, context);
+                                               }
+                                               public void 
onFailed(ArchiveFailureException e, ObjectContainer container, ClientContext 
context) {
+                                                       
SingleFileFetcher.this.onFailure(new FetchException(e), false, sched, 
container, context);
+                                               }
                                        }, container, context); // will result 
in this function being called again
                                        return;
                                }
@@ -346,7 +352,7 @@
                                        // We enforce this in ArchiveHandler.
                                        // Therefore, the archive needs to be 
fetched.
                                        fetchArchive(true, archiveMetadata, 
filename, new ArchiveExtractCallback() {
-                                               public void gotBucket(Bucket 
data) {
+                                               public void gotBucket(Bucket 
data, ObjectContainer container, ClientContext context) {
                                                        if(logMINOR) 
Logger.minor(this, "Returning data");
                                                        Bucket out;
                                                        try {
@@ -365,9 +371,15 @@
                                                        // Return the data
                                                        onSuccess(new 
FetchResult(clientMetadata, out), sched, container, context);
                                                }
-                                               public void notInArchive() {
+                                               public void 
notInArchive(ObjectContainer container, ClientContext context) {
                                                        onFailure(new 
FetchException(FetchException.NOT_IN_ARCHIVE), false, sched, container, 
context);
                                                }
+                                               public void 
onFailed(ArchiveRestartException e, ObjectContainer container, ClientContext 
context) {
+                                                       
SingleFileFetcher.this.onFailure(new FetchException(e), false, sched, 
container, context);
+                                               }
+                                               public void 
onFailed(ArchiveFailureException e, ObjectContainer container, ClientContext 
context) {
+                                                       
SingleFileFetcher.this.onFailure(new FetchException(e), false, sched, 
container, context);
+                                               }
                                        }, container, context);
                                        // Will call back into this function 
when it has been fetched.
                                        return;
@@ -597,8 +609,20 @@
                }

                public void onSuccess(FetchResult result, ClientGetState state, 
ObjectContainer container, ClientContext context) {
+                       if(!parent.persistent()) {
+                               // Run directly - we are running on some thread 
somewhere, don't worry about it.
+                               innerSuccess(result, container, context);
+                       } else {
+                               // We are running on the database thread.
+                               // Add a tag, unpack on a separate thread, copy 
the data to a persistent bucket, then schedule on the database thread,
+                               // remove the tag, and call the callback.
+                               
ah.extractPersistentOffThread(result.asBucket(), actx, element, callback, 
container, context);
+                       }
+               }
+
+               private void innerSuccess(FetchResult result, ObjectContainer 
container, ClientContext context) {
                        try {
-                               ah.extractToCache(result.asBucket(), actx, 
element, callback, context.archiveManager);
+                               ah.extractToCache(result.asBucket(), actx, 
element, callback, context.archiveManager, container, context);
                        } catch (ArchiveFailureException e) {
                                SingleFileFetcher.this.onFailure(new 
FetchException(e), false, sched, container, context);
                                return;

Modified: branches/db4o/freenet/src/freenet/node/NodeClientCore.java
===================================================================
--- branches/db4o/freenet/src/freenet/node/NodeClientCore.java  2008-06-13 
19:54:07 UTC (rev 20328)
+++ branches/db4o/freenet/src/freenet/node/NodeClientCore.java  2008-06-13 
20:42:26 UTC (rev 20329)
@@ -4,6 +4,8 @@
 import java.io.IOException;
 import java.net.URI;

+import com.db4o.ObjectContainer;
+
 import freenet.client.ArchiveManager;
 import freenet.client.FECQueue;
 import freenet.client.HighLevelSimpleClient;
@@ -57,8 +59,8 @@
 import freenet.support.SerialExecutor;
 import freenet.support.SimpleFieldSet;
 import freenet.support.api.BooleanCallback;
+import freenet.support.api.BucketFactory;
 import freenet.support.api.IntCallback;
-import freenet.support.api.BucketFactory;
 import freenet.support.api.StringArrCallback;
 import freenet.support.api.StringCallback;
 import freenet.support.io.FileUtil;
@@ -422,6 +424,13 @@
        public void start(Config config) throws NodeInitException {
                backgroundBlockEncoder.setContext(clientContext);
                node.executor.execute(backgroundBlockEncoder, "Background block 
encoder");
+               clientContext.jobRunner.queue(new DBJob() {
+
+                       public void run(ObjectContainer container, 
ClientContext context) {
+                               ArchiveManager.init(container, context, 
context.nodeDBHandle);
+                       }
+                       
+               }, NativeThread.NORM_PRIORITY, false);
                persister.start();
                if(fcpServer != null)
                        fcpServer.maybeStart();


Reply via email to