Author: toad
Date: 2005-11-09 20:58:30 +0000 (Wed, 09 Nov 2005)
New Revision: 7505

Added:
   trunk/freenet/src/freenet/client/SplitFetchException.java
Modified:
   trunk/freenet/src/freenet/client/FECCodec.java
   trunk/freenet/src/freenet/client/FetchException.java
   trunk/freenet/src/freenet/client/InsertSegment.java
   trunk/freenet/src/freenet/client/Segment.java
   trunk/freenet/src/freenet/client/SplitFetcher.java
   trunk/freenet/src/freenet/client/SplitInserter.java
   trunk/freenet/src/freenet/client/StandardOnionFECCodec.java
   trunk/freenet/src/freenet/node/Version.java
Log:
147:
Might just work now...


Modified: trunk/freenet/src/freenet/client/FECCodec.java
===================================================================
--- trunk/freenet/src/freenet/client/FECCodec.java      2005-11-09 20:14:20 UTC 
(rev 7504)
+++ trunk/freenet/src/freenet/client/FECCodec.java      2005-11-09 20:58:30 UTC 
(rev 7505)
@@ -78,7 +78,7 @@
         * @param bf The BucketFactory to use to generate buckets.
         * @throws IOException If there is an error in decoding caused by an 
I/O error (usually involving buckets).
         */
-       public abstract void encode(SplitfileBlock[] dataBlocks, 
SplitfileBlock[] checkBlocks, int blockLength, BucketFactory bucketFactory);
+       public abstract void encode(SplitfileBlock[] dataBlocks, 
SplitfileBlock[] checkBlocks, int blockLength, BucketFactory bucketFactory) 
throws IOException;

        /**
         * How many check blocks?

Modified: trunk/freenet/src/freenet/client/FetchException.java
===================================================================
--- trunk/freenet/src/freenet/client/FetchException.java        2005-11-09 
20:14:20 UTC (rev 7504)
+++ trunk/freenet/src/freenet/client/FetchException.java        2005-11-09 
20:58:30 UTC (rev 7505)
@@ -72,4 +72,6 @@
        public static final int INTERNAL_ERROR = 17;
        /** The node found the data but the transfer failed */
        public static final int TRANSFER_FAILED = 18;
+       /** Splitfile error. This should be a SplitFetchException. */
+       public static final int SPLITFILE_ERROR = 19;
 }

Modified: trunk/freenet/src/freenet/client/InsertSegment.java
===================================================================
--- trunk/freenet/src/freenet/client/InsertSegment.java 2005-11-09 20:14:20 UTC 
(rev 7504)
+++ trunk/freenet/src/freenet/client/InsertSegment.java 2005-11-09 20:58:30 UTC 
(rev 7505)
@@ -1,5 +1,7 @@
 package freenet.client;

+import java.io.IOException;
+
 import freenet.keys.FreenetURI;
 import freenet.support.BucketFactory;

@@ -37,8 +39,9 @@
        /**
         * Encode the data blocks into check blocks.
         * @return The number of check blocks generated.
+        * @throws IOException If the encode fails due to a bucket error.
         */
-       public int encode(int offset, RetryTracker tracker, InserterContext 
ctx) {
+       public int encode(int offset, RetryTracker tracker, InserterContext 
ctx) throws IOException {
                if(codec == null) return 0; // no FEC
                for(int i=0;i<checkBlocks.length;i++)
                        checkBlocks[i] = new BlockInserter(null, offset + i, 
tracker, ctx);

Modified: trunk/freenet/src/freenet/client/Segment.java
===================================================================
--- trunk/freenet/src/freenet/client/Segment.java       2005-11-09 20:14:20 UTC 
(rev 7504)
+++ trunk/freenet/src/freenet/client/Segment.java       2005-11-09 20:58:30 UTC 
(rev 7505)
@@ -9,6 +9,7 @@
 import freenet.keys.FreenetURI;
 import freenet.support.Bucket;
 import freenet.support.BucketTools;
+import freenet.support.Logger;

 /**
  * A segment, within a splitfile.
@@ -48,6 +49,7 @@
        final int recursionLevel;
        /** Retry tracker */
        private final RetryTracker tracker;
+       private FetchException failureException;

        /**
         * Create a Segment.
@@ -173,9 +175,12 @@
        public void finished(SplitfileBlock[] succeeded, SplitfileBlock[] 
failed, SplitfileBlock[] fatalErrors) {

                if(succeeded.length > minFetched)
+                       // Not finished yet, need to decode
                        successfulFetch();
                else {
-                       parentFetcher.failed(this, minFetched, 
succeeded.length, failed.length, fatalErrors.length);
+                       failureException = new 
SplitFetchException(failed.length, fatalErrors.length);
+                       finished = true;
+                       parentFetcher.segmentFinished(this);
                }
        }

@@ -204,17 +209,27 @@
                                fetcherContext.bucketFactory.freeBucket(data);
                        }
                        os.close();
-                       parentFetcher.decoded(this, output);
+                       // Must set finished BEFORE calling parentFetcher.
+                       // Otherwise a race is possible that might result in it 
not seeing our finishing.
+                       finished = true;
+                       parentFetcher.segmentFinished(this);
                } catch (IOException e) {
-                       parentFetcher.internalBucketError(this, e);
+                       finished = true;
+                       failureException = new 
FetchException(FetchException.BUCKET_ERROR);
+                       parentFetcher.segmentFinished(this);
                        return;
                }

                // Now heal

                // Encode any check blocks we don't have
-               if(codec != null)
-                       codec.encode(dataBlockStatus, checkBlockStatus, 32768, 
fetcherContext.bucketFactory);
+               if(codec != null) {
+                       try {
+                               codec.encode(dataBlockStatus, checkBlockStatus, 
32768, fetcherContext.bucketFactory);
+                       } catch (IOException e) {
+                               Logger.error(this, "Bucket error while healing: 
"+e, e);
+                       }
+               }

                // Now insert *ALL* blocks on which we had at least one 
failure, and didn't eventually succeed
                for(int i=0;i<dataBlockStatus.length;i++) {
@@ -226,5 +241,7 @@
                        }
                        block.queueHeal();
                }
+               
+               // FIXME heal check blocks too
        }
 }

Added: trunk/freenet/src/freenet/client/SplitFetchException.java
===================================================================
--- trunk/freenet/src/freenet/client/SplitFetchException.java   2005-11-09 
20:14:20 UTC (rev 7504)
+++ trunk/freenet/src/freenet/client/SplitFetchException.java   2005-11-09 
20:58:30 UTC (rev 7505)
@@ -0,0 +1,20 @@
+package freenet.client;
+
+public class SplitFetchException extends FetchException {
+
+       final int failed;
+       final int fatal;
+       
+       public SplitFetchException(int failed, int fatal) {
+               super(FetchException.SPLITFILE_ERROR);
+               this.failed = failed;
+               this.fatal = fatal;
+       }
+
+       public String getMessage() {
+               return "Splitfile fetch failure: "+failed+" failed, "+fatal+" 
fatal errors";
+       }
+       
+       private static final long serialVersionUID = 1523809424508826893L;
+
+}

Modified: trunk/freenet/src/freenet/client/SplitFetcher.java
===================================================================
--- trunk/freenet/src/freenet/client/SplitFetcher.java  2005-11-09 20:14:20 UTC 
(rev 7504)
+++ trunk/freenet/src/freenet/client/SplitFetcher.java  2005-11-09 20:58:30 UTC 
(rev 7505)
@@ -170,6 +170,9 @@
                        finalLength += s.decodedLength();
                        // Healing is done by Segment
                }
+               if(finalLength > overrideLength)
+                       finalLength = overrideLength;
+               
                long bytesWritten = 0;
                OutputStream os = null;
                Bucket output;
@@ -196,33 +199,15 @@
        }

        public void gotBlocks(Segment segment) {
-               // TODO Auto-generated method stub
-               
+               synchronized(this) {
+                       fetchingSegment = null;
+               }
        }

-       public void decoded(Segment segment, Bucket output) {
-               // TODO Auto-generated method stub
-               
+       public void segmentFinished(Segment segment) {
+               synchronized(this) {
+                       notifyAll();
+               }
        }

-       public void internalBucketError(Segment segment, IOException e) {
-               // TODO Auto-generated method stub
-               
-       }
-
-       /**
-        * The segment fetch failed.
-        * @param segment The segment that failed.
-        * @param minFetched The minimum number of successful blocks for a 
successful fetch.
-        * @param successfulBlocks The number of blocks successfully fetched.
-        * @param failedBlocks The number of blocks that failed because they got
-        * non-fatal errors on every try, and ran out of retries.
-        * @param fatalErrors The number of blocks that got fatal errors.
-        */
-       public void failed(Segment segment, int minFetched, int 
successfulBlocks, 
-                       int failedBlocks, int fatalErrors) {
-               // TODO Auto-generated method stub
-               
-       }
-
 }

Modified: trunk/freenet/src/freenet/client/SplitInserter.java
===================================================================
--- trunk/freenet/src/freenet/client/SplitInserter.java 2005-11-09 20:14:20 UTC 
(rev 7504)
+++ trunk/freenet/src/freenet/client/SplitInserter.java 2005-11-09 20:58:30 UTC 
(rev 7505)
@@ -67,8 +67,13 @@
                startInsertingDataBlocks();
                splitIntoSegments(segmentSize);
                // Backwards, because the last is the shortest
-               for(int i=segments.length-1;i>=0;i--)
-                       countCheckBlocks += encodeSegment(i, 
origDataBlocks.length + checkSegmentSize * i);
+               try {
+                       for(int i=segments.length-1;i>=0;i--) {
+                               countCheckBlocks += encodeSegment(i, 
origDataBlocks.length + checkSegmentSize * i);
+                       }
+               } catch (IOException e) {
+                       throw new 
InserterException(InserterException.BUCKET_ERROR);
+               }
                // Wait for the insertion thread to finish
                return waitForCompletion();
        }
@@ -136,7 +141,7 @@
                return uris;
        }

-       private int encodeSegment(int i, int offset) {
+       private int encodeSegment(int i, int offset) throws IOException {
                encodingSegment = segments[i];
                return encodingSegment.encode(offset, tracker, ctx);
        }

Modified: trunk/freenet/src/freenet/client/StandardOnionFECCodec.java
===================================================================
--- trunk/freenet/src/freenet/client/StandardOnionFECCodec.java 2005-11-09 
20:14:20 UTC (rev 7504)
+++ trunk/freenet/src/freenet/client/StandardOnionFECCodec.java 2005-11-09 
20:58:30 UTC (rev 7505)
@@ -103,13 +103,13 @@
                if(dataBlockStatus.length != k)
                        throw new IllegalArgumentException();
                Buffer[] packets = new Buffer[k];
-               Bucket[] buckets = new Bucket[k];
+               Bucket[] buckets = new Bucket[n];
                DataInputStream[] readers = new DataInputStream[k];
                OutputStream[] writers = new OutputStream[k];
                int[] toDecode = new int[n-k];
                int numberToDecode = 0; // can be less than n-k

-               byte[] realBuffer = new byte[k * STRIPE_SIZE];
+               byte[] realBuffer = new byte[n * STRIPE_SIZE];

                for(int i=0;i<n;i++)
                        packets[i] = new Buffer(realBuffer, i*STRIPE_SIZE, 
STRIPE_SIZE);
@@ -145,9 +145,6 @@
                        toDecode = newToDecode;
                }

-               int[] offsets = new int[n];
-               for(int i=0;i<n;i++) offsets[i] = 0;
-               
                if(numberToDecode > 0) {
                        // Do the (striped) decode
                        for(int 
offset=0;offset<blockLength;offset+=STRIPE_SIZE) {
@@ -159,23 +156,115 @@
                                }
                                // Do the decode
                                // Not shuffled
-                               code.decode(packets, offsets);
+                               code.decode(packets, toDecode);
                                // packets now contains an array of decoded 
blocks, in order
                                // Write the data out
                                for(int i=0;i<n;i++) {
-                                       writers[i].write(realBuffer, 
i*STRIPE_SIZE, STRIPE_SIZE);
+                                       if(writers[i] != null)
+                                               writers[i].write(realBuffer, 
i*STRIPE_SIZE, STRIPE_SIZE);
                                }
                        }
                }
                for(int i=0;i<k;i++) {
-                       writers[i].close();
-                       readers[i].close();
+                       if(writers[i] != null) writers[i].close();
+                       if(readers[i] != null) readers[i].close();
                }
+               // Set new buckets only after have a successful decode.
+               for(int i=0;i<dataBlockStatus.length;i++) {
+                       dataBlockStatus[i].setData(buckets[i]);
+               }
+               for(int i=0;i<checkBlockStatus.length;i++) {
+                       checkBlockStatus[i].setData(buckets[i+k]);
+               }
        }

-       public void encode(SplitfileBlock[] dataBlockStatus, SplitfileBlock[] 
checkBlockStatus, int blockLength, BucketFactory bucketFactory) {
-               // TODO Auto-generated method stub
+       public void encode(SplitfileBlock[] dataBlockStatus, SplitfileBlock[] 
checkBlockStatus, int blockLength, BucketFactory bf) throws IOException {
+               // Encodes count as decodes.
+               synchronized(runningDecodesSync) {
+                       while(runningDecodes >= PARALLEL_DECODES) {
+                               try {
+                                       wait();
+                               } catch (InterruptedException e) {
+                                       // Ignore
+                               }
+                       }
+                       runningDecodes++;
+               }
+               try {
+                       realEncode(dataBlockStatus, checkBlockStatus, 
blockLength, bf);
+               } finally {
+                       synchronized(runningDecodesSync) {
+                               runningDecodes--;
+                       }
+               }
+       }
+
+       /**
+        * Do the actual encode.
+        */
+       private void realEncode(SplitfileBlock[] dataBlockStatus, 
SplitfileBlock[] checkBlockStatus, int blockLength, BucketFactory bf) throws 
IOException {
+               if(dataBlockStatus.length + checkBlockStatus.length != n)
+                       throw new IllegalArgumentException();
+               if(dataBlockStatus.length != k)
+                       throw new IllegalArgumentException();
+               Buffer[] dataPackets = new Buffer[k];
+               Buffer[] checkPackets = new Buffer[n-k];
+               Bucket[] buckets = new Bucket[n];
+               DataInputStream[] readers = new DataInputStream[k];
+               OutputStream[] writers = new OutputStream[n-k];
+               int[] toEncode = new int[n-k];
+               int numberToEncode = 0; // can be less than n-k

+               byte[] realBuffer = new byte[n * STRIPE_SIZE];
+
+               for(int i=0;i<k;i++)
+                       dataPackets[i] = new Buffer(realBuffer, i*STRIPE_SIZE, 
STRIPE_SIZE);
+               for(int i=0;i<n-k;i++)
+                       checkPackets[i] = new Buffer(realBuffer, 
(i+k)*STRIPE_SIZE, STRIPE_SIZE);
+
+               for(int i=0;i<dataBlockStatus.length;i++) {
+                       buckets[i] = dataBlockStatus[i].getData();
+                       readers[i] = new 
DataInputStream(buckets[i].getInputStream());
+               }
+               for(int i=0;i<checkBlockStatus.length;i++) {
+                       buckets[i+k] = checkBlockStatus[i].getData();
+                       if(buckets[i+k] == null) {
+                               buckets[i+k] = bf.makeBucket(blockLength);
+                               writers[i+k] = buckets[i+k].getOutputStream();
+                               readers[i+k] = null;
+                               toEncode[numberToEncode++] = i+k;
+                       } else {
+                               writers[i+k] = null;
+                               readers[i+k] = new 
DataInputStream(buckets[i+k].getInputStream());
+                       }
+               }
+               
+               if(numberToEncode > 0) {
+                       // Do the (striped) decode
+                       for(int 
offset=0;offset<blockLength;offset+=STRIPE_SIZE) {
+                               // Read the data in first
+                               for(int i=0;i<n;i++) {
+                                       readers[i].readFully(realBuffer, 
i*STRIPE_SIZE, STRIPE_SIZE);
+                               }
+                               // Do the encode
+                               // Not shuffled
+                               code.encode(dataPackets, checkPackets, 
toEncode);
+                               // packets now contains an array of decoded 
blocks, in order
+                               // Write the data out
+                               for(int i=k;i<n;i++) {
+                                       if(writers[i] != null)
+                                               writers[i].write(realBuffer, 
i*STRIPE_SIZE, STRIPE_SIZE);
+                               }
+                       }
+               }
+               for(int i=0;i<n;i++) {
+                       if(writers[i] != null) writers[i].close();
+                       if(readers[i] != null) readers[i].close();
+               }
+               // Set new buckets only after have a successful decode.
+               for(int i=0;i<checkBlockStatus.length;i++) {
+                       checkBlockStatus[i].setData(buckets[i+k]);
+               }
        }

        public int countCheckBlocks() {

Modified: trunk/freenet/src/freenet/node/Version.java
===================================================================
--- trunk/freenet/src/freenet/node/Version.java 2005-11-09 20:14:20 UTC (rev 
7504)
+++ trunk/freenet/src/freenet/node/Version.java 2005-11-09 20:58:30 UTC (rev 
7505)
@@ -20,7 +20,7 @@
        public static final String protocolVersion = "1.0";

        /** The build number of the current revision */
-       public static final int buildNumber = 146;
+       public static final int buildNumber = 147;

        /** Oldest build of Fred we will talk to */
        public static final int lastGoodBuild = 144;


Reply via email to