Author: toad
Date: 2005-11-05 20:22:00 +0000 (Sat, 05 Nov 2005)
New Revision: 7475

Modified:
   trunk/freenet/src/freenet/client/FECCodec.java
   trunk/freenet/src/freenet/client/FetcherContext.java
   trunk/freenet/src/freenet/client/Segment.java
   trunk/freenet/src/freenet/client/SplitFetcher.java
   trunk/freenet/src/freenet/client/StandardOnionFECCodec.java
Log:
More splitfiles.

Modified: trunk/freenet/src/freenet/client/FECCodec.java
===================================================================
--- trunk/freenet/src/freenet/client/FECCodec.java      2005-11-05 20:01:19 UTC 
(rev 7474)
+++ trunk/freenet/src/freenet/client/FECCodec.java      2005-11-05 20:22:00 UTC 
(rev 7475)
@@ -1,6 +1,9 @@
 package freenet.client;

+import java.io.IOException;
+
 import freenet.client.Segment.BlockStatus;
+import freenet.support.BucketFactory;

 /**
  * FEC (forward error correction) handler.
@@ -12,17 +15,34 @@
 abstract class FECCodec {

        public static FECCodec getCodec(short splitfileType, int dataBlocks, 
int checkBlocks) {
+               if(splitfileType == Metadata.SPLITFILE_NONREDUNDANT)
+                       return null;
                if(splitfileType == Metadata.SPLITFILE_ONION_STANDARD)
                        return StandardOnionFECCodec.getInstance(dataBlocks, 
checkBlocks);
                else return null;
        }

        /**
-        * Decode all missing blocks.
+        * Decode all missing *data* blocks.
+        * Requires that the total number of available blocks is equal to or 
greater than the length of
+        * the data blocks array. (i.e. it is > k).
         * @param dataBlockStatus The data blocks.
         * @param checkBlockStatus The check blocks.
-        * @param packetLength The packet length in bytes.
+        * @param blockLength The block length in bytes.
+        * @param bf The BucketFactory to use to generate buckets.
+        * @throws IOException If there is an error in decoding caused by an 
I/O error (usually involving buckets).
         */
-       public abstract void decode(BlockStatus[] dataBlockStatus, 
BlockStatus[] checkBlockStatus, int packetLength);
+       public abstract void decode(SplitfileBlock[] dataBlockStatus, 
SplitfileBlock[] checkBlockStatus, int blockLength, BucketFactory bf) throws 
IOException;

+       /**
+        * Encode all missing *check* blocks.
+        * Requires that all the data blocks be present.
+        * @param dataBlockStatus The data blocks.
+        * @param checkBlockStatus The check blocks.
+        * @param blockLength The block length in bytes.
+        * @param bf The BucketFactory to use to generate buckets.
+        * @throws IOException If there is an error in decoding caused by an 
I/O error (usually involving buckets).
+        */
+       public abstract void encode(BlockStatus[] dataBlockStatus, 
BlockStatus[] checkBlockStatus, int blockLength, BucketFactory bucketFactory);
+
 }

Modified: trunk/freenet/src/freenet/client/FetcherContext.java
===================================================================
--- trunk/freenet/src/freenet/client/FetcherContext.java        2005-11-05 
20:01:19 UTC (rev 7474)
+++ trunk/freenet/src/freenet/client/FetcherContext.java        2005-11-05 
20:22:00 UTC (rev 7475)
@@ -5,7 +5,7 @@
 import freenet.support.BucketFactory;

 /** Context for a Fetcher. Contains all the settings a Fetcher needs to know 
about. */
-public class FetcherContext {
+public class FetcherContext implements Cloneable {

        /** Low-level client to send low-level requests to. */
        final SimpleLowLevelClient client;
@@ -47,4 +47,14 @@
                this.localRequestOnly = localRequestOnly;
        }

+       /** Make public, but just call parent for a field for field copy */
+       public Object clone() {
+               try {
+                       return super.clone();
+               } catch (CloneNotSupportedException e) {
+                       // Impossible
+                       throw new Error(e);
+               }
+       }
+       
 }

Modified: trunk/freenet/src/freenet/client/Segment.java
===================================================================
--- trunk/freenet/src/freenet/client/Segment.java       2005-11-05 20:01:19 UTC 
(rev 7474)
+++ trunk/freenet/src/freenet/client/Segment.java       2005-11-05 20:22:00 UTC 
(rev 7475)
@@ -26,12 +26,14 @@
                final FreenetURI uri;
                int completedTries;
                Bucket fetchedData;
+               boolean actuallyFetched;

                public BlockStatus(FreenetURI freenetURI, int index) {
                        uri = freenetURI;
                        completedTries = 0;
                        fetchedData = null;
                        this.index = index;
+                       actuallyFetched = false;
                }

                public void startFetch() {
@@ -60,6 +62,7 @@
                                try {
                                        FetchResult fr = f.realRun(new 
ClientMetadata(), recursionLevel, uri, 
                                                        (!nonFullBlocksAllowed) 
|| fetcherContext.dontEnterImplicitArchives);
+                                       actuallyFetched = true;
                                        fetchedData = fr.data;
                                } catch (MetadataParseException e) {
                                        fatalError(e);
@@ -124,6 +127,16 @@
                public boolean succeeded() {
                        return fetchedData != null;
                }
+
+               /**
+                * Queue a healing block for insert.
+                * Will be implemented using the download manager.
+                * FIXME: implement!
+                */
+               public void queueHeal() {
+                       // TODO Auto-generated method stub
+                       
+               }
        }

        final short splitfileType;
@@ -192,11 +205,11 @@
                Vector firstSet = new 
Vector(dataBlocks.length+checkBlocks.length);
                blocksNotTried.add(0, firstSet);
                for(int i=0;i<dataBlocks.length;i++) {
-                       dataBlockStatus[i] = new BlockStatus(dataBlocks[i]);
+                       dataBlockStatus[i] = new BlockStatus(dataBlocks[i], i);
                        firstSet.add(dataBlockStatus[i]);
                }
                for(int i=0;i<checkBlocks.length;i++) {
-                       checkBlockStatus[i] = new BlockStatus(checkBlocks[i]);
+                       checkBlockStatus[i] = new BlockStatus(checkBlocks[i], 
dataBlockStatus.length + i);
                        firstSet.add(checkBlockStatus[i]);
                }
                recentlyCompletedFetches = new LinkedList();
@@ -208,6 +221,10 @@
                } else throw new MetadataParseException("Unknown splitfile type 
"+splitfileType);
                minRetryLevel = 0;
                this.recursionLevel = recursionLevel;
+               // FIXME be a bit more flexible here depending on flags
+               blockFetchContext = (FetcherContext) fetcherContext.clone();
+               blockFetchContext.allowSplitfiles = false;
+               blockFetchContext.dontEnterImplicitArchives = true;
        }

        /**
@@ -335,10 +352,12 @@

                // Now decode

+               FECCodec codec = FECCodec.getCodec(splitfileType, 
dataBlocks.length, checkBlocks.length);
                try {
                        if(splitfileType != Metadata.SPLITFILE_NONREDUNDANT) {
-                               FECCodec codec = 
FECCodec.getCodec(splitfileType, dataBlocks.length, checkBlocks.length);
-                               codec.decode(dataBlockStatus, checkBlockStatus);
+                               // FIXME hardcoded block size below.
+                               codec.decode(dataBlockStatus, checkBlockStatus, 
32768, fetcherContext.bucketFactory);
+                               // Now have all the data blocks (not 
necessarily all the check blocks)
                        }

                        Bucket output = 
fetcherContext.bucketFactory.makeBucket(-1);
@@ -350,13 +369,28 @@
                                fetcherContext.bucketFactory.freeBucket(data);
                        }
                        os.close();
+                       parentFetcher.decoded(this, output);
                } catch (IOException e) {
                        parentFetcher.internalBucketError(this, e);
+                       return;
                }

-               parentFetcher.decoded(this, output);
+               // Now heal

-               // TODO create healing blocks
+               // Encode any check blocks we don't have
+               if(codec != null)
+                       codec.encode(dataBlockStatus, checkBlockStatus, 32768, 
fetcherContext.bucketFactory);
+               
+               // Now insert *ALL* blocks on which we had at least one 
failure, and didn't eventually succeed
+               for(int i=0;i<dataBlockStatus.length;i++) {
+                       BlockStatus block = dataBlockStatus[i];
+                       if(block.actuallyFetched) continue;
+                       if(block.completedTries == 0) {
+                               // 80% chance of not inserting, if we never 
tried it
+                               if(fetcherContext.random.nextInt(5) == 0) 
continue;
+                       }
+                       block.queueHeal();
+               }
        }

        /**

Modified: trunk/freenet/src/freenet/client/SplitFetcher.java
===================================================================
--- trunk/freenet/src/freenet/client/SplitFetcher.java  2005-11-05 20:01:19 UTC 
(rev 7474)
+++ trunk/freenet/src/freenet/client/SplitFetcher.java  2005-11-05 20:22:00 UTC 
(rev 7475)
@@ -212,9 +212,14 @@

        }

-       public void decoded(Segment segment) {
+       public void decoded(Segment segment, Bucket output) {
                // TODO Auto-generated method stub

        }

+       public void internalBucketError(Segment segment, IOException e) {
+               // TODO Auto-generated method stub
+               
+       }
+
 }

Modified: trunk/freenet/src/freenet/client/StandardOnionFECCodec.java
===================================================================
--- trunk/freenet/src/freenet/client/StandardOnionFECCodec.java 2005-11-05 
20:01:19 UTC (rev 7474)
+++ trunk/freenet/src/freenet/client/StandardOnionFECCodec.java 2005-11-05 
20:22:00 UTC (rev 7475)
@@ -1,16 +1,15 @@
 package freenet.client;

-import java.io.InputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
 import java.io.OutputStream;

 import com.onionnetworks.fec.DefaultFECCodeFactory;
 import com.onionnetworks.fec.FECCode;
+import com.onionnetworks.util.Buffer;

-import freenet.client.Segment.BlockStatus;
 import freenet.support.Bucket;
 import freenet.support.BucketFactory;
-import freenet.support.Fields;
-import freenet.support.LRUHashBag;
 import freenet.support.LRUHashtable;

 /**
@@ -22,6 +21,8 @@
        private static int MAX_CACHED_CODECS = 16;
        // REDFLAG: Optimal stripe size? Smaller => less memory usage, but more 
JNI overhead
        private static int STRIPE_SIZE = 4096;
+       // REDFLAG: Make this configurable, maybe make it depend on # CPUs
+       private static int PARALLEL_DECODES = 1;

        private static class MyKey {
                /** Number of input blocks */
@@ -72,24 +73,46 @@
                code = DefaultFECCodeFactory.getDefault().createFECCode(k,n);
        }

-       public void decode(SplitfileBlock[] dataBlockStatus, SplitfileBlock[] 
checkBlockStatus, int blockLength, BucketFactory bf) {
+       private static Object runningDecodesSync = new Object();
+       private static int runningDecodes;
+       
+       public void decode(SplitfileBlock[] dataBlockStatus, SplitfileBlock[] 
checkBlockStatus, int blockLength, BucketFactory bf) throws IOException {
                // Ensure that there are only K simultaneous running decodes.
+               synchronized(runningDecodesSync) {
+                       while(runningDecodes >= PARALLEL_DECODES) {
+                               try {
+                                       wait();
+                               } catch (InterruptedException e) {
+                                       // Ignore
+                               }
+                       }
+                       runningDecodes++;
+               }
+               try {
+                       realDecode(dataBlockStatus, checkBlockStatus, 
blockLength, bf);
+               } finally {
+                       synchronized(runningDecodesSync) {
+                               runningDecodes--;
+                       }
+               }
        }

-       public void realDecode(SplitfileBlock[] dataBlockStatus, 
SplitfileBlock[] checkBlockStatus, int blockLength, BucketFactory bf) {
+       public void realDecode(SplitfileBlock[] dataBlockStatus, 
SplitfileBlock[] checkBlockStatus, int blockLength, BucketFactory bf) throws 
IOException {
                if(dataBlockStatus.length + checkBlockStatus.length != n)
                        throw new IllegalArgumentException();
                if(dataBlockStatus.length != k)
                        throw new IllegalArgumentException();
-               byte[][] packets = new byte[k][];
+               Buffer[] packets = new Buffer[k];
                Bucket[] buckets = new Bucket[k];
-               InputStream[] readers = new InputStream[k];
+               DataInputStream[] readers = new DataInputStream[k];
                OutputStream[] writers = new OutputStream[k];
                int[] toDecode = new int[n-k];
-               int numberToDecode; // can be less than n-k
+               int numberToDecode = 0; // can be less than n-k

+               byte[] realBuffer = new byte[k * STRIPE_SIZE];
+               
                for(int i=0;i<n;i++)
-                       packets[i] = new byte[STRIPE_SIZE];
+                       packets[i] = new Buffer(realBuffer, i*STRIPE_SIZE, 
STRIPE_SIZE);

                for(int i=0;i<dataBlockStatus.length;i++) {
                        buckets[i] = dataBlockStatus[i].getData();
@@ -100,7 +123,7 @@
                                toDecode[numberToDecode++] = i;
                        } else {
                                writers[i] = null;
-                               readers[i] = buckets[i].getInputStream();
+                               readers[i] = new 
DataInputStream(buckets[i].getInputStream());
                        }
                }
                for(int i=0;i<checkBlockStatus.length;i++) {
@@ -112,7 +135,7 @@
                                toDecode[numberToDecode++] = i+k;
                        } else {
                                writers[i+k] = null;
-                               readers[i+k] = buckets[i+k].getInputStream();
+                               readers[i+k] = new 
DataInputStream(buckets[i+k].getInputStream());
                        }
                }

@@ -127,18 +150,26 @@

                if(numberToDecode > 0) {
                        // Do the (striped) decode
-                       for(int 
offset=0;offset<packetLength;offset+=STRIPE_SIZE) {
+                       for(int 
offset=0;offset<blockLength;offset+=STRIPE_SIZE) {
                                // Read the data in first
                                for(int i=0;i<n;i++) {
                                        if(readers[i] != null) {
-                                               Fields.readFully(readers[i], 
packets[i]);
+                                               
readers[i].readFully(realBuffer, i*STRIPE_SIZE, STRIPE_SIZE);
                                        }
                                }
                                // Do the decode
-                               code.decode(packets, offsets, toDecode, 
blockLength, true);
+                               // Not shuffled
+                               code.decode(packets, offsets);
+                               // packets now contains an array of decoded 
blocks, in order
+                               // Write the data out
+                               for(int i=0;i<n;i++) {
+                                       writers[i].write(realBuffer, 
i*STRIPE_SIZE, STRIPE_SIZE);
+                               }
+                       }
                }
-               // TODO Auto-generated method stub
-               
+               for(int i=0;i<k;i++) {
+                       writers[i].close();
+                       readers[i].close();
+               }
        }
-
 }


Reply via email to