Author: nextgens
Date: 2008-10-30 02:20:49 +0000 (Thu, 30 Oct 2008)
New Revision: 23211

Added:
   trunk/freenet/src/freenet/support/compress/CompressJob.java
   trunk/freenet/src/freenet/support/compress/RealCompressor.java
Modified:
   trunk/freenet/src/freenet/client/InsertContext.java
   trunk/freenet/src/freenet/client/async/SingleFileInserter.java
Log:
More untested code on the serialized compressor. It shouldn't run out of mem 
anymore

Modified: trunk/freenet/src/freenet/client/InsertContext.java
===================================================================
--- trunk/freenet/src/freenet/client/InsertContext.java 2008-10-29 23:22:36 UTC 
(rev 23210)
+++ trunk/freenet/src/freenet/client/InsertContext.java 2008-10-30 02:20:49 UTC 
(rev 23211)
@@ -10,6 +10,7 @@
 import freenet.crypt.RandomSource;
 import freenet.support.Executor;
 import freenet.support.api.BucketFactory;
+import freenet.support.compress.RealCompressor;
 import freenet.support.io.NullPersistentFileTracker;
 import freenet.support.io.PersistentFileTracker;

@@ -34,6 +35,7 @@
        public final USKManager uskManager;
        public final BackgroundBlockEncoder backgroundBlockEncoder;
        public final Executor executor;
+       public final RealCompressor compressor;

        public InsertContext(BucketFactory bf, BucketFactory persistentBF, 
PersistentFileTracker tracker, RandomSource random,
                        int maxRetries, int rnfsToSuccess, int maxThreads, int 
splitfileSegmentDataBlocks, int splitfileSegmentCheckBlocks,
@@ -54,6 +56,8 @@
                this.cacheLocalRequests = cacheLocalRequests;
                this.backgroundBlockEncoder = blockEncoder;
                this.executor = executor;
+               this.compressor = new RealCompressor(executor);
+               executor.execute(compressor, "Compression scheduler");
        }

        public InsertContext(InsertContext ctx, SimpleEventProducer producer, 
boolean forceNonPersistent) {
@@ -73,6 +77,7 @@
                this.cacheLocalRequests = ctx.cacheLocalRequests;
                this.backgroundBlockEncoder = ctx.backgroundBlockEncoder;
                this.executor = ctx.executor;
+               this.compressor = ctx.compressor;
        }

        public InsertContext(InsertContext ctx, SimpleEventProducer producer) {
@@ -92,6 +97,7 @@
                this.cacheLocalRequests = ctx.cacheLocalRequests;
                this.backgroundBlockEncoder = ctx.backgroundBlockEncoder;
                this.executor = ctx.executor;
+               this.compressor = ctx.compressor;
        }

 }

Modified: trunk/freenet/src/freenet/client/async/SingleFileInserter.java
===================================================================
--- trunk/freenet/src/freenet/client/async/SingleFileInserter.java      
2008-10-29 23:22:36 UTC (rev 23210)
+++ trunk/freenet/src/freenet/client/async/SingleFileInserter.java      
2008-10-30 02:20:49 UTC (rev 23211)
@@ -18,15 +18,14 @@
 import freenet.keys.SSKBlock;
 import freenet.node.PrioRunnable;
 import freenet.support.Logger;
-import freenet.support.OOMHandler;
 import freenet.support.SimpleFieldSet;
 import freenet.support.api.Bucket;
+import freenet.support.compress.CompressJob;
 import freenet.support.compress.CompressionOutputSizeException;
 import freenet.support.compress.Compressor.COMPRESSOR_TYPE;
 import freenet.support.io.BucketChainBucketFactory;
 import freenet.support.io.BucketTools;
 import freenet.support.io.NativeThread;
-import java.util.concurrent.Semaphore;

 /**
  * Attempt to insert a file. May include metadata.
@@ -35,7 +34,7 @@
  * Attempt to compress the file. Off-thread if it will take a while.
  * Then hand it off to SimpleFileInserter.
  */
-class SingleFileInserter implements ClientPutState {
+public class SingleFileInserter implements ClientPutState, CompressJob {

        private static boolean logMINOR;
        final BaseClientPutter parent;
@@ -106,45 +105,10 @@
                        }
                }
                // Run off thread in any case
-               OffThreadCompressor otc = new OffThreadCompressor();
-               ctx.executor.execute(otc, "Compressor for " + this);
+               ctx.compressor.enqueueNewJob(this);
        }

-       private  class OffThreadCompressor implements PrioRunnable {
-               public void run() {
-                   freenet.support.Logger.OSThread.logPID(this);
-                       try {
-                               tryCompress();
-                       } catch (InsertException e) {
-                               cb.onFailure(e, SingleFileInserter.this);
-            } catch (OutOfMemoryError e) {
-                               OOMHandler.handleOOM(e);
-                               System.err.println("OffThreadCompressor thread 
above failed.");
-                               // Might not be heap, so try anyway
-                               cb.onFailure(new 
InsertException(InsertException.INTERNAL_ERROR, e, null), 
SingleFileInserter.this);
-            } catch (Throwable t) {
-                Logger.error(this, "Caught in OffThreadCompressor: "+t, t);
-                System.err.println("Caught in OffThreadCompressor: "+t);
-                t.printStackTrace();
-                // Try to fail gracefully
-                               cb.onFailure(new 
InsertException(InsertException.INTERNAL_ERROR, t, null), 
SingleFileInserter.this);
-                       }
-               }
-
-               public int getPriority() {
-                       return NativeThread.LOW_PRIORITY;
-               }
-       }
-       
-       private void tryCompress() throws InsertException {
-               try {
-                       try {
-                               COMPRESSOR_TYPE.compressorSemaphore.acquire();
-                       } catch (InterruptedException e) {
-                               // should not happen
-                               Logger.error(this, "Caught an 
InterruptedException:"+e.getMessage(), e);
-                               throw new 
InsertException(InsertException.INTERNAL_ERROR, e, null);
-                       }
+       public void tryCompress() throws InsertException {
                // First, determine how small it needs to be
                Bucket origData = block.getData();
                Bucket data = origData;
@@ -224,91 +188,111 @@
                        if(tryCompress)
                                ctx.eventProducer.produceEvent(new 
FinishedCompressionEvent(bestCodec == null ? -1 : bestCodec.metadataID, 
origSize, data.size()));
                        if(logMINOR) Logger.minor(this, "Compressed 
"+origSize+" to "+data.size()+" on "+this);
-               }
-               
-               // Compressed data
-               
-               // Insert it...
-               short codecNumber = bestCodec == null ? -1 : 
bestCodec.metadataID;
-               long compressedDataSize = data.size();
-               boolean fitsInOneBlockAsIs = bestCodec == null ? 
compressedDataSize < blockSize : compressedDataSize < oneBlockCompressedSize;
-               boolean fitsInOneCHK = bestCodec == null ? compressedDataSize < 
CHKBlock.DATA_LENGTH : compressedDataSize < CHKBlock.MAX_COMPRESSED_DATA_LENGTH;
+               }               
+               // Compressed data ; now insert it
+               // We do it off thread so that RealCompressor can release the 
semaphore
+               final COMPRESSOR_TYPE fbestCodec = bestCodec;
+               final Bucket fdata = data;
+               final int foneBlockCompressedSize = oneBlockCompressedSize;
+               final int fblockSize = blockSize;
+               final long forigSize = origSize;
+               final boolean fshouldFreeData = shouldFreeData;
+               ctx.executor.execute(new PrioRunnable() {

-               if((fitsInOneBlockAsIs || fitsInOneCHK) && 
block.getData().size() > Integer.MAX_VALUE)
-                       throw new 
InsertException(InsertException.INTERNAL_ERROR, "2GB+ should not encode to one 
block!", null);
+                       public int getPriority() {
+                               return NativeThread.NORM_PRIORITY;
+                       }

-               boolean noMetadata = ((block.clientMetadata == null) || 
block.clientMetadata.isTrivial()) && targetFilename == null;
-               if(noMetadata && archiveType == null) {
-                       if(fitsInOneBlockAsIs) {
-                               // Just insert it
-                               ClientPutState bi =
-                                       createInserter(parent, data, 
codecNumber, block.desiredURI, ctx, cb, metadata, (int)block.getData().size(), 
-1, getCHKOnly, true, true);
-                               cb.onTransition(this, bi);
-                               bi.schedule();
-                               cb.onBlockSetFinished(this);
+                       public void run() {
+                               insert(fbestCodec, fdata, 
foneBlockCompressedSize, fblockSize, forigSize, fshouldFreeData);
+                       }
+               }, "Insert thread for "+this);
+       }
+       
+       private void insert(COMPRESSOR_TYPE bestCodec, Bucket data, int 
oneBlockCompressedSize, int blockSize, long origSize, boolean shouldFreeData) {
+               try {
+                       // Insert it...
+                       short codecNumber = bestCodec == null ? -1 : 
bestCodec.metadataID;
+                       long compressedDataSize = data.size();
+                       boolean fitsInOneBlockAsIs = bestCodec == null ? 
compressedDataSize < blockSize : compressedDataSize < oneBlockCompressedSize;
+                       boolean fitsInOneCHK = bestCodec == null ? 
compressedDataSize < CHKBlock.DATA_LENGTH : compressedDataSize < 
CHKBlock.MAX_COMPRESSED_DATA_LENGTH;
+
+                       if((fitsInOneBlockAsIs || fitsInOneCHK) && 
block.getData().size() > Integer.MAX_VALUE)
+                               throw new 
InsertException(InsertException.INTERNAL_ERROR, "2GB+ should not encode to one 
block!", null);
+
+                       boolean noMetadata = ((block.clientMetadata == null) || 
block.clientMetadata.isTrivial()) && targetFilename == null;
+                       if(noMetadata && archiveType == null)
+                               if(fitsInOneBlockAsIs) {
+                                       // Just insert it
+                                       ClientPutState bi =
+                                               createInserter(parent, data, 
codecNumber, block.desiredURI, ctx, cb, metadata, (int) block.getData().size(), 
-1, getCHKOnly, true, true);
+                                       cb.onTransition(this, bi);
+                                       bi.schedule();
+                                       cb.onBlockSetFinished(this);
+                                       return;
+                               }
+                       if(fitsInOneCHK) {
+                               // Insert single block, then insert pointer to 
it
+                               if(reportMetadataOnly) {
+                                       SingleBlockInserter dataPutter = new 
SingleBlockInserter(parent, data, codecNumber, FreenetURI.EMPTY_CHK_URI, ctx, 
cb, metadata, (int) origSize, -1, getCHKOnly, true, true, token);
+                                       Metadata meta = 
makeMetadata(archiveType, bestCodec, dataPutter.getURI());
+                                       cb.onMetadata(meta, this);
+                                       cb.onTransition(this, dataPutter);
+                                       dataPutter.schedule();
+                                       cb.onBlockSetFinished(this);
+                               } else {
+                                       MultiPutCompletionCallback mcb =
+                                               new 
MultiPutCompletionCallback(cb, parent, token);
+                                       SingleBlockInserter dataPutter = new 
SingleBlockInserter(parent, data, codecNumber, FreenetURI.EMPTY_CHK_URI, ctx, 
mcb, metadata, (int) origSize, -1, getCHKOnly, true, false, token);
+                                       Metadata meta = 
makeMetadata(archiveType, bestCodec, dataPutter.getURI());
+                                       Bucket metadataBucket;
+                                       try {
+                                               metadataBucket = 
BucketTools.makeImmutableBucket(ctx.bf, meta.writeToByteArray());
+                                       } catch(IOException e) {
+                                               Logger.error(this, "Caught " + 
e, e);
+                                               throw new 
InsertException(InsertException.BUCKET_ERROR, e, null);
+                                       } catch(MetadataUnresolvedException e) {
+                                               // Impossible, we're not 
inserting a manifest.
+                                               Logger.error(this, "Caught " + 
e, e);
+                                               throw new 
InsertException(InsertException.INTERNAL_ERROR, "Got 
MetadataUnresolvedException in SingleFileInserter: " + e.toString(), null);
+                                       }
+                                       ClientPutState metaPutter = 
createInserter(parent, metadataBucket, (short) -1, block.desiredURI, ctx, mcb, 
true, (int) origSize, -1, getCHKOnly, true, false);
+                                       mcb.addURIGenerator(metaPutter);
+                                       mcb.add(dataPutter);
+                                       cb.onTransition(this, mcb);
+                                       Logger.minor(this, "" + mcb + " : data 
" + dataPutter + " meta " + metaPutter);
+                                       mcb.arm();
+                                       dataPutter.schedule();
+                                       if(metaPutter instanceof 
SingleBlockInserter)
+                                               ((SingleBlockInserter) 
metaPutter).encode();
+                                       metaPutter.schedule();
+                                       cb.onBlockSetFinished(this);
+                               }
                                return;
                        }
-               }
-               if (fitsInOneCHK) {
-                       // Insert single block, then insert pointer to it
+                       // Otherwise the file is too big to fit into one block
+                       // We therefore must make a splitfile
+                       // Job of SplitHandler: when the splitinserter has the 
metadata,
+                       // insert it. Then when the splitinserter has finished, 
and the
+                       // metadata insert has finished too, tell the master 
callback.
                        if(reportMetadataOnly) {
-                               SingleBlockInserter dataPutter = new 
SingleBlockInserter(parent, data, codecNumber, FreenetURI.EMPTY_CHK_URI, ctx, 
cb, metadata, (int)origSize, -1, getCHKOnly, true, true, token);
-                               Metadata meta = makeMetadata(archiveType, 
bestCodec, dataPutter.getURI());
-                               cb.onMetadata(meta, this);
-                               cb.onTransition(this, dataPutter);
-                               dataPutter.schedule();
-                               cb.onBlockSetFinished(this);
+                               SplitFileInserter sfi = new 
SplitFileInserter(parent, cb, data, bestCodec, origSize, block.clientMetadata, 
ctx, getCHKOnly, metadata, token, archiveType, shouldFreeData);
+                               cb.onTransition(this, sfi);
+                               sfi.start();
+                               if(earlyEncode)
+                                       sfi.forceEncode();
                        } else {
-                               MultiPutCompletionCallback mcb = 
-                                       new MultiPutCompletionCallback(cb, 
parent, token);
-                               SingleBlockInserter dataPutter = new 
SingleBlockInserter(parent, data, codecNumber, FreenetURI.EMPTY_CHK_URI, ctx, 
mcb, metadata, (int)origSize, -1, getCHKOnly, true, false, token);
-                               Metadata meta = makeMetadata(archiveType, 
bestCodec, dataPutter.getURI());
-                               Bucket metadataBucket;
-                               try {
-                                       metadataBucket = 
BucketTools.makeImmutableBucket(ctx.bf, meta.writeToByteArray());
-                               } catch (IOException e) {
-                                       Logger.error(this, "Caught "+e, e);
-                                       throw new 
InsertException(InsertException.BUCKET_ERROR, e, null);
-                               } catch (MetadataUnresolvedException e) {
-                                       // Impossible, we're not inserting a 
manifest.
-                                       Logger.error(this, "Caught "+e, e);
-                                       throw new 
InsertException(InsertException.INTERNAL_ERROR, "Got 
MetadataUnresolvedException in SingleFileInserter: "+e.toString(), null);
-                               }
-                               ClientPutState metaPutter = 
createInserter(parent, metadataBucket, (short) -1, block.desiredURI, ctx, mcb, 
true, (int)origSize, -1, getCHKOnly, true, false);
-                               mcb.addURIGenerator(metaPutter);
-                               mcb.add(dataPutter);
-                               cb.onTransition(this, mcb);
-                               Logger.minor(this, ""+mcb+" : data 
"+dataPutter+" meta "+metaPutter);
-                               mcb.arm();
-                               dataPutter.schedule();
-                               if(metaPutter instanceof SingleBlockInserter)
-                                       
((SingleBlockInserter)metaPutter).encode();
-                               metaPutter.schedule();
-                               cb.onBlockSetFinished(this);
+                               SplitHandler sh = new SplitHandler();
+                               SplitFileInserter sfi = new 
SplitFileInserter(parent, sh, data, bestCodec, origSize, block.clientMetadata, 
ctx, getCHKOnly, metadata, token, archiveType, shouldFreeData);
+                               sh.sfi = sfi;
+                               cb.onTransition(this, sh);
+                               sfi.start();
+                               if(earlyEncode)
+                                       sfi.forceEncode();
                        }
-                       return;
+               } catch(InsertException e) {
+                       onFailure(e, this);
                }
-               // Otherwise the file is too big to fit into one block
-               // We therefore must make a splitfile
-               // Job of SplitHandler: when the splitinserter has the metadata,
-               // insert it. Then when the splitinserter has finished, and the
-               // metadata insert has finished too, tell the master callback.
-               if(reportMetadataOnly) {
-                       SplitFileInserter sfi = new SplitFileInserter(parent, 
cb, data, bestCodec, origSize, block.clientMetadata, ctx, getCHKOnly, metadata, 
token, archiveType, shouldFreeData);
-                       cb.onTransition(this, sfi);
-                       sfi.start();
-                       if(earlyEncode) sfi.forceEncode();
-               } else {
-                       SplitHandler sh = new SplitHandler();
-                       SplitFileInserter sfi = new SplitFileInserter(parent, 
sh, data, bestCodec, origSize, block.clientMetadata, ctx, getCHKOnly, metadata, 
token, archiveType, shouldFreeData);
-                       sh.sfi = sfi;
-                       cb.onTransition(this, sh);
-                       sfi.start();
-                       if(earlyEncode) sfi.forceEncode();
-               }
-               } finally {
-               COMPRESSOR_TYPE.compressorSemaphore.release();
-               }
        }

        private Metadata makeMetadata(ARCHIVE_TYPE archiveType, COMPRESSOR_TYPE 
codec, FreenetURI uri) {
@@ -722,4 +706,11 @@
        public SimpleFieldSet getProgressFieldset() {
                return null;
        }
+
+       public void onFailure(InsertException e, ClientPutState c) {
+               if(cb != null)
+                       cb.onFailure(e, c);
+               else
+                       Logger.error(this, "The callback is null but we have 
onFailure to call!");
+       }
 }

Added: trunk/freenet/src/freenet/support/compress/CompressJob.java
===================================================================
--- trunk/freenet/src/freenet/support/compress/CompressJob.java                 
        (rev 0)
+++ trunk/freenet/src/freenet/support/compress/CompressJob.java 2008-10-30 
02:20:49 UTC (rev 23211)
@@ -0,0 +1,9 @@
+package freenet.support.compress;
+
+import freenet.client.InsertException;
+import freenet.client.async.ClientPutState;
+
+public interface CompressJob {
+       public abstract void tryCompress() throws InsertException;
+       public abstract void onFailure(InsertException e, ClientPutState c);
+}

Added: trunk/freenet/src/freenet/support/compress/RealCompressor.java
===================================================================
--- trunk/freenet/src/freenet/support/compress/RealCompressor.java              
                (rev 0)
+++ trunk/freenet/src/freenet/support/compress/RealCompressor.java      
2008-10-30 02:20:49 UTC (rev 23211)
@@ -0,0 +1,86 @@
+/* This code is part of Freenet. It is distributed under the GNU General
+ * Public License, version 2 (or at your option any later version). See
+ * http://www.gnu.org/ for further details of the GPL. */
+package freenet.support.compress;
+
+import freenet.client.InsertException;
+import freenet.node.PrioRunnable;
+import freenet.support.Executor;
+import freenet.support.Logger;
+import freenet.support.OOMHandler;
+import freenet.support.io.NativeThread;
+import java.util.LinkedList;
+
+public class RealCompressor implements PrioRunnable {
+       
+       private final Executor exec;
+       private static final LinkedList<CompressJob> _awaitingJobs = new 
LinkedList<CompressJob>();
+       
+       public RealCompressor(Executor e) {
+               this.exec = e;
+       }
+
+       public int getPriority() {
+               return NativeThread.HIGH_PRIORITY;
+       }
+       
+       public synchronized void enqueueNewJob(CompressJob j) {
+               _awaitingJobs.add(j);
+               notifyAll();
+       }
+
+       public void run() {
+               Logger.normal(this, "Starting RealCompressor");
+               while(true) {
+                       CompressJob currentJob = null;
+                       try {
+                               synchronized(this) {
+                                       wait();
+                                       currentJob = _awaitingJobs.poll();
+                               }
+                               if(currentJob == null)
+                                       continue;
+                               
Compressor.COMPRESSOR_TYPE.compressorSemaphore.acquire(); 
+                       } catch(InterruptedException e) {
+                               Logger.error(this, "caught: "+e.getMessage(), 
e);
+                               continue;
+                       }
+                       
+                       final CompressJob finalJob = currentJob;
+                       exec.execute(new PrioRunnable() {
+                               public void run() {
+                                       
freenet.support.Logger.OSThread.logPID(this);
+                                       try {
+                                               while(true) {
+                                                       try {
+                                                               
finalJob.tryCompress();
+                                                       } catch(InsertException 
e) {
+                                                               
finalJob.onFailure(e, null);
+                                                       } 
catch(OutOfMemoryError e) {
+                                                               
OOMHandler.handleOOM(e);
+                                                               
System.err.println("OffThreadCompressor thread above failed.");
+                                                               // Might not be 
heap, so try anyway
+                                                               
finalJob.onFailure(new InsertException(InsertException.INTERNAL_ERROR, e, 
null), null);
+                                                       } catch(Throwable t) {
+                                                               
Logger.error(this, "Caught in OffThreadCompressor: " + t, t);
+                                                               
System.err.println("Caught in OffThreadCompressor: " + t);
+                                                               
t.printStackTrace();
+                                                               // Try to fail 
gracefully
+                                                               
finalJob.onFailure(new InsertException(InsertException.INTERNAL_ERROR, t, 
null), null);
+                                                       }
+
+                                               }
+                                       } catch(Throwable t) {
+                                               Logger.error(this, "Caught " + 
t + " in " + this, t);
+                                       } finally {
+                                               
Compressor.COMPRESSOR_TYPE.compressorSemaphore.release();
+                                       }
+                               }
+
+                               public int getPriority() {
+                                       return NativeThread.MIN_PRIORITY;
+                               }
+                       }, "Compressor thread for " + currentJob);
+               }
+       }
+}
\ No newline at end of file


Reply via email to