Author: nextgens
Date: 2007-03-24 02:59:23 +0000 (Sat, 24 Mar 2007)
New Revision: 12301

Modified:
   trunk/freenet/src/freenet/client/StandardOnionFECCodec.java
   trunk/freenet/src/freenet/client/async/SplitFileFetcherSegment.java
   trunk/freenet/src/freenet/client/async/SplitFileInserterSegment.java
Log:
More not-tested code: if it works, every FEC operation is serialized, using the 
FIFO policy

Modified: trunk/freenet/src/freenet/client/StandardOnionFECCodec.java
===================================================================
--- trunk/freenet/src/freenet/client/StandardOnionFECCodec.java 2007-03-24 
02:18:15 UTC (rev 12300)
+++ trunk/freenet/src/freenet/client/StandardOnionFECCodec.java 2007-03-24 
02:59:23 UTC (rev 12301)
@@ -13,6 +13,7 @@
 import com.onionnetworks.fec.PureCode;
 import com.onionnetworks.util.Buffer;

+import freenet.client.async.SplitFileFetcher;
 import freenet.support.LRUHashtable;
 import freenet.support.Logger;
 import freenet.support.api.Bucket;
@@ -111,6 +112,9 @@
                logMINOR = Logger.shouldLog(Logger.MINOR, this);
        }

+       /**
+        * @deprecated
+        */
        public void decode(SplitfileBlock[] dataBlockStatus, SplitfileBlock[] 
checkBlockStatus, int blockLength, BucketFactory bf) throws IOException {
                logMINOR = Logger.shouldLog(Logger.MINOR, getClass());
                if(logMINOR) 
@@ -251,7 +255,7 @@
        }

        /**
-        * @decapreted
+        * @deprecated
         */
        public void encode(Bucket[] dataBlockStatus, Bucket[] checkBlockStatus, 
int blockLength, BucketFactory bf) throws IOException {
                logMINOR = Logger.shouldLog(Logger.MINOR, getClass());
@@ -279,7 +283,7 @@
        }

        /**
-        * @decapreted
+        * @deprecated
         */
        public void encode(SplitfileBlock[] dataBlockStatus, SplitfileBlock[] 
checkBlockStatus, int blockLength, BucketFactory bf) throws IOException {
                Bucket[] dataBlocks = new Bucket[dataBlockStatus.length];
@@ -435,7 +439,7 @@

        // ###############################

-       public void addToQueue(Bucket[] dataBlocks, Bucket[] checkBlocks, int 
blockLength, BucketFactory bucketFactory, StandardOnionFECCodecEncoderCallback 
callback){
+       public void addToQueue(Bucket[] dataBlocks, Bucket[] checkBlocks, int 
blockLength, BucketFactory bucketFactory, StandardOnionFECCodecEncoderCallback 
callback, boolean isADecodingJob){
                synchronized (_awaitingJobs) {
                        if((fecRunnerThread == null) || 
!fecRunnerThread.isAlive()){
                                if(fecRunnerThread != null) Logger.error(this, 
"The callback died!! restarting a new one, please report that error.");
@@ -446,7 +450,7 @@
                                fecRunnerThread.start();
                        }

-                       _awaitingJobs.addFirst(new FECJob(dataBlocks, 
checkBlocks, blockLength, bucketFactory, callback));
+                       _awaitingJobs.addFirst(new FECJob(dataBlocks, 
checkBlocks, blockLength, bucketFactory, callback, isADecodingJob));
                }
                if(logMINOR) Logger.minor(this, "Adding a new job to the queue 
(" +_awaitingJobs.size() + ").");
                synchronized (fecRunner){
@@ -454,14 +458,14 @@
                }
        }

-       public void addToQueue(SplitfileBlock[] dataBlockStatus, 
SplitfileBlock[] checkBlockStatus, int blockLength, BucketFactory 
bucketFactory, StandardOnionFECCodecEncoderCallback callback){
+       public void addToQueue(SplitfileBlock[] dataBlockStatus, 
SplitfileBlock[] checkBlockStatus, int blockLength, BucketFactory 
bucketFactory, StandardOnionFECCodecEncoderCallback callback, boolean 
isADecodingJob){
                Bucket[] dataBlocks = new Bucket[dataBlockStatus.length];
                Bucket[] checkBlocks = new Bucket[checkBlockStatus.length];
                for(int i=0;i<dataBlocks.length;i++)
                        dataBlocks[i] = dataBlockStatus[i].getData();
                for(int i=0;i<checkBlocks.length;i++)
                        checkBlocks[i] = checkBlockStatus[i].getData();
-               addToQueue(dataBlocks, checkBlocks, blockLength, bucketFactory, 
callback);
+               addToQueue(dataBlocks, checkBlocks, blockLength, bucketFactory, 
callback, isADecodingJob);
                for(int i=0;i<dataBlocks.length;i++)
                        dataBlockStatus[i].setData(dataBlocks[i]);
                for(int i=0;i<checkBlocks.length;i++)
@@ -474,6 +478,7 @@

        public interface StandardOnionFECCodecEncoderCallback{
                public void onEncodedSegment();
+               public void onDecodedSegment();
        }

        private class FECJob {
@@ -481,13 +486,15 @@
                final BucketFactory bucketFactory;
                final int blockLength;
                final StandardOnionFECCodecEncoderCallback callback;
+               final boolean isADecodingJob;

-               FECJob(Bucket[] dataBlocks, Bucket[] checkBlocks, int 
blockLength, BucketFactory bucketFactory, StandardOnionFECCodecEncoderCallback 
callback) {
+               FECJob(Bucket[] dataBlocks, Bucket[] checkBlocks, int 
blockLength, BucketFactory bucketFactory, StandardOnionFECCodecEncoderCallback 
callback, boolean isADecodingJob) {
                        this.dataBlocks = dataBlocks;
                        this.checkBlocks = checkBlocks;
                        this.blockLength = blockLength;
                        this.bucketFactory = bucketFactory;
                        this.callback = callback;
+                       this.isADecodingJob = isADecodingJob;
                }
        }

@@ -503,13 +510,19 @@
                                if(job != null){
                                        // Encode it
                                        try {
-                                               encode(job.dataBlocks, 
job.checkBlocks, job.blockLength, job.bucketFactory);
+                                               if(job.isADecodingJob)
+                                                       
realDecode(job.dataBlocks, job.checkBlocks, job.blockLength, job.bucketFactory);
+                                               else
+                                                       
realEncode(job.dataBlocks, job.checkBlocks, job.blockLength, job.bucketFactory);
                                        } catch (IOException e) {
                                                Logger.error(this, "BOH! ioe:" 
+ e.getMessage());
                                        }
                                        // Call the callback
                                        try {
-                                               job.callback.onEncodedSegment();
+                                               if(job.isADecodingJob)
+                                                       
job.callback.onDecodedSegment();
+                                               else
+                                                       
job.callback.onEncodedSegment();
                                        } catch (Throwable e) {
                                                Logger.error(this, "The 
callback failed!" + e.getMessage());
                                        }

Modified: trunk/freenet/src/freenet/client/async/SplitFileFetcherSegment.java
===================================================================
--- trunk/freenet/src/freenet/client/async/SplitFileFetcherSegment.java 
2007-03-24 02:18:15 UTC (rev 12300)
+++ trunk/freenet/src/freenet/client/async/SplitFileFetcherSegment.java 
2007-03-24 02:59:23 UTC (rev 12301)
@@ -27,7 +27,7 @@
  * A single segment within a SplitFileFetcher.
  * This in turn controls a large number of SplitFileFetcherSubSegment's, which 
are registered on the ClientRequestScheduler.
  */
-public class SplitFileFetcherSegment {
+public class SplitFileFetcherSegment implements 
StandardOnionFECCodecEncoderCallback {

        private static boolean logMINOR;
        final short splitfileType;
@@ -59,6 +59,8 @@
        final FailureCodeTracker errors;
        private boolean finishing;

+       private FECCodec codec;
+       
        public SplitFileFetcherSegment(short splitfileType, ClientCHK[] 
splitfileDataKeys, ClientCHK[] splitfileCheckKeys, SplitFileFetcher fetcher, 
ArchiveContext archiveContext, FetchContext fetchContext, long maxTempLength, 
int recursionLevel) throws MetadataParseException, FetchException {
                logMINOR = Logger.shouldLog(Logger.MINOR, this);
                this.parentFetcher = fetcher;
@@ -174,98 +176,95 @@
                }
                parentFetcher.parent.completedBlock(dontNotify);
                if(decodeNow) {
-                       Runnable r = new Decoder();
-                       Thread t = new Thread(r, "Decoder for "+this);
-                       t.setDaemon(true);
-                       t.start();
+                       decode();
                }
        }

-       class Decoder implements Runnable, StandardOnionFECCodecEncoderCallback 
{
+       public void decode() {
+               // Now decode
+               if(logMINOR) Logger.minor(this, "Decoding 
"+SplitFileFetcherSegment.this);

-               public void run() {
-                       
-                       // Now decode
-                       if(logMINOR) Logger.minor(this, "Decoding 
"+SplitFileFetcherSegment.this);
-                       
-                       boolean[] dataBlocksSucceeded = new 
boolean[dataBuckets.length];
-                       boolean[] checkBlocksSucceeded = new 
boolean[checkBuckets.length];
-                       for(int i=0;i<dataBuckets.length;i++)
-                               dataBlocksSucceeded[i] = dataBuckets[i].data != 
null;
-                       for(int i=0;i<checkBuckets.length;i++)
-                               checkBlocksSucceeded[i] = checkBuckets[i].data 
!= null;
-                       
-                       FECCodec codec = FECCodec.getCodec(splitfileType, 
dataKeys.length, checkKeys.length);
-                       try {
-                               if(splitfileType != 
Metadata.SPLITFILE_NONREDUNDANT) {
-                                       codec.decode(dataBuckets, checkBuckets, 
CHKBlock.DATA_LENGTH, fetchContext.bucketFactory);
-                                       // Now have all the data blocks (not 
necessarily all the check blocks)
-                               }
-                               
-                               decodedData = 
fetchContext.bucketFactory.makeBucket(-1);
-                               if(logMINOR) Logger.minor(this, "Copying data 
from data blocks");
-                               OutputStream os = decodedData.getOutputStream();
-                               for(int i=0;i<dataBuckets.length;i++) {
-                                       SplitfileBlock status = dataBuckets[i];
-                                       Bucket data = status.getData();
-                                       BucketTools.copyTo(data, os, 
Long.MAX_VALUE);
-                               }
-                               if(logMINOR) Logger.minor(this, "Copied data");
-                               os.close();
-                               // Must set finished BEFORE calling 
parentFetcher.
-                               // Otherwise a race is possible that might 
result in it not seeing our finishing.
-                               finished = true;
-                               
parentFetcher.segmentFinished(SplitFileFetcherSegment.this);
-                       } catch (IOException e) {
-                               Logger.normal(this, "Caught bucket error?: "+e, 
e);
-                               finished = true;
-                               failureException = new 
FetchException(FetchException.BUCKET_ERROR);
-                               
parentFetcher.segmentFinished(SplitFileFetcherSegment.this);
-                               return;
-                       }
-                       
-                       // Now heal
-                       
-                       /** Splitfile healing:
-                        * Any block which we have tried and failed to download 
should be 
-                        * reconstructed and reinserted.
-                        */
-                       
-                       // Encode any check blocks we don't have
-                       if(codec != null) {
-                               StandardOnionFECCodec fec = 
(StandardOnionFECCodec) codec;
-                               fec.addToQueue(dataBuckets, checkBuckets, 
32768, fetchContext.bucketFactory, this);
-                       }
-               }
+               boolean[] dataBlocksSucceeded = new boolean[dataBuckets.length];
+               boolean[] checkBlocksSucceeded = new 
boolean[checkBuckets.length];
+               for(int i=0;i<dataBuckets.length;i++)
+                       dataBlocksSucceeded[i] = dataBuckets[i].data != null;
+               for(int i=0;i<checkBuckets.length;i++)
+                       checkBlocksSucceeded[i] = checkBuckets[i].data != null;
+
+               codec = FECCodec.getCodec(splitfileType, dataKeys.length, 
checkKeys.length);

-               public void onEncodedSegment() {                
-                       // Now insert *ALL* blocks on which we had at least one 
failure, and didn't eventually succeed
+               if(splitfileType != Metadata.SPLITFILE_NONREDUNDANT) {
+                       StandardOnionFECCodec fec = 
(StandardOnionFECCodec)codec;
+                       fec.addToQueue(dataBuckets, checkBuckets, 
CHKBlock.DATA_LENGTH, fetchContext.bucketFactory, this, true);
+                       // Now have all the data blocks (not necessarily all 
the check blocks)
+               }
+       }
+       
+       public void onDecodedSegment() {
+               try {
+                       decodedData = fetchContext.bucketFactory.makeBucket(-1);
+                       if(logMINOR) Logger.minor(this, "Copying data from data 
blocks");
+                       OutputStream os = decodedData.getOutputStream();
                        for(int i=0;i<dataBuckets.length;i++) {
-                               boolean heal = false;
-                               if(dataRetries[i] > 0)
-                                       heal = true;
-                               if(heal) {
-                                       queueHeal(dataBuckets[i].getData());
-                               } else {
-                                       dataBuckets[i].data.free();
-                                       dataBuckets[i].data = null;
-                               }
-                               dataBuckets[i] = null;
-                               dataKeys[i] = null;
+                               SplitfileBlock status = dataBuckets[i];
+                               Bucket data = status.getData();
+                               BucketTools.copyTo(data, os, Long.MAX_VALUE);
                        }
-                       for(int i=0;i<checkBuckets.length;i++) {
-                               boolean heal = false;
-                               if(checkRetries[i] > 0)
-                                       heal = true;
-                               if(heal) {
-                                       queueHeal(checkBuckets[i].getData());
-                               } else {
-                                       checkBuckets[i].data.free();
-                               }
-                               checkBuckets[i] = null;
-                               checkKeys[i] = null;
+                       if(logMINOR) Logger.minor(this, "Copied data");
+                       os.close();
+                       // Must set finished BEFORE calling parentFetcher.
+                       // Otherwise a race is possible that might result in it 
not seeing our finishing.
+                       finished = true;
+                       
parentFetcher.segmentFinished(SplitFileFetcherSegment.this);
+               } catch (IOException e) {
+                       Logger.normal(this, "Caught bucket error?: "+e, e);
+                       finished = true;
+                       failureException = new 
FetchException(FetchException.BUCKET_ERROR);
+                       
parentFetcher.segmentFinished(SplitFileFetcherSegment.this);
+                       return;
+               }
+
+               // Now heal
+
+               /** Splitfile healing:
+                * Any block which we have tried and failed to download should 
be 
+                * reconstructed and reinserted.
+                */
+
+               // Encode any check blocks we don't have
+               if(codec != null) {
+                       StandardOnionFECCodec fec = (StandardOnionFECCodec) 
codec;
+                       fec.addToQueue(dataBuckets, checkBuckets, 32768, 
fetchContext.bucketFactory, this, false);
+               }
+       }
+
+       public void onEncodedSegment() {                
+               // Now insert *ALL* blocks on which we had at least one 
failure, and didn't eventually succeed
+               for(int i=0;i<dataBuckets.length;i++) {
+                       boolean heal = false;
+                       if(dataRetries[i] > 0)
+                               heal = true;
+                       if(heal) {
+                               queueHeal(dataBuckets[i].getData());
+                       } else {
+                               dataBuckets[i].data.free();
+                               dataBuckets[i].data = null;
                        }
+                       dataBuckets[i] = null;
+                       dataKeys[i] = null;
                }
+               for(int i=0;i<checkBuckets.length;i++) {
+                       boolean heal = false;
+                       if(checkRetries[i] > 0)
+                               heal = true;
+                       if(heal) {
+                               queueHeal(checkBuckets[i].getData());
+                       } else {
+                               checkBuckets[i].data.free();
+                       }
+                       checkBuckets[i] = null;
+                       checkKeys[i] = null;
+               }
        }

        private void queueHeal(Bucket data) {

Modified: trunk/freenet/src/freenet/client/async/SplitFileInserterSegment.java
===================================================================
--- trunk/freenet/src/freenet/client/async/SplitFileInserterSegment.java        
2007-03-24 02:18:15 UTC (rev 12300)
+++ trunk/freenet/src/freenet/client/async/SplitFileInserterSegment.java        
2007-03-24 02:59:23 UTC (rev 12301)
@@ -424,7 +424,7 @@
                                synchronized(this) {
                                        if(!encoded){
                                                StandardOnionFECCodec fec = 
(StandardOnionFECCodec) splitfileAlgo;
-                                               fec.addToQueue(dataBlocks, 
checkBlocks, CHKBlock.DATA_LENGTH, blockInsertContext.persistentBucketFactory, 
this);
+                                               fec.addToQueue(dataBlocks, 
checkBlocks, CHKBlock.DATA_LENGTH, blockInsertContext.persistentBucketFactory, 
this, false);
                                        }
                                }                               
                                fin = false;
@@ -461,6 +461,8 @@
                }
        }

+       public void onDecodedSegment() {} // irrevelant
+
        public void onEncodedSegment() {
                // Start the inserts
                try {


Reply via email to