Author: toad
Date: 2008-06-06 17:28:13 +0000 (Fri, 06 Jun 2008)
New Revision: 20243

Added:
   branches/db4o/freenet/src/freenet/client/async/InsertCompressor.java
Modified:
   branches/db4o/freenet/src/freenet/client/async/ClientContext.java
   branches/db4o/freenet/src/freenet/client/async/SingleFileInserter.java
Log:
InsertCompressor: Off-thread data compression now moved to the database (for 
persistent requests).
We set an InsertCompressor to the database, and remove it when it's done.
Not yet implemented: restarting InsertCompressor's that didn't complete on the 
last run.

Modified: branches/db4o/freenet/src/freenet/client/async/ClientContext.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/ClientContext.java   
2008-06-06 14:17:52 UTC (rev 20242)
+++ branches/db4o/freenet/src/freenet/client/async/ClientContext.java   
2008-06-06 17:28:13 UTC (rev 20243)
@@ -5,6 +5,7 @@

 import freenet.client.FECQueue;
 import freenet.node.NodeClientCore;
+import freenet.support.Executor;

 /**
  * Object passed in to client-layer operations, containing references to 
essential but transient objects
@@ -19,6 +20,8 @@
        public final ClientRequestScheduler sskInsertScheduler;
        public final ClientRequestScheduler chkInsertScheduler;
        public final DBJobRunner jobRunner;
+       public final Executor mainExecutor;
+       public final long nodeDBHandle;

        public ClientContext(NodeClientCore core) {
                this.fecQueue = core.fecQueue;
@@ -27,6 +30,8 @@
                this.sskInsertScheduler = core.requestStarters.sskPutScheduler;
                this.chkInsertScheduler = core.requestStarters.chkPutScheduler;
                jobRunner = core;
+               this.mainExecutor = core.getExecutor();
+               this.nodeDBHandle = core.node.nodeDBHandle;
        }

 }

Added: branches/db4o/freenet/src/freenet/client/async/InsertCompressor.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/InsertCompressor.java        
                        (rev 0)
+++ branches/db4o/freenet/src/freenet/client/async/InsertCompressor.java        
2008-06-06 17:28:13 UTC (rev 20243)
@@ -0,0 +1,167 @@
+package freenet.client.async;
+
+import java.io.IOException;
+
+import com.db4o.ObjectContainer;
+
+import freenet.client.InsertException;
+import freenet.support.api.Bucket;
+import freenet.support.api.BucketFactory;
+import freenet.support.compress.CompressionOutputSizeException;
+import freenet.support.compress.Compressor;
+import freenet.support.io.NativeThread;
+
+/**
+ * Compress a file in order to insert it. This class acts as a tag in the 
database to ensure that inserts
+ * are not forgotten about, and also can be run on a non-database thread from 
an executor.
+ * 
+ * FIXME how many compressors do we want to have running simultaneously? 
Probably we should have a compression
+ * queue, or at least a SerialExecutor?
+ * 
+ * @author toad
+ */
+class InsertCompressor {
+       
+       /** Database handle to identify which node it belongs to in the 
database */
+       public final long nodeDBHandle;
+       /** The SingleFileInserter we report to. We were created by it and when 
we have compressed our data we will
+        * call a method to process it and schedule the data. */
+       public final SingleFileInserter inserter;
+       /** The original data */
+       final Bucket origData;
+       /** If we can get it into one block, don't compress any further */
+       public final int minSize;
+       /** BucketFactory */
+       public final BucketFactory bucketFactory;
+       public final boolean persistent;
+       
+       public InsertCompressor(long nodeDBHandle2, SingleFileInserter 
inserter2, Bucket origData2, int minSize2, BucketFactory bf, boolean 
persistent) {
+               this.nodeDBHandle = nodeDBHandle2;
+               this.inserter = inserter2;
+               this.origData = origData2;
+               this.minSize = minSize2;
+               this.bucketFactory = bf;
+               this.persistent = persistent;
+       }
+
+       public void init(final ClientContext ctx) {
+               ctx.mainExecutor.execute(new Runnable() {
+
+                       public void run() {
+                               compress(ctx);
+                       }
+                       
+               }, "Compressor for "+this);
+       }
+
+       protected void compress(final ClientContext context) {
+               Bucket data = origData;
+               Compressor bestCodec = null;
+               Bucket bestCompressedData = null;
+               
+               // Try to compress the data.
+               // Try each algorithm, starting with the fastest and weakest.
+               // Stop when run out of algorithms, or the compressed data fits 
in a single block.
+               int algos = Compressor.countCompressAlgorithms();
+               try {
+                       for(int i=0;i<algos;i++) {
+                               // Only produce if we are compressing *the 
original data*
+                               inserter.onStartCompression(i);
+                               Compressor comp = 
Compressor.getCompressionAlgorithmByDifficulty(i);
+                               Bucket result;
+                               result = comp.compress(origData, bucketFactory, 
origData.size());
+                               if(result.size() < minSize) {
+                                       bestCodec = comp;
+                                       if(bestCompressedData != null)
+                                               bestCompressedData.free();
+                                       bestCompressedData = result;
+                                       break;
+                               }
+                               if((bestCompressedData != null) && 
(result.size() <  bestCompressedData.size())) {
+                                       bestCompressedData.free();
+                                       bestCompressedData = result;
+                                       bestCodec = comp;
+                               } else if((bestCompressedData == null) && 
(result.size() < data.size())) {
+                                       bestCompressedData = result;
+                                       bestCodec = comp;
+                               } else {
+                                       result.free();
+                               }
+                       }
+                       
+                       final CompressionOutput output = new 
CompressionOutput(bestCompressedData, bestCodec);
+                       
+                       if(persistent) {
+                       
+                               context.jobRunner.queue(new DBJob() {
+                                       
+                                       public void run(ObjectContainer 
container) {
+                                               inserter.onCompressed(output, 
container, context);
+                                               
container.delete(InsertCompressor.this);
+                                       }
+                                       
+                               }, NativeThread.NORM_PRIORITY);
+                       } else {
+                               inserter.onCompressed(output, null, context);
+                       }
+                       
+               } catch (final IOException e) {
+                       if(persistent) {
+                               context.jobRunner.queue(new DBJob() {
+                                       
+                                       public void run(ObjectContainer 
container) {
+                                               inserter.cb.onFailure(new 
InsertException(InsertException.BUCKET_ERROR, e, null), inserter, container, 
context);
+                                               
container.delete(InsertCompressor.this);
+                                       }
+                                       
+                               }, NativeThread.NORM_PRIORITY);
+                       } else {
+                               inserter.cb.onFailure(new 
InsertException(InsertException.BUCKET_ERROR, e, null), inserter, null, 
context);
+                       }
+                       
+               } catch (final CompressionOutputSizeException e) {
+                       if(persistent) {
+                               context.jobRunner.queue(new DBJob() {
+                                       
+                                       public void run(ObjectContainer 
container) {
+                                               inserter.cb.onFailure(new 
InsertException(InsertException.BUCKET_ERROR, e, null), inserter, container, 
context);
+                                               
container.delete(InsertCompressor.this);
+                                       }
+                                       
+                               }, NativeThread.NORM_PRIORITY);
+                       } else {
+                               inserter.cb.onFailure(new 
InsertException(InsertException.BUCKET_ERROR, e, null), inserter, null, 
context);
+                       }
+               }
+       }
+
+       /**
+        * Create an InsertCompressor, add it to the database, schedule it.
+        * @param container
+        * @param context
+        * @param inserter2
+        * @param origData2
+        * @param oneBlockCompressedSize
+        * @param bf
+        * @return
+        */
+       public static InsertCompressor start(ObjectContainer container, 
ClientContext context, SingleFileInserter inserter, 
+                       Bucket origData, int minSize, BucketFactory bf, boolean 
persistent) {
+               InsertCompressor compressor = new 
InsertCompressor(context.nodeDBHandle, inserter, origData, minSize, bf, 
persistent);
+               if(persistent)
+                       container.set(compressor);
+               compressor.init(context);
+               return compressor;
+       }
+
+       
+}
+
+class CompressionOutput {
+       public CompressionOutput(Bucket bestCompressedData, Compressor 
bestCodec2) {
+               this.data = bestCompressedData;
+               this.bestCodec = bestCodec2;
+       }
+       final Bucket data;
+       final Compressor bestCodec;
+}
\ No newline at end of file

Modified: branches/db4o/freenet/src/freenet/client/async/SingleFileInserter.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/SingleFileInserter.java      
2008-06-06 14:17:52 UTC (rev 20242)
+++ branches/db4o/freenet/src/freenet/client/async/SingleFileInserter.java      
2008-06-06 17:28:13 UTC (rev 20243)
@@ -110,41 +110,51 @@
                        OffThreadCompressor otc = new OffThreadCompressor();
                        ctx.executor.execute(otc, "Compressor for "+this);
                } else {
-                       tryCompress();
+                       tryCompress(container, context);
                }
        }

        private class OffThreadCompressor implements Runnable {
                public void run() {
                    freenet.support.Logger.OSThread.logPID(this);
-                       try {
-                               tryCompress();
-                       } catch (InsertException e) {
-                               cb.onFailure(e, SingleFileInserter.this);
-            } catch (OutOfMemoryError e) {
-                               OOMHandler.handleOOM(e);
-                               System.err.println("OffThreadCompressor thread 
above failed.");
-                               // Might not be heap, so try anyway
-                               cb.onFailure(new 
InsertException(InsertException.INTERNAL_ERROR, e, null), 
SingleFileInserter.this);
-            } catch (Throwable t) {
-                Logger.error(this, "Caught in OffThreadCompressor: "+t, t);
-                System.err.println("Caught in OffThreadCompressor: "+t);
-                t.printStackTrace();
-                // Try to fail gracefully
-                               cb.onFailure(new 
InsertException(InsertException.INTERNAL_ERROR, t, null), 
SingleFileInserter.this);
-                       }
                }
        }

-       private void tryCompress() throws InsertException {
-               // First, determine how small it needs to be
-               Bucket origData = block.getData();
-               Bucket data = origData;
+       void onCompressed(CompressionOutput output, ObjectContainer container, 
ClientContext context) {
+               try {
+                       onCompressedInner(output, container, context);
+               } catch (InsertException e) {
+                       cb.onFailure(e, SingleFileInserter.this, container, 
context);
+        } catch (OutOfMemoryError e) {
+                       OOMHandler.handleOOM(e);
+                       System.err.println("OffThreadCompressor thread above 
failed.");
+                       // Might not be heap, so try anyway
+                       cb.onFailure(new 
InsertException(InsertException.INTERNAL_ERROR, e, null), 
SingleFileInserter.this, container, context);
+        } catch (Throwable t) {
+            Logger.error(this, "Caught in OffThreadCompressor: "+t, t);
+            System.err.println("Caught in OffThreadCompressor: "+t);
+            t.printStackTrace();
+            // Try to fail gracefully
+                       cb.onFailure(new 
InsertException(InsertException.INTERNAL_ERROR, t, null), 
SingleFileInserter.this, container, context);
+               }
+       }
+       
+       void onCompressedInner(CompressionOutput output, ObjectContainer 
container, ClientContext context) throws InsertException {
+               long origSize = block.getData().size();
+               Bucket bestCompressedData = output.data;
+               Bucket data = bestCompressedData;
+               Compressor bestCodec = output.bestCodec;
+               
+               boolean freeData = false;
+               if(bestCodec != null) {
+                       freeData = true;
+               } else {
+                       data = block.getData();
+               }
+
                int blockSize;
                int oneBlockCompressedSize;
-               boolean dontCompress = ctx.dontCompress;

-               long origSize = data.size();
                String type = block.desiredURI.getKeyType();
                if(type.equals("SSK") || type.equals("KSK") || 
type.equals("USK")) {
                        blockSize = SSKBlock.DATA_LENGTH;
@@ -156,62 +166,13 @@
                        throw new InsertException(InsertException.INVALID_URI, 
"Unknown key type: "+type, null);
                }

-               Compressor bestCodec = null;
-               Bucket bestCompressedData = null;
-
-               boolean tryCompress = (origSize > blockSize) && 
(!ctx.dontCompress) && (!dontCompress);
-               if(tryCompress) {
-                       // Try to compress the data.
-                       // Try each algorithm, starting with the fastest and 
weakest.
-                       // Stop when run out of algorithms, or the compressed 
data fits in a single block.
-                       int algos = Compressor.countCompressAlgorithms();
-                       try {
-                               for(int i=0;i<algos;i++) {
-                                       // Only produce if we are compressing 
*the original data*
-                                       if(parent == cb)
-                                               
ctx.eventProducer.produceEvent(new StartedCompressionEvent(i));
-                                       Compressor comp = 
Compressor.getCompressionAlgorithmByDifficulty(i);
-                                       Bucket result;
-                                       result = comp.compress(origData, 
ctx.persistentBucketFactory, origData.size());
-                                       if(result.size() < 
oneBlockCompressedSize) {
-                                               bestCodec = comp;
-                                               if(bestCompressedData != null)
-                                                       
bestCompressedData.free();
-                                               bestCompressedData = result;
-                                               break;
-                                       }
-                                       if((bestCompressedData != null) && 
(result.size() <  bestCompressedData.size())) {
-                                               bestCompressedData.free();
-                                               bestCompressedData = result;
-                                               bestCodec = comp;
-                                       } else if((bestCompressedData == null) 
&& (result.size() < data.size())) {
-                                               bestCompressedData = result;
-                                               bestCodec = comp;
-                                       } else {
-                                               result.free();
-                                       }
-                               }
-                       } catch (IOException e) {
-                               throw new 
InsertException(InsertException.BUCKET_ERROR, e, null);
-                       } catch (CompressionOutputSizeException e) {
-                               // Impossible
-                               throw new Error(e);
-                       }
-               }
-               boolean freeData = false;
-               if(bestCompressedData != null) {
-                       data = bestCompressedData;
-                       freeData = true;
-               }
+               // Compressed data

                if(parent == cb) {
-                       if(tryCompress)
-                               ctx.eventProducer.produceEvent(new 
FinishedCompressionEvent(bestCodec == null ? -1 : 
bestCodec.codecNumberForMetadata(), origSize, data.size()));
+                       ctx.eventProducer.produceEvent(new 
FinishedCompressionEvent(bestCodec == null ? -1 : 
bestCodec.codecNumberForMetadata(), origSize, data.size()));
                        if(logMINOR) Logger.minor(this, "Compressed 
"+origSize+" to "+data.size()+" on "+this);
                }

-               // Compressed data
-               
                // Insert it...
                short codecNumber = bestCodec == null ? -1 : 
bestCodec.codecNumberForMetadata();
                long compressedDataSize = data.size();
@@ -226,10 +187,10 @@
                        if(fitsInOneBlockAsIs) {
                                // Just insert it
                                ClientPutState bi =
-                                       createInserter(parent, data, 
codecNumber, block.desiredURI, ctx, cb, metadata, (int)block.getData().size(), 
-1, getCHKOnly, true, true);
-                               cb.onTransition(this, bi);
-                               bi.schedule();
-                               cb.onBlockSetFinished(this);
+                                       createInserter(parent, data, 
codecNumber, block.desiredURI, ctx, cb, metadata, (int)block.getData().size(), 
-1, getCHKOnly, true, true, container, context);
+                               cb.onTransition(this, bi, container);
+                               bi.schedule(container, context);
+                               cb.onBlockSetFinished(this, container);
                                return;
                        }
                }
@@ -238,10 +199,10 @@
                        if(reportMetadataOnly) {
                                SingleBlockInserter dataPutter = new 
SingleBlockInserter(parent, data, codecNumber, FreenetURI.EMPTY_CHK_URI, ctx, 
cb, metadata, (int)origSize, -1, getCHKOnly, true, true, token);
                                Metadata meta = 
makeMetadata(dataPutter.getURI());
-                               cb.onMetadata(meta, this);
-                               cb.onTransition(this, dataPutter);
-                               dataPutter.schedule();
-                               cb.onBlockSetFinished(this);
+                               cb.onMetadata(meta, this, container, context);
+                               cb.onTransition(this, dataPutter, container);
+                               dataPutter.schedule(container, context);
+                               cb.onBlockSetFinished(this, container);
                        } else {
                                MultiPutCompletionCallback mcb = 
                                        new MultiPutCompletionCallback(cb, 
parent, token);
@@ -258,17 +219,17 @@
                                        Logger.error(this, "Caught "+e, e);
                                        throw new 
InsertException(InsertException.INTERNAL_ERROR, "Got 
MetadataUnresolvedException in SingleFileInserter: "+e.toString(), null);
                                }
-                               ClientPutState metaPutter = 
createInserter(parent, metadataBucket, (short) -1, block.desiredURI, ctx, mcb, 
true, (int)origSize, -1, getCHKOnly, true, false);
+                               ClientPutState metaPutter = 
createInserter(parent, metadataBucket, (short) -1, block.desiredURI, ctx, mcb, 
true, (int)origSize, -1, getCHKOnly, true, false, container, context);
                                mcb.addURIGenerator(metaPutter);
                                mcb.add(dataPutter);
-                               cb.onTransition(this, mcb);
+                               cb.onTransition(this, mcb, container);
                                Logger.minor(this, ""+mcb+" : data 
"+dataPutter+" meta "+metaPutter);
-                               mcb.arm();
-                               dataPutter.schedule();
+                               mcb.arm(container, context);
+                               dataPutter.schedule(container, context);
                                if(metaPutter instanceof SingleBlockInserter)
-                                       
((SingleBlockInserter)metaPutter).encode();
-                               metaPutter.schedule();
-                               cb.onBlockSetFinished(this);
+                                       
((SingleBlockInserter)metaPutter).encode(container);
+                               metaPutter.schedule(container, context);
+                               cb.onBlockSetFinished(this, container);
                        }
                        return;
                }
@@ -279,19 +240,45 @@
                // metadata insert has finished too, tell the master callback.
                if(reportMetadataOnly) {
                        SplitFileInserter sfi = new SplitFileInserter(parent, 
cb, data, bestCodec, origSize, block.clientMetadata, ctx, getCHKOnly, metadata, 
token, insertAsArchiveManifest, freeData);
-                       cb.onTransition(this, sfi);
-                       sfi.start();
+                       cb.onTransition(this, sfi, container);
+                       sfi.start(container, context);
                        if(earlyEncode) sfi.forceEncode();
                } else {
                        SplitHandler sh = new SplitHandler();
                        SplitFileInserter sfi = new SplitFileInserter(parent, 
sh, data, bestCodec, origSize, block.clientMetadata, ctx, getCHKOnly, metadata, 
token, insertAsArchiveManifest, freeData);
                        sh.sfi = sfi;
-                       cb.onTransition(this, sh);
-                       sfi.start();
+                       cb.onTransition(this, sh, container);
+                       sfi.start(container, context);
                        if(earlyEncode) sfi.forceEncode();
                }
        }

+       private void tryCompress(ObjectContainer container, ClientContext 
context) throws InsertException {
+               // First, determine how small it needs to be
+               Bucket origData = block.getData();
+               Bucket data = origData;
+               int blockSize;
+               int oneBlockCompressedSize;
+               boolean dontCompress = ctx.dontCompress;
+               
+               long origSize = data.size();
+               String type = block.desiredURI.getKeyType();
+               if(type.equals("SSK") || type.equals("KSK") || 
type.equals("USK")) {
+                       blockSize = SSKBlock.DATA_LENGTH;
+                       oneBlockCompressedSize = 
SSKBlock.MAX_COMPRESSED_DATA_LENGTH;
+               } else if(type.equals("CHK")) {
+                       blockSize = CHKBlock.DATA_LENGTH;
+                       oneBlockCompressedSize = 
CHKBlock.MAX_COMPRESSED_DATA_LENGTH;
+               } else {
+                       throw new InsertException(InsertException.INVALID_URI, 
"Unknown key type: "+type, null);
+               }
+               
+               boolean tryCompress = (origSize > blockSize) && 
(!ctx.dontCompress) && (!dontCompress);
+               if(tryCompress) {
+                       InsertCompressor.start(container, context, this, 
origData, oneBlockCompressedSize, ctx.bf, parent.persistent());
+               }
+       }
+       
        private Metadata makeMetadata(FreenetURI uri) {
                Metadata meta = new Metadata(insertAsArchiveManifest ? 
Metadata.ZIP_MANIFEST : Metadata.SIMPLE_REDIRECT, uri, block.clientMetadata);
                if(targetFilename != null) {
@@ -304,7 +291,7 @@

        private ClientPutState createInserter(BaseClientPutter parent, Bucket 
data, short compressionCodec, FreenetURI uri, 
                        InsertContext ctx, PutCompletionCallback cb, boolean 
isMetadata, int sourceLength, int token, boolean getCHKOnly, 
-                       boolean addToParent, boolean encodeCHK, ObjectContainer 
container) throws InsertException {
+                       boolean addToParent, boolean encodeCHK, ObjectContainer 
container, ClientContext context) throws InsertException {

                uri.checkInsertURI(); // will throw an exception if needed

@@ -320,7 +307,7 @@
                                new SingleBlockInserter(parent, data, 
compressionCodec, uri, ctx, cb, isMetadata, sourceLength, token, 
                                                getCHKOnly, addToParent, false, 
this.token);
                        if(encodeCHK)
-                               cb.onEncode(sbi.getBlock().getClientKey(), 
this, container);
+                               cb.onEncode(sbi.getBlock().getClientKey(), 
this, container, context);
                        return sbi;
                }

@@ -549,9 +536,9 @@
                                oldMetadataPutter = metadataPutter;
                        }
                        if(oldSFI != null)
-                               oldSFI.cancel(container);
+                               oldSFI.cancel(container, context);
                        if(oldMetadataPutter != null)
-                               oldMetadataPutter.cancel(container);
+                               oldMetadataPutter.cancel(container, context);
                        finished = true;
                        cb.onFailure(e, this, container, context);
                }
@@ -674,7 +661,7 @@
                                }
                        } catch (InsertException e1) {
                                Logger.error(this, "Failing "+this+" : "+e1, 
e1);
-                               fail(e1, container);
+                               fail(e1, container, context);
                                return;
                        }
                }
@@ -701,4 +688,9 @@
        public SimpleFieldSet getProgressFieldset() {
                return null;
        }
+
+       public void onStartCompression(int i) {
+               if(parent == cb)
+                       ctx.eventProducer.produceEvent(new 
StartedCompressionEvent(i));
+       }
 }


Reply via email to