Author: toad
Date: 2008-06-13 20:42:26 +0000 (Fri, 13 Jun 2008)
New Revision: 20329
Modified:
branches/db4o/freenet/src/freenet/client/ArchiveExtractCallback.java
branches/db4o/freenet/src/freenet/client/ArchiveHandler.java
branches/db4o/freenet/src/freenet/client/ArchiveHandlerImpl.java
branches/db4o/freenet/src/freenet/client/ArchiveManager.java
branches/db4o/freenet/src/freenet/client/async/SingleFileFetcher.java
branches/db4o/freenet/src/freenet/node/NodeClientCore.java
Log:
Unpack into the archive cache off-thread.
Create a tag at the beginning and delete it at the end so that it's restarted
on a crash.
Call the callbacks on the database thread.
Modified: branches/db4o/freenet/src/freenet/client/ArchiveExtractCallback.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/ArchiveExtractCallback.java
2008-06-13 19:54:07 UTC (rev 20328)
+++ branches/db4o/freenet/src/freenet/client/ArchiveExtractCallback.java
2008-06-13 20:42:26 UTC (rev 20329)
@@ -1,5 +1,8 @@
package freenet.client;
+import com.db4o.ObjectContainer;
+
+import freenet.client.async.ClientContext;
import freenet.support.api.Bucket;
/** Called when we have extracted an archive, and a specified file either is
@@ -7,9 +10,15 @@
public interface ArchiveExtractCallback {
/** Got the data */
- public void gotBucket(Bucket data);
+ public void gotBucket(Bucket data, ObjectContainer container,
ClientContext context);
/** Not in the archive */
- public void notInArchive();
+ public void notInArchive(ObjectContainer container, ClientContext
context);
+ /** Failed: restart */
+ public void onFailed(ArchiveRestartException e, ObjectContainer
container, ClientContext context);
+
+ /** Failed for some other reason */
+ public void onFailed(ArchiveFailureException e, ObjectContainer
container, ClientContext context);
+
}
Modified: branches/db4o/freenet/src/freenet/client/ArchiveHandler.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/ArchiveHandler.java
2008-06-13 19:54:07 UTC (rev 20328)
+++ branches/db4o/freenet/src/freenet/client/ArchiveHandler.java
2008-06-13 20:42:26 UTC (rev 20329)
@@ -3,6 +3,9 @@
* http://www.gnu.org/ for further details of the GPL. */
package freenet.client;
+import com.db4o.ObjectContainer;
+
+import freenet.client.async.ClientContext;
import freenet.keys.FreenetURI;
import freenet.support.api.Bucket;
@@ -66,6 +69,24 @@
* @throws ArchiveFailureException
* @throws ArchiveRestartException
*/
- public abstract void extractToCache(Bucket bucket, ArchiveContext actx,
String element, ArchiveExtractCallback callback, ArchiveManager manager) throws
ArchiveFailureException, ArchiveRestartException;
+ public abstract void extractToCache(Bucket bucket, ArchiveContext actx,
String element, ArchiveExtractCallback callback, ArchiveManager manager,
+ ObjectContainer container, ClientContext context)
throws ArchiveFailureException, ArchiveRestartException;
+
+ /**
+ * Unpack a fetched archive on a separate thread for a persistent
caller.
+ * This involves:
+ * - Add a tag to the database so that it will be restarted on a crash.
+ * - Run the actual unpack on a separate thread.
+ * - Copy the data to a persistent bucket.
+ * - Schedule a database job.
+ * - Call the callback.
+ * @param bucket
+ * @param actx
+ * @param element
+ * @param callback
+ * @param container
+ * @param context
+ */
+ public abstract void extractPersistentOffThread(Bucket bucket,
ArchiveContext actx, String element, ArchiveExtractCallback callback,
ObjectContainer container, ClientContext context);
}
\ No newline at end of file
Modified: branches/db4o/freenet/src/freenet/client/ArchiveHandlerImpl.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/ArchiveHandlerImpl.java
2008-06-13 19:54:07 UTC (rev 20328)
+++ branches/db4o/freenet/src/freenet/client/ArchiveHandlerImpl.java
2008-06-13 20:42:26 UTC (rev 20329)
@@ -1,10 +1,17 @@
package freenet.client;
+import com.db4o.ObjectContainer;
+import com.db4o.ObjectSet;
+import com.db4o.query.Predicate;
+
+import freenet.client.async.ClientContext;
+import freenet.client.async.DBJob;
import freenet.keys.FreenetURI;
import freenet.support.Logger;
import freenet.support.api.Bucket;
+import freenet.support.io.NativeThread;
-public class ArchiveHandlerImpl implements ArchiveHandler {
+class ArchiveHandlerImpl implements ArchiveHandler {
private final FreenetURI key;
private final short archiveType;
@@ -48,11 +55,11 @@
public void extractToCache(Bucket bucket, ArchiveContext actx,
String element, ArchiveExtractCallback callback,
- ArchiveManager manager) throws ArchiveFailureException,
+ ArchiveManager manager, ObjectContainer container,
ClientContext context) throws ArchiveFailureException,
ArchiveRestartException {
forceRefetchArchive = false; // now we don't need to force
refetch any more
ArchiveStoreContext ctx = manager.makeContext(key, archiveType,
false);
- manager.extractToCache(key, archiveType, bucket, actx, ctx,
element, callback);
+ manager.extractToCache(key, archiveType, bucket, actx, ctx,
element, callback, container, context);
}
public short getArchiveType() {
@@ -63,4 +70,132 @@
return key;
}
+ /**
+ * Unpack a fetched archive on a separate thread for a persistent
caller.
+ * This involves:
+ * - Add a tag to the database so that it will be restarted on a crash.
+ * - Run the actual unpack on a separate thread.
+ * - Copy the data to a persistent bucket.
+ * - Schedule a database job.
+ * - Call the callback.
+ * @param bucket
+ * @param actx
+ * @param element
+ * @param callback
+ * @param container
+ * @param context
+ */
+ public void extractPersistentOffThread(Bucket bucket, ArchiveContext
actx, String element, ArchiveExtractCallback callback, ObjectContainer
container, final ClientContext context) {
+ assert(element != null); // no callback would be called...
+ final ArchiveManager manager = context.archiveManager;
+ final ArchiveExtractTag tag = new ArchiveExtractTag(this,
bucket, actx, element, callback, context.nodeDBHandle);
+ container.set(tag);
+ runPersistentOffThread(tag, context, manager);
+ }
+
+ private static void runPersistentOffThread(final ArchiveExtractTag tag,
final ClientContext context, final ArchiveManager manager) {
+ final ProxyCallback proxyCallback = new ProxyCallback();
+
+ context.mainExecutor.execute(new Runnable() {
+
+ public void run() {
+ try {
+ tag.handler.extractToCache(tag.data,
tag.actx, tag.element, proxyCallback, manager, null, context);
+ context.jobRunner.queue(new DBJob() {
+
+ public void run(ObjectContainer
container, ClientContext context) {
+ container.delete(tag);
+ if(proxyCallback.data
== null)
+
tag.callback.notInArchive(container, context);
+ else
+
tag.callback.gotBucket(proxyCallback.data, container, context);
+ }
+
+ }, NativeThread.NORM_PRIORITY, false);
+
+ } catch (final ArchiveFailureException e) {
+
+ context.jobRunner.queue(new DBJob() {
+
+ public void run(ObjectContainer
container, ClientContext context) {
+ container.delete(tag);
+
tag.callback.onFailed(e, container, context);
+ }
+
+ }, NativeThread.NORM_PRIORITY, false);
+
+ } catch (final ArchiveRestartException e) {
+
+ context.jobRunner.queue(new DBJob() {
+
+ public void run(ObjectContainer
container, ClientContext context) {
+ container.delete(tag);
+
tag.callback.onFailed(e, container, context);
+ }
+
+ }, NativeThread.NORM_PRIORITY, false);
+
+ }
+ }
+
+ }, "Off-thread extract");
+ }
+
+ /** Called from ArchiveManager.init() */
+ static void init(ObjectContainer container, ClientContext context,
final long nodeDBHandle) {
+ ObjectSet set = container.query(new Predicate() {
+ public boolean match(ArchiveExtractTag tag) {
+ return tag.nodeDBHandle == nodeDBHandle;
+ }
+ });
+ while(set.hasNext()) {
+ ArchiveExtractTag tag = (ArchiveExtractTag) set.next();
+ runPersistentOffThread(tag, context,
context.archiveManager);
+ }
+ }
+
+ private static class ProxyCallback implements ArchiveExtractCallback {
+
+ Bucket data;
+
+ public void gotBucket(Bucket data, ObjectContainer container,
ClientContext context) {
+ this.data = data;
+ }
+
+ public void notInArchive(ObjectContainer container,
ClientContext context) {
+ this.data = null;
+ }
+
+ public void onFailed(ArchiveRestartException e, ObjectContainer
container, ClientContext context) {
+ // Must not be called.
+ throw new UnsupportedOperationException();
+ }
+
+ public void onFailed(ArchiveFailureException e, ObjectContainer
container, ClientContext context) {
+ // Must not be called.
+ throw new UnsupportedOperationException();
+ }
+
+ }
+
}
+
+class ArchiveExtractTag {
+
+ final ArchiveHandlerImpl handler;
+ final Bucket data;
+ final ArchiveContext actx;
+ final String element;
+ final ArchiveExtractCallback callback;
+ final long nodeDBHandle;
+
+ ArchiveExtractTag(ArchiveHandlerImpl handler, Bucket data,
ArchiveContext actx, String element, ArchiveExtractCallback callback, long
nodeDBHandle) {
+ this.handler = handler;
+ this.data = data;
+ this.actx = actx;
+ this.element = element;
+ this.callback = callback;
+ this.nodeDBHandle = nodeDBHandle;
+ }
+
+}
\ No newline at end of file
Modified: branches/db4o/freenet/src/freenet/client/ArchiveManager.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/ArchiveManager.java
2008-06-13 19:54:07 UTC (rev 20328)
+++ branches/db4o/freenet/src/freenet/client/ArchiveManager.java
2008-06-13 20:42:26 UTC (rev 20329)
@@ -13,6 +13,7 @@
import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream;
+import freenet.client.async.ClientContext;
import freenet.crypt.RandomSource;
import freenet.keys.FreenetURI;
import freenet.support.LRUHashtable;
@@ -25,6 +26,8 @@
import freenet.support.io.TempFileBucket;
import java.util.Random;
+import com.db4o.ObjectContainer;
+
/**
* Cache of recently decoded archives:
* - Keep up to N ArchiveHandler's in RAM (this can be large; we don't keep the
@@ -189,7 +192,7 @@
* @throws ArchiveRestartException If the request needs to be restarted
because the archive
* changed.
*/
- public void extractToCache(FreenetURI key, short archiveType, Bucket
data, ArchiveContext archiveContext, ArchiveStoreContext ctx, String element,
ArchiveExtractCallback callback) throws ArchiveFailureException,
ArchiveRestartException {
+ public void extractToCache(FreenetURI key, short archiveType, Bucket
data, ArchiveContext archiveContext, ArchiveStoreContext ctx, String element,
ArchiveExtractCallback callback, ObjectContainer container, ClientContext
context) throws ArchiveFailureException, ArchiveRestartException {
logMINOR = Logger.shouldLog(Logger.MINOR, this);
@@ -269,7 +272,7 @@
out.close();
if(name.equals(".metadata"))
gotMetadata = true;
- addStoreElement(ctx, key, name, temp,
gotElement, element, callback);
+ addStoreElement(ctx, key, name, temp,
gotElement, element, callback, container, context);
names.add(name);
trimStoredData();
}
@@ -277,13 +280,13 @@
// If no metadata, generate some
if(!gotMetadata) {
- generateMetadata(ctx, key, names, gotElement,
element, callback);
+ generateMetadata(ctx, key, names, gotElement,
element, callback, container, context);
trimStoredData();
}
if(throwAtExit) throw new
ArchiveRestartException("Archive changed on re-fetch");
if((!gotElement.value) && element != null)
- callback.notInArchive();
+ callback.notInArchive(container, context);
} catch (IOException e) {
throw new ArchiveFailureException("Error reading
archive: "+e.getMessage(), e);
@@ -308,7 +311,7 @@
* @param callbackName If we generate a
* @throws ArchiveFailureException
*/
- private ArchiveStoreItem generateMetadata(ArchiveStoreContext ctx,
FreenetURI key, HashSet names, MutableBoolean gotElement, String element2,
ArchiveExtractCallback callback) throws ArchiveFailureException {
+ private ArchiveStoreItem generateMetadata(ArchiveStoreContext ctx,
FreenetURI key, HashSet names, MutableBoolean gotElement, String element2,
ArchiveExtractCallback callback, ObjectContainer container, ClientContext
context) throws ArchiveFailureException {
/* What we have to do is to:
* - Construct a filesystem tree of the names.
* - Turn each level of the tree into a Metadata object,
including those below it, with
@@ -333,10 +336,10 @@
OutputStream os =
element.bucket.getOutputStream();
os.write(buf);
os.close();
- return addStoreElement(ctx, key, ".metadata",
element, gotElement, element2, callback);
+ return addStoreElement(ctx, key, ".metadata",
element, gotElement, element2, callback, container, context);
} catch (MetadataUnresolvedException e) {
try {
- x = resolve(e, x, element, ctx, key,
gotElement, element2, callback);
+ x = resolve(e, x, element, ctx, key,
gotElement, element2, callback, container, context);
} catch (IOException e1) {
throw new
ArchiveFailureException("Failed to create metadata: "+e1, e1);
}
@@ -347,7 +350,7 @@
}
}
- private int resolve(MetadataUnresolvedException e, int x,
TempStoreElement element, ArchiveStoreContext ctx, FreenetURI key,
MutableBoolean gotElement, String element2, ArchiveExtractCallback callback)
throws IOException, ArchiveFailureException {
+ private int resolve(MetadataUnresolvedException e, int x,
TempStoreElement element, ArchiveStoreContext ctx, FreenetURI key,
MutableBoolean gotElement, String element2, ArchiveExtractCallback callback,
ObjectContainer container, ClientContext context) throws IOException,
ArchiveFailureException {
Metadata[] m = e.mustResolve;
for(int i=0;i<m.length;i++) {
try {
@@ -355,9 +358,9 @@
OutputStream os =
element.bucket.getOutputStream();
os.write(buf);
os.close();
- addStoreElement(ctx, key, ".metadata-"+(x++),
element, gotElement, element2, callback);
+ addStoreElement(ctx, key, ".metadata-"+(x++),
element, gotElement, element2, callback, container, context);
} catch (MetadataUnresolvedException e1) {
- x = resolve(e, x, element, ctx, key,
gotElement, element2, callback);
+ x = resolve(e, x, element, ctx, key,
gotElement, element2, callback, container, context);
}
}
return x;
@@ -425,7 +428,7 @@
* @throws ArchiveFailureException If a failure occurred resulting in
the data not being readable. Only happens if
* callback != null.
*/
- private ArchiveStoreItem addStoreElement(ArchiveStoreContext ctx,
FreenetURI key, String name, TempStoreElement temp, MutableBoolean gotElement,
String callbackName, ArchiveExtractCallback callback) throws
ArchiveFailureException {
+ private ArchiveStoreItem addStoreElement(ArchiveStoreContext ctx,
FreenetURI key, String name, TempStoreElement temp, MutableBoolean gotElement,
String callbackName, ArchiveExtractCallback callback, ObjectContainer
container, ClientContext context) throws ArchiveFailureException {
RealArchiveStoreItem element = new RealArchiveStoreItem(ctx,
key, name, temp);
if(logMINOR) Logger.minor(this, "Adding store element:
"+element+" ( "+key+ ' ' +name+" size "+element.spaceUsed()+" )");
ArchiveStoreItem oldItem;
@@ -444,7 +447,7 @@
}
}
if(matchBucket != null) {
- callback.gotBucket(matchBucket);
+ callback.gotBucket(matchBucket, container, context);
gotElement.value = true;
}
return element;
@@ -509,4 +512,8 @@
return Metadata.ARCHIVE_ZIP;
else throw new IllegalArgumentException();
}
+
+ public static void init(ObjectContainer container, ClientContext
context, final long nodeDBHandle) {
+ ArchiveHandlerImpl.init(container, context, nodeDBHandle);
+ }
}
Modified: branches/db4o/freenet/src/freenet/client/async/SingleFileFetcher.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/SingleFileFetcher.java
2008-06-13 19:54:07 UTC (rev 20328)
+++ branches/db4o/freenet/src/freenet/client/async/SingleFileFetcher.java
2008-06-13 20:42:26 UTC (rev 20329)
@@ -285,7 +285,7 @@
}
} else {
fetchArchive(false, archiveMetadata,
ArchiveManager.METADATA_NAME, new ArchiveExtractCallback() {
- public void gotBucket(Bucket
data) {
+ public void gotBucket(Bucket
data, ObjectContainer container, ClientContext context) {
try {
metadata =
Metadata.construct(data);
wrapHandleMetadata(true, container, context);
@@ -299,9 +299,15 @@
return;
}
}
- public void notInArchive() {
+ public void
notInArchive(ObjectContainer container, ClientContext context) {
onFailure(new
FetchException(FetchException.INTERNAL_ERROR, "No metadata in container! Cannot
happen as ArchiveManager should synthesise some!"), false, sched, container,
context);
}
+ public void
onFailed(ArchiveRestartException e, ObjectContainer container, ClientContext
context) {
+
SingleFileFetcher.this.onFailure(new FetchException(e), false, sched,
container, context);
+ }
+ public void
onFailed(ArchiveFailureException e, ObjectContainer container, ClientContext
context) {
+
SingleFileFetcher.this.onFailure(new FetchException(e), false, sched,
container, context);
+ }
}, container, context); // will result
in this function being called again
return;
}
@@ -346,7 +352,7 @@
// We enforce this in ArchiveHandler.
// Therefore, the archive needs to be
fetched.
fetchArchive(true, archiveMetadata,
filename, new ArchiveExtractCallback() {
- public void gotBucket(Bucket
data) {
+ public void gotBucket(Bucket
data, ObjectContainer container, ClientContext context) {
if(logMINOR)
Logger.minor(this, "Returning data");
Bucket out;
try {
@@ -365,9 +371,15 @@
// Return the data
onSuccess(new
FetchResult(clientMetadata, out), sched, container, context);
}
- public void notInArchive() {
+ public void
notInArchive(ObjectContainer container, ClientContext context) {
onFailure(new
FetchException(FetchException.NOT_IN_ARCHIVE), false, sched, container,
context);
}
+ public void
onFailed(ArchiveRestartException e, ObjectContainer container, ClientContext
context) {
+
SingleFileFetcher.this.onFailure(new FetchException(e), false, sched,
container, context);
+ }
+ public void
onFailed(ArchiveFailureException e, ObjectContainer container, ClientContext
context) {
+
SingleFileFetcher.this.onFailure(new FetchException(e), false, sched,
container, context);
+ }
}, container, context);
// Will call back into this function
when it has been fetched.
return;
@@ -597,8 +609,20 @@
}
public void onSuccess(FetchResult result, ClientGetState state,
ObjectContainer container, ClientContext context) {
+ if(!parent.persistent()) {
+ // Run directly - we are running on some thread
somewhere, don't worry about it.
+ innerSuccess(result, container, context);
+ } else {
+ // We are running on the database thread.
+ // Add a tag, unpack on a separate thread, copy
the data to a persistent bucket, then schedule on the database thread,
+ // remove the tag, and call the callback.
+
ah.extractPersistentOffThread(result.asBucket(), actx, element, callback,
container, context);
+ }
+ }
+
+ private void innerSuccess(FetchResult result, ObjectContainer
container, ClientContext context) {
try {
- ah.extractToCache(result.asBucket(), actx,
element, callback, context.archiveManager);
+ ah.extractToCache(result.asBucket(), actx,
element, callback, context.archiveManager, container, context);
} catch (ArchiveFailureException e) {
SingleFileFetcher.this.onFailure(new
FetchException(e), false, sched, container, context);
return;
Modified: branches/db4o/freenet/src/freenet/node/NodeClientCore.java
===================================================================
--- branches/db4o/freenet/src/freenet/node/NodeClientCore.java 2008-06-13
19:54:07 UTC (rev 20328)
+++ branches/db4o/freenet/src/freenet/node/NodeClientCore.java 2008-06-13
20:42:26 UTC (rev 20329)
@@ -4,6 +4,8 @@
import java.io.IOException;
import java.net.URI;
+import com.db4o.ObjectContainer;
+
import freenet.client.ArchiveManager;
import freenet.client.FECQueue;
import freenet.client.HighLevelSimpleClient;
@@ -57,8 +59,8 @@
import freenet.support.SerialExecutor;
import freenet.support.SimpleFieldSet;
import freenet.support.api.BooleanCallback;
+import freenet.support.api.BucketFactory;
import freenet.support.api.IntCallback;
-import freenet.support.api.BucketFactory;
import freenet.support.api.StringArrCallback;
import freenet.support.api.StringCallback;
import freenet.support.io.FileUtil;
@@ -422,6 +424,13 @@
public void start(Config config) throws NodeInitException {
backgroundBlockEncoder.setContext(clientContext);
node.executor.execute(backgroundBlockEncoder, "Background block
encoder");
+ clientContext.jobRunner.queue(new DBJob() {
+
+ public void run(ObjectContainer container,
ClientContext context) {
+ ArchiveManager.init(container, context,
context.nodeDBHandle);
+ }
+
+ }, NativeThread.NORM_PRIORITY, false);
persister.start();
if(fcpServer != null)
fcpServer.maybeStart();