Author: toad
Date: 2008-02-18 14:53:55 +0000 (Mon, 18 Feb 2008)
New Revision: 18044

Modified:
   trunk/freenet/src/freenet/client/FECCodec.java
   trunk/freenet/src/freenet/client/StandardOnionFECCodec.java
   trunk/freenet/src/freenet/client/async/SplitFileFetcherSegment.java
   trunk/freenet/src/freenet/client/async/SplitFileInserter.java
   trunk/freenet/src/freenet/client/async/SplitFileInserterSegment.java
Log:
Start FEC decode threads through the pool.

Modified: trunk/freenet/src/freenet/client/FECCodec.java
===================================================================
--- trunk/freenet/src/freenet/client/FECCodec.java      2008-02-18 14:40:34 UTC 
(rev 18043)
+++ trunk/freenet/src/freenet/client/FECCodec.java      2008-02-18 14:53:55 UTC 
(rev 18044)
@@ -12,6 +12,7 @@
 import com.onionnetworks.fec.FECCode;
 import com.onionnetworks.util.Buffer;

+import freenet.support.Executor;
 import freenet.support.Logger;
 import freenet.support.api.Bucket;
 import freenet.support.api.BucketFactory;
@@ -33,17 +34,24 @@
        private static int STRIPE_SIZE = 4096;
        static boolean logMINOR;
        FECCode fec;
-       int k, n;
+       protected final int k, n;
+       private final Executor executor;

+       protected FECCodec(Executor executor, int k, int n) {
+               this.executor = executor;
+               this.k = k;
+               this.n = n;
+       }
+       
        /**
         * Get a codec where we know both the number of data blocks and the 
number
         * of check blocks, and the codec type. Normally for decoding.
         */
-       public static FECCodec getCodec(short splitfileType, int dataBlocks, 
int checkBlocks) {
+       public static FECCodec getCodec(short splitfileType, int dataBlocks, 
int checkBlocks, Executor executor) {
                if(splitfileType == Metadata.SPLITFILE_NONREDUNDANT)
                        return null;
                if(splitfileType == Metadata.SPLITFILE_ONION_STANDARD)
-                       return StandardOnionFECCodec.getInstance(dataBlocks, 
checkBlocks);
+                       return StandardOnionFECCodec.getInstance(dataBlocks, 
checkBlocks, executor);
                else
                        return null;
        }
@@ -52,14 +60,14 @@
         * Get a codec where we know only the number of data blocks and the 
codec
         * type. Normally for encoding.
         */
-       public static FECCodec getCodec(short splitfileType, int dataBlocks) {
+       public static FECCodec getCodec(short splitfileType, int dataBlocks, 
Executor executor) {
                if(splitfileType == Metadata.SPLITFILE_NONREDUNDANT)
                        return null;
                if(splitfileType == Metadata.SPLITFILE_ONION_STANDARD) {
                        int checkBlocks = (dataBlocks >> 1);
                        if((dataBlocks & 1) == 1)
                                checkBlocks++;
-                       return StandardOnionFECCodec.getInstance(dataBlocks, 
checkBlocks);
+                       return StandardOnionFECCodec.getInstance(dataBlocks, 
checkBlocks, executor);
                }
                else
                        return null;
@@ -318,18 +326,15 @@
         * @param FECJob
         */
        public void addToQueue(FECJob job) {
-               addToQueue(job, this);
+               addToQueue(job, this, executor);
        }

-       public static void addToQueue(FECJob job, FECCodec codec) {
+       public static void addToQueue(FECJob job, FECCodec codec, Executor 
executor) {
                synchronized(_awaitingJobs) {
-                       if(fecRunnerThread == null) {
-                               fecRunnerThread = new NativeThread(fecRunner, 
"FEC Pool " + (fecPoolCounter++), NativeThread.LOW_PRIORITY, true);
-                               fecRunnerThread.setDaemon(true);
-
-                               fecRunnerThread.start();
+                       _awaitingJobs.addFirst(job);
+                       if(runningFECThreads == 0) {
+                               executor.execute(fecRunner, "FEC Pool 
"+fecPoolCounter++);
                        }
-                       _awaitingJobs.addFirst(job);
                }
                if(logMINOR)
                        Logger.minor(StandardOnionFECCodec.class, "Adding a new 
job to the queue (" + _awaitingJobs.size() + ").");
@@ -339,7 +344,7 @@
        }
        private static final LinkedList _awaitingJobs = new LinkedList();
        private static final FECRunner fecRunner = new FECRunner();
-       private static Thread fecRunnerThread;
+       private static int runningFECThreads;
        private static int fecPoolCounter;

        /**
@@ -403,7 +408,9 @@
                                Logger.error(this, "Caught "+t+" in "+this, t);
                        }
                        finally {
-                               fecRunnerThread = null;
+                               synchronized(FECCodec.class) {
+                                       runningFECThreads--;
+                               }
                        }
                }
        }

Modified: trunk/freenet/src/freenet/client/StandardOnionFECCodec.java
===================================================================
--- trunk/freenet/src/freenet/client/StandardOnionFECCodec.java 2008-02-18 
14:40:34 UTC (rev 18043)
+++ trunk/freenet/src/freenet/client/StandardOnionFECCodec.java 2008-02-18 
14:53:55 UTC (rev 18044)
@@ -7,6 +7,7 @@
 import com.onionnetworks.fec.Native8Code;
 import com.onionnetworks.fec.PureCode;

+import freenet.support.Executor;
 import freenet.support.LRUHashtable;
 import freenet.support.Logger;

@@ -44,14 +45,14 @@
                }
        }

-       public synchronized static FECCodec getInstance(int dataBlocks, int 
checkBlocks) {
+       public synchronized static FECCodec getInstance(int dataBlocks, int 
checkBlocks, Executor executor) {
                MyKey key = new MyKey(dataBlocks, checkBlocks + dataBlocks);
                StandardOnionFECCodec codec = (StandardOnionFECCodec) 
recentlyUsedCodecs.get(key);
                if(codec != null) {
                        recentlyUsedCodecs.push(key, codec);
                        return codec;
                }
-               codec = new StandardOnionFECCodec(dataBlocks, checkBlocks + 
dataBlocks);
+               codec = new StandardOnionFECCodec(executor, dataBlocks, 
checkBlocks + dataBlocks);
                recentlyUsedCodecs.push(key, codec);
                while(recentlyUsedCodecs.size() > MAX_CACHED_CODECS) {
                        recentlyUsedCodecs.popKey();
@@ -59,9 +60,8 @@
                return codec;
        }

-       public StandardOnionFECCodec(int k, int n) {
-               this.k = k;
-               this.n = n;
+       public StandardOnionFECCodec(Executor executor, int k, int n) {
+               super(executor, k, n);

                FECCode fec2 = null;
                if(!noNative) {

Modified: trunk/freenet/src/freenet/client/async/SplitFileFetcherSegment.java
===================================================================
--- trunk/freenet/src/freenet/client/async/SplitFileFetcherSegment.java 
2008-02-18 14:40:34 UTC (rev 18043)
+++ trunk/freenet/src/freenet/client/async/SplitFileFetcherSegment.java 
2008-02-18 14:53:55 UTC (rev 18044)
@@ -199,7 +199,7 @@
                // Now decode
                if(logMINOR) Logger.minor(this, "Decoding 
"+SplitFileFetcherSegment.this);

-               codec = FECCodec.getCodec(splitfileType, dataKeys.length, 
checkKeys.length);
+               codec = FECCodec.getCodec(splitfileType, dataKeys.length, 
checkKeys.length, blockFetchContext.executor);

                if(splitfileType != Metadata.SPLITFILE_NONREDUNDANT) {
                        codec.addToQueue(new FECJob(codec, dataBuckets, 
checkBuckets, CHKBlock.DATA_LENGTH, fetchContext.bucketFactory, this, true));

Modified: trunk/freenet/src/freenet/client/async/SplitFileInserter.java
===================================================================
--- trunk/freenet/src/freenet/client/async/SplitFileInserter.java       
2008-02-18 14:40:34 UTC (rev 18043)
+++ trunk/freenet/src/freenet/client/async/SplitFileInserter.java       
2008-02-18 14:53:55 UTC (rev 18044)
@@ -203,7 +203,7 @@
                // First split the data up
                if((dataBlocks < segmentSize) || (segmentSize == -1)) {
                        // Single segment
-                       FECCodec codec = FECCodec.getCodec(splitfileAlgorithm, 
origDataBlocks.length);
+                       FECCodec codec = FECCodec.getCodec(splitfileAlgorithm, 
origDataBlocks.length, ctx.executor);
                        SplitFileInserterSegment onlySeg = new 
SplitFileInserterSegment(this, codec, origDataBlocks, ctx, getCHKOnly, 0);
                        segs.add(onlySeg);
                } else {
@@ -216,7 +216,7 @@
                                j = i;
                                for(int x=0;x<seg.length;x++)
                                        if(seg[x] == null) throw new 
NullPointerException("In splitIntoSegs: "+x+" is null of "+seg.length+" of 
"+segNo);
-                               FECCodec codec = 
FECCodec.getCodec(splitfileAlgorithm, seg.length);
+                               FECCodec codec = 
FECCodec.getCodec(splitfileAlgorithm, seg.length, ctx.executor);
                                SplitFileInserterSegment s = new 
SplitFileInserterSegment(this, codec, seg, ctx, getCHKOnly, segNo);
                                segs.add(s);


Modified: trunk/freenet/src/freenet/client/async/SplitFileInserterSegment.java
===================================================================
--- trunk/freenet/src/freenet/client/async/SplitFileInserterSegment.java        
2008-02-18 14:40:34 UTC (rev 18043)
+++ trunk/freenet/src/freenet/client/async/SplitFileInserterSegment.java        
2008-02-18 14:53:55 UTC (rev 18044)
@@ -221,12 +221,12 @@
                                checkFS.removeSubset(index);
                        }
                        splitfileAlgo = FECCodec.getCodec(splitfileAlgorithm,
-                                       dataBlockCount, checkBlocks.length);
+                                       dataBlockCount, checkBlocks.length, 
ctx.executor);
                } else {
                        Logger.normal(this, "Not encoded because no check 
blocks");
                        encoded = false;
                        splitfileAlgo = FECCodec.getCodec(splitfileAlgorithm,
-                                       dataBlockCount);
+                                       dataBlockCount, ctx.executor);
                        int checkBlocksCount = splitfileAlgo.countCheckBlocks();
                        this.checkURIs = new ClientCHK[checkBlocksCount];
                        this.checkBlocks = new Bucket[checkBlocksCount];


Reply via email to