Author: toad
Date: 2005-11-05 18:28:07 +0000 (Sat, 05 Nov 2005)
New Revision: 7473

Added:
   trunk/freenet/src/freenet/client/FECCodec.java
   trunk/freenet/src/freenet/client/SplitfileBlock.java
   trunk/freenet/src/freenet/client/StandardOnionFECCodec.java
Modified:
   trunk/freenet/src/freenet/client/FetchException.java
   trunk/freenet/src/freenet/client/Segment.java
   trunk/freenet/src/freenet/client/SplitFetcher.java
Log:
Lots of work on splitfiles.

Added: trunk/freenet/src/freenet/client/FECCodec.java
===================================================================
--- trunk/freenet/src/freenet/client/FECCodec.java      2005-11-05 18:14:59 UTC 
(rev 7472)
+++ trunk/freenet/src/freenet/client/FECCodec.java      2005-11-05 18:28:07 UTC 
(rev 7473)
@@ -0,0 +1,28 @@
+package freenet.client;
+
+import freenet.client.Segment.BlockStatus;
+
+/**
+ * FEC (forward error correction) handler.
+ * I didn't keep the old freenet.client.FEC* etc as it seemed grossly 
overengineered with
+ * a lot of code there only because of API confusion.
+ * @author root
+ *
+ */
+abstract class FECCodec {
+
+       public static FECCodec getCodec(short splitfileType, int dataBlocks, 
int checkBlocks) {
+               if(splitfileType == Metadata.SPLITFILE_ONION_STANDARD)
+                       return StandardOnionFECCodec.getInstance(dataBlocks, 
checkBlocks);
+               else return null;
+       }
+
+       /**
+        * Decode all missing blocks.
+        * @param dataBlockStatus The data blocks.
+        * @param checkBlockStatus The check blocks.
+        * @param packetLength The packet length in bytes.
+        */
+       public abstract void decode(BlockStatus[] dataBlockStatus, 
BlockStatus[] checkBlockStatus, int packetLength);
+
+}

Modified: trunk/freenet/src/freenet/client/FetchException.java
===================================================================
--- trunk/freenet/src/freenet/client/FetchException.java        2005-11-05 
18:14:59 UTC (rev 7472)
+++ trunk/freenet/src/freenet/client/FetchException.java        2005-11-05 
18:28:07 UTC (rev 7473)
@@ -41,7 +41,7 @@
        /** Don't know what to do with splitfile */
        public static final int UNKNOWN_SPLITFILE_METADATA = 2;
        /** Too many ordinary redirects */
-       public static final int TOO_MANY_REDIRECTS = 3;
+       public static final int TOO_MANY_REDIRECTS = 16;
        /** Don't know what to do with metadata */
        public static final int UNKNOWN_METADATA = 3;
        /** Got a MetadataParseException */
@@ -62,4 +62,10 @@
        public static final int HAS_MORE_METASTRINGS = 11;
        /** Internal error, probably failed to read from a bucket */
        public static final int BUCKET_ERROR = 12;
+       /** Data not found */
+       public static final int DATA_NOT_FOUND = 13;
+       /** Route not found */
+       public static final int ROUTE_NOT_FOUND = 14;
+       /** Downstream overload */
+       public static final int REJECTED_OVERLOAD = 15;
 }

Modified: trunk/freenet/src/freenet/client/Segment.java
===================================================================
--- trunk/freenet/src/freenet/client/Segment.java       2005-11-05 18:14:59 UTC 
(rev 7472)
+++ trunk/freenet/src/freenet/client/Segment.java       2005-11-05 18:28:07 UTC 
(rev 7473)
@@ -19,17 +19,25 @@
  */
 public class Segment implements Runnable {

-       public class BlockStatus implements Runnable {
+       public class BlockStatus implements Runnable, SplitfileBlock {

+               /** Splitfile index - [0,k[ is the data blocks, [k,n[ is the 
check blocks */
+               final int index;
                final FreenetURI uri;
                int completedTries;
+               Bucket fetchedData;

-               public BlockStatus(FreenetURI freenetURI) {
+               public BlockStatus(FreenetURI freenetURI, int index) {
                        uri = freenetURI;
                        completedTries = 0;
+                       fetchedData = null;
+                       this.index = index;
                }

                public void startFetch() {
+                       if(fetchedData != null) {
+                               throw new IllegalStateException("Already have 
data");
+                       }
                        synchronized(runningFetches) {
                                runningFetches.add(this);
                                try {
@@ -50,22 +58,51 @@
                                // Do the fetch
                                Fetcher f = new Fetcher(uri, blockFetchContext);
                                try {
-                                       f.realRun(new ClientMetadata(), 
recursionLevel, uri, 
+                                       FetchResult fr = f.realRun(new 
ClientMetadata(), recursionLevel, uri, 
                                                        (!nonFullBlocksAllowed) 
|| fetcherContext.dontEnterImplicitArchives);
+                                       fetchedData = fr.data;
                                } catch (MetadataParseException e) {
-                                       // TODO Auto-generated catch block
-                                       e.printStackTrace();
+                                       fatalError(e);
                                } catch (FetchException e) {
-                                       // TODO Auto-generated catch block
-                                       e.printStackTrace();
+                                       int code = e.getMode();
+                                       switch(code) {
+                                       case FetchException.ARCHIVE_FAILURE:
+                                       case FetchException.BLOCK_DECODE_ERROR:
+                                       case 
FetchException.HAS_MORE_METASTRINGS:
+                                       case FetchException.INVALID_METADATA:
+                                       case FetchException.NOT_IN_ARCHIVE:
+                                       case 
FetchException.TOO_DEEP_ARCHIVE_RECURSION:
+                                       case 
FetchException.TOO_MANY_ARCHIVE_RESTARTS:
+                                       case 
FetchException.TOO_MANY_METADATA_LEVELS:
+                                       case FetchException.TOO_MANY_REDIRECTS:
+                                       case FetchException.TOO_MUCH_RECURSION:
+                                       case FetchException.UNKNOWN_METADATA:
+                                       case 
FetchException.UNKNOWN_SPLITFILE_METADATA:
+                                               // Fatal, probably an error on 
insert
+                                               fatalError(e);
+                                               return;
+                                       
+                                       case FetchException.DATA_NOT_FOUND:
+                                       case FetchException.ROUTE_NOT_FOUND:
+                                       case FetchException.REJECTED_OVERLOAD:
+                                               // Non-fatal
+                                               nonfatalError(e);
+                                               
+                                       case FetchException.BUCKET_ERROR:
+                                               // Maybe fatal
+                                               nonfatalError(e);
+                                       }
                                } catch (ArchiveFailureException e) {
-                                       // TODO Auto-generated catch block
-                                       e.printStackTrace();
+                                       fatalError(e);
                                } catch (ArchiveRestartException e) {
-                                       // TODO Auto-generated catch block
-                                       e.printStackTrace();
-                               };
+                                       fatalError(e);
+                               }
                        } finally {
+                               completedTries++;
+                               // Add before removing from runningFetches, to 
avoid race
+                               synchronized(recentlyCompletedFetches) {
+                                       recentlyCompletedFetches.add(this);
+                               }
                                synchronized(runningFetches) {
                                        runningFetches.remove(this);
                                }
@@ -75,6 +112,18 @@
                        }
                }

+               private void fatalError(Exception e) {
+                       Logger.normal(this, "Giving up on block: "+this+": "+e);
+                       completedTries = -1;
+               }
+
+               private void nonfatalError(Exception e) {
+                       Logger.minor(this, "Non-fatal error on "+this+": "+e);
+               }
+               
+               public boolean succeeded() {
+                       return fetchedData != null;
+               }
        }

        final short splitfileType;
@@ -113,6 +162,8 @@
        private final FetcherContext blockFetchContext;
        /** Recursion level */
        private final int recursionLevel;
+       /** Number of blocks which got fatal errors */
+       private int fatalErrorCount;

        /**
         * Create a Segment.
@@ -235,7 +286,11 @@

                        // Now wait for any thread to complete
                        synchronized(this) {
-                               wait(10*1000);
+                               try {
+                                       wait(10*1000);
+                               } catch (InterruptedException e) {
+                                       // Ignore
+                               }
                        }

                        while(true) {
@@ -244,10 +299,12 @@
                                        block = (BlockStatus) 
recentlyCompletedFetches.removeFirst();
                                }
                                if(block == null) break;
-                               if(block.failed()) {
+                               if(!block.succeeded()) {
                                        // Retry
                                        int retryLevel = block.completedTries;
-                                       if(retryLevel == maxRetryLevel) {
+                                       if(retryLevel == maxRetryLevel || 
retryLevel == -1) {
+                                               if(retryLevel == -1)
+                                                       fatalErrorCount++;
                                                // This block failed
                                        } else {
                                                Vector levelSet = (Vector) 
blocksNotTried.get(retryLevel);
@@ -262,7 +319,7 @@
                                        // Can't start a fetch
                                        if(runningFetches() == 0) {
                                                // Failed
-                                               
parentFetcher.failedNotEnoughBlocks();
+                                               
parentFetcher.failedNotEnoughBlocks(this);
                                                return;
                                        }
                                }
@@ -275,15 +332,29 @@
                }

                parentFetcher.gotBlocks(this);
-
+               
                // Now decode
-               if(splitfileType == Metadata.SPLITFILE_NONREDUNDANT) {
-                       // TODO put the data together
-               } else {
-                       // TODO decode via onion
+               
+               try {
+                       if(splitfileType != Metadata.SPLITFILE_NONREDUNDANT) {
+                               FECCodec codec = 
FECCodec.getCodec(splitfileType, dataBlocks.length, checkBlocks.length);
+                               codec.decode(dataBlockStatus, checkBlockStatus);
+                       }
+                       
+                       Bucket output = 
fetcherContext.bucketFactory.makeBucket(-1);
+                       OutputStream os = output.getOutputStream();
+                       for(int i=0;i<dataBlockStatus.length;i++) {
+                               BlockStatus status = dataBlockStatus[i];
+                               Bucket data = status.fetchedData;
+                               BucketTools.copyTo(data, os, Long.MAX_VALUE);
+                               fetcherContext.bucketFactory.freeBucket(data);
+                       }
+                       os.close();
+               } catch (IOException e) {
+                       parentFetcher.internalBucketError(this, e);
                }

-               parentFetcher.decoded(this);
+               parentFetcher.decoded(this, output);

                // TODO create healing blocks
        }
@@ -304,11 +375,16 @@
        private boolean startFetch() {
                if(minRetryLevel == maxRetryLevel) return false; // nothing to 
start
                // Don't need to synchronize as these are only accessed by main 
thread
-               Vector v = (Vector) blocksNotTried.get(minRetryLevel);
-               int len = v.size();
-               int idx = fetcherContext.random.nextInt(len);
-               BlockStatus b = (BlockStatus) v.remove(idx);
-               if(v.isEmpty()) minRetryLevel++;
-               b.startFetch();
+               while(true) {
+                       if(minRetryLevel >= blocksNotTried.size())
+                               return false;
+                       Vector v = (Vector) blocksNotTried.get(minRetryLevel);
+                       int len = v.size();
+                       int idx = fetcherContext.random.nextInt(len);
+                       BlockStatus b = (BlockStatus) v.remove(idx);
+                       if(v.isEmpty()) minRetryLevel++;
+                       b.startFetch();
+                       return true;
+               }
        }
 }

Modified: trunk/freenet/src/freenet/client/SplitFetcher.java
===================================================================
--- trunk/freenet/src/freenet/client/SplitFetcher.java  2005-11-05 18:14:59 UTC 
(rev 7472)
+++ trunk/freenet/src/freenet/client/SplitFetcher.java  2005-11-05 18:28:07 UTC 
(rev 7473)
@@ -202,4 +202,19 @@
                return output;
        }

+       public void failedNotEnoughBlocks(Segment segment) {
+               // TODO Auto-generated method stub
+               
+       }
+
+       public void gotBlocks(Segment segment) {
+               // TODO Auto-generated method stub
+               
+       }
+
+       public void decoded(Segment segment) {
+               // TODO Auto-generated method stub
+               
+       }
+
 }

Added: trunk/freenet/src/freenet/client/SplitfileBlock.java
===================================================================
--- trunk/freenet/src/freenet/client/SplitfileBlock.java        2005-11-05 
18:14:59 UTC (rev 7472)
+++ trunk/freenet/src/freenet/client/SplitfileBlock.java        2005-11-05 
18:28:07 UTC (rev 7473)
@@ -0,0 +1,20 @@
+package freenet.client;
+
+import freenet.support.Bucket;
+
+/** Simple interface for a splitfile block */
+public interface SplitfileBlock {
+
+       /** Get block number. [0,k[ = data blocks, [k, n[ = check blocks */
+       int getNumber();
+       
+       /** Has data? */
+       boolean hasData();
+       
+       /** Get data */
+       Bucket getData();
+       
+       /** Set data */
+       void setData(Bucket data);
+       
+}

Added: trunk/freenet/src/freenet/client/StandardOnionFECCodec.java
===================================================================
--- trunk/freenet/src/freenet/client/StandardOnionFECCodec.java 2005-11-05 
18:14:59 UTC (rev 7472)
+++ trunk/freenet/src/freenet/client/StandardOnionFECCodec.java 2005-11-05 
18:28:07 UTC (rev 7473)
@@ -0,0 +1,144 @@
+package freenet.client;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import com.onionnetworks.fec.DefaultFECCodeFactory;
+import com.onionnetworks.fec.FECCode;
+
+import freenet.client.Segment.BlockStatus;
+import freenet.support.Bucket;
+import freenet.support.BucketFactory;
+import freenet.support.Fields;
+import freenet.support.LRUHashBag;
+import freenet.support.LRUHashtable;
+
+/**
+ * FECCodec implementation using the onion code.
+ */
+public class StandardOnionFECCodec extends FECCodec {
+
+       // REDFLAG: How big is one of these?
+       private static int MAX_CACHED_CODECS = 16;
+       // REDFLAG: Optimal stripe size? Smaller => less memory usage, but more 
JNI overhead
+       private static int STRIPE_SIZE = 4096;
+       
+       private static class MyKey {
+               /** Number of input blocks */
+               int k;
+               /** Number of output blocks, including input blocks */
+               int n;
+               
+               public MyKey(int n, int k) {
+                       this.n = n;
+                       this.k = k;
+               }
+               
+               public boolean equals(Object o) {
+                       if(o instanceof MyKey) {
+                               MyKey key = (MyKey)o;
+                               return key.n == n && key.k == k;
+                       } else return false;
+               }
+               
+               public int hashCode() {
+                       return (n << 16) + k;
+               }
+       }
+
+       private static LRUHashtable recentlyUsedCodecs;
+       
+       public synchronized static FECCodec getInstance(int dataBlocks, int 
checkBlocks) {
+               MyKey key = new MyKey(dataBlocks, checkBlocks + dataBlocks);
+               StandardOnionFECCodec codec = (StandardOnionFECCodec) 
recentlyUsedCodecs.get(key);
+               if(codec != null) {
+                       recentlyUsedCodecs.push(key, codec);
+                       return codec;
+               }
+               codec = new StandardOnionFECCodec(dataBlocks, checkBlocks + 
dataBlocks);
+               recentlyUsedCodecs.push(key, codec);
+               while(recentlyUsedCodecs.size() > MAX_CACHED_CODECS) {
+                       recentlyUsedCodecs.popKey();
+               }
+               return codec;
+       }
+
+       private FECCode code;
+
+       private int k;
+       private int n;
+       
+       public StandardOnionFECCodec(int k, int n) {
+               code = DefaultFECCodeFactory.getDefault().createFECCode(k,n);
+       }
+
+       public void decode(SplitfileBlock[] dataBlockStatus, SplitfileBlock[] 
checkBlockStatus, int blockLength, BucketFactory bf) {
+               // Ensure that there are only K simultaneous running decodes.
+       }
+       
+       public void realDecode(SplitfileBlock[] dataBlockStatus, 
SplitfileBlock[] checkBlockStatus, int blockLength, BucketFactory bf) {
+               if(dataBlockStatus.length + checkBlockStatus.length != n)
+                       throw new IllegalArgumentException();
+               if(dataBlockStatus.length != k)
+                       throw new IllegalArgumentException();
+               byte[][] packets = new byte[k][];
+               Bucket[] buckets = new Bucket[k];
+               InputStream[] readers = new InputStream[k];
+               OutputStream[] writers = new OutputStream[k];
+               int[] toDecode = new int[n-k];
+               int numberToDecode; // can be less than n-k
+               
+               for(int i=0;i<n;i++)
+                       packets[i] = new byte[STRIPE_SIZE];
+               
+               for(int i=0;i<dataBlockStatus.length;i++) {
+                       buckets[i] = dataBlockStatus[i].getData();
+                       if(buckets[i] == null) {
+                               buckets[i] = bf.makeBucket(blockLength);
+                               writers[i] = buckets[i].getOutputStream();
+                               readers[i] = null;
+                               toDecode[numberToDecode++] = i;
+                       } else {
+                               writers[i] = null;
+                               readers[i] = buckets[i].getInputStream();
+                       }
+               }
+               for(int i=0;i<checkBlockStatus.length;i++) {
+                       buckets[i+k] = checkBlockStatus[i].getData();
+                       if(buckets[i+k] == null) {
+                               buckets[i+k] = bf.makeBucket(blockLength);
+                               writers[i+k] = buckets[i+k].getOutputStream();
+                               readers[i+k] = null;
+                               toDecode[numberToDecode++] = i+k;
+                       } else {
+                               writers[i+k] = null;
+                               readers[i+k] = buckets[i+k].getInputStream();
+                       }
+               }
+               
+               if(numberToDecode != toDecode.length) {
+                       int[] newToDecode = new int[numberToDecode];
+                       System.arraycopy(toDecode, 0, newToDecode, 0, 
numberToDecode);
+                       toDecode = newToDecode;
+               }
+
+               int[] offsets = new int[n];
+               for(int i=0;i<n;i++) offsets[i] = 0;
+               
+               if(numberToDecode > 0) {
+                       // Do the (striped) decode
+                       for(int 
offset=0;offset<packetLength;offset+=STRIPE_SIZE) {
+                               // Read the data in first
+                               for(int i=0;i<n;i++) {
+                                       if(readers[i] != null) {
+                                               Fields.readFully(readers[i], 
packets[i]);
+                                       }
+                               }
+                               // Do the decode
+                               code.decode(packets, offsets, toDecode, 
blockLength, true);
+               }
+               // TODO Auto-generated method stub
+               
+       }
+
+}


Reply via email to