Author: nextgens
Date: 2008-10-30 02:20:49 +0000 (Thu, 30 Oct 2008)
New Revision: 23211
Added:
trunk/freenet/src/freenet/support/compress/CompressJob.java
trunk/freenet/src/freenet/support/compress/RealCompressor.java
Modified:
trunk/freenet/src/freenet/client/InsertContext.java
trunk/freenet/src/freenet/client/async/SingleFileInserter.java
Log:
More untested code on the serialized compressor. It shouldn't run out of mem
anymore
Modified: trunk/freenet/src/freenet/client/InsertContext.java
===================================================================
--- trunk/freenet/src/freenet/client/InsertContext.java 2008-10-29 23:22:36 UTC
(rev 23210)
+++ trunk/freenet/src/freenet/client/InsertContext.java 2008-10-30 02:20:49 UTC
(rev 23211)
@@ -10,6 +10,7 @@
import freenet.crypt.RandomSource;
import freenet.support.Executor;
import freenet.support.api.BucketFactory;
+import freenet.support.compress.RealCompressor;
import freenet.support.io.NullPersistentFileTracker;
import freenet.support.io.PersistentFileTracker;
@@ -34,6 +35,7 @@
public final USKManager uskManager;
public final BackgroundBlockEncoder backgroundBlockEncoder;
public final Executor executor;
+ public final RealCompressor compressor;
public InsertContext(BucketFactory bf, BucketFactory persistentBF,
PersistentFileTracker tracker, RandomSource random,
int maxRetries, int rnfsToSuccess, int maxThreads, int
splitfileSegmentDataBlocks, int splitfileSegmentCheckBlocks,
@@ -54,6 +56,8 @@
this.cacheLocalRequests = cacheLocalRequests;
this.backgroundBlockEncoder = blockEncoder;
this.executor = executor;
+ this.compressor = new RealCompressor(executor);
+ executor.execute(compressor, "Compression scheduler");
}
public InsertContext(InsertContext ctx, SimpleEventProducer producer,
boolean forceNonPersistent) {
@@ -73,6 +77,7 @@
this.cacheLocalRequests = ctx.cacheLocalRequests;
this.backgroundBlockEncoder = ctx.backgroundBlockEncoder;
this.executor = ctx.executor;
+ this.compressor = ctx.compressor;
}
public InsertContext(InsertContext ctx, SimpleEventProducer producer) {
@@ -92,6 +97,7 @@
this.cacheLocalRequests = ctx.cacheLocalRequests;
this.backgroundBlockEncoder = ctx.backgroundBlockEncoder;
this.executor = ctx.executor;
+ this.compressor = ctx.compressor;
}
}
Modified: trunk/freenet/src/freenet/client/async/SingleFileInserter.java
===================================================================
--- trunk/freenet/src/freenet/client/async/SingleFileInserter.java
2008-10-29 23:22:36 UTC (rev 23210)
+++ trunk/freenet/src/freenet/client/async/SingleFileInserter.java
2008-10-30 02:20:49 UTC (rev 23211)
@@ -18,15 +18,14 @@
import freenet.keys.SSKBlock;
import freenet.node.PrioRunnable;
import freenet.support.Logger;
-import freenet.support.OOMHandler;
import freenet.support.SimpleFieldSet;
import freenet.support.api.Bucket;
+import freenet.support.compress.CompressJob;
import freenet.support.compress.CompressionOutputSizeException;
import freenet.support.compress.Compressor.COMPRESSOR_TYPE;
import freenet.support.io.BucketChainBucketFactory;
import freenet.support.io.BucketTools;
import freenet.support.io.NativeThread;
-import java.util.concurrent.Semaphore;
/**
* Attempt to insert a file. May include metadata.
@@ -35,7 +34,7 @@
* Attempt to compress the file. Off-thread if it will take a while.
* Then hand it off to SimpleFileInserter.
*/
-class SingleFileInserter implements ClientPutState {
+public class SingleFileInserter implements ClientPutState, CompressJob {
private static boolean logMINOR;
final BaseClientPutter parent;
@@ -106,45 +105,10 @@
}
}
// Run off thread in any case
- OffThreadCompressor otc = new OffThreadCompressor();
- ctx.executor.execute(otc, "Compressor for " + this);
+ ctx.compressor.enqueueNewJob(this);
}
- private class OffThreadCompressor implements PrioRunnable {
- public void run() {
- freenet.support.Logger.OSThread.logPID(this);
- try {
- tryCompress();
- } catch (InsertException e) {
- cb.onFailure(e, SingleFileInserter.this);
- } catch (OutOfMemoryError e) {
- OOMHandler.handleOOM(e);
- System.err.println("OffThreadCompressor thread
above failed.");
- // Might not be heap, so try anyway
- cb.onFailure(new
InsertException(InsertException.INTERNAL_ERROR, e, null),
SingleFileInserter.this);
- } catch (Throwable t) {
- Logger.error(this, "Caught in OffThreadCompressor: "+t, t);
- System.err.println("Caught in OffThreadCompressor: "+t);
- t.printStackTrace();
- // Try to fail gracefully
- cb.onFailure(new
InsertException(InsertException.INTERNAL_ERROR, t, null),
SingleFileInserter.this);
- }
- }
-
- public int getPriority() {
- return NativeThread.LOW_PRIORITY;
- }
- }
-
- private void tryCompress() throws InsertException {
- try {
- try {
- COMPRESSOR_TYPE.compressorSemaphore.acquire();
- } catch (InterruptedException e) {
- // should not happen
- Logger.error(this, "Caught an
InterruptedException:"+e.getMessage(), e);
- throw new
InsertException(InsertException.INTERNAL_ERROR, e, null);
- }
+ public void tryCompress() throws InsertException {
// First, determine how small it needs to be
Bucket origData = block.getData();
Bucket data = origData;
@@ -224,91 +188,111 @@
if(tryCompress)
ctx.eventProducer.produceEvent(new
FinishedCompressionEvent(bestCodec == null ? -1 : bestCodec.metadataID,
origSize, data.size()));
if(logMINOR) Logger.minor(this, "Compressed
"+origSize+" to "+data.size()+" on "+this);
- }
-
- // Compressed data
-
- // Insert it...
- short codecNumber = bestCodec == null ? -1 :
bestCodec.metadataID;
- long compressedDataSize = data.size();
- boolean fitsInOneBlockAsIs = bestCodec == null ?
compressedDataSize < blockSize : compressedDataSize < oneBlockCompressedSize;
- boolean fitsInOneCHK = bestCodec == null ? compressedDataSize <
CHKBlock.DATA_LENGTH : compressedDataSize < CHKBlock.MAX_COMPRESSED_DATA_LENGTH;
+ }
+ // Compressed data ; now insert it
+ // We do it off thread so that RealCompressor can release the
semaphore
+ final COMPRESSOR_TYPE fbestCodec = bestCodec;
+ final Bucket fdata = data;
+ final int foneBlockCompressedSize = oneBlockCompressedSize;
+ final int fblockSize = blockSize;
+ final long forigSize = origSize;
+ final boolean fshouldFreeData = shouldFreeData;
+ ctx.executor.execute(new PrioRunnable() {
- if((fitsInOneBlockAsIs || fitsInOneCHK) &&
block.getData().size() > Integer.MAX_VALUE)
- throw new
InsertException(InsertException.INTERNAL_ERROR, "2GB+ should not encode to one
block!", null);
+ public int getPriority() {
+ return NativeThread.NORM_PRIORITY;
+ }
- boolean noMetadata = ((block.clientMetadata == null) ||
block.clientMetadata.isTrivial()) && targetFilename == null;
- if(noMetadata && archiveType == null) {
- if(fitsInOneBlockAsIs) {
- // Just insert it
- ClientPutState bi =
- createInserter(parent, data,
codecNumber, block.desiredURI, ctx, cb, metadata, (int)block.getData().size(),
-1, getCHKOnly, true, true);
- cb.onTransition(this, bi);
- bi.schedule();
- cb.onBlockSetFinished(this);
+ public void run() {
+ insert(fbestCodec, fdata,
foneBlockCompressedSize, fblockSize, forigSize, fshouldFreeData);
+ }
+ }, "Insert thread for "+this);
+ }
+
+ private void insert(COMPRESSOR_TYPE bestCodec, Bucket data, int
oneBlockCompressedSize, int blockSize, long origSize, boolean shouldFreeData) {
+ try {
+ // Insert it...
+ short codecNumber = bestCodec == null ? -1 :
bestCodec.metadataID;
+ long compressedDataSize = data.size();
+ boolean fitsInOneBlockAsIs = bestCodec == null ?
compressedDataSize < blockSize : compressedDataSize < oneBlockCompressedSize;
+ boolean fitsInOneCHK = bestCodec == null ?
compressedDataSize < CHKBlock.DATA_LENGTH : compressedDataSize <
CHKBlock.MAX_COMPRESSED_DATA_LENGTH;
+
+ if((fitsInOneBlockAsIs || fitsInOneCHK) &&
block.getData().size() > Integer.MAX_VALUE)
+ throw new
InsertException(InsertException.INTERNAL_ERROR, "2GB+ should not encode to one
block!", null);
+
+ boolean noMetadata = ((block.clientMetadata == null) ||
block.clientMetadata.isTrivial()) && targetFilename == null;
+ if(noMetadata && archiveType == null)
+ if(fitsInOneBlockAsIs) {
+ // Just insert it
+ ClientPutState bi =
+ createInserter(parent, data,
codecNumber, block.desiredURI, ctx, cb, metadata, (int) block.getData().size(),
-1, getCHKOnly, true, true);
+ cb.onTransition(this, bi);
+ bi.schedule();
+ cb.onBlockSetFinished(this);
+ return;
+ }
+ if(fitsInOneCHK) {
+ // Insert single block, then insert pointer to
it
+ if(reportMetadataOnly) {
+ SingleBlockInserter dataPutter = new
SingleBlockInserter(parent, data, codecNumber, FreenetURI.EMPTY_CHK_URI, ctx,
cb, metadata, (int) origSize, -1, getCHKOnly, true, true, token);
+ Metadata meta =
makeMetadata(archiveType, bestCodec, dataPutter.getURI());
+ cb.onMetadata(meta, this);
+ cb.onTransition(this, dataPutter);
+ dataPutter.schedule();
+ cb.onBlockSetFinished(this);
+ } else {
+ MultiPutCompletionCallback mcb =
+ new
MultiPutCompletionCallback(cb, parent, token);
+ SingleBlockInserter dataPutter = new
SingleBlockInserter(parent, data, codecNumber, FreenetURI.EMPTY_CHK_URI, ctx,
mcb, metadata, (int) origSize, -1, getCHKOnly, true, false, token);
+ Metadata meta =
makeMetadata(archiveType, bestCodec, dataPutter.getURI());
+ Bucket metadataBucket;
+ try {
+ metadataBucket =
BucketTools.makeImmutableBucket(ctx.bf, meta.writeToByteArray());
+ } catch(IOException e) {
+ Logger.error(this, "Caught " +
e, e);
+ throw new
InsertException(InsertException.BUCKET_ERROR, e, null);
+ } catch(MetadataUnresolvedException e) {
+ // Impossible, we're not
inserting a manifest.
+ Logger.error(this, "Caught " +
e, e);
+ throw new
InsertException(InsertException.INTERNAL_ERROR, "Got
MetadataUnresolvedException in SingleFileInserter: " + e.toString(), null);
+ }
+ ClientPutState metaPutter =
createInserter(parent, metadataBucket, (short) -1, block.desiredURI, ctx, mcb,
true, (int) origSize, -1, getCHKOnly, true, false);
+ mcb.addURIGenerator(metaPutter);
+ mcb.add(dataPutter);
+ cb.onTransition(this, mcb);
+ Logger.minor(this, "" + mcb + " : data
" + dataPutter + " meta " + metaPutter);
+ mcb.arm();
+ dataPutter.schedule();
+ if(metaPutter instanceof
SingleBlockInserter)
+ ((SingleBlockInserter)
metaPutter).encode();
+ metaPutter.schedule();
+ cb.onBlockSetFinished(this);
+ }
return;
}
- }
- if (fitsInOneCHK) {
- // Insert single block, then insert pointer to it
+ // Otherwise the file is too big to fit into one block
+ // We therefore must make a splitfile
+ // Job of SplitHandler: when the splitinserter has the
metadata,
+ // insert it. Then when the splitinserter has finished,
and the
+ // metadata insert has finished too, tell the master
callback.
if(reportMetadataOnly) {
- SingleBlockInserter dataPutter = new
SingleBlockInserter(parent, data, codecNumber, FreenetURI.EMPTY_CHK_URI, ctx,
cb, metadata, (int)origSize, -1, getCHKOnly, true, true, token);
- Metadata meta = makeMetadata(archiveType,
bestCodec, dataPutter.getURI());
- cb.onMetadata(meta, this);
- cb.onTransition(this, dataPutter);
- dataPutter.schedule();
- cb.onBlockSetFinished(this);
+ SplitFileInserter sfi = new
SplitFileInserter(parent, cb, data, bestCodec, origSize, block.clientMetadata,
ctx, getCHKOnly, metadata, token, archiveType, shouldFreeData);
+ cb.onTransition(this, sfi);
+ sfi.start();
+ if(earlyEncode)
+ sfi.forceEncode();
} else {
- MultiPutCompletionCallback mcb =
- new MultiPutCompletionCallback(cb,
parent, token);
- SingleBlockInserter dataPutter = new
SingleBlockInserter(parent, data, codecNumber, FreenetURI.EMPTY_CHK_URI, ctx,
mcb, metadata, (int)origSize, -1, getCHKOnly, true, false, token);
- Metadata meta = makeMetadata(archiveType,
bestCodec, dataPutter.getURI());
- Bucket metadataBucket;
- try {
- metadataBucket =
BucketTools.makeImmutableBucket(ctx.bf, meta.writeToByteArray());
- } catch (IOException e) {
- Logger.error(this, "Caught "+e, e);
- throw new
InsertException(InsertException.BUCKET_ERROR, e, null);
- } catch (MetadataUnresolvedException e) {
- // Impossible, we're not inserting a
manifest.
- Logger.error(this, "Caught "+e, e);
- throw new
InsertException(InsertException.INTERNAL_ERROR, "Got
MetadataUnresolvedException in SingleFileInserter: "+e.toString(), null);
- }
- ClientPutState metaPutter =
createInserter(parent, metadataBucket, (short) -1, block.desiredURI, ctx, mcb,
true, (int)origSize, -1, getCHKOnly, true, false);
- mcb.addURIGenerator(metaPutter);
- mcb.add(dataPutter);
- cb.onTransition(this, mcb);
- Logger.minor(this, ""+mcb+" : data
"+dataPutter+" meta "+metaPutter);
- mcb.arm();
- dataPutter.schedule();
- if(metaPutter instanceof SingleBlockInserter)
-
((SingleBlockInserter)metaPutter).encode();
- metaPutter.schedule();
- cb.onBlockSetFinished(this);
+ SplitHandler sh = new SplitHandler();
+ SplitFileInserter sfi = new
SplitFileInserter(parent, sh, data, bestCodec, origSize, block.clientMetadata,
ctx, getCHKOnly, metadata, token, archiveType, shouldFreeData);
+ sh.sfi = sfi;
+ cb.onTransition(this, sh);
+ sfi.start();
+ if(earlyEncode)
+ sfi.forceEncode();
}
- return;
+ } catch(InsertException e) {
+ onFailure(e, this);
}
- // Otherwise the file is too big to fit into one block
- // We therefore must make a splitfile
- // Job of SplitHandler: when the splitinserter has the metadata,
- // insert it. Then when the splitinserter has finished, and the
- // metadata insert has finished too, tell the master callback.
- if(reportMetadataOnly) {
- SplitFileInserter sfi = new SplitFileInserter(parent,
cb, data, bestCodec, origSize, block.clientMetadata, ctx, getCHKOnly, metadata,
token, archiveType, shouldFreeData);
- cb.onTransition(this, sfi);
- sfi.start();
- if(earlyEncode) sfi.forceEncode();
- } else {
- SplitHandler sh = new SplitHandler();
- SplitFileInserter sfi = new SplitFileInserter(parent,
sh, data, bestCodec, origSize, block.clientMetadata, ctx, getCHKOnly, metadata,
token, archiveType, shouldFreeData);
- sh.sfi = sfi;
- cb.onTransition(this, sh);
- sfi.start();
- if(earlyEncode) sfi.forceEncode();
- }
- } finally {
- COMPRESSOR_TYPE.compressorSemaphore.release();
- }
}
private Metadata makeMetadata(ARCHIVE_TYPE archiveType, COMPRESSOR_TYPE
codec, FreenetURI uri) {
@@ -722,4 +706,11 @@
public SimpleFieldSet getProgressFieldset() {
return null;
}
+
+ public void onFailure(InsertException e, ClientPutState c) {
+ if(cb != null)
+ cb.onFailure(e, c);
+ else
+ Logger.error(this, "The callback is null but we have
onFailure to call!");
+ }
}
Added: trunk/freenet/src/freenet/support/compress/CompressJob.java
===================================================================
--- trunk/freenet/src/freenet/support/compress/CompressJob.java
(rev 0)
+++ trunk/freenet/src/freenet/support/compress/CompressJob.java 2008-10-30
02:20:49 UTC (rev 23211)
@@ -0,0 +1,9 @@
+package freenet.support.compress;
+
+import freenet.client.InsertException;
+import freenet.client.async.ClientPutState;
+
+public interface CompressJob {
+ public abstract void tryCompress() throws InsertException;
+ public abstract void onFailure(InsertException e, ClientPutState c);
+}
Added: trunk/freenet/src/freenet/support/compress/RealCompressor.java
===================================================================
--- trunk/freenet/src/freenet/support/compress/RealCompressor.java
(rev 0)
+++ trunk/freenet/src/freenet/support/compress/RealCompressor.java
2008-10-30 02:20:49 UTC (rev 23211)
@@ -0,0 +1,86 @@
+/* This code is part of Freenet. It is distributed under the GNU General
+ * Public License, version 2 (or at your option any later version). See
+ * http://www.gnu.org/ for further details of the GPL. */
+package freenet.support.compress;
+
+import freenet.client.InsertException;
+import freenet.node.PrioRunnable;
+import freenet.support.Executor;
+import freenet.support.Logger;
+import freenet.support.OOMHandler;
+import freenet.support.io.NativeThread;
+import java.util.LinkedList;
+
+public class RealCompressor implements PrioRunnable {
+
+ private final Executor exec;
+ private static final LinkedList<CompressJob> _awaitingJobs = new
LinkedList<CompressJob>();
+
+ public RealCompressor(Executor e) {
+ this.exec = e;
+ }
+
+ public int getPriority() {
+ return NativeThread.HIGH_PRIORITY;
+ }
+
+ public synchronized void enqueueNewJob(CompressJob j) {
+ _awaitingJobs.add(j);
+ notifyAll();
+ }
+
+ public void run() {
+ Logger.normal(this, "Starting RealCompressor");
+ while(true) {
+ CompressJob currentJob = null;
+ try {
+ synchronized(this) {
+ wait();
+ currentJob = _awaitingJobs.poll();
+ }
+ if(currentJob == null)
+ continue;
+
Compressor.COMPRESSOR_TYPE.compressorSemaphore.acquire();
+ } catch(InterruptedException e) {
+ Logger.error(this, "caught: "+e.getMessage(),
e);
+ continue;
+ }
+
+ final CompressJob finalJob = currentJob;
+ exec.execute(new PrioRunnable() {
+ public void run() {
+
freenet.support.Logger.OSThread.logPID(this);
+ try {
+ while(true) {
+ try {
+
finalJob.tryCompress();
+ } catch(InsertException
e) {
+
finalJob.onFailure(e, null);
+ }
catch(OutOfMemoryError e) {
+
OOMHandler.handleOOM(e);
+
System.err.println("OffThreadCompressor thread above failed.");
+ // Might not be
heap, so try anyway
+
finalJob.onFailure(new InsertException(InsertException.INTERNAL_ERROR, e,
null), null);
+ } catch(Throwable t) {
+
Logger.error(this, "Caught in OffThreadCompressor: " + t, t);
+
System.err.println("Caught in OffThreadCompressor: " + t);
+
t.printStackTrace();
+ // Try to fail
gracefully
+
finalJob.onFailure(new InsertException(InsertException.INTERNAL_ERROR, t,
null), null);
+ }
+
+ }
+ } catch(Throwable t) {
+ Logger.error(this, "Caught " +
t + " in " + this, t);
+ } finally {
+
Compressor.COMPRESSOR_TYPE.compressorSemaphore.release();
+ }
+ }
+
+ public int getPriority() {
+ return NativeThread.MIN_PRIORITY;
+ }
+ }, "Compressor thread for " + currentJob);
+ }
+ }
+}
\ No newline at end of file