Author: toad
Date: 2005-10-29 17:43:23 +0000 (Sat, 29 Oct 2005)
New Revision: 7467

Added:
   trunk/freenet/src/freenet/client/Segment.java
Modified:
   trunk/freenet/src/freenet/client/FetchException.java
   trunk/freenet/src/freenet/client/FetcherContext.java
   trunk/freenet/src/freenet/client/HighLevelSimpleClientImpl.java
   trunk/freenet/src/freenet/client/Metadata.java
   trunk/freenet/src/freenet/client/SplitFetcher.java
Log:
Splitfiles mostly.

Modified: trunk/freenet/src/freenet/client/FetchException.java
===================================================================
--- trunk/freenet/src/freenet/client/FetchException.java        2005-10-29 
15:07:27 UTC (rev 7466)
+++ trunk/freenet/src/freenet/client/FetchException.java        2005-10-29 
17:43:23 UTC (rev 7467)
@@ -1,5 +1,7 @@
 package freenet.client;
 
+import java.io.IOException;
+
 /**
  * Generic exception thrown by a Fetcher. All other exceptions are converted 
to one of
  * these to tell the client.
@@ -29,6 +31,11 @@
                initCause(e);
        }
 
+       public FetchException(int mode, IOException e) {
+               this.mode = mode;
+               initCause(e);
+       }
+
        /** Too many levels of recursion into archives */
        public static final int TOO_DEEP_ARCHIVE_RECURSION = 1;
        /** Don't know what to do with splitfile */

Modified: trunk/freenet/src/freenet/client/FetcherContext.java
===================================================================
--- trunk/freenet/src/freenet/client/FetcherContext.java        2005-10-29 
15:07:27 UTC (rev 7466)
+++ trunk/freenet/src/freenet/client/FetcherContext.java        2005-10-29 
17:43:23 UTC (rev 7467)
@@ -1,5 +1,6 @@
 package freenet.client;
 
+import freenet.crypt.RandomSource;
 import freenet.node.SimpleLowLevelClient;
 import freenet.support.BucketFactory;
 
@@ -15,10 +16,11 @@
        final int maxRecursionLevel;
        final int maxArchiveRestarts;
        final boolean dontEnterImplicitArchives;
+       final RandomSource random;
        
        public FetcherContext(SimpleLowLevelClient client, long curMaxLength, 
                        long curMaxTempLength, int maxRecursionLevel, int 
maxArchiveRestarts,
-                       boolean dontEnterImplicitArchives,
+                       boolean dontEnterImplicitArchives, RandomSource random,
                        ArchiveManager archiveManager, BucketFactory 
bucketFactory) {
                this.client = client;
                this.maxOutputLength = curMaxLength;
@@ -28,6 +30,7 @@
                this.maxRecursionLevel = maxRecursionLevel;
                this.maxArchiveRestarts = maxArchiveRestarts;
                this.dontEnterImplicitArchives = dontEnterImplicitArchives;
+               this.random = random;
        }
 
 }

Modified: trunk/freenet/src/freenet/client/HighLevelSimpleClientImpl.java
===================================================================
--- trunk/freenet/src/freenet/client/HighLevelSimpleClientImpl.java     
2005-10-29 15:07:27 UTC (rev 7466)
+++ trunk/freenet/src/freenet/client/HighLevelSimpleClientImpl.java     
2005-10-29 17:43:23 UTC (rev 7467)
@@ -1,5 +1,6 @@
 package freenet.client;
 
+import freenet.crypt.RandomSource;
 import freenet.keys.FreenetURI;
 import freenet.node.SimpleLowLevelClient;
 import freenet.support.BucketFactory;
@@ -11,14 +12,16 @@
        private final BucketFactory bucketFactory;
        private long curMaxLength;
        private long curMaxTempLength;
+       private final RandomSource random;
        static final int MAX_RECURSION = 10;
        static final int MAX_ARCHIVE_RESTARTS = 2;
        static final boolean DONT_ENTER_IMPLICIT_ARCHIVES = true;
        
-       public HighLevelSimpleClientImpl(SimpleLowLevelClient client, 
ArchiveManager mgr, BucketFactory bf) {
+       public HighLevelSimpleClientImpl(SimpleLowLevelClient client, 
ArchiveManager mgr, BucketFactory bf, RandomSource r) {
                this.client = client;
                archiveManager = mgr;
                bucketFactory = bf;
+               random = r;
        }
        
        public void setMaxLength(long maxLength) {
@@ -34,7 +37,7 @@
         */
        public FetchResult fetch(FreenetURI uri) throws FetchException {
                FetcherContext context = new FetcherContext(client, 
curMaxLength, curMaxLength, 
-                               MAX_RECURSION, MAX_ARCHIVE_RESTARTS, 
DONT_ENTER_IMPLICIT_ARCHIVES, archiveManager, bucketFactory);
+                               MAX_RECURSION, MAX_ARCHIVE_RESTARTS, 
DONT_ENTER_IMPLICIT_ARCHIVES, random, archiveManager, bucketFactory);
                Fetcher f = new Fetcher(uri, context);
                return f.run();
        }

Modified: trunk/freenet/src/freenet/client/Metadata.java
===================================================================
--- trunk/freenet/src/freenet/client/Metadata.java      2005-10-29 15:07:27 UTC 
(rev 7466)
+++ trunk/freenet/src/freenet/client/Metadata.java      2005-10-29 17:43:23 UTC 
(rev 7467)
@@ -614,4 +614,18 @@
                }
        }
 
+       /**
+        * Get the splitfile type.
+        */
+       public short getSplitfileType() {
+               return splitfileAlgorithm;
+       }
+
+       public FreenetURI[] getSplitfileDataKeys() {
+               return splitfileDataKeys;
+       }
+       
+       public FreenetURI[] getSplitfileCheckKeys() {
+               return splitfileCheckKeys;
+       }
 }

Added: trunk/freenet/src/freenet/client/Segment.java
===================================================================
--- trunk/freenet/src/freenet/client/Segment.java       2005-10-29 15:07:27 UTC 
(rev 7466)
+++ trunk/freenet/src/freenet/client/Segment.java       2005-10-29 17:43:23 UTC 
(rev 7467)
@@ -0,0 +1,82 @@
+package freenet.client;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+import freenet.keys.FreenetURI;
+
+/**
+ * A segment, within a splitfile.
+ */
+public class Segment {
+
+       final short splitfileType;
+       final FreenetURI[] dataBlocks;
+       final FreenetURI[] checkBlocks;
+       
+       /**
+        * Create a Segment.
+        * @param splitfileType The type of the splitfile.
+        * @param splitfileDataBlocks The data blocks to fetch.
+        * @param splitfileCheckBlocks The check blocks to fetch.
+        */
+       public Segment(short splitfileType, FreenetURI[] splitfileDataBlocks, 
FreenetURI[] splitfileCheckBlocks) {
+               this.splitfileType = splitfileType;
+               dataBlocks = splitfileDataBlocks;
+               checkBlocks = splitfileCheckBlocks;
+       }
+
+       /**
+        * Is the segment finished? (Either error or fetched and decoded)?
+        */
+       public boolean isFinished() {
+               // TODO Auto-generated method stub
+               return false;
+       }
+
+       /**
+        * If there was an error, throw it now.
+        */
+       public void throwError() throws FetchException {
+               // TODO Auto-generated method stub
+               
+       }
+
+       /**
+        * Return the length of the data, after decoding.
+        * Will throw unless known in advance, or  
+        * @return
+        */
+       public long decodedLength() {
+               // TODO Auto-generated method stub
+               return 0;
+       }
+
+       /**
+        * Write the decoded data to the given output stream.
+        * Do not write more than the specified number of bytes (unless it is 
negative,
+        * in which case ignore it).
+        * @return The number of bytes written.
+        */
+       public long writeDecodedDataTo(OutputStream os, long truncateLength) 
throws IOException {
+               // TODO Auto-generated method stub
+               return 0;
+       }
+
+       /**
+        * Return true if the Segment has been started, otherwise false.
+        */
+       public boolean isStarted() {
+               // TODO Auto-generated method stub
+               return false;
+       }
+
+       /**
+        * Start the Segment fetching the data. When it has finished fetching, 
it will
+        * notify the SplitFetcher.
+        */
+       public void start(SplitFetcher fetcher, ArchiveContext actx, 
FetcherContext fctx, long maxTempLength) {
+               // TODO Auto-generated method stub
+               
+       }
+}

Modified: trunk/freenet/src/freenet/client/SplitFetcher.java
===================================================================
--- trunk/freenet/src/freenet/client/SplitFetcher.java  2005-10-29 15:07:27 UTC 
(rev 7466)
+++ trunk/freenet/src/freenet/client/SplitFetcher.java  2005-10-29 17:43:23 UTC 
(rev 7467)
@@ -1,5 +1,12 @@
 package freenet.client;
 
+import java.io.IOException;
+import java.io.OutputStream;
+
+import com.onionnetworks.fec.FECCode;
+import com.onionnetworks.fec.FECCodeFactory;
+
+import freenet.keys.FreenetURI;
 import freenet.support.Bucket;
 
 /**
@@ -7,13 +14,189 @@
  */
 public class SplitFetcher {
 
-       public SplitFetcher(Metadata metadata, long maxTempLength, 
ArchiveContext archiveContext, FetcherContext ctx) {
-               // TODO Auto-generated constructor stub
+       // 128/192. Crazy, but it's possible we'd get big erasures.
+       static final int ONION_STD_K = 128;
+       static final int ONION_STD_N = 192;
+       
+       /** The standard onion codec */
+       static FECCode onionStandardCode =
+               
FECCodeFactory.getDefault().createFECCode(ONION_STD_K,ONION_STD_N);
+       
+       /** The splitfile type. See the SPLITFILE_ constants on Metadata. */
+       final short splitfileType;
+       /** The segment length. -1 means not segmented and must get everything 
to decode. */
+       final int blocksPerSegment;
+       /** The segment length in check blocks. */
+       final int checkBlocksPerSegment;
+       /** Total number of segments */
+       final int segmentCount;
+       /** The detailed information on each segment */
+       final Segment[] segments;
+       /** The splitfile data blocks. */
+       final FreenetURI[] splitfileDataBlocks;
+       /** The splitfile check blocks. */
+       final FreenetURI[] splitfileCheckBlocks;
+       /** The archive context */
+       final ArchiveContext actx;
+       /** The fetch context */
+       final FetcherContext fctx;
+       /** Maximum temporary length */
+       final long maxTempLength;
+       /** Have all segments finished? Access synchronized. */
+       private boolean allSegmentsFinished = false;
+       /** Currently fetching segment */
+       private Segment fetchingSegment;
+       /** Array of unstarted segments. Modify synchronized. */
+       private Segment[] unstartedSegments;
+       /** Number of unstarted segments. Ditto. */
+       private int unstartedSegmentsCount;
+       /** Override length. If this is positive, truncate the splitfile to 
this length. */
+       private long overrideLength;
+       
+       public SplitFetcher(Metadata metadata, long maxTempLength, 
ArchiveContext archiveContext, FetcherContext ctx) throws 
MetadataParseException {
+               actx = archiveContext;
+               fctx = ctx;
+               overrideLength = metadata.dataLength;
+               this.maxTempLength = maxTempLength;
+               splitfileType = metadata.getSplitfileType();
+               splitfileDataBlocks = metadata.getSplitfileDataKeys();
+               splitfileCheckBlocks = metadata.getSplitfileCheckKeys();
+               if(splitfileType == Metadata.SPLITFILE_NONREDUNDANT) {
+                       // Don't need to do much - just fetch everything and 
piece it together.
+                       blocksPerSegment = -1;
+                       checkBlocksPerSegment = -1;
+                       segmentCount = 1;
+               } else if(splitfileType == Metadata.SPLITFILE_ONION_STANDARD) {
+                       blocksPerSegment = 128;
+                       checkBlocksPerSegment = 64;
+                       segmentCount = (splitfileDataBlocks.length / 
blocksPerSegment) +
+                               (splitfileDataBlocks.length % blocksPerSegment 
== 0 ? 0 : 1);
+                       // Onion, 128/192.
+                       // Will be segmented.
+               } else throw new MetadataParseException("Unknown splitfile 
format: "+splitfileType);
+               segments = new Segment[segmentCount]; // initially null on all 
entries
+               if(segmentCount == 1) {
+                       segments[0] = new Segment(splitfileType, 
splitfileDataBlocks, splitfileCheckBlocks);
+               } else {
+                       int dataBlocksPtr = 0;
+                       int checkBlocksPtr = 0;
+                       for(int i=0;i<segments.length;i++) {
+                               // Create a segment. Give it its keys.
+                               int copyDataBlocks = 
Math.min(splitfileDataBlocks.length - dataBlocksPtr, blocksPerSegment);
+                               int copyCheckBlocks = 
Math.min(splitfileCheckBlocks.length - checkBlocksPtr, checkBlocksPerSegment);
+                               FreenetURI[] dataBlocks = new 
FreenetURI[copyDataBlocks];
+                               FreenetURI[] checkBlocks = new 
FreenetURI[copyCheckBlocks];
+                               if(copyDataBlocks > 0)
+                                       System.arraycopy(splitfileDataBlocks, 
dataBlocksPtr, dataBlocks, 0, copyDataBlocks);
+                               if(copyCheckBlocks > 0)
+                                       System.arraycopy(splitfileCheckBlocks, 
checkBlocksPtr, checkBlocks, 0, copyCheckBlocks);
+                               dataBlocksPtr += copyDataBlocks;
+                               checkBlocksPtr += copyCheckBlocks;
+                               segments[i] = new Segment(splitfileType, 
dataBlocks, checkBlocks);
+                       }
+               }
+               unstartedSegments = segments;
+               unstartedSegmentsCount = segments.length;
        }
 
-       public Bucket fetch() {
-               // TODO Auto-generated method stub
-               return null;
+       /**
+        * Fetch the splitfile.
+        * Fetch one segment, while decoding the previous one.
+        * Fetch the segments in random order.
+        * When everything has been fetched and decoded, return the full data.
+        * @throws FetchException 
+        */
+       public Bucket fetch() throws FetchException {
+               /*
+                * While(true) {
+                *      Pick a random segment, start it fetching.
+                *      Wait for a segment to finish fetching, a segment to 
finish decoding, or an error.
+                *      If a segment finishes fetching:
+                *              Continue to start another one if there are any 
left
+                *      If a segment finishes decoding:
+                *              If all segments are decoded, assemble all the 
segments and return the data.
+                * 
+                * Segments are expected to automatically start decoding when 
they finish fetching,
+                * but to tell us either way.
+                */
+               while(true) {
+                       synchronized(this) {
+                               if(fetchingSegment == null) {
+                                       // Pick a random segment
+                                       Segment s = chooseUnstartedSegment();
+                                       if(s == null) {
+                                               // All segments have started
+                                       } else {
+                                               start(s); // will keep 
unstartedSegments up to date
+                                       }
+                               }
+                               if(allSegmentsFinished) {
+                                       return finalStatus();
+                               }
+                               try {
+                                       wait(100*1000); // or wait()?
+                               } catch (InterruptedException e) {
+                                       // Ignore
+                               }
+                       }
+               }
        }
 
+       private synchronized void start(Segment start) {
+               start.start(this, actx, fctx, maxTempLength);
+               int j = 0;
+               for(int i=0;i<unstartedSegmentsCount;i++) {
+                       Segment s = unstartedSegments[i];
+                       if(!s.isStarted()) {
+                               unstartedSegments[j] = unstartedSegments[i];
+                               j++;
+                       }
+               }
+               unstartedSegmentsCount = j;
+       }
+
+       private Segment chooseUnstartedSegment() {
+               if(unstartedSegmentsCount == 0) return null;
+               return 
unstartedSegments[fctx.random.nextInt(unstartedSegmentsCount)];
+       }
+
+       /** Return the final status of the fetch. Throws an exception, or 
returns a 
+        * Bucket containing the fetched data.
+        * @throws FetchException If the fetch failed for some reason.
+        */
+       private Bucket finalStatus() throws FetchException {
+               long finalLength = 0;
+               for(int i=0;i<segments.length;i++) {
+                       Segment s = segments[i];
+                       if(!s.isFinished()) throw new 
IllegalStateException("Not all finished");
+                       s.throwError();
+                       // If still here, it succeeded
+                       finalLength += s.decodedLength();
+                       // Healing is done by Segment
+               }
+               long bytesWritten = 0;
+               OutputStream os = null;
+               Bucket output;
+               try {
+                       output = fctx.bucketFactory.makeBucket(finalLength);
+                       os = output.getOutputStream();
+                       for(int i=0;i<segments.length;i++) {
+                               Segment s = segments[i];
+                               long max = (finalLength < 0 ? 0 : (finalLength 
- bytesWritten));
+                               bytesWritten += s.writeDecodedDataTo(os, max);
+                       }
+               } catch (IOException e) {
+                       throw new FetchException(FetchException.BUCKET_ERROR, 
e);
+               } finally {
+                       if(os != null) {
+                               try {
+                                       os.close();
+                               } catch (IOException e) {
+                                       throw new 
FetchException(FetchException.BUCKET_ERROR, e);
+                               }
+                       }
+               }
+               return output;
+       }
+
 }

_______________________________________________
cvs mailing list
[email protected]
http://emu.freenetproject.org/cgi-bin/mailman/listinfo/cvs

Reply via email to