Author: toad
Date: 2008-06-06 17:28:13 +0000 (Fri, 06 Jun 2008)
New Revision: 20243
Added:
branches/db4o/freenet/src/freenet/client/async/InsertCompressor.java
Modified:
branches/db4o/freenet/src/freenet/client/async/ClientContext.java
branches/db4o/freenet/src/freenet/client/async/SingleFileInserter.java
Log:
InsertCompressor: Off-thread data compression now moved to the database (for
persistent requests).
We set an InsertCompressor to the database, and remove it when it's done.
Not yet implemented: restarting InsertCompressor's that didn't complete on the
last run.
Modified: branches/db4o/freenet/src/freenet/client/async/ClientContext.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/ClientContext.java
2008-06-06 14:17:52 UTC (rev 20242)
+++ branches/db4o/freenet/src/freenet/client/async/ClientContext.java
2008-06-06 17:28:13 UTC (rev 20243)
@@ -5,6 +5,7 @@
import freenet.client.FECQueue;
import freenet.node.NodeClientCore;
+import freenet.support.Executor;
/**
* Object passed in to client-layer operations, containing references to
essential but transient objects
@@ -19,6 +20,8 @@
public final ClientRequestScheduler sskInsertScheduler;
public final ClientRequestScheduler chkInsertScheduler;
public final DBJobRunner jobRunner;
+ public final Executor mainExecutor;
+ public final long nodeDBHandle;
public ClientContext(NodeClientCore core) {
this.fecQueue = core.fecQueue;
@@ -27,6 +30,8 @@
this.sskInsertScheduler = core.requestStarters.sskPutScheduler;
this.chkInsertScheduler = core.requestStarters.chkPutScheduler;
jobRunner = core;
+ this.mainExecutor = core.getExecutor();
+ this.nodeDBHandle = core.node.nodeDBHandle;
}
}
Added: branches/db4o/freenet/src/freenet/client/async/InsertCompressor.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/InsertCompressor.java
(rev 0)
+++ branches/db4o/freenet/src/freenet/client/async/InsertCompressor.java
2008-06-06 17:28:13 UTC (rev 20243)
@@ -0,0 +1,167 @@
+package freenet.client.async;
+
+import java.io.IOException;
+
+import com.db4o.ObjectContainer;
+
+import freenet.client.InsertException;
+import freenet.support.api.Bucket;
+import freenet.support.api.BucketFactory;
+import freenet.support.compress.CompressionOutputSizeException;
+import freenet.support.compress.Compressor;
+import freenet.support.io.NativeThread;
+
+/**
+ * Compress a file in order to insert it. This class acts as a tag in the
database to ensure that inserts
+ * are not forgotten about, and also can be run on a non-database thread from
an executor.
+ *
+ * FIXME how many compressors do we want to have running simultaneously?
Probably we should have a compression
+ * queue, or at least a SerialExecutor?
+ *
+ * @author toad
+ */
+class InsertCompressor {
+
+ /** Database handle to identify which node it belongs to in the
database */
+ public final long nodeDBHandle;
+ /** The SingleFileInserter we report to. We were created by it and when
we have compressed our data we will
+ * call a method to process it and schedule the data. */
+ public final SingleFileInserter inserter;
+ /** The original data */
+ final Bucket origData;
+ /** If we can get it into one block, don't compress any further */
+ public final int minSize;
+ /** BucketFactory */
+ public final BucketFactory bucketFactory;
+ public final boolean persistent;
+
+ public InsertCompressor(long nodeDBHandle2, SingleFileInserter
inserter2, Bucket origData2, int minSize2, BucketFactory bf, boolean
persistent) {
+ this.nodeDBHandle = nodeDBHandle2;
+ this.inserter = inserter2;
+ this.origData = origData2;
+ this.minSize = minSize2;
+ this.bucketFactory = bf;
+ this.persistent = persistent;
+ }
+
+ public void init(final ClientContext ctx) {
+ ctx.mainExecutor.execute(new Runnable() {
+
+ public void run() {
+ compress(ctx);
+ }
+
+ }, "Compressor for "+this);
+ }
+
+ protected void compress(final ClientContext context) {
+ Bucket data = origData;
+ Compressor bestCodec = null;
+ Bucket bestCompressedData = null;
+
+ // Try to compress the data.
+ // Try each algorithm, starting with the fastest and weakest.
+ // Stop when run out of algorithms, or the compressed data fits
in a single block.
+ int algos = Compressor.countCompressAlgorithms();
+ try {
+ for(int i=0;i<algos;i++) {
+ // Only produce if we are compressing *the
original data*
+ inserter.onStartCompression(i);
+ Compressor comp =
Compressor.getCompressionAlgorithmByDifficulty(i);
+ Bucket result;
+ result = comp.compress(origData, bucketFactory,
origData.size());
+ if(result.size() < minSize) {
+ bestCodec = comp;
+ if(bestCompressedData != null)
+ bestCompressedData.free();
+ bestCompressedData = result;
+ break;
+ }
+ if((bestCompressedData != null) &&
(result.size() < bestCompressedData.size())) {
+ bestCompressedData.free();
+ bestCompressedData = result;
+ bestCodec = comp;
+ } else if((bestCompressedData == null) &&
(result.size() < data.size())) {
+ bestCompressedData = result;
+ bestCodec = comp;
+ } else {
+ result.free();
+ }
+ }
+
+ final CompressionOutput output = new
CompressionOutput(bestCompressedData, bestCodec);
+
+ if(persistent) {
+
+ context.jobRunner.queue(new DBJob() {
+
+ public void run(ObjectContainer
container) {
+ inserter.onCompressed(output,
container, context);
+
container.delete(InsertCompressor.this);
+ }
+
+ }, NativeThread.NORM_PRIORITY);
+ } else {
+ inserter.onCompressed(output, null, context);
+ }
+
+ } catch (final IOException e) {
+ if(persistent) {
+ context.jobRunner.queue(new DBJob() {
+
+ public void run(ObjectContainer
container) {
+ inserter.cb.onFailure(new
InsertException(InsertException.BUCKET_ERROR, e, null), inserter, container,
context);
+
container.delete(InsertCompressor.this);
+ }
+
+ }, NativeThread.NORM_PRIORITY);
+ } else {
+ inserter.cb.onFailure(new
InsertException(InsertException.BUCKET_ERROR, e, null), inserter, null,
context);
+ }
+
+ } catch (final CompressionOutputSizeException e) {
+ if(persistent) {
+ context.jobRunner.queue(new DBJob() {
+
+ public void run(ObjectContainer
container) {
+ inserter.cb.onFailure(new
InsertException(InsertException.BUCKET_ERROR, e, null), inserter, container,
context);
+
container.delete(InsertCompressor.this);
+ }
+
+ }, NativeThread.NORM_PRIORITY);
+ } else {
+ inserter.cb.onFailure(new
InsertException(InsertException.BUCKET_ERROR, e, null), inserter, null,
context);
+ }
+ }
+ }
+
+ /**
+ * Create an InsertCompressor, add it to the database, schedule it.
+ * @param container
+ * @param context
+ * @param inserter2
+ * @param origData2
+ * @param oneBlockCompressedSize
+ * @param bf
+ * @return
+ */
+ public static InsertCompressor start(ObjectContainer container,
ClientContext context, SingleFileInserter inserter,
+ Bucket origData, int minSize, BucketFactory bf, boolean
persistent) {
+ InsertCompressor compressor = new
InsertCompressor(context.nodeDBHandle, inserter, origData, minSize, bf,
persistent);
+ if(persistent)
+ container.set(compressor);
+ compressor.init(context);
+ return compressor;
+ }
+
+
+}
+
+class CompressionOutput {
+ public CompressionOutput(Bucket bestCompressedData, Compressor
bestCodec2) {
+ this.data = bestCompressedData;
+ this.bestCodec = bestCodec2;
+ }
+ final Bucket data;
+ final Compressor bestCodec;
+}
\ No newline at end of file
Modified: branches/db4o/freenet/src/freenet/client/async/SingleFileInserter.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/SingleFileInserter.java
2008-06-06 14:17:52 UTC (rev 20242)
+++ branches/db4o/freenet/src/freenet/client/async/SingleFileInserter.java
2008-06-06 17:28:13 UTC (rev 20243)
@@ -110,41 +110,51 @@
OffThreadCompressor otc = new OffThreadCompressor();
ctx.executor.execute(otc, "Compressor for "+this);
} else {
- tryCompress();
+ tryCompress(container, context);
}
}
private class OffThreadCompressor implements Runnable {
public void run() {
freenet.support.Logger.OSThread.logPID(this);
- try {
- tryCompress();
- } catch (InsertException e) {
- cb.onFailure(e, SingleFileInserter.this);
- } catch (OutOfMemoryError e) {
- OOMHandler.handleOOM(e);
- System.err.println("OffThreadCompressor thread
above failed.");
- // Might not be heap, so try anyway
- cb.onFailure(new
InsertException(InsertException.INTERNAL_ERROR, e, null),
SingleFileInserter.this);
- } catch (Throwable t) {
- Logger.error(this, "Caught in OffThreadCompressor: "+t, t);
- System.err.println("Caught in OffThreadCompressor: "+t);
- t.printStackTrace();
- // Try to fail gracefully
- cb.onFailure(new
InsertException(InsertException.INTERNAL_ERROR, t, null),
SingleFileInserter.this);
- }
}
}
- private void tryCompress() throws InsertException {
- // First, determine how small it needs to be
- Bucket origData = block.getData();
- Bucket data = origData;
+ void onCompressed(CompressionOutput output, ObjectContainer container,
ClientContext context) {
+ try {
+ onCompressedInner(output, container, context);
+ } catch (InsertException e) {
+ cb.onFailure(e, SingleFileInserter.this, container,
context);
+ } catch (OutOfMemoryError e) {
+ OOMHandler.handleOOM(e);
+ System.err.println("OffThreadCompressor thread above
failed.");
+ // Might not be heap, so try anyway
+ cb.onFailure(new
InsertException(InsertException.INTERNAL_ERROR, e, null),
SingleFileInserter.this, container, context);
+ } catch (Throwable t) {
+ Logger.error(this, "Caught in OffThreadCompressor: "+t, t);
+ System.err.println("Caught in OffThreadCompressor: "+t);
+ t.printStackTrace();
+ // Try to fail gracefully
+ cb.onFailure(new
InsertException(InsertException.INTERNAL_ERROR, t, null),
SingleFileInserter.this, container, context);
+ }
+ }
+
+ void onCompressedInner(CompressionOutput output, ObjectContainer
container, ClientContext context) throws InsertException {
+ long origSize = block.getData().size();
+ Bucket bestCompressedData = output.data;
+ Bucket data = bestCompressedData;
+ Compressor bestCodec = output.bestCodec;
+
+ boolean freeData = false;
+ if(bestCodec != null) {
+ freeData = true;
+ } else {
+ data = block.getData();
+ }
+
int blockSize;
int oneBlockCompressedSize;
- boolean dontCompress = ctx.dontCompress;
- long origSize = data.size();
String type = block.desiredURI.getKeyType();
if(type.equals("SSK") || type.equals("KSK") ||
type.equals("USK")) {
blockSize = SSKBlock.DATA_LENGTH;
@@ -156,62 +166,13 @@
throw new InsertException(InsertException.INVALID_URI,
"Unknown key type: "+type, null);
}
- Compressor bestCodec = null;
- Bucket bestCompressedData = null;
-
- boolean tryCompress = (origSize > blockSize) &&
(!ctx.dontCompress) && (!dontCompress);
- if(tryCompress) {
- // Try to compress the data.
- // Try each algorithm, starting with the fastest and
weakest.
- // Stop when run out of algorithms, or the compressed
data fits in a single block.
- int algos = Compressor.countCompressAlgorithms();
- try {
- for(int i=0;i<algos;i++) {
- // Only produce if we are compressing
*the original data*
- if(parent == cb)
-
ctx.eventProducer.produceEvent(new StartedCompressionEvent(i));
- Compressor comp =
Compressor.getCompressionAlgorithmByDifficulty(i);
- Bucket result;
- result = comp.compress(origData,
ctx.persistentBucketFactory, origData.size());
- if(result.size() <
oneBlockCompressedSize) {
- bestCodec = comp;
- if(bestCompressedData != null)
-
bestCompressedData.free();
- bestCompressedData = result;
- break;
- }
- if((bestCompressedData != null) &&
(result.size() < bestCompressedData.size())) {
- bestCompressedData.free();
- bestCompressedData = result;
- bestCodec = comp;
- } else if((bestCompressedData == null)
&& (result.size() < data.size())) {
- bestCompressedData = result;
- bestCodec = comp;
- } else {
- result.free();
- }
- }
- } catch (IOException e) {
- throw new
InsertException(InsertException.BUCKET_ERROR, e, null);
- } catch (CompressionOutputSizeException e) {
- // Impossible
- throw new Error(e);
- }
- }
- boolean freeData = false;
- if(bestCompressedData != null) {
- data = bestCompressedData;
- freeData = true;
- }
+ // Compressed data
if(parent == cb) {
- if(tryCompress)
- ctx.eventProducer.produceEvent(new
FinishedCompressionEvent(bestCodec == null ? -1 :
bestCodec.codecNumberForMetadata(), origSize, data.size()));
+ ctx.eventProducer.produceEvent(new
FinishedCompressionEvent(bestCodec == null ? -1 :
bestCodec.codecNumberForMetadata(), origSize, data.size()));
if(logMINOR) Logger.minor(this, "Compressed
"+origSize+" to "+data.size()+" on "+this);
}
- // Compressed data
-
// Insert it...
short codecNumber = bestCodec == null ? -1 :
bestCodec.codecNumberForMetadata();
long compressedDataSize = data.size();
@@ -226,10 +187,10 @@
if(fitsInOneBlockAsIs) {
// Just insert it
ClientPutState bi =
- createInserter(parent, data,
codecNumber, block.desiredURI, ctx, cb, metadata, (int)block.getData().size(),
-1, getCHKOnly, true, true);
- cb.onTransition(this, bi);
- bi.schedule();
- cb.onBlockSetFinished(this);
+ createInserter(parent, data,
codecNumber, block.desiredURI, ctx, cb, metadata, (int)block.getData().size(),
-1, getCHKOnly, true, true, container, context);
+ cb.onTransition(this, bi, container);
+ bi.schedule(container, context);
+ cb.onBlockSetFinished(this, container);
return;
}
}
@@ -238,10 +199,10 @@
if(reportMetadataOnly) {
SingleBlockInserter dataPutter = new
SingleBlockInserter(parent, data, codecNumber, FreenetURI.EMPTY_CHK_URI, ctx,
cb, metadata, (int)origSize, -1, getCHKOnly, true, true, token);
Metadata meta =
makeMetadata(dataPutter.getURI());
- cb.onMetadata(meta, this);
- cb.onTransition(this, dataPutter);
- dataPutter.schedule();
- cb.onBlockSetFinished(this);
+ cb.onMetadata(meta, this, container, context);
+ cb.onTransition(this, dataPutter, container);
+ dataPutter.schedule(container, context);
+ cb.onBlockSetFinished(this, container);
} else {
MultiPutCompletionCallback mcb =
new MultiPutCompletionCallback(cb,
parent, token);
@@ -258,17 +219,17 @@
Logger.error(this, "Caught "+e, e);
throw new
InsertException(InsertException.INTERNAL_ERROR, "Got
MetadataUnresolvedException in SingleFileInserter: "+e.toString(), null);
}
- ClientPutState metaPutter =
createInserter(parent, metadataBucket, (short) -1, block.desiredURI, ctx, mcb,
true, (int)origSize, -1, getCHKOnly, true, false);
+ ClientPutState metaPutter =
createInserter(parent, metadataBucket, (short) -1, block.desiredURI, ctx, mcb,
true, (int)origSize, -1, getCHKOnly, true, false, container, context);
mcb.addURIGenerator(metaPutter);
mcb.add(dataPutter);
- cb.onTransition(this, mcb);
+ cb.onTransition(this, mcb, container);
Logger.minor(this, ""+mcb+" : data
"+dataPutter+" meta "+metaPutter);
- mcb.arm();
- dataPutter.schedule();
+ mcb.arm(container, context);
+ dataPutter.schedule(container, context);
if(metaPutter instanceof SingleBlockInserter)
-
((SingleBlockInserter)metaPutter).encode();
- metaPutter.schedule();
- cb.onBlockSetFinished(this);
+
((SingleBlockInserter)metaPutter).encode(container);
+ metaPutter.schedule(container, context);
+ cb.onBlockSetFinished(this, container);
}
return;
}
@@ -279,19 +240,45 @@
// metadata insert has finished too, tell the master callback.
if(reportMetadataOnly) {
SplitFileInserter sfi = new SplitFileInserter(parent,
cb, data, bestCodec, origSize, block.clientMetadata, ctx, getCHKOnly, metadata,
token, insertAsArchiveManifest, freeData);
- cb.onTransition(this, sfi);
- sfi.start();
+ cb.onTransition(this, sfi, container);
+ sfi.start(container, context);
if(earlyEncode) sfi.forceEncode();
} else {
SplitHandler sh = new SplitHandler();
SplitFileInserter sfi = new SplitFileInserter(parent,
sh, data, bestCodec, origSize, block.clientMetadata, ctx, getCHKOnly, metadata,
token, insertAsArchiveManifest, freeData);
sh.sfi = sfi;
- cb.onTransition(this, sh);
- sfi.start();
+ cb.onTransition(this, sh, container);
+ sfi.start(container, context);
if(earlyEncode) sfi.forceEncode();
}
}
+ private void tryCompress(ObjectContainer container, ClientContext
context) throws InsertException {
+ // First, determine how small it needs to be
+ Bucket origData = block.getData();
+ Bucket data = origData;
+ int blockSize;
+ int oneBlockCompressedSize;
+ boolean dontCompress = ctx.dontCompress;
+
+ long origSize = data.size();
+ String type = block.desiredURI.getKeyType();
+ if(type.equals("SSK") || type.equals("KSK") ||
type.equals("USK")) {
+ blockSize = SSKBlock.DATA_LENGTH;
+ oneBlockCompressedSize =
SSKBlock.MAX_COMPRESSED_DATA_LENGTH;
+ } else if(type.equals("CHK")) {
+ blockSize = CHKBlock.DATA_LENGTH;
+ oneBlockCompressedSize =
CHKBlock.MAX_COMPRESSED_DATA_LENGTH;
+ } else {
+ throw new InsertException(InsertException.INVALID_URI,
"Unknown key type: "+type, null);
+ }
+
+ boolean tryCompress = (origSize > blockSize) &&
(!ctx.dontCompress) && (!dontCompress);
+ if(tryCompress) {
+ InsertCompressor.start(container, context, this,
origData, oneBlockCompressedSize, ctx.bf, parent.persistent());
+ }
+ }
+
private Metadata makeMetadata(FreenetURI uri) {
Metadata meta = new Metadata(insertAsArchiveManifest ?
Metadata.ZIP_MANIFEST : Metadata.SIMPLE_REDIRECT, uri, block.clientMetadata);
if(targetFilename != null) {
@@ -304,7 +291,7 @@
private ClientPutState createInserter(BaseClientPutter parent, Bucket
data, short compressionCodec, FreenetURI uri,
InsertContext ctx, PutCompletionCallback cb, boolean
isMetadata, int sourceLength, int token, boolean getCHKOnly,
- boolean addToParent, boolean encodeCHK, ObjectContainer
container) throws InsertException {
+ boolean addToParent, boolean encodeCHK, ObjectContainer
container, ClientContext context) throws InsertException {
uri.checkInsertURI(); // will throw an exception if needed
@@ -320,7 +307,7 @@
new SingleBlockInserter(parent, data,
compressionCodec, uri, ctx, cb, isMetadata, sourceLength, token,
getCHKOnly, addToParent, false,
this.token);
if(encodeCHK)
- cb.onEncode(sbi.getBlock().getClientKey(),
this, container);
+ cb.onEncode(sbi.getBlock().getClientKey(),
this, container, context);
return sbi;
}
@@ -549,9 +536,9 @@
oldMetadataPutter = metadataPutter;
}
if(oldSFI != null)
- oldSFI.cancel(container);
+ oldSFI.cancel(container, context);
if(oldMetadataPutter != null)
- oldMetadataPutter.cancel(container);
+ oldMetadataPutter.cancel(container, context);
finished = true;
cb.onFailure(e, this, container, context);
}
@@ -674,7 +661,7 @@
}
} catch (InsertException e1) {
Logger.error(this, "Failing "+this+" : "+e1,
e1);
- fail(e1, container);
+ fail(e1, container, context);
return;
}
}
@@ -701,4 +688,9 @@
public SimpleFieldSet getProgressFieldset() {
return null;
}
+
+ public void onStartCompression(int i) {
+ if(parent == cb)
+ ctx.eventProducer.produceEvent(new
StartedCompressionEvent(i));
+ }
}