Author: toad
Date: 2005-11-09 18:55:33 +0000 (Wed, 09 Nov 2005)
New Revision: 7503
Added:
trunk/freenet/src/freenet/client/BlockFetcher.java
trunk/freenet/src/freenet/client/BlockInserter.java
trunk/freenet/src/freenet/client/FailureCodeTracker.java
trunk/freenet/src/freenet/client/RetryTrackerCallback.java
Modified:
trunk/freenet/src/freenet/client/FECCodec.java
trunk/freenet/src/freenet/client/FileInserter.java
trunk/freenet/src/freenet/client/HighLevelSimpleClientImpl.java
trunk/freenet/src/freenet/client/InsertSegment.java
trunk/freenet/src/freenet/client/InserterContext.java
trunk/freenet/src/freenet/client/InserterException.java
trunk/freenet/src/freenet/client/Metadata.java
trunk/freenet/src/freenet/client/RetryTracker.java
trunk/freenet/src/freenet/client/Segment.java
trunk/freenet/src/freenet/client/SplitFetcher.java
trunk/freenet/src/freenet/client/SplitInserter.java
trunk/freenet/src/freenet/client/SplitfileBlock.java
trunk/freenet/src/freenet/client/StandardOnionFECCodec.java
trunk/freenet/src/freenet/node/Version.java
trunk/freenet/src/freenet/support/BucketTools.java
Log:
145:
Lots more work on splitfiles.
Neither splitfile insert nor splitfile request will work at the moment.
Added: trunk/freenet/src/freenet/client/BlockFetcher.java
===================================================================
--- trunk/freenet/src/freenet/client/BlockFetcher.java 2005-11-09 16:09:35 UTC
(rev 7502)
+++ trunk/freenet/src/freenet/client/BlockFetcher.java 2005-11-09 18:55:33 UTC
(rev 7503)
@@ -0,0 +1,162 @@
+/**
+ *
+ */
+package freenet.client;
+
+import freenet.keys.FreenetURI;
+import freenet.support.Bucket;
+import freenet.support.Logger;
+
+public class BlockFetcher extends SplitfileBlock implements Runnable {
+
+ private final Segment segment;
+ private final RetryTracker tracker;
+ /** Splitfile index - [0,k[ is the data blocks, [k,n[ is the check
blocks */
+ final int index;
+ final FreenetURI uri;
+ final boolean dontEnterImplicitArchives;
+ int completedTries;
+ Bucket fetchedData;
+ boolean actuallyFetched;
+
+ public BlockFetcher(Segment segment, RetryTracker tracker, FreenetURI
freenetURI, int index, boolean dontEnterImplicitArchives) {
+ this.segment = segment;
+ this.tracker = tracker;
+ uri = freenetURI;
+ completedTries = 0;
+ fetchedData = null;
+ this.index = index;
+ actuallyFetched = false;
+ this.dontEnterImplicitArchives = dontEnterImplicitArchives;
+ }
+
+ public void start() {
+ if(fetchedData != null) {
+ throw new IllegalStateException("Already have data");
+ }
+ try {
+ Thread t = new Thread(this);
+ t.setDaemon(true);
+ t.start();
+ } catch (Throwable error) {
+ tracker.fatalError(this,
InserterException.INTERNAL_ERROR);
+ Logger.error(this, "Caught "+error+" creating thread
for "+this);
+ }
+ }
+
+ public void run() {
+ // Already added to runningFetches.
+ // But need to make sure we are removed when we exit.
+ try {
+ // Do the fetch
+ Fetcher f = new Fetcher(uri,
this.segment.blockFetchContext);
+ try {
+ FetchResult fr = f.realRun(new
ClientMetadata(), this.segment.recursionLevel, uri,
+
(!this.segment.nonFullBlocksAllowed) || dontEnterImplicitArchives);
+ actuallyFetched = true;
+ fetchedData = fr.data;
+ } catch (MetadataParseException e) {
+ fatalError(e, FetchException.INVALID_METADATA);
+ } catch (FetchException e) {
+ int code = e.getMode();
+ switch(code) {
+ case FetchException.ARCHIVE_FAILURE:
+ case FetchException.BLOCK_DECODE_ERROR:
+ case FetchException.HAS_MORE_METASTRINGS:
+ case FetchException.INVALID_METADATA:
+ case FetchException.NOT_IN_ARCHIVE:
+ case FetchException.TOO_DEEP_ARCHIVE_RECURSION:
+ case FetchException.TOO_MANY_ARCHIVE_RESTARTS:
+ case FetchException.TOO_MANY_METADATA_LEVELS:
+ case FetchException.TOO_MANY_REDIRECTS:
+ case FetchException.TOO_MUCH_RECURSION:
+ case FetchException.UNKNOWN_METADATA:
+ case FetchException.UNKNOWN_SPLITFILE_METADATA:
+ // Fatal, probably an error on insert
+ fatalError(e, code);
+ return;
+
+ case FetchException.DATA_NOT_FOUND:
+ case FetchException.ROUTE_NOT_FOUND:
+ case FetchException.REJECTED_OVERLOAD:
+ case FetchException.TRANSFER_FAILED:
+ // Non-fatal
+ nonfatalError(e, code);
+
+ case FetchException.BUCKET_ERROR:
+ case FetchException.INTERNAL_ERROR:
+ // Maybe fatal
+ nonfatalError(e, code);
+ }
+ } catch (ArchiveFailureException e) {
+ fatalError(e, FetchException.ARCHIVE_FAILURE);
+ } catch (ArchiveRestartException e) {
+ Logger.error(this, "Got an
ArchiveRestartException in a splitfile - WTF?");
+ fatalError(e, FetchException.ARCHIVE_FAILURE);
+ }
+ } finally {
+ completedTries++;
+ // Add before removing from runningFetches, to avoid
race
+ synchronized(this.segment.recentlyCompletedFetches) {
+ this.segment.recentlyCompletedFetches.add(this);
+ }
+ synchronized(this.segment.runningFetches) {
+ this.segment.runningFetches.remove(this);
+ }
+ synchronized(this.segment) {
+ this.segment.notify();
+ }
+ }
+ }
+
+ private void fatalError(Exception e, int code) {
+ Logger.normal(this, "Giving up on block: "+this+": "+e);
+ completedTries = -1;
+ tracker.fatalError(this, code);
+ }
+
+ private void nonfatalError(Exception e, int code) {
+ Logger.minor(this, "Non-fatal error on "+this+": "+e);
+ tracker.nonfatalError(this, code);
+ }
+
+ public boolean succeeded() {
+ return fetchedData != null;
+ }
+
+ /**
+ * Queue a healing block for insert.
+ * Will be implemented using the download manager.
+ * FIXME: implement!
+ */
+ public void queueHeal() {
+ // TODO Auto-generated method stub
+
+ }
+
+ public int getNumber() {
+ return index;
+ }
+
+ public boolean hasData() {
+ return fetchedData != null;
+ }
+
+ public Bucket getData() {
+ return fetchedData;
+ }
+
+ public void setData(Bucket data) {
+ actuallyFetched = false;
+ fetchedData = data;
+ }
+
+ public void kill() {
+ // Do nothing, for now
+ }
+
+ public FreenetURI getURI() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+}
\ No newline at end of file
Added: trunk/freenet/src/freenet/client/BlockInserter.java
===================================================================
--- trunk/freenet/src/freenet/client/BlockInserter.java 2005-11-09 16:09:35 UTC
(rev 7502)
+++ trunk/freenet/src/freenet/client/BlockInserter.java 2005-11-09 18:55:33 UTC
(rev 7503)
@@ -0,0 +1,56 @@
+package freenet.client;
+
+import freenet.keys.FreenetURI;
+import freenet.support.Bucket;
+
+/**
+ * Inserts a single splitfile block.
+ */
+public class BlockInserter extends SplitfileBlock {
+
+ private Bucket data;
+ private final int num;
+
+ /**
+ * Create a BlockInserter.
+ * @param bucket The data to insert, or null if it will be filled in
later.
+ * @param num The block number in the splitfile.
+ */
+ public BlockInserter(Bucket bucket, int num) {
+ this.data = bucket;
+ if(bucket == null) throw new NullPointerException();
+ this.num = num;
+ }
+
+ int getNumber() {
+ return num;
+ }
+
+ boolean hasData() {
+ return true;
+ }
+
+ Bucket getData() {
+ return data;
+ }
+
+ synchronized void setData(Bucket data) {
+ if(this.data != null) throw new
IllegalArgumentException("Cannot set data when already have data");
+ this.data = data;
+ }
+
+ void start() {
+ // TODO Auto-generated method stub
+
+ }
+
+ public void kill() {
+ // TODO Auto-generated method stub
+
+ }
+
+ public FreenetURI getURI() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+}
Modified: trunk/freenet/src/freenet/client/FECCodec.java
===================================================================
--- trunk/freenet/src/freenet/client/FECCodec.java 2005-11-09 16:09:35 UTC
(rev 7502)
+++ trunk/freenet/src/freenet/client/FECCodec.java 2005-11-09 18:55:33 UTC
(rev 7503)
@@ -2,7 +2,6 @@
import java.io.IOException;
-import freenet.client.Segment.BlockStatus;
import freenet.support.BucketFactory;
/**
@@ -14,13 +13,21 @@
*/
abstract class FECCodec {
- public static int getCodecMaxSegmentSize(short splitfileType) {
+ public static int getCodecMaxSegmentDataBlocks(short splitfileType) {
if(splitfileType == Metadata.SPLITFILE_NONREDUNDANT)
return -1;
if(splitfileType == Metadata.SPLITFILE_ONION_STANDARD)
return 128;
throw new IllegalArgumentException();
}
+
+ public static int getCodecMaxSegmentCheckBlocks(short splitfileType) {
+ if(splitfileType == Metadata.SPLITFILE_NONREDUNDANT)
+ return -1;
+ if(splitfileType == Metadata.SPLITFILE_ONION_STANDARD)
+ return 64;
+ throw new IllegalArgumentException();
+ }
/**
* Get a codec where we know both the number of data blocks and the
number
Added: trunk/freenet/src/freenet/client/FailureCodeTracker.java
===================================================================
--- trunk/freenet/src/freenet/client/FailureCodeTracker.java 2005-11-09
16:09:35 UTC (rev 7502)
+++ trunk/freenet/src/freenet/client/FailureCodeTracker.java 2005-11-09
18:55:33 UTC (rev 7503)
@@ -0,0 +1,25 @@
+package freenet.client;
+
+import java.util.HashMap;
+
+/**
+ * Essentially a map of integer to incrementible integer.
+ * FIXME maybe move this to support, give it a better name?
+ */
+public class FailureCodeTracker {
+
+ public class Item {
+ int x;
+ }
+
+ final HashMap map = new HashMap();
+
+ public synchronized void inc(int k) {
+ Integer key = new Integer(k);
+ Item i = (Item) map.get(key);
+ if(i == null)
+ map.put(key, i = new Item());
+ i.x++;
+ }
+
+}
Modified: trunk/freenet/src/freenet/client/FileInserter.java
===================================================================
--- trunk/freenet/src/freenet/client/FileInserter.java 2005-11-09 16:09:35 UTC
(rev 7502)
+++ trunk/freenet/src/freenet/client/FileInserter.java 2005-11-09 18:55:33 UTC
(rev 7503)
@@ -97,7 +97,7 @@
}
// Too big, encode to a splitfile
- SplitInserter splitInsert = new SplitInserter(data,
block.clientMetadata, bestCodec, ctx.splitfileAlgorithm, ctx);
+ SplitInserter splitInsert = new SplitInserter(data,
block.clientMetadata, bestCodec, ctx.splitfileAlgorithm, ctx, this,
NodeCHK.BLOCK_SIZE);
return splitInsert.run();
}
Modified: trunk/freenet/src/freenet/client/HighLevelSimpleClientImpl.java
===================================================================
--- trunk/freenet/src/freenet/client/HighLevelSimpleClientImpl.java
2005-11-09 16:09:35 UTC (rev 7502)
+++ trunk/freenet/src/freenet/client/HighLevelSimpleClientImpl.java
2005-11-09 18:55:33 UTC (rev 7503)
@@ -33,9 +33,10 @@
/** If set, only check the local datastore, don't send an actual
request out.
* Don't turn this off either. */
static final boolean LOCAL_REQUESTS_ONLY = false;
+ static final int SPLITFILE_INSERT_THREADS = 40;
+ static final int SPLITFILE_INSERT_RETRIES = 10;
-
public HighLevelSimpleClientImpl(SimpleLowLevelClient client,
ArchiveManager mgr, BucketFactory bf, RandomSource r) {
this.client = client;
archiveManager = mgr;
@@ -65,7 +66,7 @@
}
public FreenetURI insert(InsertBlock insert) throws InserterException {
- InserterContext context = new InserterContext(client,
bucketFactory, random);
+ InserterContext context = new InserterContext(client,
bucketFactory, random, SPLITFILE_INSERT_RETRIES, SPLITFILE_INSERT_THREADS);
FileInserter i = new FileInserter(context);
return i.run(insert, false);
}
Modified: trunk/freenet/src/freenet/client/InsertSegment.java
===================================================================
--- trunk/freenet/src/freenet/client/InsertSegment.java 2005-11-09 16:09:35 UTC
(rev 7502)
+++ trunk/freenet/src/freenet/client/InsertSegment.java 2005-11-09 18:55:33 UTC
(rev 7503)
@@ -1,16 +1,49 @@
package freenet.client;
+import freenet.keys.FreenetURI;
+import freenet.support.BucketFactory;
+
/**
* Segment of a splitfile, for insertion purposes.
*/
public class InsertSegment {
- final short splitfileAlgorithm;
+ final FECCodec codec;
final SplitfileBlock[] origDataBlocks;
+ final int blockLength;
+ final BucketFactory bf;
+ /** Check blocks. Will be created by encode(...). */
+ final SplitfileBlock[] checkBlocks;
- public InsertSegment(short splitfileAlgorithm, SplitfileBlock[]
origDataBlocks) {
- this.splitfileAlgorithm = splitfileAlgorithm;
+ public InsertSegment(short splitfileAlgo, SplitfileBlock[]
origDataBlocks, int blockLength, BucketFactory bf) {
this.origDataBlocks = origDataBlocks;
+ codec = FECCodec.getCodec(splitfileAlgo, origDataBlocks.length);
+ checkBlocks = new SplitfileBlock[codec.countCheckBlocks()];
+ this.blockLength = blockLength;
+ this.bf = bf;
}
+ /**
+ * Get the check block URIs.
+ * Don't call before encode()! Don't call before all blocks have
inserted either.
+ */
+ public FreenetURI[] getCheckURIs() {
+ FreenetURI[] uris = new FreenetURI[checkBlocks.length];
+ for(int i=0;i<uris.length;i++)
+ uris[i] = checkBlocks[i].getURI();
+ return uris;
+ }
+
+ /**
+ * Encode the data blocks into check blocks.
+ * @return The number of check blocks generated.
+ */
+ public int encode(int offset) {
+ if(codec == null) return 0; // no FEC
+ for(int i=0;i<checkBlocks.length;i++)
+ checkBlocks[i] = new BlockInserter(null, offset + i);
+ codec.encode(origDataBlocks, checkBlocks, blockLength, bf);
+ return checkBlocks.length;
+ }
+
}
Modified: trunk/freenet/src/freenet/client/InserterContext.java
===================================================================
--- trunk/freenet/src/freenet/client/InserterContext.java 2005-11-09
16:09:35 UTC (rev 7502)
+++ trunk/freenet/src/freenet/client/InserterContext.java 2005-11-09
18:55:33 UTC (rev 7503)
@@ -13,13 +13,18 @@
final boolean dontCompress;
final RandomSource random;
final short splitfileAlgorithm;
+ final int maxInsertBlockRetries;
+ final int maxSplitInsertThreads;
- public InserterContext(SimpleLowLevelClient client, BucketFactory bf,
RandomSource random) {
+ public InserterContext(SimpleLowLevelClient client, BucketFactory bf,
RandomSource random,
+ int maxRetries, int maxThreads) {
this.client = client;
this.bf = bf;
this.random = random;
dontCompress = false;
splitfileAlgorithm = Metadata.SPLITFILE_ONION_STANDARD;
+ this.maxInsertBlockRetries = maxRetries;
+ this.maxSplitInsertThreads = maxThreads;
}
}
Modified: trunk/freenet/src/freenet/client/InserterException.java
===================================================================
--- trunk/freenet/src/freenet/client/InserterException.java 2005-11-09
16:09:35 UTC (rev 7502)
+++ trunk/freenet/src/freenet/client/InserterException.java 2005-11-09
18:55:33 UTC (rev 7503)
@@ -6,6 +6,8 @@
private static final long serialVersionUID = -1106716067841151962L;
final int mode;
+ /** For collection errors */
+ final FailureCodeTracker errorCodes;
/** Get the failure mode. */
public int getMode() {
@@ -14,13 +16,20 @@
public InserterException(int m) {
mode = m;
+ errorCodes = null;
}
public InserterException(int mode, IOException e) {
this.mode = mode;
+ errorCodes = null;
initCause(e);
}
+ public InserterException(int mode, FailureCodeTracker errorCodes) {
+ this.mode = mode;
+ this.errorCodes = errorCodes;
+ }
+
/** Caller supplied a URI we cannot use */
public static final int INVALID_URI = 1;
/** Failed to read from or write to a bucket; a kind of internal error
*/
@@ -31,4 +40,8 @@
public static final int REJECTED_OVERLOAD = 4;
/** Couldn't find enough nodes to send the data to */
public static final int ROUTE_NOT_FOUND = 5;
+ /** There were fatal errors in a splitfile insert. */
+ public static final int FATAL_ERRORS_IN_BLOCKS = 6;
+ /** Could not insert a splitfile because a block failed too many times
*/
+ public static final int TOO_MANY_RETRIES_IN_BLOCKS = 7;
}
Modified: trunk/freenet/src/freenet/client/Metadata.java
===================================================================
--- trunk/freenet/src/freenet/client/Metadata.java 2005-11-09 16:09:35 UTC
(rev 7502)
+++ trunk/freenet/src/freenet/client/Metadata.java 2005-11-09 18:55:33 UTC
(rev 7503)
@@ -293,6 +293,20 @@
throw new IllegalArgumentException();
}
+ public Metadata(short algo, FreenetURI[] dataURIs, FreenetURI[]
checkURIs, ClientMetadata cm, long dataLength, short compressionAlgo) {
+ documentType = SIMPLE_REDIRECT;
+ splitfile = true;
+ splitfileAlgorithm = algo;
+ this.dataLength = dataLength;
+ this.compressionCodec = compressionAlgo;
+ splitfileBlocks = dataURIs.length;
+ splitfileCheckBlocks = checkURIs.length;
+ splitfileDataKeys = dataURIs;
+ splitfileCheckKeys = checkURIs;
+ clientMetadata = cm;
+ setMIMEType(cm.getMIMEType());
+ }
+
/**
* Set the MIME type to a string. Compresses it if possible for transit.
*/
Modified: trunk/freenet/src/freenet/client/RetryTracker.java
===================================================================
--- trunk/freenet/src/freenet/client/RetryTracker.java 2005-11-09 16:09:35 UTC
(rev 7502)
+++ trunk/freenet/src/freenet/client/RetryTracker.java 2005-11-09 18:55:33 UTC
(rev 7503)
@@ -1,5 +1,6 @@
package freenet.client;
+import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Vector;
@@ -51,6 +52,8 @@
}
}
+ final FailureCodeTracker fatalErrors;
+ final FailureCodeTracker nonfatalErrors;
final HashMap levels;
final RandomSource random;
final int maxLevel;
@@ -60,9 +63,30 @@
final HashSet succeededBlocks;
private int curMaxLevel;
private int curMinLevel;
-
- public RetryTracker(int maxLevel, RandomSource random) {
+ /** Maximum number of concurrently running threads */
+ final int maxThreads;
+ /** After we have successes on this many blocks, we should terminate,
+ * even if there are threads running and blocks queued. */
+ final int targetSuccesses;
+ final boolean killOnFatalError;
+ private boolean finishOnEmpty;
+ private final RetryTrackerCallback callback;
+
+ /**
+ * Create a RetryTracker.
+ * @param maxLevel The maximum number of tries for each block.
+ * @param random The random number source.
+ * @param maxThreads The maximum number of threads to use.
+ * @param killOnFatalError Whether to terminate the tracker when a fatal
+ * error occurs on a single block.
+ * @param cb The callback to call .finish(...) when we no longer have
+ * anything to do *and* the client has set the finish on empty flag.
+ */
+ public RetryTracker(int maxLevel, int targetSuccesses, RandomSource
random, int maxThreads, boolean killOnFatalError, RetryTrackerCallback cb) {
levels = new HashMap();
+ fatalErrors = new FailureCodeTracker();
+ nonfatalErrors = new FailureCodeTracker();
+ this.targetSuccesses = targetSuccesses;
this.maxLevel = maxLevel;
this.random = random;
curMaxLevel = curMinLevel = 0;
@@ -70,8 +94,21 @@
failedBlocksFatalErrors = new HashSet();
runningBlocks = new HashSet();
succeededBlocks = new HashSet();
+ this.maxThreads = maxThreads;
+ this.killOnFatalError = killOnFatalError;
+ this.finishOnEmpty = false;
+ this.callback = cb;
}
+ /**
+ * Set the finish-on-empty flag to true.
+ * This means that when there are no longer any blocks to process, and
there
+ * are none running, the tracker will call the client's finish(...)
method.
+ */
+ public synchronized void setFinishOnEmpty() {
+ finishOnEmpty = true;
+ }
+
/** Remove a level */
private synchronized void removeLevel(int level) {
Integer x = new Integer(level);
@@ -126,6 +163,7 @@
public synchronized void addBlock(SplitfileBlock block) {
Level l = makeLevel(0);
l.add(block);
+ maybeStart(true);
}
/**
@@ -133,7 +171,8 @@
* Move it out of the running list and back into the relevant list,
unless
* we have run out of retries.
*/
- public synchronized void nonfatalError(SplitfileBlock block) {
+ public synchronized void nonfatalError(SplitfileBlock block, int
reasonCode) {
+ nonfatalErrors.inc(reasonCode);
runningBlocks.remove(block);
Level l = block.getLevel();
if(l == null) throw new IllegalArgumentException();
@@ -147,24 +186,57 @@
Level newLevel = makeLevel(levelNumber);
newLevel.add(block);
}
+ maybeStart(false);
}
/**
* A block got a fatal error and should not be retried.
* Move it into the fatal error list.
+ * @param reasonCode A client-specific code indicating the type of
failure.
*/
- public synchronized void fatalError(SplitfileBlock block) {
+ public synchronized void fatalError(SplitfileBlock block, int
reasonCode) {
+ fatalErrors.inc(reasonCode);
runningBlocks.remove(block);
Level l = block.getLevel();
if(l == null) throw new IllegalArgumentException();
if(l.tracker != this) throw new
IllegalArgumentException("Belongs to wrong tracker");
l.remove(block);
failedBlocksFatalErrors.add(block);
+ maybeStart(false);
}
+ /**
+ * If we can start some blocks, start some blocks.
+ * Otherwise if we are finished, call the callback's finish method.
+ */
+ public synchronized void maybeStart(boolean cantCallFinished) {
+ if((succeededBlocks.size() >= targetSuccesses)
+ || (runningBlocks.isEmpty() && levels.isEmpty()
&& finishOnEmpty)) {
+ SplitfileBlock[] running = runningBlocks();
+ for(int i=0;i<running.length;i++) {
+ running[i].kill();
+ }
+ if(!cantCallFinished)
+ callback.finished(succeededBlocks(),
failedBlocks(), fatalErrorBlocks());
+ else {
+ Runnable r = new Runnable() { public void run()
{ callback.finished(succeededBlocks(), failedBlocks(), fatalErrorBlocks()); } };
+ Thread t = new Thread(r);
+ t.setDaemon(true);
+ t.start();
+ }
+ } else {
+ while(runningBlocks.size() < maxThreads) {
+ SplitfileBlock block = getBlock();
+ block.start();
+ runningBlocks.add(block);
+ }
+ }
+ }
+
public synchronized void success(SplitfileBlock block) {
runningBlocks.remove(block);
succeededBlocks.add(block);
+ maybeStart(false);
}
/**
@@ -188,12 +260,20 @@
* Get all blocks with fatal errors.
* SplitfileBlock's are assumed to remember their errors, so we don't.
*/
- public synchronized SplitfileBlock[] errorBlocks() {
+ public synchronized SplitfileBlock[] fatalErrorBlocks() {
return (SplitfileBlock[])
failedBlocksFatalErrors.toArray(new
SplitfileBlock[failedBlocksFatalErrors.size()]);
}
/**
+ * Get all blocks which didn't succeed in the maximum number of tries.
+ */
+ public synchronized SplitfileBlock[] failedBlocks() {
+ return (SplitfileBlock[])
+ failedBlocksTooManyRetries.toArray(new
SplitfileBlock[failedBlocksTooManyRetries.size()]);
+ }
+
+ /**
* Get all successfully downloaded blocks.
*/
public synchronized SplitfileBlock[] succeededBlocks() {
@@ -229,4 +309,12 @@
public synchronized boolean moreBlocks() {
return !levels.isEmpty();
}
+
+ public FailureCodeTracker getAccumulatedFatalErrorCodes() {
+ return fatalErrors;
+ }
+
+ public FailureCodeTracker getAccumulatedNonFatalErrorCodes() {
+ return nonfatalErrors;
+ }
}
Added: trunk/freenet/src/freenet/client/RetryTrackerCallback.java
===================================================================
--- trunk/freenet/src/freenet/client/RetryTrackerCallback.java 2005-11-09
16:09:35 UTC (rev 7502)
+++ trunk/freenet/src/freenet/client/RetryTrackerCallback.java 2005-11-09
18:55:33 UTC (rev 7503)
@@ -0,0 +1,16 @@
+package freenet.client;
+
+/**
+ * Object passed to RetryTracker. This is called when RetryTracker finishes.
+ */
+public interface RetryTrackerCallback {
+
+ /**
+ * Notify the caller that we have finished.
+ * @param succeeded The blocks which succeeded.
+ * @param failed The blocks which failed.
+ * @param fatalErrors The blocks which got fatal errors.
+ */
+ void finished(SplitfileBlock[] succeeded, SplitfileBlock[] failed,
SplitfileBlock[] fatalErrors);
+
+}
Modified: trunk/freenet/src/freenet/client/Segment.java
===================================================================
--- trunk/freenet/src/freenet/client/Segment.java 2005-11-09 16:09:35 UTC
(rev 7502)
+++ trunk/freenet/src/freenet/client/Segment.java 2005-11-09 18:55:33 UTC
(rev 7503)
@@ -9,7 +9,6 @@
import freenet.keys.FreenetURI;
import freenet.support.Bucket;
import freenet.support.BucketTools;
-import freenet.support.Logger;
/**
* A segment, within a splitfile.
@@ -17,152 +16,13 @@
*
* Does not require locking, because all locking goes through the parent
Segment.
*/
-public class Segment implements Runnable {
+public class Segment implements RetryTrackerCallback {
- public class BlockStatus extends SplitfileBlock implements Runnable {
-
- /** Splitfile index - [0,k[ is the data blocks, [k,n[ is the
check blocks */
- final int index;
- final FreenetURI uri;
- int completedTries;
- Bucket fetchedData;
- boolean actuallyFetched;
-
- public BlockStatus(FreenetURI freenetURI, int index) {
- uri = freenetURI;
- completedTries = 0;
- fetchedData = null;
- this.index = index;
- actuallyFetched = false;
- }
-
- public void startFetch() {
- if(fetchedData != null) {
- throw new IllegalStateException("Already have
data");
- }
- synchronized(runningFetches) {
- runningFetches.add(this);
- try {
- Thread t = new Thread(this);
- t.setDaemon(true);
- t.start();
- } catch (Throwable error) {
- runningFetches.remove(this);
- Logger.error(this, "Caught "+error);
- }
- }
- }
-
- public void run() {
- // Already added to runningFetches.
- // But need to make sure we are removed when we exit.
- try {
- // Do the fetch
- Fetcher f = new Fetcher(uri, blockFetchContext);
- try {
- FetchResult fr = f.realRun(new
ClientMetadata(), recursionLevel, uri,
- (!nonFullBlocksAllowed)
|| fetcherContext.dontEnterImplicitArchives);
- actuallyFetched = true;
- fetchedData = fr.data;
- } catch (MetadataParseException e) {
- fatalError(e);
- } catch (FetchException e) {
- int code = e.getMode();
- switch(code) {
- case FetchException.ARCHIVE_FAILURE:
- case FetchException.BLOCK_DECODE_ERROR:
- case
FetchException.HAS_MORE_METASTRINGS:
- case FetchException.INVALID_METADATA:
- case FetchException.NOT_IN_ARCHIVE:
- case
FetchException.TOO_DEEP_ARCHIVE_RECURSION:
- case
FetchException.TOO_MANY_ARCHIVE_RESTARTS:
- case
FetchException.TOO_MANY_METADATA_LEVELS:
- case FetchException.TOO_MANY_REDIRECTS:
- case FetchException.TOO_MUCH_RECURSION:
- case FetchException.UNKNOWN_METADATA:
- case
FetchException.UNKNOWN_SPLITFILE_METADATA:
- // Fatal, probably an error on
insert
- fatalError(e);
- return;
-
- case FetchException.DATA_NOT_FOUND:
- case FetchException.ROUTE_NOT_FOUND:
- case FetchException.REJECTED_OVERLOAD:
- case FetchException.TRANSFER_FAILED:
- // Non-fatal
- nonfatalError(e);
-
- case FetchException.BUCKET_ERROR:
- case FetchException.INTERNAL_ERROR:
- // Maybe fatal
- nonfatalError(e);
- }
- } catch (ArchiveFailureException e) {
- fatalError(e);
- } catch (ArchiveRestartException e) {
- fatalError(e);
- }
- } finally {
- completedTries++;
- // Add before removing from runningFetches, to
avoid race
- synchronized(recentlyCompletedFetches) {
- recentlyCompletedFetches.add(this);
- }
- synchronized(runningFetches) {
- runningFetches.remove(this);
- }
- synchronized(Segment.this) {
- Segment.this.notify();
- }
- }
- }
-
- private void fatalError(Exception e) {
- Logger.normal(this, "Giving up on block: "+this+": "+e);
- completedTries = -1;
- }
-
- private void nonfatalError(Exception e) {
- Logger.minor(this, "Non-fatal error on "+this+": "+e);
- }
-
- public boolean succeeded() {
- return fetchedData != null;
- }
-
- /**
- * Queue a healing block for insert.
- * Will be implemented using the download manager.
- * FIXME: implement!
- */
- public void queueHeal() {
- // TODO Auto-generated method stub
-
- }
-
- public int getNumber() {
- return index;
- }
-
- public boolean hasData() {
- return fetchedData != null;
- }
-
- public Bucket getData() {
- return fetchedData;
- }
-
- public void setData(Bucket data) {
- actuallyFetched = false;
- fetchedData = data;
- }
- }
-
final short splitfileType;
final FreenetURI[] dataBlocks;
final FreenetURI[] checkBlocks;
- final BlockStatus[] dataBlockStatus;
- final BlockStatus[] checkBlockStatus;
+ final BlockFetcher[] dataBlockStatus;
+ final BlockFetcher[] checkBlockStatus;
final int minFetched;
private Vector blocksNotTried;
final SplitFetcher parentFetcher;
@@ -179,25 +39,15 @@
/** Bucket to store the data retrieved, after it has been decoded */
private Bucket decodedData;
/** Recently completed fetches */
- private final LinkedList recentlyCompletedFetches;
- /** Total number of successfully fetched blocks */
- private int totalFetched;
+ final LinkedList recentlyCompletedFetches;
/** Running fetches */
- private LinkedList runningFetches;
- /** Minimum retry level of any BlockStatus; this is the largest integer
n such that
- * blocksNotTried.get(n-1) is empty. Initially 0.
- */
- private int minRetryLevel;
- /** Maximum retry level. */
- private final int maxRetryLevel;
+ LinkedList runningFetches;
/** Fetch context for block fetches */
- private final FetcherContext blockFetchContext;
+ final FetcherContext blockFetchContext;
/** Recursion level */
- private final int recursionLevel;
- /** Number of blocks which got fatal errors */
- private int fatalErrorCount;
+ final int recursionLevel;
/** Retry tracker */
- private RetryTracker tracker;
+ private final RetryTracker tracker;
/**
* Create a Segment.
@@ -210,6 +60,13 @@
this.splitfileType = splitfileType;
dataBlocks = splitfileDataBlocks;
checkBlocks = splitfileCheckBlocks;
+ if(splitfileType == Metadata.SPLITFILE_NONREDUNDANT) {
+ minFetched = dataBlocks.length;
+ } else if(splitfileType == Metadata.SPLITFILE_ONION_STANDARD) {
+ minFetched = dataBlocks.length;
+ } else throw new MetadataParseException("Unknown splitfile
type"+splitfileType);
+ tracker = new RetryTracker(fctx.maxSplitfileBlockRetries,
splitfileDataBlocks.length, fctx.random, fctx.maxSplitfileThreads, false, this);
+ // Don't add blocks to tracker yet, because don't want to start
fetch yet.
parentFetcher = fetcher;
archiveContext = actx;
fetcherContext = fctx;
@@ -219,28 +76,21 @@
finished = false;
fetchError = -1;
decodedData = null;
- dataBlockStatus = new BlockStatus[dataBlocks.length];
- checkBlockStatus = new BlockStatus[checkBlocks.length];
+ dataBlockStatus = new BlockFetcher[dataBlocks.length];
+ checkBlockStatus = new BlockFetcher[checkBlocks.length];
blocksNotTried = new Vector();
- maxRetryLevel = fetcherContext.maxSplitfileBlockRetries;
Vector firstSet = new
Vector(dataBlocks.length+checkBlocks.length);
blocksNotTried.add(0, firstSet);
for(int i=0;i<dataBlocks.length;i++) {
- dataBlockStatus[i] = new BlockStatus(dataBlocks[i], i);
+ dataBlockStatus[i] = new BlockFetcher(this, tracker,
dataBlocks[i], i, fctx.dontEnterImplicitArchives);
firstSet.add(dataBlockStatus[i]);
}
for(int i=0;i<checkBlocks.length;i++) {
- checkBlockStatus[i] = new BlockStatus(checkBlocks[i],
dataBlockStatus.length + i);
+ checkBlockStatus[i] = new BlockFetcher(this, tracker,
checkBlocks[i], dataBlockStatus.length + i, fctx.dontEnterImplicitArchives);
firstSet.add(checkBlockStatus[i]);
}
recentlyCompletedFetches = new LinkedList();
runningFetches = new LinkedList();
- if(splitfileType == Metadata.SPLITFILE_NONREDUNDANT) {
- minFetched = dataBlocks.length;
- } else if(splitfileType == Metadata.SPLITFILE_ONION_STANDARD) {
- minFetched = dataBlocks.length;
- } else throw new MetadataParseException("Unknown splitfile type
"+splitfileType);
- minRetryLevel = 0;
this.recursionLevel = recursionLevel;
// FIXME be a bit more flexible here depending on flags
blockFetchContext = new FetcherContext(fetcherContext,
FetcherContext.SPLITFILE_DEFAULT_BLOCK_MASK);
@@ -293,80 +143,46 @@
/**
* Start the Segment fetching the data. When it has finished fetching,
it will
- * notify the SplitFetcher.
+ * notify the SplitFetcher. Note that we must not start fetching until
this
+ * method is called, because of the requirement to not fetch all
segments
+ * simultaneously.
*/
public void start() {
started = true;
- Thread t = new Thread(this);
- t.setDaemon(true);
- t.start();
+ for(int i=0;i<dataBlockStatus.length;i++) {
+ tracker.addBlock(dataBlockStatus[i]);
+ }
+ for(int i=0;i<checkBlockStatus.length;i++) {
+ tracker.addBlock(checkBlockStatus[i]);
+ }
+ tracker.setFinishOnEmpty();
}
/**
- * Fetch the data.
- * Tell the SplitFetcher.
- * Decode the data.
- * Tell the SplitFetcher.
- * If there is an error, tell the SplitFetcher and exit.
+ * How many fetches are running?
*/
- public void run() {
- // Create a number of fetcher threads.
- // Wait for any thread to complete (success or failure).
- // Retry if necessary, up to N times per block.
-
- for(int i=0;i<fetcherContext.maxSplitfileThreads;i++) {
- startFetch(); // ignore return value
+ private int runningFetches() {
+ synchronized(runningFetches) {
+ return runningFetches.size();
}
+ }
+
+ /**
+ * Once we have enough data to decode, tell parent, and decode it.
+ */
+ public void finished(SplitfileBlock[] succeeded, SplitfileBlock[]
failed, SplitfileBlock[] fatalErrors) {
- while(true) {
-
- // Now wait for any thread to complete
- synchronized(this) {
- try {
- wait(10*1000);
- } catch (InterruptedException e) {
- // Ignore
- }
- }
-
- while(true) {
- BlockStatus block;
- synchronized(this) {
- block = (BlockStatus)
recentlyCompletedFetches.removeFirst();
- }
- if(block == null) break;
- if(!block.succeeded()) {
- // Retry
- int retryLevel = block.completedTries;
- if(retryLevel == maxRetryLevel ||
retryLevel == -1) {
- if(retryLevel == -1)
- fatalErrorCount++;
- // This block failed
- } else {
- Vector levelSet = (Vector)
blocksNotTried.get(retryLevel);
- levelSet.add(block);
- }
- } else {
- // Succeeded
- totalFetched++;
- }
- // Either way, start a new fetch
- if(!startFetch()) {
- // Can't start a fetch
- if(runningFetches() == 0) {
- // Failed
-
parentFetcher.failedNotEnoughBlocks(this);
- return;
- }
- }
- }
-
- if(totalFetched >= minFetched) {
- // Success! Go to next phase
- break;
- }
+ if(succeeded.length > minFetched)
+ successfulFetch();
+ else {
+ parentFetcher.failed(this, minFetched,
succeeded.length, failed.length, fatalErrors.length);
}
-
+ }
+
+ /**
+ * Successful fetch, do the decode, tell the parent, etc.
+ */
+ private void successfulFetch() {
parentFetcher.gotBlocks(this);
// Now decode
@@ -382,7 +198,7 @@
Bucket output =
fetcherContext.bucketFactory.makeBucket(-1);
OutputStream os = output.getOutputStream();
for(int i=0;i<dataBlockStatus.length;i++) {
- BlockStatus status = dataBlockStatus[i];
+ BlockFetcher status = dataBlockStatus[i];
Bucket data = status.fetchedData;
BucketTools.copyTo(data, os, Long.MAX_VALUE);
fetcherContext.bucketFactory.freeBucket(data);
@@ -402,7 +218,7 @@
// Now insert *ALL* blocks on which we had at least one
failure, and didn't eventually succeed
for(int i=0;i<dataBlockStatus.length;i++) {
- BlockStatus block = dataBlockStatus[i];
+ BlockFetcher block = dataBlockStatus[i];
if(block.actuallyFetched) continue;
if(block.completedTries == 0) {
// 80% chance of not inserting, if we never
tried it
@@ -411,33 +227,4 @@
block.queueHeal();
}
}
-
- /**
- * How many fetches are running?
- */
- private int runningFetches() {
- synchronized(runningFetches) {
- return runningFetches.size();
- }
- }
-
- /**
- * Start a fetch.
- * @return True if we started a fetch, false if there was nothing to
start.
- */
- private boolean startFetch() {
- if(minRetryLevel == maxRetryLevel) return false; // nothing to
start
- // Don't need to synchronize as these are only accessed by main
thread
- while(true) {
- if(minRetryLevel >= blocksNotTried.size())
- return false;
- Vector v = (Vector) blocksNotTried.get(minRetryLevel);
- int len = v.size();
- int idx = fetcherContext.random.nextInt(len);
- BlockStatus b = (BlockStatus) v.remove(idx);
- if(v.isEmpty()) minRetryLevel++;
- b.startFetch();
- return true;
- }
- }
}
Modified: trunk/freenet/src/freenet/client/SplitFetcher.java
===================================================================
--- trunk/freenet/src/freenet/client/SplitFetcher.java 2005-11-09 16:09:35 UTC
(rev 7502)
+++ trunk/freenet/src/freenet/client/SplitFetcher.java 2005-11-09 18:55:33 UTC
(rev 7503)
@@ -195,22 +195,32 @@
return output;
}
- public void failedNotEnoughBlocks(Segment segment) {
+ public void gotBlocks(Segment segment) {
// TODO Auto-generated method stub
}
- public void gotBlocks(Segment segment) {
+ public void decoded(Segment segment, Bucket output) {
// TODO Auto-generated method stub
}
- public void decoded(Segment segment, Bucket output) {
+ public void internalBucketError(Segment segment, IOException e) {
// TODO Auto-generated method stub
}
- public void internalBucketError(Segment segment, IOException e) {
+ /**
+ * The segment fetch failed.
+ * @param segment The segment that failed.
+ * @param minFetched The minimum number of successful blocks for a
successful fetch.
+ * @param successfulBlocks The number of blocks successfully fetched.
+ * @param failedBlocks The number of blocks that failed because they got
+ * non-fatal errors on every try, and ran out of retries.
+ * @param fatalErrors The number of blocks that got fatal errors.
+ */
+ public void failed(Segment segment, int minFetched, int
successfulBlocks,
+ int failedBlocks, int fatalErrors) {
// TODO Auto-generated method stub
}
Modified: trunk/freenet/src/freenet/client/SplitInserter.java
===================================================================
--- trunk/freenet/src/freenet/client/SplitInserter.java 2005-11-09 16:09:35 UTC
(rev 7502)
+++ trunk/freenet/src/freenet/client/SplitInserter.java 2005-11-09 18:55:33 UTC
(rev 7503)
@@ -1,5 +1,6 @@
package freenet.client;
+import java.io.IOException;
import java.util.Vector;
import freenet.keys.FreenetURI;
@@ -11,7 +12,7 @@
/**
* Insert a splitfile.
*/
-public class SplitInserter {
+public class SplitInserter implements RetryTrackerCallback {
final Bucket origData;
final long dataLength;
@@ -19,10 +20,25 @@
final short compressionCodec;
final short splitfileAlgorithm;
final InserterContext ctx;
+ final RetryTracker tracker;
+ final int segmentSize;
+ final int checkSegmentSize;
+ final int blockSize;
SplitfileBlock[] origDataBlocks;
+ InsertSegment encodingSegment;
+ InsertSegment[] segments;
+ final Vector unstartedSegments = new Vector();
+ private boolean allSegmentsFinished = false;
+ private int succeeded;
+ private int failed;
+ private int fatalErrors;
+ private int countCheckBlocks;
+ private SplitfileBlock[] fatalErrorBlocks;
+ private FileInserter inserter;
- public SplitInserter(Bucket data, ClientMetadata clientMetadata,
Compressor compressor, short splitfileAlgorithm, InserterContext ctx) {
+ public SplitInserter(Bucket data, ClientMetadata clientMetadata,
Compressor compressor, short splitfileAlgorithm, InserterContext ctx,
FileInserter inserter, int blockLength) throws InserterException {
this.origData = data;
+ this.blockSize = blockLength;
this.clientMetadata = clientMetadata;
if(compressor == null)
compressionCodec = -1;
@@ -31,110 +47,157 @@
this.splitfileAlgorithm = splitfileAlgorithm;
this.ctx = ctx;
this.dataLength = data.size();
+ segmentSize =
FECCodec.getCodecMaxSegmentDataBlocks(splitfileAlgorithm);
+ checkSegmentSize =
FECCodec.getCodecMaxSegmentCheckBlocks(splitfileAlgorithm);
+ try {
+ splitIntoBlocks();
+ } catch (IOException e) {
+ throw new
InserterException(InserterException.BUCKET_ERROR);
+ }
+ tracker = new RetryTracker(ctx.maxInsertBlockRetries, 0,
ctx.random, ctx.maxSplitInsertThreads, true, this);
+ this.inserter = inserter;
}
- InsertSegment encodingSegment;
- InsertSegment[] segments;
- final Vector unstartedSegments = new Vector();
- boolean allSegmentsFinished = false;
-
/**
* Inserts the splitfile.
* @return The URI of the resulting file.
+ * @throws InserterException If we are not able to insert the splitfile.
*/
- public FreenetURI run() {
- // Create the splitfile
- int segmentSize =
FECCodec.getCodecMaxSegmentSize(splitfileAlgorithm);
-
- splitIntoBlocks();
-
+ public FreenetURI run() throws InserterException {
+ startInsertingDataBlocks();
splitIntoSegments(segmentSize);
+ // Backwards, because the last is the shortest
+ for(int i=segments.length-1;i>=0;i--)
+ countCheckBlocks += encodeSegment(i,
origDataBlocks.length + checkSegmentSize * i);
+ // Wait for the insertion thread to finish
+ return waitForCompletion();
+ }
+
+ private FreenetURI waitForCompletion() throws InserterException {
+ synchronized(this) {
+ while(!allSegmentsFinished) {
+ try {
+ wait(10*1000);
+ } catch (InterruptedException e) {
+ // Ignore
+ }
+ }
+ }
+
+ // Did we succeed?
- // Encode the last segment (which is always shortest)
+ if(fatalErrors > 0) {
+ throw new
InserterException(InserterException.FATAL_ERRORS_IN_BLOCKS,
tracker.getAccumulatedFatalErrorCodes());
+ }
- encodeSegment(segments.length-1);
+ if(failed > 0) {
+ throw new
InserterException(InserterException.TOO_MANY_RETRIES_IN_BLOCKS,
tracker.getAccumulatedNonFatalErrorCodes());
+ }
- // Then start the insertion thread
+ // Okay, we succeeded... create the manifest
- startInsertionThread();
+ Metadata metadata = new Metadata(splitfileAlgorithm,
getDataURIs(), getCheckURIs(), clientMetadata, dataLength, compressionCodec);
- // Then encode the rest
+ Bucket mbucket;
+ try {
+ mbucket = BucketTools.makeImmutableBucket(ctx.bf,
metadata.writeToByteArray());
+ } catch (IOException e) {
+ throw new
InserterException(InserterException.BUCKET_ERROR);
+ }
+
+ if(inserter == null)
+ inserter = new FileInserter(ctx);
- for(int i=0;i<segments.length-1;i++)
- encodeSegment(i);
+ InsertBlock mblock = new InsertBlock(mbucket, clientMetadata,
FreenetURI.EMPTY_CHK_URI);
- // Then wait for the insertion thread to finish
-
- return waitForCompletion();
+ return inserter.run(mblock, true);
}
- private void splitIntoBlocks() {
- Bucket[] dataBuckets = BucketTools.split(origData,
NodeCHK.BLOCK_SIZE);
- origDataBlocks = new SplitfileBlock[dataBuckets.length];
- for(int i=0;i<origDataBlocks.length;i++) {
- origDataBlocks[i] = new BucketWrapper(dataBuckets[i],
i);
+ private FreenetURI[] getCheckURIs() {
+ // Copy check blocks from each segment into a FreenetURI[].
+ FreenetURI[] uris = new FreenetURI[countCheckBlocks];
+ int x = 0;
+ for(int i=0;i<segments.length;i++) {
+ FreenetURI[] segURIs = segments[i].getCheckURIs();
+ System.arraycopy(segURIs, 0, uris, x, segURIs.length);
+ x += segURIs.length;
}
+
+ if(uris.length != x)
+ throw new IllegalStateException("Total is wrong");
+
+ return uris;
}
+ private FreenetURI[] getDataURIs() {
+ FreenetURI[] uris = new FreenetURI[origDataBlocks.length];
+ for(int i=0;i<uris.length;i++)
+ uris[i] = origDataBlocks[i].getURI();
+ return uris;
+ }
+
+ private int encodeSegment(int i, int offset) {
+ encodingSegment = segments[i];
+ return encodingSegment.encode(offset);
+ }
+
/**
- * Create the metadata document. Insert it. Return its URI.
+ * Start the insert, by adding all the data blocks.
*/
- private FreenetURI finalStatus() {
- // TODO Auto-generated method stub
- return null;
+ private void startInsertingDataBlocks() {
+ for(int i=0;i<origDataBlocks.length;i++)
+ tracker.addBlock(origDataBlocks[i]);
}
/**
+ * Split blocks into segments for encoding.
+ * @throws IOException If there is a bucket error encoding the file.
+ */
+ private void splitIntoBlocks() throws IOException {
+ Bucket[] dataBuckets = BucketTools.split(origData,
NodeCHK.BLOCK_SIZE, ctx.bf);
+ origDataBlocks = new SplitfileBlock[dataBuckets.length];
+ for(int i=0;i<origDataBlocks.length;i++) {
+ origDataBlocks[i] = new BlockInserter(dataBuckets[i],
i);
+ }
+ }
+
+ /**
* Group the blocks into segments.
*/
private void splitIntoSegments(int segmentSize) {
int dataBlocks = origDataBlocks.length;
+ Vector segs = new Vector();
+
// First split the data up
if(dataBlocks < segmentSize || segmentSize == -1) {
// Single segment
- InsertSegment onlySeg = new
InsertSegment(splitfileAlgorithm, origDataBlocks);
- unstartedSegments.add(new InsertSegment[] { onlySeg });
+ InsertSegment onlySeg = new
InsertSegment(splitfileAlgorithm, origDataBlocks, blockSize, ctx.bf);
+ segs.add(onlySeg);
} else {
int j = 0;
for(int i=segmentSize;;i+=segmentSize) {
if(i > dataBlocks) i = dataBlocks;
- Bucket[] seg = new Bucket[i-j];
+ SplitfileBlock[] seg = new SplitfileBlock[i-j];
System.arraycopy(origDataBlocks, j, seg, 0,
i-j);
unstartedSegments.add(seg);
j = i;
+ segs.add(new InsertSegment(splitfileAlgorithm,
seg, blockSize, ctx.bf));
if(i == dataBlocks) break;
}
}
segments = (InsertSegment[]) unstartedSegments.toArray(new
InsertSegment[unstartedSegments.size()]);
}
- public static class BucketWrapper implements SplitfileBlock {
-
- Bucket data;
- int number;
-
- public BucketWrapper(Bucket data, int number) {
- this.data = data;
- this.number = number;
+ public void finished(SplitfileBlock[] succeeded, SplitfileBlock[]
failed, SplitfileBlock[] fatalErrors) {
+ synchronized(this) {
+ allSegmentsFinished = true;
+ this.succeeded = succeeded.length;
+ this.failed = failed.length;
+ this.fatalErrorBlocks = fatalErrors;
+ this.fatalErrors = fatalErrorBlocks.length;
+ notify();
}
-
- public int getNumber() {
- return number;
- }
-
- public boolean hasData() {
- return data != null;
- }
-
- public Bucket getData() {
- return data;
- }
-
- public void setData(Bucket data) {
- this.data = data;
- }
-
}
}
Modified: trunk/freenet/src/freenet/client/SplitfileBlock.java
===================================================================
--- trunk/freenet/src/freenet/client/SplitfileBlock.java 2005-11-09
16:09:35 UTC (rev 7502)
+++ trunk/freenet/src/freenet/client/SplitfileBlock.java 2005-11-09
18:55:33 UTC (rev 7503)
@@ -1,6 +1,7 @@
package freenet.client;
import freenet.client.RetryTracker.Level;
+import freenet.keys.FreenetURI;
import freenet.support.Bucket;
/** Simple interface for a splitfile block */
@@ -27,4 +28,19 @@
final void setLevel(Level l) {
level = l;
}
+
+ /** Start the fetch (or insert). Implementation is required to call
relevant
+ * methods on RetryTracker when done. */
+ abstract void start();
+
+ /**
+ * Shut down the fetch as soon as reasonably possible.
+ */
+ abstract public void kill();
+
+ /**
+ * Get the URI of the file. For an insert, this is derived during
insert.
+ * For a request, it is fixed in the constructor.
+ */
+ abstract public FreenetURI getURI();
}
Modified: trunk/freenet/src/freenet/client/StandardOnionFECCodec.java
===================================================================
--- trunk/freenet/src/freenet/client/StandardOnionFECCodec.java 2005-11-09
16:09:35 UTC (rev 7502)
+++ trunk/freenet/src/freenet/client/StandardOnionFECCodec.java 2005-11-09
18:55:33 UTC (rev 7503)
@@ -8,7 +8,6 @@
import com.onionnetworks.fec.FECCode;
import com.onionnetworks.util.Buffer;
-import freenet.client.Segment.BlockStatus;
import freenet.support.Bucket;
import freenet.support.BucketFactory;
import freenet.support.LRUHashtable;
@@ -174,7 +173,7 @@
}
}
- public void encode(BlockStatus[] dataBlockStatus, BlockStatus[]
checkBlockStatus, int blockLength, BucketFactory bucketFactory) {
+ public void encode(SplitfileBlock[] dataBlockStatus, SplitfileBlock[]
checkBlockStatus, int blockLength, BucketFactory bucketFactory) {
// TODO Auto-generated method stub
}
Modified: trunk/freenet/src/freenet/node/Version.java
===================================================================
--- trunk/freenet/src/freenet/node/Version.java 2005-11-09 16:09:35 UTC (rev
7502)
+++ trunk/freenet/src/freenet/node/Version.java 2005-11-09 18:55:33 UTC (rev
7503)
@@ -20,7 +20,7 @@
public static final String protocolVersion = "1.0";
/** The build number of the current revision */
- public static final int buildNumber = 144;
+ public static final int buildNumber = 145;
/** Oldest build of Fred we will talk to */
public static final int lastGoodBuild = 144;
Modified: trunk/freenet/src/freenet/support/BucketTools.java
===================================================================
--- trunk/freenet/src/freenet/support/BucketTools.java 2005-11-09 16:09:35 UTC
(rev 7502)
+++ trunk/freenet/src/freenet/support/BucketTools.java 2005-11-09 18:55:33 UTC
(rev 7503)
@@ -357,4 +357,40 @@
os.write(buf, 0, bytes);
}
}
+
+ /**
+ * Split the data into a series of read-only Bucket's.
+ * @param origData The original data Bucket.
+ * @param splitSize The number of bytes to put into each bucket.
+ *
+ * FIXME This could be made many orders of magnitude more efficient on
+ * time and space if the underlying Bucket happens to be a passed-in
+ * plaintext file!
+ *
+ * Note that this method will allocate a buffer of size splitSize.
+ * @throws IOException If there is an error creating buckets, reading
from
+ * the provided bucket, or writing to created buckets.
+ */
+ public static Bucket[] split(Bucket origData, int splitSize,
BucketFactory bf) throws IOException {
+ long length = origData.size();
+ if(length > Integer.MAX_VALUE * splitSize)
+ throw new IllegalArgumentException("Way too big!");
+ int bucketCount = (int) (length / splitSize);
+ Bucket[] buckets = new Bucket[bucketCount];
+ if(length % splitSize > 0) bucketCount++;
+ InputStream is = origData.getInputStream();
+ DataInputStream dis = new DataInputStream(is);
+ long remainingLength = length;
+ byte[] buf = new byte[splitSize];
+ for(int i=0;i<bucketCount;i++) {
+ int len = (int) Math.min(splitSize, remainingLength);
+ Bucket bucket = bf.makeBucket(len);
+ dis.readFully(buf, 0, len);
+ OutputStream os = bucket.getOutputStream();
+ os.write(buf, 0, len);
+ os.close();
+ }
+ dis.close();
+ return buckets;
+ }
}