Author: toad
Date: 2005-11-09 18:55:33 +0000 (Wed, 09 Nov 2005)
New Revision: 7503

Added:
   trunk/freenet/src/freenet/client/BlockFetcher.java
   trunk/freenet/src/freenet/client/BlockInserter.java
   trunk/freenet/src/freenet/client/FailureCodeTracker.java
   trunk/freenet/src/freenet/client/RetryTrackerCallback.java
Modified:
   trunk/freenet/src/freenet/client/FECCodec.java
   trunk/freenet/src/freenet/client/FileInserter.java
   trunk/freenet/src/freenet/client/HighLevelSimpleClientImpl.java
   trunk/freenet/src/freenet/client/InsertSegment.java
   trunk/freenet/src/freenet/client/InserterContext.java
   trunk/freenet/src/freenet/client/InserterException.java
   trunk/freenet/src/freenet/client/Metadata.java
   trunk/freenet/src/freenet/client/RetryTracker.java
   trunk/freenet/src/freenet/client/Segment.java
   trunk/freenet/src/freenet/client/SplitFetcher.java
   trunk/freenet/src/freenet/client/SplitInserter.java
   trunk/freenet/src/freenet/client/SplitfileBlock.java
   trunk/freenet/src/freenet/client/StandardOnionFECCodec.java
   trunk/freenet/src/freenet/node/Version.java
   trunk/freenet/src/freenet/support/BucketTools.java
Log:
145:
Lots more work on splitfiles.
Neither splitfile insert nor splitfile request will work at the moment.

Added: trunk/freenet/src/freenet/client/BlockFetcher.java
===================================================================
--- trunk/freenet/src/freenet/client/BlockFetcher.java  2005-11-09 16:09:35 UTC 
(rev 7502)
+++ trunk/freenet/src/freenet/client/BlockFetcher.java  2005-11-09 18:55:33 UTC 
(rev 7503)
@@ -0,0 +1,162 @@
+/**
+ * 
+ */
+package freenet.client;
+
+import freenet.keys.FreenetURI;
+import freenet.support.Bucket;
+import freenet.support.Logger;
+
+public class BlockFetcher extends SplitfileBlock implements Runnable {
+
+       private final Segment segment;
+       private final RetryTracker tracker;
+       /** Splitfile index - [0,k[ is the data blocks, [k,n[ is the check 
blocks */
+       final int index;
+       final FreenetURI uri;
+       final boolean dontEnterImplicitArchives;
+       int completedTries;
+       Bucket fetchedData;
+       boolean actuallyFetched;
+       
+       public BlockFetcher(Segment segment, RetryTracker tracker, FreenetURI 
freenetURI, int index, boolean dontEnterImplicitArchives) {
+               this.segment = segment;
+               this.tracker = tracker;
+               uri = freenetURI;
+               completedTries = 0;
+               fetchedData = null;
+               this.index = index;
+               actuallyFetched = false;
+               this.dontEnterImplicitArchives = dontEnterImplicitArchives;
+       }
+
+       public void start() {
+               if(fetchedData != null) {
+                       throw new IllegalStateException("Already have data");
+               }
+               try {
+                       Thread t = new Thread(this);
+                       t.setDaemon(true);
+                       t.start();
+               } catch (Throwable error) {
+                       tracker.fatalError(this, 
InserterException.INTERNAL_ERROR);
+                       Logger.error(this, "Caught "+error+" creating thread 
for "+this);
+               }
+       }
+
+       public void run() {
+               // Already added to runningFetches.
+               // But need to make sure we are removed when we exit.
+               try {
+                       // Do the fetch
+                       Fetcher f = new Fetcher(uri, 
this.segment.blockFetchContext);
+                       try {
+                               FetchResult fr = f.realRun(new 
ClientMetadata(), this.segment.recursionLevel, uri, 
+                                               
(!this.segment.nonFullBlocksAllowed) || dontEnterImplicitArchives);
+                               actuallyFetched = true;
+                               fetchedData = fr.data;
+                       } catch (MetadataParseException e) {
+                               fatalError(e, FetchException.INVALID_METADATA);
+                       } catch (FetchException e) {
+                               int code = e.getMode();
+                               switch(code) {
+                               case FetchException.ARCHIVE_FAILURE:
+                               case FetchException.BLOCK_DECODE_ERROR:
+                               case FetchException.HAS_MORE_METASTRINGS:
+                               case FetchException.INVALID_METADATA:
+                               case FetchException.NOT_IN_ARCHIVE:
+                               case FetchException.TOO_DEEP_ARCHIVE_RECURSION:
+                               case FetchException.TOO_MANY_ARCHIVE_RESTARTS:
+                               case FetchException.TOO_MANY_METADATA_LEVELS:
+                               case FetchException.TOO_MANY_REDIRECTS:
+                               case FetchException.TOO_MUCH_RECURSION:
+                               case FetchException.UNKNOWN_METADATA:
+                               case FetchException.UNKNOWN_SPLITFILE_METADATA:
+                                       // Fatal, probably an error on insert
+                                       fatalError(e, code);
+                                       return;
+                               
+                               case FetchException.DATA_NOT_FOUND:
+                               case FetchException.ROUTE_NOT_FOUND:
+                               case FetchException.REJECTED_OVERLOAD:
+                               case FetchException.TRANSFER_FAILED:
+                                       // Non-fatal
+                                       nonfatalError(e, code);
+                                       
+                               case FetchException.BUCKET_ERROR:
+                               case FetchException.INTERNAL_ERROR:
+                                       // Maybe fatal
+                                       nonfatalError(e, code);
+                               }
+                       } catch (ArchiveFailureException e) {
+                               fatalError(e, FetchException.ARCHIVE_FAILURE);
+                       } catch (ArchiveRestartException e) {
+                               Logger.error(this, "Got an 
ArchiveRestartException in a splitfile - WTF?");
+                               fatalError(e, FetchException.ARCHIVE_FAILURE);
+                       }
+               } finally {
+                       completedTries++;
+                       // Add before removing from runningFetches, to avoid 
race
+                       synchronized(this.segment.recentlyCompletedFetches) {
+                               this.segment.recentlyCompletedFetches.add(this);
+                       }
+                       synchronized(this.segment.runningFetches) {
+                               this.segment.runningFetches.remove(this);
+                       }
+                       synchronized(this.segment) {
+                               this.segment.notify();
+                       }
+               }
+       }
+
+       private void fatalError(Exception e, int code) {
+               Logger.normal(this, "Giving up on block: "+this+": "+e);
+               completedTries = -1;
+               tracker.fatalError(this, code);
+       }
+
+       private void nonfatalError(Exception e, int code) {
+               Logger.minor(this, "Non-fatal error on "+this+": "+e);
+               tracker.nonfatalError(this, code);
+       }
+       
+       public boolean succeeded() {
+               return fetchedData != null;
+       }
+
+       /**
+        * Queue a healing block for insert.
+        * Will be implemented using the download manager.
+        * FIXME: implement!
+        */
+       public void queueHeal() {
+               // TODO Auto-generated method stub
+               
+       }
+
+       public int getNumber() {
+               return index;
+       }
+
+       public boolean hasData() {
+               return fetchedData != null;
+       }
+
+       public Bucket getData() {
+               return fetchedData;
+       }
+
+       public void setData(Bucket data) {
+               actuallyFetched = false;
+               fetchedData = data;
+       }
+
+       public void kill() {
+               // Do nothing, for now
+       }
+
+       public FreenetURI getURI() {
+               // TODO Auto-generated method stub
+               return null;
+       }
+}
\ No newline at end of file

Added: trunk/freenet/src/freenet/client/BlockInserter.java
===================================================================
--- trunk/freenet/src/freenet/client/BlockInserter.java 2005-11-09 16:09:35 UTC 
(rev 7502)
+++ trunk/freenet/src/freenet/client/BlockInserter.java 2005-11-09 18:55:33 UTC 
(rev 7503)
@@ -0,0 +1,56 @@
+package freenet.client;
+
+import freenet.keys.FreenetURI;
+import freenet.support.Bucket;
+
+/**
+ * Inserts a single splitfile block.
+ */
+public class BlockInserter extends SplitfileBlock {
+
+       private Bucket data;
+       private final int num;
+       
+       /**
+        * Create a BlockInserter.
+        * @param bucket The data to insert, or null if it will be filled in 
later.
+        * @param num The block number in the splitfile.
+        */
+       public BlockInserter(Bucket bucket, int num) {
+               this.data = bucket;
+               if(bucket == null) throw new NullPointerException();
+               this.num = num;
+       }
+
+       int getNumber() {
+               return num;
+       }
+
+       boolean hasData() {
+               return true;
+       }
+
+       Bucket getData() {
+               return data;
+       }
+
+       synchronized void setData(Bucket data) {
+               if(this.data != null) throw new 
IllegalArgumentException("Cannot set data when already have data");
+               this.data = data;
+       }
+
+       void start() {
+               // TODO Auto-generated method stub
+
+       }
+
+       public void kill() {
+               // TODO Auto-generated method stub
+
+       }
+
+       public FreenetURI getURI() {
+               // TODO Auto-generated method stub
+               return null;
+       }
+}

Modified: trunk/freenet/src/freenet/client/FECCodec.java
===================================================================
--- trunk/freenet/src/freenet/client/FECCodec.java      2005-11-09 16:09:35 UTC 
(rev 7502)
+++ trunk/freenet/src/freenet/client/FECCodec.java      2005-11-09 18:55:33 UTC 
(rev 7503)
@@ -2,7 +2,6 @@

 import java.io.IOException;

-import freenet.client.Segment.BlockStatus;
 import freenet.support.BucketFactory;

 /**
@@ -14,13 +13,21 @@
  */
 abstract class FECCodec {

-       public static int getCodecMaxSegmentSize(short splitfileType) {
+       public static int getCodecMaxSegmentDataBlocks(short splitfileType) {
                if(splitfileType == Metadata.SPLITFILE_NONREDUNDANT)
                        return -1;
                if(splitfileType == Metadata.SPLITFILE_ONION_STANDARD)
                        return 128;
                throw new IllegalArgumentException();
        }
+
+       public static int getCodecMaxSegmentCheckBlocks(short splitfileType) {
+               if(splitfileType == Metadata.SPLITFILE_NONREDUNDANT)
+                       return -1;
+               if(splitfileType == Metadata.SPLITFILE_ONION_STANDARD)
+                       return 64;
+               throw new IllegalArgumentException();
+       }

        /**
         * Get a codec where we know both the number of data blocks and the 
number

Added: trunk/freenet/src/freenet/client/FailureCodeTracker.java
===================================================================
--- trunk/freenet/src/freenet/client/FailureCodeTracker.java    2005-11-09 
16:09:35 UTC (rev 7502)
+++ trunk/freenet/src/freenet/client/FailureCodeTracker.java    2005-11-09 
18:55:33 UTC (rev 7503)
@@ -0,0 +1,25 @@
+package freenet.client;
+
+import java.util.HashMap;
+
+/**
+ * Essentially a map of integer to incrementible integer.
+ * FIXME maybe move this to support, give it a better name?
+ */
+public class FailureCodeTracker {
+
+       public class Item {
+               int x;
+       }
+
+       final HashMap map = new HashMap();
+
+       public synchronized void inc(int k) {
+               Integer key = new Integer(k);
+               Item i = (Item) map.get(key);
+               if(i == null)
+                       map.put(key, i = new Item());
+               i.x++;
+       }
+       
+}

Modified: trunk/freenet/src/freenet/client/FileInserter.java
===================================================================
--- trunk/freenet/src/freenet/client/FileInserter.java  2005-11-09 16:09:35 UTC 
(rev 7502)
+++ trunk/freenet/src/freenet/client/FileInserter.java  2005-11-09 18:55:33 UTC 
(rev 7503)
@@ -97,7 +97,7 @@
                }

                // Too big, encode to a splitfile
-               SplitInserter splitInsert = new SplitInserter(data, 
block.clientMetadata, bestCodec, ctx.splitfileAlgorithm, ctx);
+               SplitInserter splitInsert = new SplitInserter(data, 
block.clientMetadata, bestCodec, ctx.splitfileAlgorithm, ctx, this, 
NodeCHK.BLOCK_SIZE);
                return splitInsert.run();
        }


Modified: trunk/freenet/src/freenet/client/HighLevelSimpleClientImpl.java
===================================================================
--- trunk/freenet/src/freenet/client/HighLevelSimpleClientImpl.java     
2005-11-09 16:09:35 UTC (rev 7502)
+++ trunk/freenet/src/freenet/client/HighLevelSimpleClientImpl.java     
2005-11-09 18:55:33 UTC (rev 7503)
@@ -33,9 +33,10 @@
        /** If set, only check the local datastore, don't send an actual 
request out.
         * Don't turn this off either. */
        static final boolean LOCAL_REQUESTS_ONLY = false;
+       static final int SPLITFILE_INSERT_THREADS = 40;
+       static final int SPLITFILE_INSERT_RETRIES = 10;


-       
        public HighLevelSimpleClientImpl(SimpleLowLevelClient client, 
ArchiveManager mgr, BucketFactory bf, RandomSource r) {
                this.client = client;
                archiveManager = mgr;
@@ -65,7 +66,7 @@
        }

        public FreenetURI insert(InsertBlock insert) throws InserterException {
-               InserterContext context = new InserterContext(client, 
bucketFactory, random);
+               InserterContext context = new InserterContext(client, 
bucketFactory, random, SPLITFILE_INSERT_RETRIES, SPLITFILE_INSERT_THREADS);
                FileInserter i = new FileInserter(context);
                return i.run(insert, false);
        }

Modified: trunk/freenet/src/freenet/client/InsertSegment.java
===================================================================
--- trunk/freenet/src/freenet/client/InsertSegment.java 2005-11-09 16:09:35 UTC 
(rev 7502)
+++ trunk/freenet/src/freenet/client/InsertSegment.java 2005-11-09 18:55:33 UTC 
(rev 7503)
@@ -1,16 +1,49 @@
 package freenet.client;

+import freenet.keys.FreenetURI;
+import freenet.support.BucketFactory;
+
 /**
  * Segment of a splitfile, for insertion purposes.
  */
 public class InsertSegment {

-       final short splitfileAlgorithm;
+       final FECCodec codec;
        final SplitfileBlock[] origDataBlocks;
+       final int blockLength;
+       final BucketFactory bf;
+       /** Check blocks. Will be created by encode(...). */
+       final SplitfileBlock[] checkBlocks;

-       public InsertSegment(short splitfileAlgorithm, SplitfileBlock[] 
origDataBlocks) {
-               this.splitfileAlgorithm = splitfileAlgorithm;
+       public InsertSegment(short splitfileAlgo, SplitfileBlock[] 
origDataBlocks, int blockLength, BucketFactory bf) {
                this.origDataBlocks = origDataBlocks;
+               codec = FECCodec.getCodec(splitfileAlgo, origDataBlocks.length);
+               checkBlocks = new SplitfileBlock[codec.countCheckBlocks()];
+               this.blockLength = blockLength;
+               this.bf = bf;
        }

+       /**
+        * Get the check block URIs.
+        * Don't call before encode()! Don't call before all blocks have 
inserted either.
+        */
+       public FreenetURI[] getCheckURIs() {
+               FreenetURI[] uris = new FreenetURI[checkBlocks.length];
+               for(int i=0;i<uris.length;i++)
+                       uris[i] = checkBlocks[i].getURI();
+               return uris;
+       }
+
+       /**
+        * Encode the data blocks into check blocks.
+        * @return The number of check blocks generated.
+        */
+       public int encode(int offset) {
+               if(codec == null) return 0; // no FEC
+               for(int i=0;i<checkBlocks.length;i++)
+                       checkBlocks[i] = new BlockInserter(null, offset + i);
+               codec.encode(origDataBlocks, checkBlocks, blockLength, bf);
+               return checkBlocks.length;
+       }
+
 }

Modified: trunk/freenet/src/freenet/client/InserterContext.java
===================================================================
--- trunk/freenet/src/freenet/client/InserterContext.java       2005-11-09 
16:09:35 UTC (rev 7502)
+++ trunk/freenet/src/freenet/client/InserterContext.java       2005-11-09 
18:55:33 UTC (rev 7503)
@@ -13,13 +13,18 @@
        final boolean dontCompress;
        final RandomSource random;
        final short splitfileAlgorithm;
+       final int maxInsertBlockRetries;
+       final int maxSplitInsertThreads;

-       public InserterContext(SimpleLowLevelClient client, BucketFactory bf, 
RandomSource random) {
+       public InserterContext(SimpleLowLevelClient client, BucketFactory bf, 
RandomSource random,
+                       int maxRetries, int maxThreads) {
                this.client = client;
                this.bf = bf;
                this.random = random;
                dontCompress = false;
                splitfileAlgorithm = Metadata.SPLITFILE_ONION_STANDARD;
+               this.maxInsertBlockRetries = maxRetries;
+               this.maxSplitInsertThreads = maxThreads;
        }

 }

Modified: trunk/freenet/src/freenet/client/InserterException.java
===================================================================
--- trunk/freenet/src/freenet/client/InserterException.java     2005-11-09 
16:09:35 UTC (rev 7502)
+++ trunk/freenet/src/freenet/client/InserterException.java     2005-11-09 
18:55:33 UTC (rev 7503)
@@ -6,6 +6,8 @@
        private static final long serialVersionUID = -1106716067841151962L;

        final int mode;
+       /** For collection errors */
+       final FailureCodeTracker errorCodes;

        /** Get the failure mode. */
        public int getMode() {
@@ -14,13 +16,20 @@

        public InserterException(int m) {
                mode = m;
+               errorCodes = null;
        }

        public InserterException(int mode, IOException e) {
                this.mode = mode;
+               errorCodes = null;
                initCause(e);
        }

+       public InserterException(int mode, FailureCodeTracker errorCodes) {
+               this.mode = mode;
+               this.errorCodes = errorCodes;
+       }
+
        /** Caller supplied a URI we cannot use */
        public static final int INVALID_URI = 1;
        /** Failed to read from or write to a bucket; a kind of internal error 
*/
@@ -31,4 +40,8 @@
        public static final int REJECTED_OVERLOAD = 4;
        /** Couldn't find enough nodes to send the data to */
        public static final int ROUTE_NOT_FOUND = 5;
+       /** There were fatal errors in a splitfile insert. */
+       public static final int FATAL_ERRORS_IN_BLOCKS = 6;
+       /** Could not insert a splitfile because a block failed too many times 
*/
+       public static final int TOO_MANY_RETRIES_IN_BLOCKS = 7;
 }

Modified: trunk/freenet/src/freenet/client/Metadata.java
===================================================================
--- trunk/freenet/src/freenet/client/Metadata.java      2005-11-09 16:09:35 UTC 
(rev 7502)
+++ trunk/freenet/src/freenet/client/Metadata.java      2005-11-09 18:55:33 UTC 
(rev 7503)
@@ -293,6 +293,20 @@
                        throw new IllegalArgumentException();
        }

+       public Metadata(short algo, FreenetURI[] dataURIs, FreenetURI[] 
checkURIs, ClientMetadata cm, long dataLength, short compressionAlgo) {
+               documentType = SIMPLE_REDIRECT;
+               splitfile = true;
+               splitfileAlgorithm = algo;
+               this.dataLength = dataLength;
+               this.compressionCodec = compressionAlgo;
+               splitfileBlocks = dataURIs.length;
+               splitfileCheckBlocks = checkURIs.length;
+               splitfileDataKeys = dataURIs;
+               splitfileCheckKeys = checkURIs;
+               clientMetadata = cm;
+               setMIMEType(cm.getMIMEType());
+       }
+
        /**
         * Set the MIME type to a string. Compresses it if possible for transit.
         */

Modified: trunk/freenet/src/freenet/client/RetryTracker.java
===================================================================
--- trunk/freenet/src/freenet/client/RetryTracker.java  2005-11-09 16:09:35 UTC 
(rev 7502)
+++ trunk/freenet/src/freenet/client/RetryTracker.java  2005-11-09 18:55:33 UTC 
(rev 7503)
@@ -1,5 +1,6 @@
 package freenet.client;

+import java.io.IOException;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Vector;
@@ -51,6 +52,8 @@
                }
        }

+       final FailureCodeTracker fatalErrors;
+       final FailureCodeTracker nonfatalErrors;
        final HashMap levels;
        final RandomSource random;
        final int maxLevel;
@@ -60,9 +63,30 @@
        final HashSet succeededBlocks;
        private int curMaxLevel;
        private int curMinLevel;
-       
-       public RetryTracker(int maxLevel, RandomSource random) {
+       /** Maximum number of concurrently running threads */
+       final int maxThreads;
+       /** After we have successes on this many blocks, we should terminate, 
+        * even if there are threads running and blocks queued. */
+       final int targetSuccesses;
+       final boolean killOnFatalError;
+       private boolean finishOnEmpty;
+       private final RetryTrackerCallback callback;
+
+       /**
+        * Create a RetryTracker.
+        * @param maxLevel The maximum number of tries for each block.
+        * @param random The random number source.
+        * @param maxThreads The maximum number of threads to use.
+        * @param killOnFatalError Whether to terminate the tracker when a fatal
+        * error occurs on a single block.
+        * @param cb The callback to call .finish(...) when we no longer have
+        * anything to do *and* the client has set the finish on empty flag.
+        */
+       public RetryTracker(int maxLevel, int targetSuccesses, RandomSource 
random, int maxThreads, boolean killOnFatalError, RetryTrackerCallback cb) {
                levels = new HashMap();
+               fatalErrors = new FailureCodeTracker();
+               nonfatalErrors = new FailureCodeTracker();
+               this.targetSuccesses = targetSuccesses;
                this.maxLevel = maxLevel;
                this.random = random;
                curMaxLevel = curMinLevel = 0;
@@ -70,8 +94,21 @@
                failedBlocksFatalErrors = new HashSet();
                runningBlocks = new HashSet();
                succeededBlocks = new HashSet();
+               this.maxThreads = maxThreads;
+               this.killOnFatalError = killOnFatalError;
+               this.finishOnEmpty = false;
+               this.callback = cb;
        }

+       /**
+        * Set the finish-on-empty flag to true.
+        * This means that when there are no longer any blocks to process, and 
there
+        * are none running, the tracker will call the client's finish(...) 
method.
+        */
+       public synchronized void setFinishOnEmpty() {
+               finishOnEmpty = true;
+       }
+       
        /** Remove a level */
        private synchronized void removeLevel(int level) {
                Integer x = new Integer(level);
@@ -126,6 +163,7 @@
        public synchronized void addBlock(SplitfileBlock block) {
                Level l = makeLevel(0);
                l.add(block);
+               maybeStart(true);
        }

        /**
@@ -133,7 +171,8 @@
         * Move it out of the running list and back into the relevant list, 
unless
         * we have run out of retries.
         */
-       public synchronized void nonfatalError(SplitfileBlock block) {
+       public synchronized void nonfatalError(SplitfileBlock block, int 
reasonCode) {
+               nonfatalErrors.inc(reasonCode);
                runningBlocks.remove(block);
                Level l = block.getLevel();
                if(l == null) throw new IllegalArgumentException();
@@ -147,24 +186,57 @@
                        Level newLevel = makeLevel(levelNumber);
                        newLevel.add(block);
                }
+               maybeStart(false);
        }

        /**
         * A block got a fatal error and should not be retried.
         * Move it into the fatal error list.
+        * @param reasonCode A client-specific code indicating the type of 
failure.
         */
-       public synchronized void fatalError(SplitfileBlock block) {
+       public synchronized void fatalError(SplitfileBlock block, int 
reasonCode) {
+               fatalErrors.inc(reasonCode);
                runningBlocks.remove(block);
                Level l = block.getLevel();
                if(l == null) throw new IllegalArgumentException();
                if(l.tracker != this) throw new 
IllegalArgumentException("Belongs to wrong tracker");
                l.remove(block);
                failedBlocksFatalErrors.add(block);
+               maybeStart(false);
        }

+       /**
+        * If we can start some blocks, start some blocks.
+        * Otherwise if we are finished, call the callback's finish method.
+        */
+       public synchronized void maybeStart(boolean cantCallFinished) {
+               if((succeededBlocks.size() >= targetSuccesses)
+                               || (runningBlocks.isEmpty() && levels.isEmpty() 
&& finishOnEmpty)) {
+                       SplitfileBlock[] running = runningBlocks();
+                       for(int i=0;i<running.length;i++) {
+                               running[i].kill();
+                       }
+                       if(!cantCallFinished)
+                               callback.finished(succeededBlocks(), 
failedBlocks(), fatalErrorBlocks());
+                       else {
+                               Runnable r = new Runnable() { public void run() 
{ callback.finished(succeededBlocks(), failedBlocks(), fatalErrorBlocks()); } };
+                               Thread t = new Thread(r);
+                               t.setDaemon(true);
+                               t.start();
+                       }
+               } else {
+                       while(runningBlocks.size() < maxThreads) {
+                               SplitfileBlock block = getBlock();
+                               block.start();
+                               runningBlocks.add(block);
+                       }
+               }
+       }
+
        public synchronized void success(SplitfileBlock block) {
                runningBlocks.remove(block);
                succeededBlocks.add(block);
+               maybeStart(false);
        }

        /**
@@ -188,12 +260,20 @@
         * Get all blocks with fatal errors.
         * SplitfileBlock's are assumed to remember their errors, so we don't.
         */
-       public synchronized SplitfileBlock[] errorBlocks() {
+       public synchronized SplitfileBlock[] fatalErrorBlocks() {
                return (SplitfileBlock[])
                        failedBlocksFatalErrors.toArray(new 
SplitfileBlock[failedBlocksFatalErrors.size()]);
        }

        /**
+        * Get all blocks which didn't succeed in the maximum number of tries.
+        */
+       public synchronized SplitfileBlock[] failedBlocks() {
+               return (SplitfileBlock[])
+               failedBlocksTooManyRetries.toArray(new 
SplitfileBlock[failedBlocksTooManyRetries.size()]);
+       }
+       
+       /**
         * Get all successfully downloaded blocks.
         */
        public synchronized SplitfileBlock[] succeededBlocks() {
@@ -229,4 +309,12 @@
        public synchronized boolean moreBlocks() {
                return !levels.isEmpty();
        }
+
+       public FailureCodeTracker getAccumulatedFatalErrorCodes() {
+               return fatalErrors;
+       }
+       
+       public FailureCodeTracker getAccumulatedNonFatalErrorCodes() {
+               return nonfatalErrors;
+       }
 }

Added: trunk/freenet/src/freenet/client/RetryTrackerCallback.java
===================================================================
--- trunk/freenet/src/freenet/client/RetryTrackerCallback.java  2005-11-09 
16:09:35 UTC (rev 7502)
+++ trunk/freenet/src/freenet/client/RetryTrackerCallback.java  2005-11-09 
18:55:33 UTC (rev 7503)
@@ -0,0 +1,16 @@
+package freenet.client;
+
+/**
+ * Object passed to RetryTracker. This is called when RetryTracker finishes.
+ */
+public interface RetryTrackerCallback {
+
+       /**
+        * Notify the caller that we have finished.
+        * @param succeeded The blocks which succeeded.
+        * @param failed The blocks which failed.
+        * @param fatalErrors The blocks which got fatal errors.
+        */
+       void finished(SplitfileBlock[] succeeded, SplitfileBlock[] failed, 
SplitfileBlock[] fatalErrors);
+
+}

Modified: trunk/freenet/src/freenet/client/Segment.java
===================================================================
--- trunk/freenet/src/freenet/client/Segment.java       2005-11-09 16:09:35 UTC 
(rev 7502)
+++ trunk/freenet/src/freenet/client/Segment.java       2005-11-09 18:55:33 UTC 
(rev 7503)
@@ -9,7 +9,6 @@
 import freenet.keys.FreenetURI;
 import freenet.support.Bucket;
 import freenet.support.BucketTools;
-import freenet.support.Logger;

 /**
  * A segment, within a splitfile.
@@ -17,152 +16,13 @@
  * 
  * Does not require locking, because all locking goes through the parent 
Segment.
  */
-public class Segment implements Runnable {
+public class Segment implements RetryTrackerCallback {

-       public class BlockStatus extends SplitfileBlock implements Runnable {
-
-               /** Splitfile index - [0,k[ is the data blocks, [k,n[ is the 
check blocks */
-               final int index;
-               final FreenetURI uri;
-               int completedTries;
-               Bucket fetchedData;
-               boolean actuallyFetched;
-               
-               public BlockStatus(FreenetURI freenetURI, int index) {
-                       uri = freenetURI;
-                       completedTries = 0;
-                       fetchedData = null;
-                       this.index = index;
-                       actuallyFetched = false;
-               }
-
-               public void startFetch() {
-                       if(fetchedData != null) {
-                               throw new IllegalStateException("Already have 
data");
-                       }
-                       synchronized(runningFetches) {
-                               runningFetches.add(this);
-                               try {
-                                       Thread t = new Thread(this);
-                                       t.setDaemon(true);
-                                       t.start();
-                               } catch (Throwable error) {
-                                       runningFetches.remove(this);
-                                       Logger.error(this, "Caught "+error);
-                               }
-                       }
-               }
-
-               public void run() {
-                       // Already added to runningFetches.
-                       // But need to make sure we are removed when we exit.
-                       try {
-                               // Do the fetch
-                               Fetcher f = new Fetcher(uri, blockFetchContext);
-                               try {
-                                       FetchResult fr = f.realRun(new 
ClientMetadata(), recursionLevel, uri, 
-                                                       (!nonFullBlocksAllowed) 
|| fetcherContext.dontEnterImplicitArchives);
-                                       actuallyFetched = true;
-                                       fetchedData = fr.data;
-                               } catch (MetadataParseException e) {
-                                       fatalError(e);
-                               } catch (FetchException e) {
-                                       int code = e.getMode();
-                                       switch(code) {
-                                       case FetchException.ARCHIVE_FAILURE:
-                                       case FetchException.BLOCK_DECODE_ERROR:
-                                       case 
FetchException.HAS_MORE_METASTRINGS:
-                                       case FetchException.INVALID_METADATA:
-                                       case FetchException.NOT_IN_ARCHIVE:
-                                       case 
FetchException.TOO_DEEP_ARCHIVE_RECURSION:
-                                       case 
FetchException.TOO_MANY_ARCHIVE_RESTARTS:
-                                       case 
FetchException.TOO_MANY_METADATA_LEVELS:
-                                       case FetchException.TOO_MANY_REDIRECTS:
-                                       case FetchException.TOO_MUCH_RECURSION:
-                                       case FetchException.UNKNOWN_METADATA:
-                                       case 
FetchException.UNKNOWN_SPLITFILE_METADATA:
-                                               // Fatal, probably an error on 
insert
-                                               fatalError(e);
-                                               return;
-                                       
-                                       case FetchException.DATA_NOT_FOUND:
-                                       case FetchException.ROUTE_NOT_FOUND:
-                                       case FetchException.REJECTED_OVERLOAD:
-                                       case FetchException.TRANSFER_FAILED:
-                                               // Non-fatal
-                                               nonfatalError(e);
-                                               
-                                       case FetchException.BUCKET_ERROR:
-                                       case FetchException.INTERNAL_ERROR:
-                                               // Maybe fatal
-                                               nonfatalError(e);
-                                       }
-                               } catch (ArchiveFailureException e) {
-                                       fatalError(e);
-                               } catch (ArchiveRestartException e) {
-                                       fatalError(e);
-                               }
-                       } finally {
-                               completedTries++;
-                               // Add before removing from runningFetches, to 
avoid race
-                               synchronized(recentlyCompletedFetches) {
-                                       recentlyCompletedFetches.add(this);
-                               }
-                               synchronized(runningFetches) {
-                                       runningFetches.remove(this);
-                               }
-                               synchronized(Segment.this) {
-                                       Segment.this.notify();
-                               }
-                       }
-               }
-
-               private void fatalError(Exception e) {
-                       Logger.normal(this, "Giving up on block: "+this+": "+e);
-                       completedTries = -1;
-               }
-
-               private void nonfatalError(Exception e) {
-                       Logger.minor(this, "Non-fatal error on "+this+": "+e);
-               }
-               
-               public boolean succeeded() {
-                       return fetchedData != null;
-               }
-
-               /**
-                * Queue a healing block for insert.
-                * Will be implemented using the download manager.
-                * FIXME: implement!
-                */
-               public void queueHeal() {
-                       // TODO Auto-generated method stub
-                       
-               }
-
-               public int getNumber() {
-                       return index;
-               }
-
-               public boolean hasData() {
-                       return fetchedData != null;
-               }
-
-               public Bucket getData() {
-                       return fetchedData;
-               }
-
-               public void setData(Bucket data) {
-                       actuallyFetched = false;
-                       fetchedData = data;
-               }
-       }
-
        final short splitfileType;
        final FreenetURI[] dataBlocks;
        final FreenetURI[] checkBlocks;
-       final BlockStatus[] dataBlockStatus;
-       final BlockStatus[] checkBlockStatus;
+       final BlockFetcher[] dataBlockStatus;
+       final BlockFetcher[] checkBlockStatus;
        final int minFetched;
        private Vector blocksNotTried;
        final SplitFetcher parentFetcher;
@@ -179,25 +39,15 @@
        /** Bucket to store the data retrieved, after it has been decoded */
        private Bucket decodedData;
        /** Recently completed fetches */
-       private final LinkedList recentlyCompletedFetches;
-       /** Total number of successfully fetched blocks */
-       private int totalFetched;
+       final LinkedList recentlyCompletedFetches;
        /** Running fetches */
-       private LinkedList runningFetches;
-       /** Minimum retry level of any BlockStatus; this is the largest integer 
n such that
-        * blocksNotTried.get(n-1) is empty. Initially 0.
-        */
-       private int minRetryLevel;
-       /** Maximum retry level. */
-       private final int maxRetryLevel;
+       LinkedList runningFetches;
        /** Fetch context for block fetches */
-       private final FetcherContext blockFetchContext;
+       final FetcherContext blockFetchContext;
        /** Recursion level */
-       private final int recursionLevel;
-       /** Number of blocks which got fatal errors */
-       private int fatalErrorCount;
+       final int recursionLevel;
        /** Retry tracker */
-       private RetryTracker tracker;
+       private final RetryTracker tracker;

        /**
         * Create a Segment.
@@ -210,6 +60,13 @@
                this.splitfileType = splitfileType;
                dataBlocks = splitfileDataBlocks;
                checkBlocks = splitfileCheckBlocks;
+               if(splitfileType == Metadata.SPLITFILE_NONREDUNDANT) {
+                       minFetched = dataBlocks.length;
+               } else if(splitfileType == Metadata.SPLITFILE_ONION_STANDARD) {
+                       minFetched = dataBlocks.length;
+               } else throw new MetadataParseException("Unknown splitfile 
type"+splitfileType);
+               tracker = new RetryTracker(fctx.maxSplitfileBlockRetries, 
splitfileDataBlocks.length, fctx.random, fctx.maxSplitfileThreads, false, this);
+               // Don't add blocks to tracker yet, because don't want to start 
fetch yet.
                parentFetcher = fetcher;
                archiveContext = actx;
                fetcherContext = fctx;
@@ -219,28 +76,21 @@
                finished = false;
                fetchError = -1;
                decodedData = null;
-               dataBlockStatus = new BlockStatus[dataBlocks.length];
-               checkBlockStatus = new BlockStatus[checkBlocks.length];
+               dataBlockStatus = new BlockFetcher[dataBlocks.length];
+               checkBlockStatus = new BlockFetcher[checkBlocks.length];
                blocksNotTried = new Vector();
-               maxRetryLevel = fetcherContext.maxSplitfileBlockRetries;
                Vector firstSet = new 
Vector(dataBlocks.length+checkBlocks.length);
                blocksNotTried.add(0, firstSet);
                for(int i=0;i<dataBlocks.length;i++) {
-                       dataBlockStatus[i] = new BlockStatus(dataBlocks[i], i);
+                       dataBlockStatus[i] = new BlockFetcher(this, tracker, 
dataBlocks[i], i, fctx.dontEnterImplicitArchives);
                        firstSet.add(dataBlockStatus[i]);
                }
                for(int i=0;i<checkBlocks.length;i++) {
-                       checkBlockStatus[i] = new BlockStatus(checkBlocks[i], 
dataBlockStatus.length + i);
+                       checkBlockStatus[i] = new BlockFetcher(this, tracker, 
checkBlocks[i], dataBlockStatus.length + i, fctx.dontEnterImplicitArchives);
                        firstSet.add(checkBlockStatus[i]);
                }
                recentlyCompletedFetches = new LinkedList();
                runningFetches = new LinkedList();
-               if(splitfileType == Metadata.SPLITFILE_NONREDUNDANT) {
-                       minFetched = dataBlocks.length;
-               } else if(splitfileType == Metadata.SPLITFILE_ONION_STANDARD) {
-                       minFetched = dataBlocks.length;
-               } else throw new MetadataParseException("Unknown splitfile type 
"+splitfileType);
-               minRetryLevel = 0;
                this.recursionLevel = recursionLevel;
                // FIXME be a bit more flexible here depending on flags
                blockFetchContext = new FetcherContext(fetcherContext, 
FetcherContext.SPLITFILE_DEFAULT_BLOCK_MASK);
@@ -293,80 +143,46 @@

        /**
         * Start the Segment fetching the data. When it has finished fetching, 
it will
-        * notify the SplitFetcher.
+        * notify the SplitFetcher. Note that we must not start fetching until 
this
+        * method is called, because of the requirement to not fetch all 
segments
+        * simultaneously.
         */
        public void start() {
                started = true;
-               Thread t = new Thread(this);
-               t.setDaemon(true);
-               t.start();
+               for(int i=0;i<dataBlockStatus.length;i++) {
+                       tracker.addBlock(dataBlockStatus[i]);
+               }
+               for(int i=0;i<checkBlockStatus.length;i++) {
+                       tracker.addBlock(checkBlockStatus[i]);
+               }
+               tracker.setFinishOnEmpty();
        }

        /**
-        * Fetch the data.
-        * Tell the SplitFetcher.
-        * Decode the data.
-        * Tell the SplitFetcher.
-        * If there is an error, tell the SplitFetcher and exit.
+        * How many fetches are running?
         */
-       public void run() {
-               // Create a number of fetcher threads.
-               // Wait for any thread to complete (success or failure).
-               // Retry if necessary, up to N times per block.
-               
-               for(int i=0;i<fetcherContext.maxSplitfileThreads;i++) {
-                       startFetch(); // ignore return value
+       private int runningFetches() {
+               synchronized(runningFetches) {
+                       return runningFetches.size();
                }
+       }
+
+       /**
+        * Once we have enough data to decode, tell parent, and decode it.
+        */
+       public void finished(SplitfileBlock[] succeeded, SplitfileBlock[] 
failed, SplitfileBlock[] fatalErrors) {

-               while(true) {
-               
-                       // Now wait for any thread to complete
-                       synchronized(this) {
-                               try {
-                                       wait(10*1000);
-                               } catch (InterruptedException e) {
-                                       // Ignore
-                               }
-                       }
-                       
-                       while(true) {
-                               BlockStatus block;
-                               synchronized(this) {
-                                       block = (BlockStatus) 
recentlyCompletedFetches.removeFirst();
-                               }
-                               if(block == null) break;
-                               if(!block.succeeded()) {
-                                       // Retry
-                                       int retryLevel = block.completedTries;
-                                       if(retryLevel == maxRetryLevel || 
retryLevel == -1) {
-                                               if(retryLevel == -1)
-                                                       fatalErrorCount++;
-                                               // This block failed
-                                       } else {
-                                               Vector levelSet = (Vector) 
blocksNotTried.get(retryLevel);
-                                               levelSet.add(block);
-                                       }
-                               } else {
-                                       // Succeeded
-                                       totalFetched++;
-                               }
-                               // Either way, start a new fetch
-                               if(!startFetch()) {
-                                       // Can't start a fetch
-                                       if(runningFetches() == 0) {
-                                               // Failed
-                                               
parentFetcher.failedNotEnoughBlocks(this);
-                                               return;
-                                       }
-                               }
-                       }
-                       
-                       if(totalFetched >= minFetched) {
-                               // Success! Go to next phase
-                               break;
-                       }
+               if(succeeded.length > minFetched)
+                       successfulFetch();
+               else {
+                       parentFetcher.failed(this, minFetched, 
succeeded.length, failed.length, fatalErrors.length);
                }
-               
+       }
+
+       /**
+        * Successful fetch, do the decode, tell the parent, etc.
+        */
+       private void successfulFetch() {
                parentFetcher.gotBlocks(this);

                // Now decode
@@ -382,7 +198,7 @@
                        Bucket output = 
fetcherContext.bucketFactory.makeBucket(-1);
                        OutputStream os = output.getOutputStream();
                        for(int i=0;i<dataBlockStatus.length;i++) {
-                               BlockStatus status = dataBlockStatus[i];
+                               BlockFetcher status = dataBlockStatus[i];
                                Bucket data = status.fetchedData;
                                BucketTools.copyTo(data, os, Long.MAX_VALUE);
                                fetcherContext.bucketFactory.freeBucket(data);
@@ -402,7 +218,7 @@

                // Now insert *ALL* blocks on which we had at least one 
failure, and didn't eventually succeed
                for(int i=0;i<dataBlockStatus.length;i++) {
-                       BlockStatus block = dataBlockStatus[i];
+                       BlockFetcher block = dataBlockStatus[i];
                        if(block.actuallyFetched) continue;
                        if(block.completedTries == 0) {
                                // 80% chance of not inserting, if we never 
tried it
@@ -411,33 +227,4 @@
                        block.queueHeal();
                }
        }
-
-       /**
-        * How many fetches are running?
-        */
-       private int runningFetches() {
-               synchronized(runningFetches) {
-                       return runningFetches.size();
-               }
-       }
-
-       /**
-        * Start a fetch.
-        * @return True if we started a fetch, false if there was nothing to 
start.
-        */
-       private boolean startFetch() {
-               if(minRetryLevel == maxRetryLevel) return false; // nothing to 
start
-               // Don't need to synchronize as these are only accessed by main 
thread
-               while(true) {
-                       if(minRetryLevel >= blocksNotTried.size())
-                               return false;
-                       Vector v = (Vector) blocksNotTried.get(minRetryLevel);
-                       int len = v.size();
-                       int idx = fetcherContext.random.nextInt(len);
-                       BlockStatus b = (BlockStatus) v.remove(idx);
-                       if(v.isEmpty()) minRetryLevel++;
-                       b.startFetch();
-                       return true;
-               }
-       }
 }

Modified: trunk/freenet/src/freenet/client/SplitFetcher.java
===================================================================
--- trunk/freenet/src/freenet/client/SplitFetcher.java  2005-11-09 16:09:35 UTC 
(rev 7502)
+++ trunk/freenet/src/freenet/client/SplitFetcher.java  2005-11-09 18:55:33 UTC 
(rev 7503)
@@ -195,22 +195,32 @@
                return output;
        }

-       public void failedNotEnoughBlocks(Segment segment) {
+       public void gotBlocks(Segment segment) {
                // TODO Auto-generated method stub

        }

-       public void gotBlocks(Segment segment) {
+       public void decoded(Segment segment, Bucket output) {
                // TODO Auto-generated method stub

        }

-       public void decoded(Segment segment, Bucket output) {
+       public void internalBucketError(Segment segment, IOException e) {
                // TODO Auto-generated method stub

        }

-       public void internalBucketError(Segment segment, IOException e) {
+       /**
+        * The segment fetch failed.
+        * @param segment The segment that failed.
+        * @param minFetched The minimum number of successful blocks for a 
successful fetch.
+        * @param successfulBlocks The number of blocks successfully fetched.
+        * @param failedBlocks The number of blocks that failed because they got
+        * non-fatal errors on every try, and ran out of retries.
+        * @param fatalErrors The number of blocks that got fatal errors.
+        */
+       public void failed(Segment segment, int minFetched, int 
successfulBlocks, 
+                       int failedBlocks, int fatalErrors) {
                // TODO Auto-generated method stub

        }

Modified: trunk/freenet/src/freenet/client/SplitInserter.java
===================================================================
--- trunk/freenet/src/freenet/client/SplitInserter.java 2005-11-09 16:09:35 UTC 
(rev 7502)
+++ trunk/freenet/src/freenet/client/SplitInserter.java 2005-11-09 18:55:33 UTC 
(rev 7503)
@@ -1,5 +1,6 @@
 package freenet.client;

+import java.io.IOException;
 import java.util.Vector;

 import freenet.keys.FreenetURI;
@@ -11,7 +12,7 @@
 /**
  * Insert a splitfile.
  */
-public class SplitInserter {
+public class SplitInserter implements RetryTrackerCallback {

        final Bucket origData;
        final long dataLength;
@@ -19,10 +20,25 @@
        final short compressionCodec;
        final short splitfileAlgorithm;
        final InserterContext ctx;
+       final RetryTracker tracker;
+       final int segmentSize;
+       final int checkSegmentSize;
+       final int blockSize;
        SplitfileBlock[] origDataBlocks;
+       InsertSegment encodingSegment;
+       InsertSegment[] segments;
+       final Vector unstartedSegments = new Vector();
+       private boolean allSegmentsFinished = false;
+       private int succeeded;
+       private int failed;
+       private int fatalErrors;
+       private int countCheckBlocks;
+       private SplitfileBlock[] fatalErrorBlocks;
+       private FileInserter inserter;

-       public SplitInserter(Bucket data, ClientMetadata clientMetadata, 
Compressor compressor, short splitfileAlgorithm, InserterContext ctx) {
+       public SplitInserter(Bucket data, ClientMetadata clientMetadata, 
Compressor compressor, short splitfileAlgorithm, InserterContext ctx, 
FileInserter inserter, int blockLength) throws InserterException {
                this.origData = data;
+               this.blockSize = blockLength;
                this.clientMetadata = clientMetadata;
                if(compressor == null)
                        compressionCodec = -1;
@@ -31,110 +47,157 @@
                this.splitfileAlgorithm = splitfileAlgorithm;
                this.ctx = ctx;
                this.dataLength = data.size();
+               segmentSize = 
FECCodec.getCodecMaxSegmentDataBlocks(splitfileAlgorithm);
+               checkSegmentSize = 
FECCodec.getCodecMaxSegmentCheckBlocks(splitfileAlgorithm);
+               try {
+                       splitIntoBlocks();
+               } catch (IOException e) {
+                       throw new 
InserterException(InserterException.BUCKET_ERROR);
+               }
+               tracker = new RetryTracker(ctx.maxInsertBlockRetries, 0, 
ctx.random, ctx.maxSplitInsertThreads, true, this);
+               this.inserter = inserter;
        }

-       InsertSegment encodingSegment;
-       InsertSegment[] segments;
-       final Vector unstartedSegments = new Vector();
-       boolean allSegmentsFinished = false;
-       
        /**
         * Inserts the splitfile.
         * @return The URI of the resulting file.
+        * @throws InserterException If we are not able to insert the splitfile.
         */
-       public FreenetURI run() {
-               // Create the splitfile
-               int segmentSize = 
FECCodec.getCodecMaxSegmentSize(splitfileAlgorithm);
-       
-               splitIntoBlocks();
-               
+       public FreenetURI run() throws InserterException {
+               startInsertingDataBlocks();
                splitIntoSegments(segmentSize);
+               // Backwards, because the last is the shortest
+               for(int i=segments.length-1;i>=0;i--)
+                       countCheckBlocks += encodeSegment(i, 
origDataBlocks.length + checkSegmentSize * i);
+               // Wait for the insertion thread to finish
+               return waitForCompletion();
+       }
+
+       private FreenetURI waitForCompletion() throws InserterException {
+               synchronized(this) {
+                       while(!allSegmentsFinished) {
+                               try {
+                                       wait(10*1000);
+                               } catch (InterruptedException e) {
+                                       // Ignore
+                               }
+                       }
+               }
+
+               // Did we succeed?

-               // Encode the last segment (which is always shortest)
+               if(fatalErrors > 0) {
+                       throw new 
InserterException(InserterException.FATAL_ERRORS_IN_BLOCKS, 
tracker.getAccumulatedFatalErrorCodes());
+               }

-               encodeSegment(segments.length-1);
+               if(failed > 0) {
+                       throw new 
InserterException(InserterException.TOO_MANY_RETRIES_IN_BLOCKS, 
tracker.getAccumulatedNonFatalErrorCodes());
+               }

-               // Then start the insertion thread
+               // Okay, we succeeded... create the manifest

-               startInsertionThread();
+               Metadata metadata = new Metadata(splitfileAlgorithm, 
getDataURIs(), getCheckURIs(), clientMetadata, dataLength, compressionCodec);

-               // Then encode the rest
+               Bucket mbucket;
+               try {
+                       mbucket = BucketTools.makeImmutableBucket(ctx.bf, 
metadata.writeToByteArray());
+               } catch (IOException e) {
+                       throw new 
InserterException(InserterException.BUCKET_ERROR);
+               }
+
+               if(inserter == null)
+                       inserter = new FileInserter(ctx);

-               for(int i=0;i<segments.length-1;i++)
-                       encodeSegment(i);
+               InsertBlock mblock = new InsertBlock(mbucket, clientMetadata, 
FreenetURI.EMPTY_CHK_URI);

-               // Then wait for the insertion thread to finish
-               
-               return waitForCompletion();
+               return inserter.run(mblock, true);
        }

-       private void splitIntoBlocks() {
-               Bucket[] dataBuckets = BucketTools.split(origData, 
NodeCHK.BLOCK_SIZE);
-               origDataBlocks = new SplitfileBlock[dataBuckets.length];
-               for(int i=0;i<origDataBlocks.length;i++) {
-                       origDataBlocks[i] = new BucketWrapper(dataBuckets[i], 
i);
+       private FreenetURI[] getCheckURIs() {
+               // Copy check blocks from each segment into a FreenetURI[].
+               FreenetURI[] uris = new FreenetURI[countCheckBlocks];
+               int x = 0;
+               for(int i=0;i<segments.length;i++) {
+                       FreenetURI[] segURIs = segments[i].getCheckURIs();
+                       System.arraycopy(segURIs, 0, uris, x, segURIs.length);
+                       x += segURIs.length;
                }
+
+               if(uris.length != x)
+                       throw new IllegalStateException("Total is wrong");
+               
+               return uris;
        }

+       private FreenetURI[] getDataURIs() {
+               FreenetURI[] uris = new FreenetURI[origDataBlocks.length];
+               for(int i=0;i<uris.length;i++)
+                       uris[i] = origDataBlocks[i].getURI();
+               return uris;
+       }
+
+       private int encodeSegment(int i, int offset) {
+               encodingSegment = segments[i];
+               return encodingSegment.encode(offset);
+       }
+
        /**
-        * Create the metadata document. Insert it. Return its URI.
+        * Start the insert, by adding all the data blocks.
         */
-       private FreenetURI finalStatus() {
-               // TODO Auto-generated method stub
-               return null;
+       private void startInsertingDataBlocks() {
+               for(int i=0;i<origDataBlocks.length;i++)
+                       tracker.addBlock(origDataBlocks[i]);
        }

        /**
+        * Split blocks into segments for encoding.
+        * @throws IOException If there is a bucket error encoding the file.
+        */
+       private void splitIntoBlocks() throws IOException {
+               Bucket[] dataBuckets = BucketTools.split(origData, 
NodeCHK.BLOCK_SIZE, ctx.bf);
+               origDataBlocks = new SplitfileBlock[dataBuckets.length];
+               for(int i=0;i<origDataBlocks.length;i++) {
+                       origDataBlocks[i] = new BlockInserter(dataBuckets[i], 
i);
+               }
+       }
+
+       /**
         * Group the blocks into segments.
         */
        private void splitIntoSegments(int segmentSize) {
                int dataBlocks = origDataBlocks.length;

+               Vector segs = new Vector();
+               
                // First split the data up
                if(dataBlocks < segmentSize || segmentSize == -1) {
                        // Single segment
-                       InsertSegment onlySeg = new 
InsertSegment(splitfileAlgorithm, origDataBlocks);
-                       unstartedSegments.add(new InsertSegment[] { onlySeg });
+                       InsertSegment onlySeg = new 
InsertSegment(splitfileAlgorithm, origDataBlocks, blockSize, ctx.bf);
+                       segs.add(onlySeg);
                } else {
                        int j = 0;
                        for(int i=segmentSize;;i+=segmentSize) {
                                if(i > dataBlocks) i = dataBlocks;
-                               Bucket[] seg = new Bucket[i-j];
+                               SplitfileBlock[] seg = new SplitfileBlock[i-j];
                                System.arraycopy(origDataBlocks, j, seg, 0, 
i-j);
                                unstartedSegments.add(seg);
                                j = i;
+                               segs.add(new InsertSegment(splitfileAlgorithm, 
seg, blockSize, ctx.bf));
                                if(i == dataBlocks) break;
                        }
                }
                segments = (InsertSegment[]) unstartedSegments.toArray(new 
InsertSegment[unstartedSegments.size()]);
        }

-       public static class BucketWrapper implements SplitfileBlock {
-
-               Bucket data;
-               int number;
-               
-               public BucketWrapper(Bucket data, int number) {
-                       this.data = data;
-                       this.number = number;
+       public void finished(SplitfileBlock[] succeeded, SplitfileBlock[] 
failed, SplitfileBlock[] fatalErrors) {
+               synchronized(this) {
+                       allSegmentsFinished = true;
+                       this.succeeded = succeeded.length;
+                       this.failed = failed.length;
+                       this.fatalErrorBlocks = fatalErrors;
+                       this.fatalErrors = fatalErrorBlocks.length;
+                       notify();
                }
-
-               public int getNumber() {
-                       return number;
-               }
-
-               public boolean hasData() {
-                       return data != null;
-               }
-
-               public Bucket getData() {
-                       return data;
-               }
-
-               public void setData(Bucket data) {
-                       this.data = data;
-               }
-
        }

 }

Modified: trunk/freenet/src/freenet/client/SplitfileBlock.java
===================================================================
--- trunk/freenet/src/freenet/client/SplitfileBlock.java        2005-11-09 
16:09:35 UTC (rev 7502)
+++ trunk/freenet/src/freenet/client/SplitfileBlock.java        2005-11-09 
18:55:33 UTC (rev 7503)
@@ -1,6 +1,7 @@
 package freenet.client;

 import freenet.client.RetryTracker.Level;
+import freenet.keys.FreenetURI;
 import freenet.support.Bucket;

 /** Simple interface for a splitfile block */
@@ -27,4 +28,19 @@
        final void setLevel(Level l) {
                level = l;
        }
+
+       /** Start the fetch (or insert). Implementation is required to call 
relevant
+        * methods on RetryTracker when done. */
+       abstract void start();
+
+       /**
+        * Shut down the fetch as soon as reasonably possible.
+        */
+       abstract public void kill();
+
+       /**
+        * Get the URI of the file. For an insert, this is derived during 
insert.
+        * For a request, it is fixed in the constructor.
+        */
+       abstract public FreenetURI getURI();
 }

Modified: trunk/freenet/src/freenet/client/StandardOnionFECCodec.java
===================================================================
--- trunk/freenet/src/freenet/client/StandardOnionFECCodec.java 2005-11-09 
16:09:35 UTC (rev 7502)
+++ trunk/freenet/src/freenet/client/StandardOnionFECCodec.java 2005-11-09 
18:55:33 UTC (rev 7503)
@@ -8,7 +8,6 @@
 import com.onionnetworks.fec.FECCode;
 import com.onionnetworks.util.Buffer;

-import freenet.client.Segment.BlockStatus;
 import freenet.support.Bucket;
 import freenet.support.BucketFactory;
 import freenet.support.LRUHashtable;
@@ -174,7 +173,7 @@
                }
        }

-       public void encode(BlockStatus[] dataBlockStatus, BlockStatus[] 
checkBlockStatus, int blockLength, BucketFactory bucketFactory) {
+       public void encode(SplitfileBlock[] dataBlockStatus, SplitfileBlock[] 
checkBlockStatus, int blockLength, BucketFactory bucketFactory) {
                // TODO Auto-generated method stub

        }

Modified: trunk/freenet/src/freenet/node/Version.java
===================================================================
--- trunk/freenet/src/freenet/node/Version.java 2005-11-09 16:09:35 UTC (rev 
7502)
+++ trunk/freenet/src/freenet/node/Version.java 2005-11-09 18:55:33 UTC (rev 
7503)
@@ -20,7 +20,7 @@
        public static final String protocolVersion = "1.0";

        /** The build number of the current revision */
-       public static final int buildNumber = 144;
+       public static final int buildNumber = 145;

        /** Oldest build of Fred we will talk to */
        public static final int lastGoodBuild = 144;

Modified: trunk/freenet/src/freenet/support/BucketTools.java
===================================================================
--- trunk/freenet/src/freenet/support/BucketTools.java  2005-11-09 16:09:35 UTC 
(rev 7502)
+++ trunk/freenet/src/freenet/support/BucketTools.java  2005-11-09 18:55:33 UTC 
(rev 7503)
@@ -357,4 +357,40 @@
                        os.write(buf, 0, bytes);
                }
        }
+
+       /**
+        * Split the data into a series of read-only Bucket's.
+        * @param origData The original data Bucket.
+        * @param splitSize The number of bytes to put into each bucket.
+        * 
+        * FIXME This could be made many orders of magnitude more efficient on
+        * time and space if the underlying Bucket happens to be a passed-in
+        * plaintext file!
+        * 
+        * Note that this method will allocate a buffer of size splitSize.
+        * @throws IOException If there is an error creating buckets, reading 
from
+        * the provided bucket, or writing to created buckets.
+        */
+       public static Bucket[] split(Bucket origData, int splitSize, 
BucketFactory bf) throws IOException {
+               long length = origData.size();
+               if(length > Integer.MAX_VALUE * splitSize)
+                       throw new IllegalArgumentException("Way too big!");
+               int bucketCount = (int) (length / splitSize);
+               Bucket[] buckets = new Bucket[bucketCount];
+               if(length % splitSize > 0) bucketCount++;
+               InputStream is = origData.getInputStream();
+               DataInputStream dis = new DataInputStream(is);
+               long remainingLength = length;
+               byte[] buf = new byte[splitSize];
+               for(int i=0;i<bucketCount;i++) {
+                       int len = (int) Math.min(splitSize, remainingLength);
+                       Bucket bucket = bf.makeBucket(len);
+                       dis.readFully(buf, 0, len);
+                       OutputStream os = bucket.getOutputStream();
+                       os.write(buf, 0, len);
+                       os.close();
+               }
+               dis.close();
+               return buckets;
+       }
 }


Reply via email to