Author: toad
Date: 2005-10-29 19:52:46 +0000 (Sat, 29 Oct 2005)
New Revision: 7469
Modified:
trunk/freenet/src/freenet/client/Fetcher.java
trunk/freenet/src/freenet/client/FetcherContext.java
trunk/freenet/src/freenet/client/HighLevelSimpleClientImpl.java
trunk/freenet/src/freenet/client/Segment.java
trunk/freenet/src/freenet/client/SplitFetcher.java
trunk/freenet/src/freenet/node/SimpleLowLevelClient.java
trunk/freenet/src/freenet/support/BucketTools.java
Log:
More work on splitfiles.
Modified: trunk/freenet/src/freenet/client/Fetcher.java
===================================================================
--- trunk/freenet/src/freenet/client/Fetcher.java 2005-10-29 18:12:27 UTC
(rev 7468)
+++ trunk/freenet/src/freenet/client/Fetcher.java 2005-10-29 19:52:46 UTC
(rev 7469)
@@ -93,7 +93,7 @@
throw new
FetchException(FetchException.TOO_MUCH_RECURSION);
// Do the fetch
- KeyBlock block = ctx.client.getKey(key);
+ KeyBlock block = ctx.client.getKey(key, ctx.localRequestOnly);
byte[] data;
try {
Modified: trunk/freenet/src/freenet/client/FetcherContext.java
===================================================================
--- trunk/freenet/src/freenet/client/FetcherContext.java 2005-10-29
18:12:27 UTC (rev 7468)
+++ trunk/freenet/src/freenet/client/FetcherContext.java 2005-10-29
19:52:46 UTC (rev 7469)
@@ -16,12 +16,20 @@
final int maxRecursionLevel;
final int maxArchiveRestarts;
final boolean dontEnterImplicitArchives;
+ final int maxSplitfileThreads;
+ final int maxSplitfileBlockRetries;
+ final int maxNonSplitfileRetries;
final RandomSource random;
+ final boolean allowSplitfiles;
+ final boolean followRedirects;
+ final boolean localRequestOnly;
public FetcherContext(SimpleLowLevelClient client, long curMaxLength,
long curMaxTempLength, int maxRecursionLevel, int
maxArchiveRestarts,
- boolean dontEnterImplicitArchives, RandomSource random,
- ArchiveManager archiveManager, BucketFactory
bucketFactory) {
+ boolean dontEnterImplicitArchives, int
maxSplitfileThreads,
+ int maxSplitfileBlockRetries, int
maxNonSplitfileRetries,
+ boolean allowSplitfiles, boolean followRedirects,
boolean localRequestOnly,
+ RandomSource random, ArchiveManager archiveManager,
BucketFactory bucketFactory) {
this.client = client;
this.maxOutputLength = curMaxLength;
this.maxTempLength = curMaxTempLength;
@@ -31,6 +39,12 @@
this.maxArchiveRestarts = maxArchiveRestarts;
this.dontEnterImplicitArchives = dontEnterImplicitArchives;
this.random = random;
+ this.maxSplitfileThreads = maxSplitfileThreads;
+ this.maxSplitfileBlockRetries = maxSplitfileBlockRetries;
+ this.maxNonSplitfileRetries = maxNonSplitfileRetries;
+ this.allowSplitfiles = allowSplitfiles;
+ this.followRedirects = followRedirects;
+ this.localRequestOnly = localRequestOnly;
}
}
Modified: trunk/freenet/src/freenet/client/HighLevelSimpleClientImpl.java
===================================================================
--- trunk/freenet/src/freenet/client/HighLevelSimpleClientImpl.java
2005-10-29 18:12:27 UTC (rev 7468)
+++ trunk/freenet/src/freenet/client/HighLevelSimpleClientImpl.java
2005-10-29 19:52:46 UTC (rev 7469)
@@ -16,7 +16,26 @@
static final int MAX_RECURSION = 10;
static final int MAX_ARCHIVE_RESTARTS = 2;
static final boolean DONT_ENTER_IMPLICIT_ARCHIVES = true;
+ /** Number of threads used by a splitfile fetch */
+ static final int SPLITFILE_THREADS = 20;
+ /** Number of retries allowed per block in a splitfile. Must be at
least 1 as
+ * on the first try we just check the datastore.
+ */
+ static final int SPLITFILE_BLOCK_RETRIES = 5;
+ /** Number of retries allowed on non-splitfile fetches. Unlike above,
we always
+ * go to network. */
+ static final int NON_SPLITFILE_RETRIES = 2;
+ /** Whether to fetch splitfiles. Don't turn this off! */
+ static final boolean FETCH_SPLITFILES = true;
+ /** Whether to follow redirects etc. If false, we only fetch a plain
block of data.
+ * Don't turn this off either! */
+ static final boolean FOLLOW_REDIRECTS = true;
+ /** If set, only check the local datastore, don't send an actual
request out.
+ * Don't turn this off either. */
+ static final boolean LOCAL_REQUESTS_ONLY = false;
+
+
public HighLevelSimpleClientImpl(SimpleLowLevelClient client,
ArchiveManager mgr, BucketFactory bf, RandomSource r) {
this.client = client;
archiveManager = mgr;
@@ -37,7 +56,10 @@
*/
public FetchResult fetch(FreenetURI uri) throws FetchException {
FetcherContext context = new FetcherContext(client,
curMaxLength, curMaxTempLength,
- MAX_RECURSION, MAX_ARCHIVE_RESTARTS,
DONT_ENTER_IMPLICIT_ARCHIVES, random, archiveManager, bucketFactory);
+ MAX_RECURSION, MAX_ARCHIVE_RESTARTS,
DONT_ENTER_IMPLICIT_ARCHIVES,
+ SPLITFILE_THREADS, SPLITFILE_BLOCK_RETRIES,
NON_SPLITFILE_RETRIES,
+ FETCH_SPLITFILES, FOLLOW_REDIRECTS,
LOCAL_REQUESTS_ONLY,
+ random, archiveManager, bucketFactory);
Fetcher f = new Fetcher(uri, context);
return f.run();
}
Modified: trunk/freenet/src/freenet/client/Segment.java
===================================================================
--- trunk/freenet/src/freenet/client/Segment.java 2005-10-29 18:12:27 UTC
(rev 7468)
+++ trunk/freenet/src/freenet/client/Segment.java 2005-10-29 19:52:46 UTC
(rev 7469)
@@ -1,18 +1,116 @@
package freenet.client;
import java.io.IOException;
+import java.io.InputStream;
import java.io.OutputStream;
+import java.util.LinkedList;
+import java.util.Vector;
import freenet.keys.FreenetURI;
+import freenet.support.Bucket;
+import freenet.support.BucketTools;
+import freenet.support.Logger;
/**
* A segment, within a splitfile.
+ * Self-starting Runnable.
+ *
+ * Does not require locking, because all locking goes through the parent
Segment.
*/
-public class Segment {
+public class Segment implements Runnable {
+ public class BlockStatus implements Runnable {
+
+ final FreenetURI uri;
+ int completedTries;
+
+ public BlockStatus(FreenetURI freenetURI) {
+ uri = freenetURI;
+ completedTries = 0;
+ }
+
+ public void startFetch() {
+ synchronized(runningFetches) {
+ runningFetches.add(this);
+ try {
+ Thread t = new Thread(this);
+ t.setDaemon(true);
+ t.start();
+ } catch (Throwable error) {
+ runningFetches.remove(this);
+ Logger.error(this, "Caught "+error);
+ }
+ }
+ }
+
+ public void run() {
+ // Already added to runningFetches.
+ // But need to make sure we are removed when we exit.
+ try {
+ // Do the fetch
+ Fetcher f = new Fetcher(uri, blockFetchContext);
+ try {
+ f.realRun(new ClientMetadata(),
recursionLevel, uri,
+ (!nonFullBlocksAllowed)
|| fetcherContext.dontEnterImplicitArchives);
+ } catch (MetadataParseException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ } catch (FetchException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ } catch (ArchiveFailureException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ } catch (ArchiveRestartException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ };
+ } finally {
+ runningFetches.remove(this);
+ synchronized(Segment.this) {
+ Segment.this.notify();
+ }
+ }
+ }
+
+ }
+
final short splitfileType;
final FreenetURI[] dataBlocks;
final FreenetURI[] checkBlocks;
+ final BlockStatus[] dataBlockStatus;
+ final BlockStatus[] checkBlockStatus;
+ final int minFetched;
+ private Vector blocksNotTried;
+ final SplitFetcher parentFetcher;
+ final ArchiveContext archiveContext;
+ final FetcherContext fetcherContext;
+ final long maxBlockLength;
+ final boolean nonFullBlocksAllowed;
+ /** Has the segment started to do something? Irreversible. */
+ private boolean started;
+ /** Has the segment finished processing? Irreversible. */
+ private boolean finished;
+ /** Error code, or -1 */
+ private short fetchError;
+ /** Bucket to store the data retrieved, after it has been decoded */
+ private Bucket decodedData;
+ /** Recently completed fetches */
+ private final LinkedList recentlyCompletedFetches;
+ /** Total number of successfully fetched blocks */
+ private int totalFetched;
+ /** Running fetches */
+ private LinkedList runningFetches;
+ /** Minimum retry level of any BlockStatus; this is the largest integer
n such that
+ * blocksNotTried.get(n-1) is empty. Initially 0.
+ */
+ private int minRetryLevel;
+ /** Maximum retry level. */
+ private final int maxRetryLevel;
+ /** Fetch context for block fetches */
+ private final FetcherContext blockFetchContext;
+ /** Recursion level */
+ private final int recursionLevel;
/**
* Create a Segment.
@@ -20,36 +118,65 @@
* @param splitfileDataBlocks The data blocks to fetch.
* @param splitfileCheckBlocks The check blocks to fetch.
*/
- public Segment(short splitfileType, FreenetURI[] splitfileDataBlocks,
FreenetURI[] splitfileCheckBlocks) {
+ public Segment(short splitfileType, FreenetURI[] splitfileDataBlocks,
FreenetURI[] splitfileCheckBlocks,
+ SplitFetcher fetcher, ArchiveContext actx,
FetcherContext fctx, long maxTempLength, boolean useLengths, int
recursionLevel) throws MetadataParseException {
this.splitfileType = splitfileType;
dataBlocks = splitfileDataBlocks;
checkBlocks = splitfileCheckBlocks;
+ parentFetcher = fetcher;
+ archiveContext = actx;
+ fetcherContext = fctx;
+ maxBlockLength = maxTempLength;
+ nonFullBlocksAllowed = useLengths;
+ started = false;
+ finished = false;
+ fetchError = -1;
+ decodedData = null;
+ dataBlockStatus = new BlockStatus[dataBlocks.length];
+ checkBlockStatus = new BlockStatus[checkBlocks.length];
+ blocksNotTried = new Vector();
+ maxRetryLevel = fetcherContext.maxSplitfileBlockRetries;
+ Vector firstSet = new
Vector(dataBlocks.length+checkBlocks.length);
+ blocksNotTried.add(0, firstSet);
+ for(int i=0;i<dataBlocks.length;i++) {
+ dataBlockStatus[i] = new BlockStatus(dataBlocks[i]);
+ firstSet.add(dataBlockStatus[i]);
+ }
+ for(int i=0;i<checkBlocks.length;i++) {
+ checkBlockStatus[i] = new BlockStatus(checkBlocks[i]);
+ firstSet.add(checkBlockStatus[i]);
+ }
+ recentlyCompletedFetches = new LinkedList();
+ runningFetches = new LinkedList();
+ if(splitfileType == Metadata.SPLITFILE_NONREDUNDANT) {
+ minFetched = dataBlocks.length;
+ } else if(splitfileType == Metadata.SPLITFILE_ONION_STANDARD) {
+ minFetched = dataBlocks.length;
+ } else throw new MetadataParseException("Unknown splitfile type
"+splitfileType);
+ minRetryLevel = 0;
+ this.recursionLevel = recursionLevel;
}
/**
* Is the segment finished? (Either error or fetched and decoded)?
*/
public boolean isFinished() {
- // TODO Auto-generated method stub
- return false;
+ return finished;
}
/**
* If there was an error, throw it now.
*/
public void throwError() throws FetchException {
- // TODO Auto-generated method stub
-
+ if(fetchError != -1)
+ throw new FetchException(fetchError);
}
/**
* Return the length of the data, after decoding.
- * Will throw unless known in advance, or
- * @return
*/
public long decodedLength() {
- // TODO Auto-generated method stub
- return 0;
+ return decodedData.size();
}
/**
@@ -57,26 +184,120 @@
* Do not write more than the specified number of bytes (unless it is
negative,
* in which case ignore it).
* @return The number of bytes written.
+ * @throws IOException If there was an error reading from the bucket
the data is
+ * stored in, or writing to the stream provided.
*/
public long writeDecodedDataTo(OutputStream os, long truncateLength)
throws IOException {
- // TODO Auto-generated method stub
- return 0;
+ long len = decodedData.size();
+ if(truncateLength >= 0 && truncateLength < len)
+ len = truncateLength;
+ BucketTools.copyTo(decodedData, os, truncateLength);
+ return len;
}
/**
* Return true if the Segment has been started, otherwise false.
*/
public boolean isStarted() {
- // TODO Auto-generated method stub
- return false;
+ return started;
}
/**
* Start the Segment fetching the data. When it has finished fetching,
it will
* notify the SplitFetcher.
*/
- public void start(SplitFetcher fetcher, ArchiveContext actx,
FetcherContext fctx, long maxTempLength) {
- // TODO Auto-generated method stub
+ public void start() {
+ started = true;
+ Thread t = new Thread(this);
+ t.setDaemon(true);
+ t.start();
+ }
+
+ /**
+ * Fetch the data.
+ * Tell the SplitFetcher.
+ * Decode the data.
+ * Tell the SplitFetcher.
+ * If there is an error, tell the SplitFetcher and exit.
+ */
+ public void run() {
+ // Create a number of fetcher threads.
+ // Wait for any thread to complete (success or failure).
+ // Retry if necessary, up to N times per block.
+ for(int i=0;i<fetcherContext.maxSplitfileThreads;i++) {
+ startFetch(); // ignore return value
+ }
+
+ while(true) {
+
+ // Now wait for any thread to complete
+ synchronized(this) {
+ wait(10*1000);
+ }
+
+ while(true) {
+ BlockStatus block;
+ synchronized(this) {
+ block = (BlockStatus)
recentlyCompletedFetches.removeFirst();
+ }
+ if(block == null) break;
+ if(block.failed()) {
+ // Retry
+ int retryLevel = block.completedTries;
+ if(retryLevel == maxRetryLevel) {
+ // This block failed
+ } else {
+ Vector levelSet = (Vector)
blocksNotTried.get(retryLevel);
+ levelSet.add(block);
+ }
+ } else {
+ // Succeeded
+ totalFetched++;
+ }
+ // Either way, start a new fetch
+ if(!startFetch()) {
+ // Can't start a fetch
+ if(runningFetches() == 0) {
+ // Failed
+
parentFetcher.failedNotEnoughBlocks();
+ return;
+ }
+ }
+ }
+
+ if(totalFetched >= minFetched) {
+ // Success! Go to next phase
+ break;
+ }
+ }
+
+ parentFetcher.gotBlocks(this);
+
+ // Now decode
+ if(splitfileType == Metadata.SPLITFILE_NONREDUNDANT) {
+ // TODO put the data together
+ } else {
+ // TODO decode via onion
+ }
+
+ parentFetcher.decoded(this);
+
+ // TODO create healing blocks
}
+
+ /**
+ * Start a fetch.
+ * @return True if we started a fetch, false if there was nothing to
start.
+ */
+ private boolean startFetch() {
+ if(minRetryLevel == maxRetryLevel) return false; // nothing to
start
+ // Don't need to synchronize as these are only accessed by main
thread
+ Vector v = (Vector) blocksNotTried.get(minRetryLevel);
+ int len = v.size();
+ int idx = fetcherContext.random.nextInt(len);
+ BlockStatus b = (BlockStatus) v.remove(idx);
+ if(v.isEmpty()) minRetryLevel++;
+ b.startFetch();
+ }
}
Modified: trunk/freenet/src/freenet/client/SplitFetcher.java
===================================================================
--- trunk/freenet/src/freenet/client/SplitFetcher.java 2005-10-29 18:12:27 UTC
(rev 7468)
+++ trunk/freenet/src/freenet/client/SplitFetcher.java 2005-10-29 19:52:46 UTC
(rev 7469)
@@ -52,6 +52,8 @@
private int unstartedSegmentsCount;
/** Override length. If this is positive, truncate the splitfile to
this length. */
private long overrideLength;
+ /** Accept non-full splitfile chunks? */
+ private boolean splitUseLengths;
public SplitFetcher(Metadata metadata, long maxTempLength,
ArchiveContext archiveContext, FetcherContext ctx) throws
MetadataParseException {
actx = archiveContext;
@@ -61,6 +63,7 @@
splitfileType = metadata.getSplitfileType();
splitfileDataBlocks = metadata.getSplitfileDataKeys();
splitfileCheckBlocks = metadata.getSplitfileCheckKeys();
+ splitUseLengths = metadata.splitUseLengths;
if(splitfileType == Metadata.SPLITFILE_NONREDUNDANT) {
// Don't need to do much - just fetch everything and
piece it together.
blocksPerSegment = -1;
@@ -76,7 +79,7 @@
} else throw new MetadataParseException("Unknown splitfile
format: "+splitfileType);
segments = new Segment[segmentCount]; // initially null on all
entries
if(segmentCount == 1) {
- segments[0] = new Segment(splitfileType,
splitfileDataBlocks, splitfileCheckBlocks);
+ segments[0] = new Segment(splitfileType,
splitfileDataBlocks, splitfileCheckBlocks, this, archiveContext, ctx,
maxTempLength, splitUseLengths);
} else {
int dataBlocksPtr = 0;
int checkBlocksPtr = 0;
@@ -92,7 +95,7 @@
System.arraycopy(splitfileCheckBlocks,
checkBlocksPtr, checkBlocks, 0, copyCheckBlocks);
dataBlocksPtr += copyDataBlocks;
checkBlocksPtr += copyCheckBlocks;
- segments[i] = new Segment(splitfileType,
dataBlocks, checkBlocks);
+ segments[i] = new Segment(splitfileType,
dataBlocks, checkBlocks, this, archiveContext, ctx, maxTempLength,
splitUseLengths);
}
}
unstartedSegments = segments;
@@ -143,7 +146,7 @@
}
private synchronized void start(Segment start) {
- start.start(this, actx, fctx, maxTempLength);
+ start.start();
int j = 0;
for(int i=0;i<unstartedSegmentsCount;i++) {
Segment s = unstartedSegments[i];
Modified: trunk/freenet/src/freenet/node/SimpleLowLevelClient.java
===================================================================
--- trunk/freenet/src/freenet/node/SimpleLowLevelClient.java 2005-10-29
18:12:27 UTC (rev 7468)
+++ trunk/freenet/src/freenet/node/SimpleLowLevelClient.java 2005-10-29
19:52:46 UTC (rev 7469)
@@ -17,7 +17,7 @@
/**
* Fetch a key. Return null if cannot retrieve it.
*/
- public KeyBlock getKey(ClientKey key);
+ public KeyBlock getKey(ClientKey key, boolean localOnly);
/**
* Insert a key.
Modified: trunk/freenet/src/freenet/support/BucketTools.java
===================================================================
--- trunk/freenet/src/freenet/support/BucketTools.java 2005-10-29 18:12:27 UTC
(rev 7468)
+++ trunk/freenet/src/freenet/support/BucketTools.java 2005-10-29 19:52:46 UTC
(rev 7469)
@@ -343,4 +343,18 @@
throw new Error("No such digest: SHA-256 !!");
}
}
+
+ /** Copy the given quantity of data from the given bucket to the given
OutputStream.
+ * @throws IOException If there was an error reading from the bucket or
writing to the stream. */
+ public static void copyTo(Bucket decodedData, OutputStream os, long
truncateLength) throws IOException {
+ if(truncateLength == 0) return;
+ InputStream is = decodedData.getInputStream();
+ byte[] buf = new byte[4096];
+ long moved = 0;
+ while(moved < truncateLength) {
+ int bytes = Math.min(buf.length, (int)(truncateLength -
moved));
+ is.read(buf, 0, bytes);
+ os.write(buf, 0, bytes);
+ }
+ }
}