Author: toad
Date: 2005-10-29 19:52:46 +0000 (Sat, 29 Oct 2005)
New Revision: 7469

Modified:
   trunk/freenet/src/freenet/client/Fetcher.java
   trunk/freenet/src/freenet/client/FetcherContext.java
   trunk/freenet/src/freenet/client/HighLevelSimpleClientImpl.java
   trunk/freenet/src/freenet/client/Segment.java
   trunk/freenet/src/freenet/client/SplitFetcher.java
   trunk/freenet/src/freenet/node/SimpleLowLevelClient.java
   trunk/freenet/src/freenet/support/BucketTools.java
Log:
More work on splitfiles.

Modified: trunk/freenet/src/freenet/client/Fetcher.java
===================================================================
--- trunk/freenet/src/freenet/client/Fetcher.java       2005-10-29 18:12:27 UTC 
(rev 7468)
+++ trunk/freenet/src/freenet/client/Fetcher.java       2005-10-29 19:52:46 UTC 
(rev 7469)
@@ -93,7 +93,7 @@
                        throw new 
FetchException(FetchException.TOO_MUCH_RECURSION);

                // Do the fetch
-               KeyBlock block = ctx.client.getKey(key);
+               KeyBlock block = ctx.client.getKey(key, ctx.localRequestOnly);

                byte[] data;
                try {

Modified: trunk/freenet/src/freenet/client/FetcherContext.java
===================================================================
--- trunk/freenet/src/freenet/client/FetcherContext.java        2005-10-29 
18:12:27 UTC (rev 7468)
+++ trunk/freenet/src/freenet/client/FetcherContext.java        2005-10-29 
19:52:46 UTC (rev 7469)
@@ -16,12 +16,20 @@
        final int maxRecursionLevel;
        final int maxArchiveRestarts;
        final boolean dontEnterImplicitArchives;
+       final int maxSplitfileThreads;
+       final int maxSplitfileBlockRetries;
+       final int maxNonSplitfileRetries;
        final RandomSource random;
+       final boolean allowSplitfiles;
+       final boolean followRedirects;
+       final boolean localRequestOnly;

        public FetcherContext(SimpleLowLevelClient client, long curMaxLength, 
                        long curMaxTempLength, int maxRecursionLevel, int 
maxArchiveRestarts,
-                       boolean dontEnterImplicitArchives, RandomSource random,
-                       ArchiveManager archiveManager, BucketFactory 
bucketFactory) {
+                       boolean dontEnterImplicitArchives, int 
maxSplitfileThreads,
+                       int maxSplitfileBlockRetries, int 
maxNonSplitfileRetries,
+                       boolean allowSplitfiles, boolean followRedirects, 
boolean localRequestOnly,
+                       RandomSource random, ArchiveManager archiveManager, 
BucketFactory bucketFactory) {
                this.client = client;
                this.maxOutputLength = curMaxLength;
                this.maxTempLength = curMaxTempLength;
@@ -31,6 +39,12 @@
                this.maxArchiveRestarts = maxArchiveRestarts;
                this.dontEnterImplicitArchives = dontEnterImplicitArchives;
                this.random = random;
+               this.maxSplitfileThreads = maxSplitfileThreads;
+               this.maxSplitfileBlockRetries = maxSplitfileBlockRetries;
+               this.maxNonSplitfileRetries = maxNonSplitfileRetries;
+               this.allowSplitfiles = allowSplitfiles;
+               this.followRedirects = followRedirects;
+               this.localRequestOnly = localRequestOnly;
        }

 }

Modified: trunk/freenet/src/freenet/client/HighLevelSimpleClientImpl.java
===================================================================
--- trunk/freenet/src/freenet/client/HighLevelSimpleClientImpl.java     
2005-10-29 18:12:27 UTC (rev 7468)
+++ trunk/freenet/src/freenet/client/HighLevelSimpleClientImpl.java     
2005-10-29 19:52:46 UTC (rev 7469)
@@ -16,7 +16,26 @@
        static final int MAX_RECURSION = 10;
        static final int MAX_ARCHIVE_RESTARTS = 2;
        static final boolean DONT_ENTER_IMPLICIT_ARCHIVES = true;
+       /** Number of threads used by a splitfile fetch */
+       static final int SPLITFILE_THREADS = 20;
+       /** Number of retries allowed per block in a splitfile. Must be at 
least 1 as 
+        * on the first try we just check the datastore.
+        */
+       static final int SPLITFILE_BLOCK_RETRIES = 5;
+       /** Number of retries allowed on non-splitfile fetches. Unlike above, 
we always
+        * go to network. */
+       static final int NON_SPLITFILE_RETRIES = 2;
+       /** Whether to fetch splitfiles. Don't turn this off! */
+       static final boolean FETCH_SPLITFILES = true;
+       /** Whether to follow redirects etc. If false, we only fetch a plain 
block of data. 
+        * Don't turn this off either! */
+       static final boolean FOLLOW_REDIRECTS = true;
+       /** If set, only check the local datastore, don't send an actual 
request out.
+        * Don't turn this off either. */
+       static final boolean LOCAL_REQUESTS_ONLY = false;

+       
+       
        public HighLevelSimpleClientImpl(SimpleLowLevelClient client, 
ArchiveManager mgr, BucketFactory bf, RandomSource r) {
                this.client = client;
                archiveManager = mgr;
@@ -37,7 +56,10 @@
         */
        public FetchResult fetch(FreenetURI uri) throws FetchException {
                FetcherContext context = new FetcherContext(client, 
curMaxLength, curMaxTempLength, 
-                               MAX_RECURSION, MAX_ARCHIVE_RESTARTS, 
DONT_ENTER_IMPLICIT_ARCHIVES, random, archiveManager, bucketFactory);
+                               MAX_RECURSION, MAX_ARCHIVE_RESTARTS, 
DONT_ENTER_IMPLICIT_ARCHIVES, 
+                               SPLITFILE_THREADS, SPLITFILE_BLOCK_RETRIES, 
NON_SPLITFILE_RETRIES,
+                               FETCH_SPLITFILES, FOLLOW_REDIRECTS, 
LOCAL_REQUESTS_ONLY,
+                               random, archiveManager, bucketFactory);
                Fetcher f = new Fetcher(uri, context);
                return f.run();
        }

Modified: trunk/freenet/src/freenet/client/Segment.java
===================================================================
--- trunk/freenet/src/freenet/client/Segment.java       2005-10-29 18:12:27 UTC 
(rev 7468)
+++ trunk/freenet/src/freenet/client/Segment.java       2005-10-29 19:52:46 UTC 
(rev 7469)
@@ -1,18 +1,116 @@
 package freenet.client;

 import java.io.IOException;
+import java.io.InputStream;
 import java.io.OutputStream;
+import java.util.LinkedList;
+import java.util.Vector;

 import freenet.keys.FreenetURI;
+import freenet.support.Bucket;
+import freenet.support.BucketTools;
+import freenet.support.Logger;

 /**
  * A segment, within a splitfile.
+ * Self-starting Runnable.
+ * 
+ * Does not require locking, because all locking goes through the parent 
Segment.
  */
-public class Segment {
+public class Segment implements Runnable {

+       public class BlockStatus implements Runnable {
+
+               final FreenetURI uri;
+               int completedTries;
+               
+               public BlockStatus(FreenetURI freenetURI) {
+                       uri = freenetURI;
+                       completedTries = 0;
+               }
+
+               public void startFetch() {
+                       synchronized(runningFetches) {
+                               runningFetches.add(this);
+                               try {
+                                       Thread t = new Thread(this);
+                                       t.setDaemon(true);
+                                       t.start();
+                               } catch (Throwable error) {
+                                       runningFetches.remove(this);
+                                       Logger.error(this, "Caught "+error);
+                               }
+                       }
+               }
+
+               public void run() {
+                       // Already added to runningFetches.
+                       // But need to make sure we are removed when we exit.
+                       try {
+                               // Do the fetch
+                               Fetcher f = new Fetcher(uri, blockFetchContext);
+                               try {
+                                       f.realRun(new ClientMetadata(), 
recursionLevel, uri, 
+                                                       (!nonFullBlocksAllowed) 
|| fetcherContext.dontEnterImplicitArchives);
+                               } catch (MetadataParseException e) {
+                                       // TODO Auto-generated catch block
+                                       e.printStackTrace();
+                               } catch (FetchException e) {
+                                       // TODO Auto-generated catch block
+                                       e.printStackTrace();
+                               } catch (ArchiveFailureException e) {
+                                       // TODO Auto-generated catch block
+                                       e.printStackTrace();
+                               } catch (ArchiveRestartException e) {
+                                       // TODO Auto-generated catch block
+                                       e.printStackTrace();
+                               };
+                       } finally {
+                               runningFetches.remove(this);
+                               synchronized(Segment.this) {
+                                       Segment.this.notify();
+                               }
+                       }
+               }
+
+       }
+
        final short splitfileType;
        final FreenetURI[] dataBlocks;
        final FreenetURI[] checkBlocks;
+       final BlockStatus[] dataBlockStatus;
+       final BlockStatus[] checkBlockStatus;
+       final int minFetched;
+       private Vector blocksNotTried;
+       final SplitFetcher parentFetcher;
+       final ArchiveContext archiveContext;
+       final FetcherContext fetcherContext;
+       final long maxBlockLength;
+       final boolean nonFullBlocksAllowed;
+       /** Has the segment started to do something? Irreversible. */
+       private boolean started;
+       /** Has the segment finished processing? Irreversible. */
+       private boolean finished;
+       /** Error code, or -1 */
+       private short fetchError;
+       /** Bucket to store the data retrieved, after it has been decoded */
+       private Bucket decodedData;
+       /** Recently completed fetches */
+       private final LinkedList recentlyCompletedFetches;
+       /** Total number of successfully fetched blocks */
+       private int totalFetched;
+       /** Running fetches */
+       private LinkedList runningFetches;
+       /** Minimum retry level of any BlockStatus; this is the largest integer 
n such that
+        * blocksNotTried.get(n-1) is empty. Initially 0.
+        */
+       private int minRetryLevel;
+       /** Maximum retry level. */
+       private final int maxRetryLevel;
+       /** Fetch context for block fetches */
+       private final FetcherContext blockFetchContext;
+       /** Recursion level */
+       private final int recursionLevel;

        /**
         * Create a Segment.
@@ -20,36 +118,65 @@
         * @param splitfileDataBlocks The data blocks to fetch.
         * @param splitfileCheckBlocks The check blocks to fetch.
         */
-       public Segment(short splitfileType, FreenetURI[] splitfileDataBlocks, 
FreenetURI[] splitfileCheckBlocks) {
+       public Segment(short splitfileType, FreenetURI[] splitfileDataBlocks, 
FreenetURI[] splitfileCheckBlocks,
+                       SplitFetcher fetcher, ArchiveContext actx, 
FetcherContext fctx, long maxTempLength, boolean useLengths, int 
recursionLevel) throws MetadataParseException {
                this.splitfileType = splitfileType;
                dataBlocks = splitfileDataBlocks;
                checkBlocks = splitfileCheckBlocks;
+               parentFetcher = fetcher;
+               archiveContext = actx;
+               fetcherContext = fctx;
+               maxBlockLength = maxTempLength;
+               nonFullBlocksAllowed = useLengths;
+               started = false;
+               finished = false;
+               fetchError = -1;
+               decodedData = null;
+               dataBlockStatus = new BlockStatus[dataBlocks.length];
+               checkBlockStatus = new BlockStatus[checkBlocks.length];
+               blocksNotTried = new Vector();
+               maxRetryLevel = fetcherContext.maxSplitfileBlockRetries;
+               Vector firstSet = new 
Vector(dataBlocks.length+checkBlocks.length);
+               blocksNotTried.add(0, firstSet);
+               for(int i=0;i<dataBlocks.length;i++) {
+                       dataBlockStatus[i] = new BlockStatus(dataBlocks[i]);
+                       firstSet.add(dataBlockStatus[i]);
+               }
+               for(int i=0;i<checkBlocks.length;i++) {
+                       checkBlockStatus[i] = new BlockStatus(checkBlocks[i]);
+                       firstSet.add(checkBlockStatus[i]);
+               }
+               recentlyCompletedFetches = new LinkedList();
+               runningFetches = new LinkedList();
+               if(splitfileType == Metadata.SPLITFILE_NONREDUNDANT) {
+                       minFetched = dataBlocks.length;
+               } else if(splitfileType == Metadata.SPLITFILE_ONION_STANDARD) {
+                       minFetched = dataBlocks.length;
+               } else throw new MetadataParseException("Unknown splitfile type 
"+splitfileType);
+               minRetryLevel = 0;
+               this.recursionLevel = recursionLevel;
        }

        /**
         * Is the segment finished? (Either error or fetched and decoded)?
         */
        public boolean isFinished() {
-               // TODO Auto-generated method stub
-               return false;
+               return finished;
        }

        /**
         * If there was an error, throw it now.
         */
        public void throwError() throws FetchException {
-               // TODO Auto-generated method stub
-               
+               if(fetchError != -1)
+                       throw new FetchException(fetchError);
        }

        /**
         * Return the length of the data, after decoding.
-        * Will throw unless known in advance, or  
-        * @return
         */
        public long decodedLength() {
-               // TODO Auto-generated method stub
-               return 0;
+               return decodedData.size();
        }

        /**
@@ -57,26 +184,120 @@
         * Do not write more than the specified number of bytes (unless it is 
negative,
         * in which case ignore it).
         * @return The number of bytes written.
+        * @throws IOException If there was an error reading from the bucket 
the data is
+        * stored in, or writing to the stream provided.
         */
        public long writeDecodedDataTo(OutputStream os, long truncateLength) 
throws IOException {
-               // TODO Auto-generated method stub
-               return 0;
+               long len = decodedData.size();
+               if(truncateLength >= 0 && truncateLength < len)
+                       len = truncateLength;
+               BucketTools.copyTo(decodedData, os, truncateLength);
+               return len;
        }

        /**
         * Return true if the Segment has been started, otherwise false.
         */
        public boolean isStarted() {
-               // TODO Auto-generated method stub
-               return false;
+               return started;
        }

        /**
         * Start the Segment fetching the data. When it has finished fetching, 
it will
         * notify the SplitFetcher.
         */
-       public void start(SplitFetcher fetcher, ArchiveContext actx, 
FetcherContext fctx, long maxTempLength) {
-               // TODO Auto-generated method stub
+       public void start() {
+               started = true;
+               Thread t = new Thread(this);
+               t.setDaemon(true);
+               t.start();
+       }
+
+       /**
+        * Fetch the data.
+        * Tell the SplitFetcher.
+        * Decode the data.
+        * Tell the SplitFetcher.
+        * If there is an error, tell the SplitFetcher and exit.
+        */
+       public void run() {
+               // Create a number of fetcher threads.
+               // Wait for any thread to complete (success or failure).
+               // Retry if necessary, up to N times per block.

+               for(int i=0;i<fetcherContext.maxSplitfileThreads;i++) {
+                       startFetch(); // ignore return value
+               }
+               
+               while(true) {
+               
+                       // Now wait for any thread to complete
+                       synchronized(this) {
+                               wait(10*1000);
+                       }
+                       
+                       while(true) {
+                               BlockStatus block;
+                               synchronized(this) {
+                                       block = (BlockStatus) 
recentlyCompletedFetches.removeFirst();
+                               }
+                               if(block == null) break;
+                               if(block.failed()) {
+                                       // Retry
+                                       int retryLevel = block.completedTries;
+                                       if(retryLevel == maxRetryLevel) {
+                                               // This block failed
+                                       } else {
+                                               Vector levelSet = (Vector) 
blocksNotTried.get(retryLevel);
+                                               levelSet.add(block);
+                                       }
+                               } else {
+                                       // Succeeded
+                                       totalFetched++;
+                               }
+                               // Either way, start a new fetch
+                               if(!startFetch()) {
+                                       // Can't start a fetch
+                                       if(runningFetches() == 0) {
+                                               // Failed
+                                               
parentFetcher.failedNotEnoughBlocks();
+                                               return;
+                                       }
+                               }
+                       }
+                       
+                       if(totalFetched >= minFetched) {
+                               // Success! Go to next phase
+                               break;
+                       }
+               }
+               
+               parentFetcher.gotBlocks(this);
+
+               // Now decode
+               if(splitfileType == Metadata.SPLITFILE_NONREDUNDANT) {
+                       // TODO put the data together
+               } else {
+                       // TODO decode via onion
+               }
+               
+               parentFetcher.decoded(this);
+               
+               // TODO create healing blocks
        }
+
+       /**
+        * Start a fetch.
+        * @return True if we started a fetch, false if there was nothing to 
start.
+        */
+       private boolean startFetch() {
+               if(minRetryLevel == maxRetryLevel) return false; // nothing to 
start
+               // Don't need to synchronize as these are only accessed by main 
thread
+               Vector v = (Vector) blocksNotTried.get(minRetryLevel);
+               int len = v.size();
+               int idx = fetcherContext.random.nextInt(len);
+               BlockStatus b = (BlockStatus) v.remove(idx);
+               if(v.isEmpty()) minRetryLevel++;
+               b.startFetch();
+       }
 }

Modified: trunk/freenet/src/freenet/client/SplitFetcher.java
===================================================================
--- trunk/freenet/src/freenet/client/SplitFetcher.java  2005-10-29 18:12:27 UTC 
(rev 7468)
+++ trunk/freenet/src/freenet/client/SplitFetcher.java  2005-10-29 19:52:46 UTC 
(rev 7469)
@@ -52,6 +52,8 @@
        private int unstartedSegmentsCount;
        /** Override length. If this is positive, truncate the splitfile to 
this length. */
        private long overrideLength;
+       /** Accept non-full splitfile chunks? */
+       private boolean splitUseLengths;

        public SplitFetcher(Metadata metadata, long maxTempLength, 
ArchiveContext archiveContext, FetcherContext ctx) throws 
MetadataParseException {
                actx = archiveContext;
@@ -61,6 +63,7 @@
                splitfileType = metadata.getSplitfileType();
                splitfileDataBlocks = metadata.getSplitfileDataKeys();
                splitfileCheckBlocks = metadata.getSplitfileCheckKeys();
+               splitUseLengths = metadata.splitUseLengths;
                if(splitfileType == Metadata.SPLITFILE_NONREDUNDANT) {
                        // Don't need to do much - just fetch everything and 
piece it together.
                        blocksPerSegment = -1;
@@ -76,7 +79,7 @@
                } else throw new MetadataParseException("Unknown splitfile 
format: "+splitfileType);
                segments = new Segment[segmentCount]; // initially null on all 
entries
                if(segmentCount == 1) {
-                       segments[0] = new Segment(splitfileType, 
splitfileDataBlocks, splitfileCheckBlocks);
+                       segments[0] = new Segment(splitfileType, 
splitfileDataBlocks, splitfileCheckBlocks, this, archiveContext, ctx, 
maxTempLength, splitUseLengths);
                } else {
                        int dataBlocksPtr = 0;
                        int checkBlocksPtr = 0;
@@ -92,7 +95,7 @@
                                        System.arraycopy(splitfileCheckBlocks, 
checkBlocksPtr, checkBlocks, 0, copyCheckBlocks);
                                dataBlocksPtr += copyDataBlocks;
                                checkBlocksPtr += copyCheckBlocks;
-                               segments[i] = new Segment(splitfileType, 
dataBlocks, checkBlocks);
+                               segments[i] = new Segment(splitfileType, 
dataBlocks, checkBlocks, this, archiveContext, ctx, maxTempLength, 
splitUseLengths);
                        }
                }
                unstartedSegments = segments;
@@ -143,7 +146,7 @@
        }

        private synchronized void start(Segment start) {
-               start.start(this, actx, fctx, maxTempLength);
+               start.start();
                int j = 0;
                for(int i=0;i<unstartedSegmentsCount;i++) {
                        Segment s = unstartedSegments[i];

Modified: trunk/freenet/src/freenet/node/SimpleLowLevelClient.java
===================================================================
--- trunk/freenet/src/freenet/node/SimpleLowLevelClient.java    2005-10-29 
18:12:27 UTC (rev 7468)
+++ trunk/freenet/src/freenet/node/SimpleLowLevelClient.java    2005-10-29 
19:52:46 UTC (rev 7469)
@@ -17,7 +17,7 @@
     /**
      * Fetch a key. Return null if cannot retrieve it.
      */
-    public KeyBlock getKey(ClientKey key);
+    public KeyBlock getKey(ClientKey key, boolean localOnly);

     /**
      * Insert a key.

Modified: trunk/freenet/src/freenet/support/BucketTools.java
===================================================================
--- trunk/freenet/src/freenet/support/BucketTools.java  2005-10-29 18:12:27 UTC 
(rev 7468)
+++ trunk/freenet/src/freenet/support/BucketTools.java  2005-10-29 19:52:46 UTC 
(rev 7469)
@@ -343,4 +343,18 @@
                        throw new Error("No such digest: SHA-256 !!");
                }
        }
+
+       /** Copy the given quantity of data from the given bucket to the given 
OutputStream. 
+        * @throws IOException If there was an error reading from the bucket or 
writing to the stream. */
+       public static void copyTo(Bucket decodedData, OutputStream os, long 
truncateLength) throws IOException {
+               if(truncateLength == 0) return;
+               InputStream is = decodedData.getInputStream();
+               byte[] buf = new byte[4096];
+               long moved = 0;
+               while(moved < truncateLength) {
+                       int bytes = Math.min(buf.length, (int)(truncateLength - 
moved));
+                       is.read(buf, 0, bytes);
+                       os.write(buf, 0, bytes);
+               }
+       }
 }


Reply via email to