Author: toad
Date: 2006-01-20 19:46:21 +0000 (Fri, 20 Jan 2006)
New Revision: 7890
Added:
branches/async-client/src/freenet/client/StartableSplitfileBlock.java
branches/async-client/src/freenet/client/async/
branches/async-client/src/freenet/client/async/Client.java
branches/async-client/src/freenet/client/async/ClientGet.java
branches/async-client/src/freenet/client/async/ClientGetState.java
branches/async-client/src/freenet/client/async/ClientRequest.java
branches/async-client/src/freenet/client/async/ClientRequestScheduler.java
branches/async-client/src/freenet/client/async/MinimalSplitfileBlock.java
branches/async-client/src/freenet/client/async/RequestCompletionCallback.java
branches/async-client/src/freenet/client/async/SendableRequest.java
branches/async-client/src/freenet/client/async/SingleFileFetcher.java
branches/async-client/src/freenet/client/async/SplitFileFetcher.java
branches/async-client/src/freenet/client/async/SplitFileFetcherSegment.java
Modified:
branches/async-client/src/freenet/client/ArchiveHandler.java
branches/async-client/src/freenet/client/ArchiveStoreContext.java
branches/async-client/src/freenet/client/ClientMetadata.java
branches/async-client/src/freenet/client/FECCodec.java
branches/async-client/src/freenet/client/FetchException.java
branches/async-client/src/freenet/client/FetcherContext.java
branches/async-client/src/freenet/client/InsertSegment.java
branches/async-client/src/freenet/client/Metadata.java
branches/async-client/src/freenet/client/RetryTracker.java
branches/async-client/src/freenet/client/RetryTrackerCallback.java
branches/async-client/src/freenet/client/Segment.java
branches/async-client/src/freenet/client/SplitFetcher.java
branches/async-client/src/freenet/client/SplitInserter.java
branches/async-client/src/freenet/client/SplitfileBlock.java
branches/async-client/src/freenet/client/StdSplitfileBlock.java
branches/async-client/src/freenet/node/Version.java
branches/async-client/src/freenet/node/fcp/ClientGetMessage.java
branches/async-client/src/freenet/node/fcp/ClientPutMessage.java
Log:
Fetches implemented. Still some way to go; need to implement puts, and some
infrastructure, and integrate with HighLevelSimple*, and need to implement
serialization and offline fetches.
Modified: branches/async-client/src/freenet/client/ArchiveHandler.java
===================================================================
--- branches/async-client/src/freenet/client/ArchiveHandler.java
2006-01-20 19:42:14 UTC (rev 7889)
+++ branches/async-client/src/freenet/client/ArchiveHandler.java
2006-01-20 19:46:21 UTC (rev 7890)
@@ -6,7 +6,7 @@
* The public face (to Fetcher, for example) of ArchiveStoreContext.
* Just has methods for fetching stuff.
*/
-interface ArchiveHandler {
+public interface ArchiveHandler {
/**
* Get the metadata for this ZIP manifest, as a Bucket.
@@ -36,4 +36,9 @@
throws ArchiveFailureException, ArchiveRestartException,
MetadataParseException, FetchException;
+ /**
+ * Get the archive type.
+ */
+ public abstract short getArchiveType();
+
}
\ No newline at end of file
Modified: branches/async-client/src/freenet/client/ArchiveStoreContext.java
===================================================================
--- branches/async-client/src/freenet/client/ArchiveStoreContext.java
2006-01-20 19:42:14 UTC (rev 7889)
+++ branches/async-client/src/freenet/client/ArchiveStoreContext.java
2006-01-20 19:46:21 UTC (rev 7890)
@@ -12,8 +12,11 @@
* subject to the above.
*
* Always take the lock on ArchiveStoreContext before the lock on
ArchiveManager, NOT the other way around.
+ *
+ * Not normally to be used directly by external packages, but public for
+ * ArchiveManager.extractToCache. FIXME.
*/
-class ArchiveStoreContext implements ArchiveHandler {
+public class ArchiveStoreContext implements ArchiveHandler {
private ArchiveManager manager;
private FreenetURI key;
@@ -65,6 +68,7 @@
// Not in cache
if(fetchContext == null) return null;
+ fetchContext = new FetcherContext(fetchContext,
FetcherContext.SET_RETURN_ARCHIVES);
Fetcher fetcher = new Fetcher(key, fetchContext,
archiveContext);
FetchResult result = fetcher.realRun(dm,
recursionLevel, key, dontEnterImplicitArchives, fetchContext.localRequestOnly);
manager.extractToCache(key, archiveType, result.data,
archiveContext, this);
@@ -130,5 +134,9 @@
myItems.remove(item);
}
}
+
+ public short getArchiveType() {
+ return archiveType;
+ }
}
Modified: branches/async-client/src/freenet/client/ClientMetadata.java
===================================================================
--- branches/async-client/src/freenet/client/ClientMetadata.java
2006-01-20 19:42:14 UTC (rev 7889)
+++ branches/async-client/src/freenet/client/ClientMetadata.java
2006-01-20 19:46:21 UTC (rev 7890)
@@ -13,7 +13,7 @@
}
/** Create an empty ClientMetadata instance */
- ClientMetadata() {
+ public ClientMetadata() {
mimeType = null;
}
Modified: branches/async-client/src/freenet/client/FECCodec.java
===================================================================
--- branches/async-client/src/freenet/client/FECCodec.java 2006-01-20
19:42:14 UTC (rev 7889)
+++ branches/async-client/src/freenet/client/FECCodec.java 2006-01-20
19:46:21 UTC (rev 7890)
@@ -11,7 +11,7 @@
* @author root
*
*/
-abstract class FECCodec {
+public abstract class FECCodec {
/**
* Get a codec where we know both the number of data blocks and the
number
Modified: branches/async-client/src/freenet/client/FetchException.java
===================================================================
--- branches/async-client/src/freenet/client/FetchException.java
2006-01-20 19:42:14 UTC (rev 7889)
+++ branches/async-client/src/freenet/client/FetchException.java
2006-01-20 19:46:21 UTC (rev 7890)
@@ -40,7 +40,7 @@
}
public FetchException(ArchiveFailureException e) {
- super(getMessage(INVALID_METADATA)+": "+e.getMessage());
+ super(getMessage(ARCHIVE_FAILURE)+": "+e.getMessage());
extraMessage = e.getMessage();
mode = ARCHIVE_FAILURE;
errorCodes = null;
@@ -48,6 +48,14 @@
Logger.minor(this, "FetchException("+getMessage(mode)+"):
"+e,e);
}
+ public FetchException(ArchiveRestartException e) {
+ super(getMessage(ARCHIVE_RESTART)+": "+e.getMessage());
+ extraMessage = e.getMessage();
+ mode = ARCHIVE_FAILURE;
+ errorCodes = null;
+ initCause(e);
+ Logger.minor(this, "FetchException("+getMessage(mode)+"):
"+e,e); }
+
public FetchException(int mode, Throwable t) {
super(getMessage(mode)+": "+t.getMessage());
extraMessage = t.getMessage();
@@ -126,6 +134,8 @@
return "No default document; give more metastrings in
URI";
case CANCELLED:
return "Cancelled by caller";
+ case ARCHIVE_RESTART:
+ return "Archive restart";
default:
return "Unknown fetch error code: "+mode;
}
@@ -183,6 +193,8 @@
public static final int NOT_ENOUGH_METASTRINGS = 24;
/** Explicitly cancelled */
public static final int CANCELLED = 25;
+ /** Archive restart */
+ public static final int ARCHIVE_RESTART = 26;
/** Is an error fatal i.e. is there no point retrying? */
public boolean isFatal() {
@@ -200,6 +212,7 @@
case FetchException.TOO_MUCH_RECURSION:
case FetchException.UNKNOWN_METADATA:
case FetchException.UNKNOWN_SPLITFILE_METADATA:
+ case FetchException.TOO_BIG:
return true;
// Low level errors, can be retried
@@ -218,7 +231,9 @@
// Fatal, because there are internal retries
return true;
+ // Wierd ones
case FetchException.CANCELLED:
+ case FetchException.ARCHIVE_RESTART:
// Fatal
return true;
Modified: branches/async-client/src/freenet/client/FetcherContext.java
===================================================================
--- branches/async-client/src/freenet/client/FetcherContext.java
2006-01-20 19:42:14 UTC (rev 7889)
+++ branches/async-client/src/freenet/client/FetcherContext.java
2006-01-20 19:46:21 UTC (rev 7890)
@@ -10,14 +10,15 @@
public class FetcherContext implements Cloneable {
public static final int IDENTICAL_MASK = 0;
- static final int SPLITFILE_DEFAULT_BLOCK_MASK = 1;
- static final int SPLITFILE_DEFAULT_MASK = 2;
- static final int SPLITFILE_USE_LENGTHS_MASK = 3;
+ public static final int SPLITFILE_DEFAULT_BLOCK_MASK = 1;
+ public static final int SPLITFILE_DEFAULT_MASK = 2;
+ public static final int SPLITFILE_USE_LENGTHS_MASK = 3;
+ public static final int SET_RETURN_ARCHIVES = 4;
/** Low-level client to send low-level requests to. */
final SimpleLowLevelClient client;
public long maxOutputLength;
public long maxTempLength;
- final ArchiveManager archiveManager;
+ public final ArchiveManager archiveManager;
public final BucketFactory bucketFactory;
public int maxRecursionLevel;
public int maxArchiveRestarts;
@@ -40,6 +41,9 @@
public final RequestStarterClient starterClient;
public boolean cacheLocalRequests;
private boolean cancelled;
+ /** If true, and we get a ZIP manifest, and we have no meta-strings
left, then
+ * return the manifest contents as data. */
+ public boolean returnZIPManifests;
public final boolean isCancelled() {
return cancelled;
@@ -101,6 +105,7 @@
this.maxCheckBlocksPerSegment =
ctx.maxCheckBlocksPerSegment;
this.starterClient = ctx.starterClient;
this.cacheLocalRequests = ctx.cacheLocalRequests;
+ this.returnZIPManifests = ctx.returnZIPManifests;
} else if(maskID == SPLITFILE_DEFAULT_BLOCK_MASK) {
this.client = ctx.client;
this.maxOutputLength = ctx.maxOutputLength;
@@ -124,6 +129,7 @@
this.maxCheckBlocksPerSegment = 0;
this.starterClient = ctx.starterClient;
this.cacheLocalRequests = ctx.cacheLocalRequests;
+ this.returnZIPManifests = false;
} else if(maskID == SPLITFILE_DEFAULT_MASK) {
this.client = ctx.client;
this.maxOutputLength = ctx.maxOutputLength;
@@ -147,6 +153,7 @@
this.maxCheckBlocksPerSegment =
ctx.maxCheckBlocksPerSegment;
this.starterClient = ctx.starterClient;
this.cacheLocalRequests = ctx.cacheLocalRequests;
+ this.returnZIPManifests = ctx.returnZIPManifests;
} else if(maskID == SPLITFILE_USE_LENGTHS_MASK) {
this.client = ctx.client;
this.maxOutputLength = ctx.maxOutputLength;
@@ -170,7 +177,33 @@
this.maxCheckBlocksPerSegment =
ctx.maxCheckBlocksPerSegment;
this.starterClient = ctx.starterClient;
this.cacheLocalRequests = ctx.cacheLocalRequests;
- } else throw new IllegalArgumentException();
+ this.returnZIPManifests = ctx.returnZIPManifests;
+ } else if (maskID == SET_RETURN_ARCHIVES) {
+ this.client = ctx.client;
+ this.maxOutputLength = ctx.maxOutputLength;
+ this.maxMetadataSize = ctx.maxMetadataSize;
+ this.maxTempLength = ctx.maxTempLength;
+ this.archiveManager = ctx.archiveManager;
+ this.bucketFactory = ctx.bucketFactory;
+ this.maxRecursionLevel = ctx.maxRecursionLevel;
+ this.maxArchiveRestarts = ctx.maxArchiveRestarts;
+ this.dontEnterImplicitArchives =
ctx.dontEnterImplicitArchives;
+ this.random = ctx.random;
+ this.maxSplitfileThreads = ctx.maxSplitfileThreads;
+ this.maxSplitfileBlockRetries =
ctx.maxSplitfileBlockRetries;
+ this.maxNonSplitfileRetries =
ctx.maxNonSplitfileRetries;
+ this.allowSplitfiles = ctx.allowSplitfiles;
+ this.followRedirects = ctx.followRedirects;
+ this.localRequestOnly = ctx.localRequestOnly;
+ this.splitfileUseLengths = ctx.splitfileUseLengths;
+ this.eventProducer = ctx.eventProducer;
+ this.maxDataBlocksPerSegment =
ctx.maxDataBlocksPerSegment;
+ this.maxCheckBlocksPerSegment =
ctx.maxCheckBlocksPerSegment;
+ this.starterClient = ctx.starterClient;
+ this.cacheLocalRequests = ctx.cacheLocalRequests;
+ this.returnZIPManifests = true;
+ }
+ else throw new IllegalArgumentException();
}
public void cancel() {
Modified: branches/async-client/src/freenet/client/InsertSegment.java
===================================================================
--- branches/async-client/src/freenet/client/InsertSegment.java 2006-01-20
19:42:14 UTC (rev 7889)
+++ branches/async-client/src/freenet/client/InsertSegment.java 2006-01-20
19:46:21 UTC (rev 7890)
@@ -12,22 +12,22 @@
public class InsertSegment {
final FECCodec codec;
- final SplitfileBlock[] origDataBlocks;
+ final StartableSplitfileBlock[] origDataBlocks;
final int blockLength;
final BucketFactory bf;
/** Check blocks. Will be created by encode(...). */
- final SplitfileBlock[] checkBlocks;
+ final StartableSplitfileBlock[] checkBlocks;
final boolean getCHKOnly;
// just for debugging
final int segNo;
- public InsertSegment(short splitfileAlgo, SplitfileBlock[]
origDataBlocks, int blockLength, BucketFactory bf, boolean getCHKOnly, int
segNo) {
+ public InsertSegment(short splitfileAlgo, StartableSplitfileBlock[]
origDataBlocks, int blockLength, BucketFactory bf, boolean getCHKOnly, int
segNo) {
this.origDataBlocks = origDataBlocks;
codec = FECCodec.getCodec(splitfileAlgo, origDataBlocks.length);
if(codec != null)
- checkBlocks = new
SplitfileBlock[codec.countCheckBlocks()];
+ checkBlocks = new
StartableSplitfileBlock[codec.countCheckBlocks()];
else
- checkBlocks = new SplitfileBlock[0];
+ checkBlocks = new StartableSplitfileBlock[0];
this.blockLength = blockLength;
this.bf = bf;
this.getCHKOnly = getCHKOnly;
Modified: branches/async-client/src/freenet/client/Metadata.java
===================================================================
--- branches/async-client/src/freenet/client/Metadata.java 2006-01-20
19:42:14 UTC (rev 7889)
+++ branches/async-client/src/freenet/client/Metadata.java 2006-01-20
19:46:21 UTC (rev 7890)
@@ -19,13 +19,104 @@
/** Metadata parser/writer class. */
-public class Metadata {
+public class Metadata implements Cloneable {
static final long FREENET_METADATA_MAGIC = 0xf053b2842d91482bL;
static final int MAX_SPLITFILE_PARAMS_LENGTH = 32768;
/** Soft limit, to avoid memory DoS */
static final int MAX_SPLITFILE_BLOCKS = 100*1000;
+ // Actual parsed data
+
+ // document type
+ byte documentType;
+ static final byte SIMPLE_REDIRECT = 0;
+ static final byte MULTI_LEVEL_METADATA = 1;
+ static final byte SIMPLE_MANIFEST = 2;
+ static final byte ZIP_MANIFEST = 3;
+ static final byte ZIP_INTERNAL_REDIRECT = 4;
+
+ // 2 bytes of flags
+ /** Is a splitfile */
+ boolean splitfile;
+ /** Is a DBR */
+ boolean dbr;
+ /** No MIME type; on by default as not all doctypes have MIME */
+ boolean noMIME = true;
+ /** Compressed MIME type */
+ boolean compressedMIME;
+ /** Has extra client-metadata */
+ boolean extraMetadata;
+ /** Keys stored in full (otherwise assumed to be CHKs) */
+ boolean fullKeys;
+ /** Non-final splitfile chunks can be non-full */
+ boolean splitUseLengths;
+ static final short FLAGS_SPLITFILE = 1;
+ static final short FLAGS_DBR = 2;
+ static final short FLAGS_NO_MIME = 4;
+ static final short FLAGS_COMPRESSED_MIME = 8;
+ static final short FLAGS_EXTRA_METADATA = 16;
+ static final short FLAGS_FULL_KEYS = 32;
+ static final short FLAGS_SPLIT_USE_LENGTHS = 64;
+ static final short FLAGS_COMPRESSED = 128;
+
+ /** Container archive type */
+ short archiveType;
+ static final short ARCHIVE_ZIP = 0;
+ static final short ARCHIVE_TAR = 1; // FIXME for future use
+
+ /** Compressed splitfile codec */
+ short compressionCodec = -1;
+ static public final short COMPRESS_GZIP = 0;
+ static final short COMPRESS_BZIP2 = 1; // FIXME for future use
+
+ /** The length of the splitfile */
+ long dataLength;
+ /** The decompressed length of the compressed data */
+ long decompressedLength;
+
+ /** The MIME type, as a string */
+ String mimeType;
+
+ /** The compressed MIME type - lookup index for the MIME types table.
+ * Must be between 0 and 32767.
+ */
+ short compressedMIMEValue;
+ boolean hasCompressedMIMEParams;
+ short compressedMIMEParams;
+
+ /** The simple redirect key */
+ FreenetURI simpleRedirectKey;
+
+ short splitfileAlgorithm;
+ static public final short SPLITFILE_NONREDUNDANT = 0;
+ static public final short SPLITFILE_ONION_STANDARD = 1;
+
+ /** Splitfile parameters */
+ byte[] splitfileParams;
+ int splitfileBlocks;
+ int splitfileCheckBlocks;
+ FreenetURI[] splitfileDataKeys;
+ FreenetURI[] splitfileCheckKeys;
+
+ // Manifests
+ int manifestEntryCount;
+ /** Manifest entries by name */
+ HashMap manifestEntries;
+
+ /** ZIP internal redirect: name of file in ZIP */
+ String nameInArchive;
+
+ ClientMetadata clientMetadata;
+
+ public Object clone() {
+ try {
+ return super.clone();
+ } catch (CloneNotSupportedException e) {
+ throw new Error("Yes it is!");
+ }
+ }
+
/** Parse a block of bytes into a Metadata structure.
* Constructor method because of need to catch impossible exceptions.
* @throws MetadataParseException If the metadata is invalid.
@@ -507,89 +598,6 @@
} else throw new IllegalArgumentException("Full keys
must be enabled to write non-CHKs");
}
}
- // Actual parsed data
-
- // document type
- byte documentType;
- static final byte SIMPLE_REDIRECT = 0;
- static final byte MULTI_LEVEL_METADATA = 1;
- static final byte SIMPLE_MANIFEST = 2;
- static final byte ZIP_MANIFEST = 3;
- static final byte ZIP_INTERNAL_REDIRECT = 4;
-
- // 2 bytes of flags
- /** Is a splitfile */
- boolean splitfile;
- /** Is a DBR */
- boolean dbr;
- /** No MIME type; on by default as not all doctypes have MIME */
- boolean noMIME = true;
- /** Compressed MIME type */
- boolean compressedMIME;
- /** Has extra client-metadata */
- boolean extraMetadata;
- /** Keys stored in full (otherwise assumed to be CHKs) */
- boolean fullKeys;
- /** Non-final splitfile chunks can be non-full */
- boolean splitUseLengths;
- static final short FLAGS_SPLITFILE = 1;
- static final short FLAGS_DBR = 2;
- static final short FLAGS_NO_MIME = 4;
- static final short FLAGS_COMPRESSED_MIME = 8;
- static final short FLAGS_EXTRA_METADATA = 16;
- static final short FLAGS_FULL_KEYS = 32;
- static final short FLAGS_SPLIT_USE_LENGTHS = 64;
- static final short FLAGS_COMPRESSED = 128;
-
- /** Container archive type */
- short archiveType;
- static final short ARCHIVE_ZIP = 0;
- static final short ARCHIVE_TAR = 1; // FIXME for future use
-
- /** Compressed splitfile codec */
- short compressionCodec = -1;
- static public final short COMPRESS_GZIP = 0;
- static final short COMPRESS_BZIP2 = 1; // FIXME for future use
-
- /** The length of the splitfile */
- long dataLength;
- /** The decompressed length of the compressed data */
- long decompressedLength;
-
- /** The MIME type, as a string */
- String mimeType;
-
- /** The compressed MIME type - lookup index for the MIME types table.
- * Must be between 0 and 32767.
- */
- short compressedMIMEValue;
- boolean hasCompressedMIMEParams;
- short compressedMIMEParams;
-
- /** The simple redirect key */
- FreenetURI simpleRedirectKey;
-
- short splitfileAlgorithm;
- static final short SPLITFILE_NONREDUNDANT = 0;
- static final short SPLITFILE_ONION_STANDARD = 1;
-
- /** Splitfile parameters */
- byte[] splitfileParams;
- int splitfileBlocks;
- int splitfileCheckBlocks;
- FreenetURI[] splitfileDataKeys;
- FreenetURI[] splitfileCheckKeys;
-
- // Manifests
- int manifestEntryCount;
- /** Manifest entries by name */
- HashMap manifestEntries;
-
- /** ZIP internal redirect: name of file in ZIP */
- String nameInArchive;
-
- ClientMetadata clientMetadata;
-
/** Is a manifest? */
public boolean isSimpleManifest() {
return documentType == SIMPLE_MANIFEST;
@@ -799,4 +807,20 @@
public boolean isCompressed() {
return compressionCodec >= 0;
}
+
+ public boolean splitUseLengths() {
+ return splitUseLengths;
+ }
+
+ public short getCompressionCodec() {
+ return compressionCodec;
+ }
+
+ public long dataLength() {
+ return dataLength;
+ }
+
+ public byte[] splitfileParams() {
+ return splitfileParams;
+ }
}
Modified: branches/async-client/src/freenet/client/RetryTracker.java
===================================================================
--- branches/async-client/src/freenet/client/RetryTracker.java 2006-01-20
19:42:14 UTC (rev 7889)
+++ branches/async-client/src/freenet/client/RetryTracker.java 2006-01-20
19:46:21 UTC (rev 7890)
@@ -26,16 +26,16 @@
* Return a random block.
* Call synchronized on RetryTracker.
*/
- SplitfileBlock getBlock() {
+ StartableSplitfileBlock getBlock() {
int len = blocks.size();
int x = random.nextInt(len);
- SplitfileBlock block = (SplitfileBlock)
blocks.remove(x);
+ StartableSplitfileBlock block =
(StartableSplitfileBlock) blocks.remove(x);
if(blocks.isEmpty())
removeLevel(level);
return block;
}
- void add(SplitfileBlock block) {
+ void add(StartableSplitfileBlock block) {
blocks.add(block);
}
@@ -44,7 +44,7 @@
* Remove self if run out of blocks.
* Call synchronized on RetryTracker.
*/
- void remove(SplitfileBlock block) {
+ void remove(StartableSplitfileBlock block) {
blocks.remove(block);
if(blocks.isEmpty())
removeLevel(level);
@@ -167,7 +167,7 @@
/**
* Add a block at retry level zero.
*/
- public synchronized void addBlock(SplitfileBlock block) {
+ public synchronized void addBlock(StartableSplitfileBlock block) {
if(killed) return;
Level l = makeLevel(0);
l.add(block);
@@ -179,7 +179,7 @@
* Move it out of the running list and back into the relevant list,
unless
* we have run out of retries.
*/
- public void nonfatalError(SplitfileBlock block, int reasonCode) {
+ public void nonfatalError(StartableSplitfileBlock block, int
reasonCode) {
synchronized(this) {
nonfatalErrors.inc(reasonCode);
runningBlocks.remove(block);
@@ -204,7 +204,7 @@
* Move it into the fatal error list.
* @param reasonCode A client-specific code indicating the type of
failure.
*/
- public void fatalError(SplitfileBlock block, int reasonCode) {
+ public void fatalError(StartableSplitfileBlock block, int reasonCode) {
synchronized(this) {
fatalErrors.inc(reasonCode);
runningBlocks.remove(block);
@@ -238,7 +238,7 @@
|| (runningBlocks.isEmpty() && levels.isEmpty()
&& finishOnEmpty)) {
killed = true;
Logger.minor(this, "Finishing");
- SplitfileBlock[] running = runningBlocks();
+ StartableSplitfileBlock[] running = runningBlocks();
for(int i=0;i<running.length;i++) {
running[i].kill();
}
@@ -253,7 +253,7 @@
}
} else {
while(runningBlocks.size() < maxThreads) {
- SplitfileBlock block = getBlock();
+ StartableSplitfileBlock block = getBlock();
if(block == null) break;
Logger.minor(this, "Starting: "+block);
block.start();
@@ -265,7 +265,7 @@
callback.finished(succeededBlocks(), failedBlocks(),
fatalErrorBlocks());
}
- public void success(SplitfileBlock block) {
+ public void success(StartableSplitfileBlock block) {
synchronized(this) {
if(killed) return;
runningBlocks.remove(block);
@@ -284,7 +284,7 @@
* Get the next block to try. This is a randomly selected block from the
* lowest priority currently available. Move it into the running list.
*/
- public synchronized SplitfileBlock getBlock() {
+ public synchronized StartableSplitfileBlock getBlock() {
if(killed) return null;
Integer iMin = new Integer(curMinLevel);
Level l = (Level) levels.get(iMin);
@@ -309,34 +309,34 @@
/**
* Get all running blocks.
*/
- public synchronized SplitfileBlock[] runningBlocks() {
- return (SplitfileBlock[])
- runningBlocks.toArray(new
SplitfileBlock[runningBlocks.size()]);
+ public synchronized StartableSplitfileBlock[] runningBlocks() {
+ return (StartableSplitfileBlock[])
+ runningBlocks.toArray(new
StartableSplitfileBlock[runningBlocks.size()]);
}
/**
* Get all blocks with fatal errors.
- * SplitfileBlock's are assumed to remember their errors, so we don't.
+ * StartableSplitfileBlock's are assumed to remember their errors, so
we don't.
*/
- public synchronized SplitfileBlock[] fatalErrorBlocks() {
- return (SplitfileBlock[])
- failedBlocksFatalErrors.toArray(new
SplitfileBlock[failedBlocksFatalErrors.size()]);
+ public synchronized StartableSplitfileBlock[] fatalErrorBlocks() {
+ return (StartableSplitfileBlock[])
+ failedBlocksFatalErrors.toArray(new
StartableSplitfileBlock[failedBlocksFatalErrors.size()]);
}
/**
* Get all blocks which didn't succeed in the maximum number of tries.
*/
- public synchronized SplitfileBlock[] failedBlocks() {
- return (SplitfileBlock[])
- failedBlocksTooManyRetries.toArray(new
SplitfileBlock[failedBlocksTooManyRetries.size()]);
+ public synchronized StartableSplitfileBlock[] failedBlocks() {
+ return (StartableSplitfileBlock[])
+ failedBlocksTooManyRetries.toArray(new
StartableSplitfileBlock[failedBlocksTooManyRetries.size()]);
}
/**
* Get all successfully downloaded blocks.
*/
- public synchronized SplitfileBlock[] succeededBlocks() {
- return (SplitfileBlock[])
- succeededBlocks.toArray(new
SplitfileBlock[succeededBlocks.size()]);
+ public synchronized StartableSplitfileBlock[] succeededBlocks() {
+ return (StartableSplitfileBlock[])
+ succeededBlocks.toArray(new
StartableSplitfileBlock[succeededBlocks.size()]);
}
public synchronized int succeededBlocksLength() {
@@ -384,7 +384,7 @@
killed = true;
levels.clear();
for(Iterator i=runningBlocks.iterator();i.hasNext();) {
- SplitfileBlock sb = (SplitfileBlock) i.next();
+ StartableSplitfileBlock sb = (StartableSplitfileBlock)
i.next();
sb.kill();
}
runningBlocks.clear();
Modified: branches/async-client/src/freenet/client/RetryTrackerCallback.java
===================================================================
--- branches/async-client/src/freenet/client/RetryTrackerCallback.java
2006-01-20 19:42:14 UTC (rev 7889)
+++ branches/async-client/src/freenet/client/RetryTrackerCallback.java
2006-01-20 19:46:21 UTC (rev 7890)
@@ -11,7 +11,7 @@
* @param failed The blocks which failed.
* @param fatalErrors The blocks which got fatal errors.
*/
- void finished(SplitfileBlock[] succeeded, SplitfileBlock[] failed,
SplitfileBlock[] fatalErrors);
+ void finished(StartableSplitfileBlock[] succeeded,
StartableSplitfileBlock[] failed, StartableSplitfileBlock[] fatalErrors);
/**
* When a block completes etc.
Modified: branches/async-client/src/freenet/client/Segment.java
===================================================================
--- branches/async-client/src/freenet/client/Segment.java 2006-01-20
19:42:14 UTC (rev 7889)
+++ branches/async-client/src/freenet/client/Segment.java 2006-01-20
19:46:21 UTC (rev 7890)
@@ -24,7 +24,6 @@
final BlockFetcher[] dataBlockStatus;
final BlockFetcher[] checkBlockStatus;
final int minFetched;
- private Vector blocksNotTried;
final SplitFetcher parentFetcher;
final ArchiveContext archiveContext;
final FetcherContext fetcherContext;
@@ -36,10 +35,6 @@
private boolean finished;
/** Bucket to store the data retrieved, after it has been decoded */
private Bucket decodedData;
- /** Recently completed fetches */
- final LinkedList recentlyCompletedFetches;
- /** Running fetches */
- LinkedList runningFetches;
/** Fetch context for block fetches */
final FetcherContext blockFetchContext;
/** Recursion level */
@@ -76,9 +71,7 @@
decodedData = null;
dataBlockStatus = new BlockFetcher[dataBlocks.length];
checkBlockStatus = new BlockFetcher[checkBlocks.length];
- blocksNotTried = new Vector();
Vector firstSet = new
Vector(dataBlocks.length+checkBlocks.length);
- blocksNotTried.add(0, firstSet);
for(int i=0;i<dataBlocks.length;i++) {
dataBlockStatus[i] = new BlockFetcher(this, tracker,
dataBlocks[i], i, fctx.dontEnterImplicitArchives);
firstSet.add(dataBlockStatus[i]);
@@ -87,8 +80,6 @@
checkBlockStatus[i] = new BlockFetcher(this, tracker,
checkBlocks[i], dataBlockStatus.length + i, fctx.dontEnterImplicitArchives);
firstSet.add(checkBlockStatus[i]);
}
- recentlyCompletedFetches = new LinkedList();
- runningFetches = new LinkedList();
// FIXME be a bit more flexible here depending on flags
if(useLengths) {
blockFetchContext = new FetcherContext(fetcherContext,
FetcherContext.SPLITFILE_USE_LENGTHS_MASK);
@@ -165,18 +156,9 @@
}
/**
- * How many fetches are running?
- */
- private int runningFetches() {
- synchronized(runningFetches) {
- return runningFetches.size();
- }
- }
-
- /**
* Once we have enough data to decode, tell parent, and decode it.
*/
- public void finished(SplitfileBlock[] succeeded, SplitfileBlock[]
failed, SplitfileBlock[] fatalErrors) {
+ public void finished(StartableSplitfileBlock[] succeeded,
StartableSplitfileBlock[] failed, StartableSplitfileBlock[] fatalErrors) {
Logger.minor(this, "Finished("+succeeded.length+",
"+failed.length+", "+fatalErrors.length+")");
parentFetcher.gotBlocks(this);
Modified: branches/async-client/src/freenet/client/SplitFetcher.java
===================================================================
--- branches/async-client/src/freenet/client/SplitFetcher.java 2006-01-20
19:42:14 UTC (rev 7889)
+++ branches/async-client/src/freenet/client/SplitFetcher.java 2006-01-20
19:46:21 UTC (rev 7890)
@@ -4,9 +4,6 @@
import java.io.OutputStream;
import java.util.Vector;
-import com.onionnetworks.fec.FECCode;
-import com.onionnetworks.fec.FECCodeFactory;
-
import freenet.client.events.SplitfileProgressEvent;
import freenet.keys.FreenetURI;
import freenet.keys.NodeCHK;
@@ -19,14 +16,6 @@
*/
public class SplitFetcher {
- // 128/192. Crazy, but it's possible we'd get big erasures.
- static final int ONION_STD_K = 128;
- static final int ONION_STD_N = 192;
-
- /** The standard onion codec */
- static FECCode onionStandardCode =
-
FECCodeFactory.getDefault().createFECCode(ONION_STD_K,ONION_STD_N);
-
/** The splitfile type. See the SPLITFILE_ constants on Metadata. */
final short splitfileType;
/** The segment length. -1 means not segmented and must get everything
to decode. */
Modified: branches/async-client/src/freenet/client/SplitInserter.java
===================================================================
--- branches/async-client/src/freenet/client/SplitInserter.java 2006-01-20
19:42:14 UTC (rev 7889)
+++ branches/async-client/src/freenet/client/SplitInserter.java 2006-01-20
19:46:21 UTC (rev 7890)
@@ -30,7 +30,7 @@
final int blockSize;
final boolean isMetadata;
final Bucket returnMetadata;
- SplitfileBlock[] origDataBlocks;
+ StartableSplitfileBlock[] origDataBlocks;
InsertSegment encodingSegment;
InsertSegment[] segments;
private boolean finishedInserting = false;
@@ -39,7 +39,7 @@
private int failed;
private int fatalErrors;
private int countCheckBlocks;
- private SplitfileBlock[] fatalErrorBlocks;
+ private StartableSplitfileBlock[] fatalErrorBlocks;
private FileInserter inserter;
/**
@@ -232,7 +232,7 @@
*/
private void splitIntoBlocks() throws IOException {
Bucket[] dataBuckets = BucketTools.split(origData,
NodeCHK.BLOCK_SIZE, ctx.bf);
- origDataBlocks = new SplitfileBlock[dataBuckets.length];
+ origDataBlocks = new
StartableSplitfileBlock[dataBuckets.length];
for(int i=0;i<origDataBlocks.length;i++) {
origDataBlocks[i] = new BlockInserter(dataBuckets[i],
i, tracker, ctx, getCHKOnly);
if(origDataBlocks[i].getData() == null)
@@ -259,7 +259,7 @@
int segNo = 0;
for(int i=segmentSize;;i+=segmentSize) {
if(i > dataBlocks) i = dataBlocks;
- SplitfileBlock[] seg = new SplitfileBlock[i-j];
+ StartableSplitfileBlock[] seg = new
StartableSplitfileBlock[i-j];
System.arraycopy(origDataBlocks, j, seg, 0,
i-j);
j = i;
for(int x=0;x<seg.length;x++)
@@ -275,7 +275,7 @@
segments = (InsertSegment[]) segs.toArray(new
InsertSegment[segs.size()]);
}
- public void finished(SplitfileBlock[] succeeded, SplitfileBlock[]
failed, SplitfileBlock[] fatalErrors) {
+ public void finished(StartableSplitfileBlock[] succeeded,
StartableSplitfileBlock[] failed, StartableSplitfileBlock[] fatalErrors) {
synchronized(this) {
finishedInserting = true;
this.succeeded = succeeded.length;
Modified: branches/async-client/src/freenet/client/SplitfileBlock.java
===================================================================
--- branches/async-client/src/freenet/client/SplitfileBlock.java
2006-01-20 19:42:14 UTC (rev 7889)
+++ branches/async-client/src/freenet/client/SplitfileBlock.java
2006-01-20 19:46:21 UTC (rev 7890)
@@ -1,11 +1,9 @@
package freenet.client;
-import freenet.client.RetryTracker.Level;
import freenet.keys.FreenetURI;
import freenet.support.Bucket;
-/** Simple interface for a splitfile block */
-public abstract class SplitfileBlock {
+public interface SplitfileBlock {
/** Get block number. [0,k[ = data blocks, [k, n[ = check blocks */
abstract int getNumber();
@@ -19,20 +17,5 @@
/** Set data */
abstract void setData(Bucket data);
- /** Start the fetch (or insert). Implementation is required to call
relevant
- * methods on RetryTracker when done. */
- abstract void start();
- /**
- * Shut down the fetch as soon as reasonably possible.
- */
- abstract public void kill();
-
- /**
- * Get the URI of the file. For an insert, this is derived during
insert.
- * For a request, it is fixed in the constructor.
- */
- abstract public FreenetURI getURI();
-
- abstract public int getRetryCount();
}
Added: branches/async-client/src/freenet/client/StartableSplitfileBlock.java
===================================================================
--- branches/async-client/src/freenet/client/StartableSplitfileBlock.java
2006-01-20 19:42:14 UTC (rev 7889)
+++ branches/async-client/src/freenet/client/StartableSplitfileBlock.java
2006-01-20 19:46:21 UTC (rev 7890)
@@ -0,0 +1,25 @@
+package freenet.client;
+
+import freenet.keys.FreenetURI;
+
+/** Simple interface for a splitfile block */
+public interface StartableSplitfileBlock extends SplitfileBlock {
+
+ /** Start the fetch (or insert). Implementation is required to call
relevant
+ * methods on RetryTracker when done. */
+ abstract void start();
+
+ /**
+ * Shut down the fetch as soon as reasonably possible.
+ */
+ abstract public void kill();
+
+ abstract public int getRetryCount();
+
+ /**
+ * Get the URI of the file. For an insert, this is derived during
insert.
+ * For a request, it is fixed in the constructor.
+ */
+ abstract public FreenetURI getURI();
+
+}
Modified: branches/async-client/src/freenet/client/StdSplitfileBlock.java
===================================================================
--- branches/async-client/src/freenet/client/StdSplitfileBlock.java
2006-01-20 19:42:14 UTC (rev 7889)
+++ branches/async-client/src/freenet/client/StdSplitfileBlock.java
2006-01-20 19:46:21 UTC (rev 7890)
@@ -3,7 +3,7 @@
import freenet.support.Bucket;
import freenet.support.Logger;
-public abstract class StdSplitfileBlock extends SplitfileBlock implements
Runnable {
+public abstract class StdSplitfileBlock implements StartableSplitfileBlock ,
Runnable {
Bucket fetchedData;
protected final RetryTracker tracker;
Added: branches/async-client/src/freenet/client/async/Client.java
===================================================================
--- branches/async-client/src/freenet/client/async/Client.java 2006-01-20
19:42:14 UTC (rev 7889)
+++ branches/async-client/src/freenet/client/async/Client.java 2006-01-20
19:46:21 UTC (rev 7890)
@@ -0,0 +1,17 @@
+package freenet.client.async;
+
+import freenet.client.FetchException;
+import freenet.client.FetchResult;
+
+/**
+ * A client process. Something that initiates requests, and can cancel
+ * them. FCP, Fproxy, and the GlobalPersistentClient, implement this
+ * somewhere.
+ */
+public interface Client {
+
+ public void onSuccess(FetchResult result, ClientGet state);
+
+ public void onFailure(FetchException e, ClientGet state);
+
+}
Added: branches/async-client/src/freenet/client/async/ClientGet.java
===================================================================
--- branches/async-client/src/freenet/client/async/ClientGet.java
2006-01-20 19:42:14 UTC (rev 7889)
+++ branches/async-client/src/freenet/client/async/ClientGet.java
2006-01-20 19:46:21 UTC (rev 7890)
@@ -0,0 +1,79 @@
+package freenet.client.async;
+
+import java.net.MalformedURLException;
+
+import freenet.client.ArchiveContext;
+import freenet.client.ClientMetadata;
+import freenet.client.FetchException;
+import freenet.client.FetchResult;
+import freenet.client.FetcherContext;
+import freenet.keys.FreenetURI;
+
+/**
+ * A high level data request.
+ */
+public class ClientGet extends ClientRequest implements
RequestCompletionCallback {
+
+ final Client client;
+ final FreenetURI uri;
+ final FetcherContext ctx;
+ final ArchiveContext actx;
+ final ClientRequestScheduler scheduler;
+ ClientGetState fetchState;
+ private boolean finished;
+ private boolean cancelled;
+ final int priorityClass;
+ private int archiveRestarts;
+
+ public ClientGet(Client client, ClientRequestScheduler sched,
FreenetURI uri, FetcherContext ctx, short priorityClass) {
+ super(priorityClass);
+ this.client = client;
+ this.uri = uri;
+ this.ctx = ctx;
+ this.scheduler = sched;
+ this.finished = false;
+ this.actx = new ArchiveContext();
+ this.priorityClass = priorityClass;
+ archiveRestarts = 0;
+ start();
+ }
+
+ private void start() {
+ try {
+ fetchState = new SingleFileFetcher(this, this, new
ClientMetadata(), uri, ctx, actx, priorityClass, 0, false, null);
+ fetchState.schedule();
+ } catch (MalformedURLException e) {
+ onFailure(new
FetchException(FetchException.INVALID_URI, e), null);
+ } catch (FetchException e) {
+ onFailure(e, null);
+ }
+ }
+
+ public void cancel() {
+ cancelled = true;
+ }
+
+ public boolean isCancelled() {
+ return cancelled;
+ }
+
+ public void onSuccess(FetchResult result, ClientGetState state) {
+ finished = true;
+ client.onSuccess(result, this);
+ }
+
+ public void onFailure(FetchException e, ClientGetState state) {
+ if(e.mode == FetchException.ARCHIVE_RESTART) {
+ archiveRestarts++;
+ if(archiveRestarts > ctx.maxArchiveRestarts)
+ e = new
FetchException(FetchException.TOO_MANY_ARCHIVE_RESTARTS);
+ else {
+ start();
+ return;
+ }
+ }
+ finished = true;
+ client.onFailure(e, this);
+ }
+
+}
Added: branches/async-client/src/freenet/client/async/ClientGetState.java
===================================================================
--- branches/async-client/src/freenet/client/async/ClientGetState.java
2006-01-20 19:42:14 UTC (rev 7889)
+++ branches/async-client/src/freenet/client/async/ClientGetState.java
2006-01-20 19:46:21 UTC (rev 7890)
@@ -0,0 +1,16 @@
+package freenet.client.async;
+
+/**
+ * A ClientGetState.
+ * Represents a stage in the fetch process.
+ */
+public abstract class ClientGetState {
+
+ public abstract ClientGet getParent();
+
+ public void schedule() {
+ // TODO Auto-generated method stub
+
+ }
+
+}
Added: branches/async-client/src/freenet/client/async/ClientRequest.java
===================================================================
--- branches/async-client/src/freenet/client/async/ClientRequest.java
2006-01-20 19:42:14 UTC (rev 7889)
+++ branches/async-client/src/freenet/client/async/ClientRequest.java
2006-01-20 19:46:21 UTC (rev 7890)
@@ -0,0 +1,23 @@
+package freenet.client.async;
+
+/** A high level client request. A request (either fetch or put) started
+ * by a Client. Has a suitable context and a URI; is fulfilled only when
+ * we have followed all the redirects etc, or have an error. Can be
+ * retried.
+ */
+public abstract class ClientRequest {
+
+ // FIXME move the priority classes from RequestStarter here
+ private short priorityClass;
+
+ public short getPriorityClass() {
+ return priorityClass;
+ }
+
+ protected ClientRequest(short priorityClass) {
+ this.priorityClass = priorityClass;
+ }
+
+ public abstract void cancel();
+
+}
Added:
branches/async-client/src/freenet/client/async/ClientRequestScheduler.java
===================================================================
--- branches/async-client/src/freenet/client/async/ClientRequestScheduler.java
2006-01-20 19:42:14 UTC (rev 7889)
+++ branches/async-client/src/freenet/client/async/ClientRequestScheduler.java
2006-01-20 19:46:21 UTC (rev 7890)
@@ -0,0 +1,22 @@
+package freenet.client.async;
+
+/**
+ * Every X seconds, the RequestSender calls the ClientRequestScheduler to
+ * ask for a request to start. A request is then started, in its own
+ * thread. It is removed at that point.
+ */
+public class ClientRequestScheduler {
+
+ public void register(SendableRequest req) {
+ // FIXME
+ }
+
+ public void remove(SendableRequest sr) {
+ // FIXME
+ }
+
+ public void update(SendableRequest sr) {
+ // FIXME
+ }
+
+}
Added: branches/async-client/src/freenet/client/async/MinimalSplitfileBlock.java
===================================================================
--- branches/async-client/src/freenet/client/async/MinimalSplitfileBlock.java
2006-01-20 19:42:14 UTC (rev 7889)
+++ branches/async-client/src/freenet/client/async/MinimalSplitfileBlock.java
2006-01-20 19:46:21 UTC (rev 7890)
@@ -0,0 +1,32 @@
+package freenet.client.async;
+
+import freenet.client.SplitfileBlock;
+import freenet.keys.FreenetURI;
+import freenet.support.Bucket;
+
+public class MinimalSplitfileBlock implements SplitfileBlock {
+
+ public final int number;
+ Bucket data;
+
+ public MinimalSplitfileBlock(int n) {
+ this.number = n;
+ }
+
+ public int getNumber() {
+ return number;
+ }
+
+ public boolean hasData() {
+ return data != null;
+ }
+
+ public Bucket getData() {
+ return data;
+ }
+
+ public void setData(Bucket data) {
+ this.data = data;
+ }
+
+}
Added:
branches/async-client/src/freenet/client/async/RequestCompletionCallback.java
===================================================================
---
branches/async-client/src/freenet/client/async/RequestCompletionCallback.java
2006-01-20 19:42:14 UTC (rev 7889)
+++
branches/async-client/src/freenet/client/async/RequestCompletionCallback.java
2006-01-20 19:46:21 UTC (rev 7890)
@@ -0,0 +1,16 @@
+package freenet.client.async;
+
+import freenet.client.FetchException;
+import freenet.client.FetchResult;
+
+/**
+ * Callback called when part of a get request completes - either with a
+ * Bucket full of data, or with an error.
+ */
+public interface RequestCompletionCallback {
+
+ public void onSuccess(FetchResult result, ClientGetState state);
+
+ public void onFailure(FetchException e, ClientGetState state);
+
+}
Added: branches/async-client/src/freenet/client/async/SendableRequest.java
===================================================================
--- branches/async-client/src/freenet/client/async/SendableRequest.java
2006-01-20 19:42:14 UTC (rev 7889)
+++ branches/async-client/src/freenet/client/async/SendableRequest.java
2006-01-20 19:46:21 UTC (rev 7890)
@@ -0,0 +1,25 @@
+package freenet.client.async;
+
+import freenet.keys.ClientKey;
+import freenet.keys.ClientKeyBlock;
+import freenet.node.LowLevelPutException;
+
+/**
+ * A low-level request which can be sent immediately. These are registered
+ * on the ClientRequestScheduler.
+ */
+public interface SendableRequest {
+
+ public ClientKey getKey();
+
+ public short getPriorityClass();
+
+ public int getRetryCount();
+
+ /** Called when/if the low-level request succeeds. */
+ public void onSuccess(ClientKeyBlock block);
+
+ /** Called when/if the low-level request fails. */
+ public void onFailure(LowLevelPutException e);
+
+}
Added: branches/async-client/src/freenet/client/async/SingleFileFetcher.java
===================================================================
--- branches/async-client/src/freenet/client/async/SingleFileFetcher.java
2006-01-20 19:42:14 UTC (rev 7889)
+++ branches/async-client/src/freenet/client/async/SingleFileFetcher.java
2006-01-20 19:46:21 UTC (rev 7890)
@@ -0,0 +1,457 @@
+package freenet.client.async;
+
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.util.LinkedList;
+
+import freenet.client.ArchiveContext;
+import freenet.client.ArchiveFailureException;
+import freenet.client.ArchiveRestartException;
+import freenet.client.ArchiveStoreContext;
+import freenet.client.ClientMetadata;
+import freenet.client.FetchException;
+import freenet.client.FetchResult;
+import freenet.client.FetcherContext;
+import freenet.client.Metadata;
+import freenet.client.MetadataParseException;
+import freenet.keys.ClientKey;
+import freenet.keys.ClientKeyBlock;
+import freenet.keys.FreenetURI;
+import freenet.keys.KeyDecodeException;
+import freenet.node.LowLevelGetException;
+import freenet.node.LowLevelPutException;
+import freenet.support.Bucket;
+import freenet.support.Logger;
+import freenet.support.compress.CompressionOutputSizeException;
+import freenet.support.compress.Compressor;
+
+public class SingleFileFetcher extends ClientGetState implements
SendableRequest {
+
+ final ClientGet parent;
+ //final FreenetURI uri;
+ final ClientKey key;
+ final LinkedList metaStrings;
+ final FetcherContext ctx;
+ final RequestCompletionCallback rcb;
+ final ClientMetadata clientMetadata;
+ private Metadata metadata;
+ final int maxRetries;
+ final ArchiveContext actx;
+ /** Archive handler. We can only have one archive handler at a time. */
+ private ArchiveStoreContext ah;
+ private int recursionLevel;
+ /** The URI of the currently-being-processed data, for archives etc. */
+ private FreenetURI thisKey;
+ private int retryCount;
+ private final LinkedList decompressors;
+ private final boolean dontTellClientGet;
+ private Object token;
+
+
+ /** Create a new SingleFileFetcher and register self.
+ * Called when following a redirect, or direct from ClientGet.
+ * @param token
+ * @param dontTellClientGet
+ */
+ public SingleFileFetcher(ClientGet get, RequestCompletionCallback cb,
ClientMetadata metadata, ClientKey key, LinkedList metaStrings, FetcherContext
ctx, ArchiveContext actx, int maxRetries, int recursionLevel, boolean
dontTellClientGet, Object token) throws FetchException {
+ this.dontTellClientGet = dontTellClientGet;
+ this.token = token;
+ this.parent = get;
+ //this.uri = uri;
+ //this.key = ClientKey.getBaseKey(uri);
+ //metaStrings = uri.listMetaStrings();
+ this.key = key;
+ this.metaStrings = metaStrings;
+ this.ctx = ctx;
+ retryCount = 0;
+ this.rcb = cb;
+ this.clientMetadata = metadata;
+ this.maxRetries = maxRetries;
+ thisKey = key.getURI();
+ this.actx = actx;
+ this.recursionLevel = recursionLevel + 1;
+ if(recursionLevel > ctx.maxRecursionLevel)
+ throw new
FetchException(FetchException.TOO_MUCH_RECURSION);
+ this.decompressors = new LinkedList();
+ }
+
+ /** Called by ClientGet. */
+ public SingleFileFetcher(ClientGet get, RequestCompletionCallback cb,
ClientMetadata metadata, FreenetURI uri, FetcherContext ctx, ArchiveContext
actx, int maxRetries, int recursionLevel, boolean dontTellClientGet, Object
token) throws MalformedURLException, FetchException {
+ this(get, cb, metadata, ClientKey.getBaseKey(uri),
uri.listMetaStrings(), ctx, actx, maxRetries, recursionLevel,
dontTellClientGet, token);
+ }
+
+ /** Copy constructor, modifies a few given fields, don't call
schedule() */
+ public SingleFileFetcher(SingleFileFetcher fetcher, Metadata newMeta,
RequestCompletionCallback callback, FetcherContext ctx2) throws FetchException {
+ this.token = fetcher.token;
+ this.dontTellClientGet = fetcher.dontTellClientGet;
+ this.actx = fetcher.actx;
+ this.ah = fetcher.ah;
+ this.clientMetadata = fetcher.clientMetadata;
+ this.ctx = ctx2;
+ this.key = fetcher.key;
+ this.maxRetries = fetcher.maxRetries;
+ this.metadata = newMeta;
+ this.metaStrings = fetcher.metaStrings;
+ this.parent = fetcher.parent;
+ this.rcb = callback;
+ this.retryCount = 0;
+ this.recursionLevel = fetcher.recursionLevel + 1;
+ if(recursionLevel > ctx.maxRecursionLevel)
+ throw new
FetchException(FetchException.TOO_MUCH_RECURSION);
+ this.thisKey = fetcher.thisKey;
+ this.decompressors = fetcher.decompressors;
+ }
+
+ public void schedule() {
+ if(!dontTellClientGet)
+ this.parent.fetchState = this;
+ parent.scheduler.register(this);
+ }
+
+ public ClientGet getParent() {
+ return parent;
+ }
+
+ public ClientKey getKey() {
+ return key;
+ }
+
+ public short getPriorityClass() {
+ return parent.getPriorityClass();
+ }
+
+ public int getRetryCount() {
+ return retryCount;
+ }
+
+ // Process the completed data. May result in us going to a
+ // splitfile, or another SingleFileFetcher, etc.
+ public void onSuccess(ClientKeyBlock block) {
+ // Extract data
+ Bucket data;
+ try {
+ data = block.decode(ctx.bucketFactory,
(int)(Math.min(ctx.maxOutputLength, Integer.MAX_VALUE)));
+ } catch (KeyDecodeException e1) {
+ onFailure(new
FetchException(FetchException.BLOCK_DECODE_ERROR, e1.getMessage()));
+ return;
+ } catch (IOException e) {
+ Logger.error(this, "Could not capture data - disk
full?: "+e, e);
+ onFailure(new
FetchException(FetchException.BUCKET_ERROR, e));
+ return;
+ }
+ if(!block.isMetadata()) {
+ onSuccess(new FetchResult(clientMetadata, data));
+ } else {
+ if(!ctx.followRedirects) {
+ onFailure(new
FetchException(FetchException.INVALID_METADATA, "Told me not to follow
redirects (splitfile block??)"));
+ return;
+ }
+ if(parent.isCancelled()) {
+ onFailure(new
FetchException(FetchException.CANCELLED));
+ return;
+ }
+ if(data.size() > ctx.maxMetadataSize) {
+ onFailure(new
FetchException(FetchException.TOO_BIG_METADATA));
+ return;
+ }
+ // Parse metadata
+ try {
+ metadata = Metadata.construct(data);
+ } catch (MetadataParseException e) {
+ onFailure(new FetchException(e));
+ return;
+ } catch (IOException e) {
+ // Bucket error?
+ onFailure(new
FetchException(FetchException.BUCKET_ERROR, e));
+ return;
+ }
+ try {
+ handleMetadata();
+ } catch (MetadataParseException e) {
+ onFailure(new FetchException(e));
+ return;
+ } catch (FetchException e) {
+ onFailure(e);
+ return;
+ } catch (ArchiveFailureException e) {
+ onFailure(new FetchException(e));
+ } catch (ArchiveRestartException e) {
+ onFailure(new FetchException(e));
+ }
+ }
+ }
+
+ private void onSuccess(FetchResult result) {
+ if(!decompressors.isEmpty()) {
+ Bucket data = result.asBucket();
+ while(!decompressors.isEmpty()) {
+ Compressor c = (Compressor)
decompressors.removeLast();
+ try {
+ data = c.decompress(data,
ctx.bucketFactory, Math.max(ctx.maxTempLength, ctx.maxOutputLength));
+ } catch (IOException e) {
+ onFailure(new
FetchException(FetchException.BUCKET_ERROR, e));
+ return;
+ } catch (CompressionOutputSizeException e) {
+ onFailure(new
FetchException(FetchException.TOO_BIG, e));
+ return;
+ }
+ }
+ result = new FetchResult(result, data);
+ }
+ rcb.onSuccess(result, this);
+ }
+
+ private void handleMetadata() throws FetchException,
MetadataParseException, ArchiveFailureException, ArchiveRestartException {
+ while(true) {
+ if(metadata.isSimpleManifest()) {
+ String name;
+ if(metaStrings.isEmpty())
+ name = null;
+ else
+ name = (String)
metaStrings.removeFirst();
+ // Since metadata is a document, we just
replace metadata here
+ if(name == null) {
+ metadata =
metadata.getDefaultDocument();
+ if(metadata == null)
+ throw new
FetchException(FetchException.NOT_ENOUGH_METASTRINGS);
+ } else {
+ metadata = metadata.getDocument(name);
+ thisKey = thisKey.pushMetaString(name);
+ if(metadata == null)
+ throw new
FetchException(FetchException.NOT_IN_ARCHIVE);
+ }
+ continue; // loop
+ } else if(metadata.isArchiveManifest()) {
+ if(metaStrings.isEmpty() &&
ctx.returnZIPManifests) {
+ // Just return the archive, whole.
+ metadata.setSimpleRedirect();
+ continue;
+ }
+ // First we need the archive metadata.
+ // Then parse it.
+ // Then we may need to fetch something from
inside the archive.
+ ah = (ArchiveStoreContext)
ctx.archiveManager.makeHandler(thisKey, metadata.getArchiveType(), false);
+ // ah is set. This means we are currently
handling an archive.
+ Bucket metadataBucket;
+ metadataBucket = ah.getMetadata(actx, null,
null, recursionLevel+1, true);
+ if(metadataBucket != null) {
+ try {
+ metadata =
Metadata.construct(metadataBucket);
+ } catch (IOException e) {
+ // Bucket error?
+ throw new
FetchException(FetchException.BUCKET_ERROR, e);
+ }
+ } else {
+ fetchArchive(); // will result in this
function being called again
+ return;
+ }
+ continue;
+ } else if(metadata.isArchiveInternalRedirect()) {
+
clientMetadata.mergeNoOverwrite(metadata.getClientMetadata()); // even
splitfiles can have mime types!
+ // Fetch it from the archive
+ if(ah == null)
+ throw new
FetchException(FetchException.UNKNOWN_METADATA, "Archive redirect not in an
archive");
+ if(metaStrings.isEmpty())
+ throw new
FetchException(FetchException.NOT_ENOUGH_METASTRINGS);
+ Bucket dataBucket = ah.get((String)
metaStrings.removeFirst(), actx, null, null, recursionLevel+1, true);
+ if(dataBucket != null) {
+ // Return the data
+ onSuccess(new
FetchResult(this.clientMetadata, dataBucket));
+ return;
+ } else {
+ // Metadata cannot contain pointers to
files which don't exist.
+ // We enforce this in ArchiveHandler.
+ // Therefore, the archive needs to be
fetched.
+ fetchArchive();
+ // Will call back into this function
when it has been fetched.
+ return;
+ }
+ } else if(metadata.isMultiLevelMetadata()) {
+ // Fetch on a second SingleFileFetcher, like
with archives.
+ Metadata newMeta = (Metadata) metadata.clone();
+ metadata.setSimpleRedirect();
+ SingleFileFetcher f = new
SingleFileFetcher(this, newMeta, new MultiLevelMetadataCallback(), ctx);
+ f.handleMetadata();
+ return;
+ } else if(metadata.isSingleFileRedirect()) {
+
clientMetadata.mergeNoOverwrite(metadata.getClientMetadata()); // even
splitfiles can have mime types!
+ // FIXME implement implicit archive support
+
+ // Simple redirect
+ // Just create a new SingleFileFetcher
+ // Which will then fetch the target URI, and
call the rcd.success
+ // Hopefully!
+ FreenetURI uri = metadata.getSingleTarget();
+ ClientKey key;
+ try {
+ key = ClientKey.getBaseKey(uri);
+ } catch (MalformedURLException e) {
+ throw new
FetchException(FetchException.INVALID_URI, e);
+ }
+ LinkedList newMetaStrings =
uri.listMetaStrings();
+
+ // Move any new meta strings to beginning of
our list of remaining meta strings
+ while(!newMetaStrings.isEmpty()) {
+ Object o = newMetaStrings.removeLast();
+ metaStrings.addFirst(o);
+ }
+
+ SingleFileFetcher f = new
SingleFileFetcher(parent, rcb, clientMetadata, key, metaStrings, ctx, actx,
maxRetries, recursionLevel, false, null);
+ if(metadata.isCompressed()) {
+ Compressor codec =
Compressor.getCompressionAlgorithmByMetadataID(metadata.getCompressionCodec());
+ f.addDecompressor(codec);
+ }
+ f.schedule();
+ // All done! No longer our problem!
+ return;
+ } else if(metadata.isSplitfile()) {
+ // FIXME implicit archive support
+
+
clientMetadata.mergeNoOverwrite(metadata.getClientMetadata()); // even
splitfiles can have mime types!
+
+ // Splitfile (possibly compressed)
+
+ if(metadata.isCompressed()) {
+ Compressor codec =
Compressor.getCompressionAlgorithmByMetadataID(metadata.getCompressionCodec());
+ addDecompressor(codec);
+ }
+
+ SplitFileFetcher sf = new
SplitFileFetcher(metadata, rcb, parent, ctx,
+ decompressors, clientMetadata,
actx, recursionLevel);
+ // SplitFile will now run.
+ // Then it will return data to rcd.
+ // We are now out of the loop. Yay!
+ return;
+ } else {
+ Logger.error(this, "Don't know what to do with
metadata: "+metadata);
+ throw new
FetchException(FetchException.UNKNOWN_METADATA);
+ }
+ }
+ }
+
+ private void addDecompressor(Compressor codec) {
+ decompressors.addLast(codec);
+ }
+
+ private void fetchArchive() throws FetchException,
MetadataParseException, ArchiveFailureException, ArchiveRestartException {
+ // Fetch the archive
+ // How?
+ // Spawn a separate SingleFileFetcher,
+ // which fetches the archive, then calls
+ // our Callback, which unpacks the archive, then
+ // reschedules us.
+ Metadata newMeta = (Metadata) metadata.clone();
+ newMeta.setSimpleRedirect();
+ SingleFileFetcher f;
+ f = new SingleFileFetcher(this, newMeta, new
ArchiveFetcherCallback(), new FetcherContext(ctx,
FetcherContext.SET_RETURN_ARCHIVES));
+ f.handleMetadata();
+ // When it is done (if successful), the ArchiveCallback will
re-call this function.
+ // Which will then discover that the metadata *is* available.
+ // And will also discover that the data is available, and will
complete.
+ }
+
+ class ArchiveFetcherCallback implements RequestCompletionCallback {
+
+ public void onSuccess(FetchResult result, ClientGetState state)
{
+ parent.fetchState = SingleFileFetcher.this;
+ try {
+ ctx.archiveManager.extractToCache(thisKey,
ah.getArchiveType(), result.asBucket(), actx, ah);
+ } catch (ArchiveFailureException e) {
+ SingleFileFetcher.this.onFailure(new
FetchException(e));
+ } catch (ArchiveRestartException e) {
+ SingleFileFetcher.this.onFailure(new
FetchException(e));
+ }
+ try {
+ handleMetadata();
+ } catch (MetadataParseException e) {
+ SingleFileFetcher.this.onFailure(new
FetchException(e));
+ } catch (FetchException e) {
+ SingleFileFetcher.this.onFailure(e);
+ } catch (ArchiveFailureException e) {
+ SingleFileFetcher.this.onFailure(new
FetchException(e));
+ } catch (ArchiveRestartException e) {
+ SingleFileFetcher.this.onFailure(new
FetchException(e));
+ }
+ }
+
+ public void onFailure(FetchException e, ClientGetState state) {
+ // Force fatal as the fetcher is presumed to have made
a reasonable effort.
+ SingleFileFetcher.this.onFailure(e, true);
+ }
+
+ }
+
+ class MultiLevelMetadataCallback implements RequestCompletionCallback {
+
+ public void onSuccess(FetchResult result, ClientGetState state)
{
+ parent.fetchState = SingleFileFetcher.this;
+ try {
+ metadata =
Metadata.construct(result.asBucket());
+ } catch (MetadataParseException e) {
+ SingleFileFetcher.this.onFailure(new
FetchException(e));
+ return;
+ } catch (IOException e) {
+ // Bucket error?
+ SingleFileFetcher.this.onFailure(new
FetchException(FetchException.BUCKET_ERROR, e));
+ return;
+ }
+ }
+
+ public void onFailure(FetchException e, ClientGetState state) {
+ // Pass it on; fetcher is assumed to have retried as
appropriate already, so this is fatal.
+ SingleFileFetcher.this.onFailure(e, true);
+ }
+
+ }
+
+ private final void onFailure(FetchException e) {
+ onFailure(e, false);
+ }
+
+ // Real onFailure
+ private void onFailure(FetchException e, boolean forceFatal) {
+ if(!(e.isFatal() || forceFatal) ) {
+ if(retryCount <= maxRetries) {
+ if(parent.isCancelled()) {
+ onFailure(new
FetchException(FetchException.CANCELLED));
+ return;
+ }
+ retryCount++;
+ parent.scheduler.register(this);
+ return;
+ }
+ }
+ // :(
+ rcb.onFailure(e, this);
+ }
+
+ // Translate it, then call the real onFailure
+ public void onFailure(LowLevelPutException e) {
+ switch(e.code) {
+ case LowLevelGetException.DATA_NOT_FOUND:
+ onFailure(new
FetchException(FetchException.DATA_NOT_FOUND));
+ case LowLevelGetException.DATA_NOT_FOUND_IN_STORE:
+ onFailure(new
FetchException(FetchException.DATA_NOT_FOUND));
+ case LowLevelGetException.DECODE_FAILED:
+ onFailure(new
FetchException(FetchException.BLOCK_DECODE_ERROR));
+ case LowLevelGetException.INTERNAL_ERROR:
+ onFailure(new
FetchException(FetchException.INTERNAL_ERROR));
+ case LowLevelGetException.REJECTED_OVERLOAD:
+ onFailure(new
FetchException(FetchException.REJECTED_OVERLOAD));
+ case LowLevelGetException.ROUTE_NOT_FOUND:
+ onFailure(new
FetchException(FetchException.ROUTE_NOT_FOUND));
+ case LowLevelGetException.TRANSFER_FAILED:
+ onFailure(new
FetchException(FetchException.TRANSFER_FAILED));
+ case LowLevelGetException.VERIFY_FAILED:
+ onFailure(new
FetchException(FetchException.BLOCK_DECODE_ERROR));
+ default:
+ Logger.error(this, "Unknown LowLevelGetException code:
"+e.code);
+ onFailure(new
FetchException(FetchException.INTERNAL_ERROR));
+ }
+ }
+
+ public Object getToken() {
+ return token;
+ }
+
+}
Added: branches/async-client/src/freenet/client/async/SplitFileFetcher.java
===================================================================
--- branches/async-client/src/freenet/client/async/SplitFileFetcher.java
2006-01-20 19:42:14 UTC (rev 7889)
+++ branches/async-client/src/freenet/client/async/SplitFileFetcher.java
2006-01-20 19:46:21 UTC (rev 7890)
@@ -0,0 +1,238 @@
+package freenet.client.async;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.LinkedList;
+
+import freenet.client.ArchiveContext;
+import freenet.client.ClientMetadata;
+import freenet.client.FetchException;
+import freenet.client.FetchResult;
+import freenet.client.FetcherContext;
+import freenet.client.Metadata;
+import freenet.client.MetadataParseException;
+import freenet.client.events.SplitfileProgressEvent;
+import freenet.keys.FreenetURI;
+import freenet.keys.NodeCHK;
+import freenet.support.Bucket;
+import freenet.support.Fields;
+import freenet.support.Logger;
+import freenet.support.compress.CompressionOutputSizeException;
+import freenet.support.compress.Compressor;
+
+/**
+ * Fetch a splitfile, decompress it if need be, and return it to the
RequestCompletionCallback.
+ * Most of the work is done by the segments, and we do not need a thread.
+ */
+public class SplitFileFetcher extends ClientGetState {
+
+ final FetcherContext fetchContext;
+ final ArchiveContext archiveContext;
+ final LinkedList decompressors;
+ final ClientMetadata clientMetadata;
+ final ClientGet parent;
+ final RequestCompletionCallback cb;
+ final int recursionLevel;
+ /** The splitfile type. See the SPLITFILE_ constants on Metadata. */
+ final short splitfileType;
+ /** The segment length. -1 means not segmented and must get everything
to decode. */
+ final int blocksPerSegment;
+ /** The segment length in check blocks. */
+ final int checkBlocksPerSegment;
+ /** Total number of segments */
+ final int segmentCount;
+ /** The detailed information on each segment */
+ final SplitFileFetcherSegment[] segments;
+ /** The splitfile data blocks. */
+ final FreenetURI[] splitfileDataBlocks;
+ /** The splitfile check blocks. */
+ final FreenetURI[] splitfileCheckBlocks;
+ /** Maximum temporary length */
+ final long maxTempLength;
+ /** Have all segments finished? Access synchronized. */
+ private boolean allSegmentsFinished = false;
+ /** Override length. If this is positive, truncate the splitfile to
this length. */
+ private final long overrideLength;
+ /** Accept non-full splitfile chunks? */
+ private final boolean splitUseLengths;
+ private boolean finished;
+
+ public SplitFileFetcher(Metadata metadata, RequestCompletionCallback
rcb, ClientGet parent,
+ FetcherContext newCtx, LinkedList decompressors,
ClientMetadata clientMetadata,
+ ArchiveContext actx, int recursionLevel) throws
FetchException, MetadataParseException {
+ this.finished = false;
+ this.fetchContext = newCtx;
+ this.archiveContext = actx;
+ this.decompressors = decompressors;
+ this.clientMetadata = clientMetadata;
+ this.cb = rcb;
+ this.recursionLevel = recursionLevel + 1;
+ this.parent = parent;
+ if(parent.isCancelled())
+ throw new FetchException(FetchException.CANCELLED);
+ overrideLength = metadata.dataLength();
+ this.splitfileType = metadata.getSplitfileType();
+ splitfileDataBlocks = metadata.getSplitfileDataKeys();
+ splitfileCheckBlocks = metadata.getSplitfileCheckKeys();
+ splitUseLengths = metadata.splitUseLengths();
+ int blockLength = splitUseLengths ? -1 : NodeCHK.BLOCK_SIZE;
+ if(splitfileType == Metadata.SPLITFILE_NONREDUNDANT) {
+ // Don't need to do much - just fetch everything and
piece it together.
+ blocksPerSegment = -1;
+ checkBlocksPerSegment = -1;
+ segmentCount = 1;
+ } else if(splitfileType == Metadata.SPLITFILE_ONION_STANDARD) {
+ byte[] params = metadata.splitfileParams();
+ if(params == null || params.length < 8)
+ throw new MetadataParseException("No splitfile
params");
+ blocksPerSegment = Fields.bytesToInt(params, 0);
+ checkBlocksPerSegment = Fields.bytesToInt(params, 4);
+ if(blocksPerSegment >
fetchContext.maxDataBlocksPerSegment
+ || checkBlocksPerSegment >
fetchContext.maxCheckBlocksPerSegment)
+ throw new
FetchException(FetchException.TOO_MANY_BLOCKS_PER_SEGMENT, "Too many blocks per
segment: "+blocksPerSegment+" data, "+checkBlocksPerSegment+" check");
+ segmentCount = (splitfileDataBlocks.length /
blocksPerSegment) +
+ (splitfileDataBlocks.length % blocksPerSegment
== 0 ? 0 : 1);
+ // Onion, 128/192.
+ // Will be segmented.
+ } else throw new MetadataParseException("Unknown splitfile
format: "+splitfileType);
+ this.maxTempLength = fetchContext.maxTempLength;
+ Logger.minor(this, "Algorithm: "+splitfileType+", blocks per
segment: "+blocksPerSegment+", check blocks per segment:
"+checkBlocksPerSegment+", segments: "+segmentCount);
+ segments = new SplitFileFetcherSegment[segmentCount]; //
initially null on all entries
+ if(segmentCount == 1) {
+ segments[0] = new
SplitFileFetcherSegment(splitfileType, splitfileDataBlocks,
splitfileCheckBlocks, this, archiveContext, fetchContext, maxTempLength,
splitUseLengths, recursionLevel);
+ } else {
+ int dataBlocksPtr = 0;
+ int checkBlocksPtr = 0;
+ for(int i=0;i<segments.length;i++) {
+ // Create a segment. Give it its keys.
+ int copyDataBlocks =
Math.min(splitfileDataBlocks.length - dataBlocksPtr, blocksPerSegment);
+ int copyCheckBlocks =
Math.min(splitfileCheckBlocks.length - checkBlocksPtr, checkBlocksPerSegment);
+ FreenetURI[] dataBlocks = new
FreenetURI[copyDataBlocks];
+ FreenetURI[] checkBlocks = new
FreenetURI[copyCheckBlocks];
+ if(copyDataBlocks > 0)
+ System.arraycopy(splitfileDataBlocks,
dataBlocksPtr, dataBlocks, 0, copyDataBlocks);
+ if(copyCheckBlocks > 0)
+ System.arraycopy(splitfileCheckBlocks,
checkBlocksPtr, checkBlocks, 0, copyCheckBlocks);
+ dataBlocksPtr += copyDataBlocks;
+ checkBlocksPtr += copyCheckBlocks;
+ segments[i] = new
SplitFileFetcherSegment(splitfileType, dataBlocks, checkBlocks, this,
archiveContext, fetchContext, maxTempLength, splitUseLengths, recursionLevel+1);
+ }
+ }
+ }
+
+ /** Return the final status of the fetch. Throws an exception, or
returns a
+ * Bucket containing the fetched data.
+ * @throws FetchException If the fetch failed for some reason.
+ */
+ private Bucket finalStatus() throws FetchException {
+ long finalLength = 0;
+ for(int i=0;i<segments.length;i++) {
+ SplitFileFetcherSegment s = segments[i];
+ if(!s.isFinished()) throw new
IllegalStateException("Not all finished");
+ s.throwError();
+ // If still here, it succeeded
+ finalLength += s.decodedLength();
+ // Healing is done by Segment
+ }
+ if(finalLength > overrideLength)
+ finalLength = overrideLength;
+
+ long bytesWritten = 0;
+ OutputStream os = null;
+ Bucket output;
+ try {
+ output =
fetchContext.bucketFactory.makeBucket(finalLength);
+ os = output.getOutputStream();
+ for(int i=0;i<segments.length;i++) {
+ SplitFileFetcherSegment s = segments[i];
+ long max = (finalLength < 0 ? 0 : (finalLength
- bytesWritten));
+ bytesWritten += s.writeDecodedDataTo(os, max);
+ }
+ } catch (IOException e) {
+ throw new FetchException(FetchException.BUCKET_ERROR,
e);
+ } finally {
+ if(os != null) {
+ try {
+ os.close();
+ } catch (IOException e) {
+ // If it fails to close it may return
corrupt data.
+ throw new
FetchException(FetchException.BUCKET_ERROR, e);
+ }
+ }
+ }
+ return output;
+ }
+
+ public void segmentFinished(SplitFileFetcherSegment segment) {
+ Logger.minor(this, "Finished segment: "+segment);
+ synchronized(this) {
+ boolean allDone = true;
+ for(int i=0;i<segments.length;i++)
+ if(!segments[i].isFinished()) {
+ Logger.minor(this, "Segment
"+segments[i]+" is not finished");
+ allDone = false;
+ }
+ if(allDone) {
+ if(allSegmentsFinished)
+ Logger.error(this, "Was already
finished! (segmentFinished("+segment+")");
+ else {
+ allSegmentsFinished = true;
+ finish();
+ }
+ }
+ notifyAll();
+ }
+ }
+
+ private void finish() {
+ try {
+ synchronized(this) {
+ if(finished) {
+ Logger.error(this, "Was already
finished");
+ return;
+ }
+ finished = true;
+ }
+ Bucket data = finalStatus();
+ // Decompress
+ while(!decompressors.isEmpty()) {
+ Compressor c = (Compressor)
decompressors.removeLast();
+ try {
+ data = c.decompress(data,
fetchContext.bucketFactory, Math.max(fetchContext.maxTempLength,
fetchContext.maxOutputLength));
+ } catch (IOException e) {
+ cb.onFailure(new
FetchException(FetchException.BUCKET_ERROR, e), this);
+ return;
+ } catch (CompressionOutputSizeException e) {
+ cb.onFailure(new
FetchException(FetchException.TOO_BIG, e), this);
+ return;
+ }
+ }
+ cb.onSuccess(new FetchResult(clientMetadata, data),
this);
+ } catch (FetchException e) {
+ cb.onFailure(e, this);
+ }
+ }
+
+ public ClientGet getParent() {
+ return parent;
+ }
+
+ public void onProgress() {
+ int totalBlocks = splitfileDataBlocks.length;
+ int fetchedBlocks = 0;
+ int failedBlocks = 0;
+ int fatallyFailedBlocks = 0;
+ int runningBlocks = 0;
+ for(int i=0;i<segments.length;i++) {
+ SplitFileFetcherSegment segment = segments[i];
+ Logger.minor(this, "Segment: "+segment+":
fetched="+segment.fetchedBlocks()+", failedBlocks: "+segment.failedBlocks()+
+ ", fatally:
"+segment.fatallyFailedBlocks()+", running: "+segment.runningBlocks());
+ fetchedBlocks += segment.fetchedBlocks();
+ failedBlocks += segment.failedBlocks();
+ fatallyFailedBlocks += segment.fatallyFailedBlocks();
+ runningBlocks += segment.runningBlocks();
+ }
+ fetchContext.eventProducer.produceEvent(new
SplitfileProgressEvent(totalBlocks, fetchedBlocks, failedBlocks,
fatallyFailedBlocks, runningBlocks));
+ }
+
+}
Added:
branches/async-client/src/freenet/client/async/SplitFileFetcherSegment.java
===================================================================
--- branches/async-client/src/freenet/client/async/SplitFileFetcherSegment.java
2006-01-20 19:42:14 UTC (rev 7889)
+++ branches/async-client/src/freenet/client/async/SplitFileFetcherSegment.java
2006-01-20 19:46:21 UTC (rev 7890)
@@ -0,0 +1,298 @@
+package freenet.client.async;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.MalformedURLException;
+
+import freenet.client.ArchiveContext;
+import freenet.client.FECCodec;
+import freenet.client.FailureCodeTracker;
+import freenet.client.FetchException;
+import freenet.client.FetchResult;
+import freenet.client.FetcherContext;
+import freenet.client.Metadata;
+import freenet.client.MetadataParseException;
+import freenet.client.SplitfileBlock;
+import freenet.keys.FreenetURI;
+import freenet.support.Bucket;
+import freenet.support.BucketTools;
+import freenet.support.Logger;
+
+/**
+ * A single segment within a SplitFileFetcher.
+ * This in turn controls a large number of SingleFileFetcher's.
+ */
+public class SplitFileFetcherSegment implements RequestCompletionCallback {
+
+ final short splitfileType;
+ final FreenetURI[] dataBlocks;
+ final FreenetURI[] checkBlocks;
+ final SingleFileFetcher[] dataBlockStatus;
+ final SingleFileFetcher[] checkBlockStatus;
+ final MinimalSplitfileBlock[] dataBuckets;
+ final MinimalSplitfileBlock[] checkBuckets;
+ final int minFetched;
+ final SplitFileFetcher parentFetcher;
+ final ArchiveContext archiveContext;
+ final FetcherContext fetcherContext;
+ final long maxBlockLength;
+ final boolean nonFullBlocksAllowed;
+ /** Has the segment finished processing? Irreversible. */
+ private boolean finished;
+ /** Bucket to store the data retrieved, after it has been decoded */
+ private Bucket decodedData;
+ /** Fetch context for block fetches */
+ final FetcherContext blockFetchContext;
+ /** Recursion level */
+ final int recursionLevel;
+ private FetchException failureException;
+ private int fatallyFailedBlocks;
+ private int failedBlocks;
+ private int fetchedBlocks;
+ private FailureCodeTracker errors;
+
+ public SplitFileFetcherSegment(short splitfileType, FreenetURI[]
splitfileDataBlocks, FreenetURI[] splitfileCheckBlocks, SplitFileFetcher
fetcher, ArchiveContext archiveContext, FetcherContext fetchContext, long
maxTempLength, boolean splitUseLengths, int recursionLevel) throws
MetadataParseException, FetchException {
+ this.parentFetcher = fetcher;
+ this.archiveContext = archiveContext;
+ this.splitfileType = splitfileType;
+ dataBlocks = splitfileDataBlocks;
+ checkBlocks = splitfileCheckBlocks;
+ if(splitfileType == Metadata.SPLITFILE_NONREDUNDANT) {
+ minFetched = dataBlocks.length;
+ } else if(splitfileType == Metadata.SPLITFILE_ONION_STANDARD) {
+ minFetched = dataBlocks.length;
+ } else throw new MetadataParseException("Unknown splitfile
type"+splitfileType);
+ finished = false;
+ decodedData = null;
+ dataBlockStatus = new SingleFileFetcher[dataBlocks.length];
+ checkBlockStatus = new SingleFileFetcher[checkBlocks.length];
+ dataBuckets = new MinimalSplitfileBlock[dataBlocks.length];
+ checkBuckets = new MinimalSplitfileBlock[checkBlocks.length];
+ for(int i=0;i<dataBuckets.length;i++) {
+ dataBuckets[i] = new MinimalSplitfileBlock(i);
+ }
+ for(int i=0;i<checkBuckets.length;i++)
+ checkBuckets[i] = new
MinimalSplitfileBlock(i+dataBuckets.length);
+ nonFullBlocksAllowed = splitUseLengths;
+ this.fetcherContext = fetchContext;
+ maxBlockLength = maxTempLength;
+ if(splitUseLengths) {
+ blockFetchContext = new FetcherContext(fetcherContext,
FetcherContext.SPLITFILE_USE_LENGTHS_MASK);
+ this.recursionLevel = recursionLevel + 1;
+ } else {
+ blockFetchContext = new FetcherContext(fetcherContext,
FetcherContext.SPLITFILE_DEFAULT_BLOCK_MASK);
+ this.recursionLevel = 0;
+ }
+
+ try {
+ for(int i=0;i<dataBlocks.length;i++) {
+ dataBlockStatus[i] =
+ new
SingleFileFetcher(parentFetcher.parent, this, null, dataBlocks[i],
blockFetchContext, archiveContext, fetchContext.maxSplitfileBlockRetries,
recursionLevel, true, new Integer(i));
+ dataBlockStatus[i].schedule();
+ }
+ for(int i=0;i<checkBlocks.length;i++) {
+ checkBlockStatus[i] =
+ new
SingleFileFetcher(parentFetcher.parent, this, null, checkBlocks[i],
blockFetchContext, archiveContext, fetchContext.maxSplitfileBlockRetries,
recursionLevel, true, new Integer(dataBlocks.length+i));
+ checkBlockStatus[i].schedule();
+ }
+ } catch (MalformedURLException e) {
+ // Invalidates the whole splitfile
+ throw new FetchException(FetchException.INVALID_URI,
"Invalid URI in splitfile");
+ }
+ }
+
+ public boolean isFinished() {
+ return finished;
+ }
+
+ /** Throw a FetchException, if we have one. Else do nothing. */
+ public synchronized void throwError() throws FetchException {
+ if(failureException != null)
+ throw failureException;
+ }
+
+ /** Decoded length? */
+ public long decodedLength() {
+ return decodedData.size();
+ }
+
+ /** Write the decoded segment's data to an OutputStream */
+ public long writeDecodedDataTo(OutputStream os, long truncateLength)
throws IOException {
+ long len = decodedData.size();
+ if(truncateLength >= 0 && truncateLength < len)
+ len = truncateLength;
+ BucketTools.copyTo(decodedData, os, Math.min(truncateLength,
decodedData.size()));
+ return len;
+ }
+
+ /** How many blocks have failed due to running out of retries? */
+ public synchronized int failedBlocks() {
+ return failedBlocks;
+ }
+
+ /** How many blocks have been successfully fetched? */
+ public synchronized int fetchedBlocks() {
+ return fetchedBlocks;
+ }
+
+ /** How many blocks have currently running requests? */
+ public int runningBlocks() {
+ // FIXME implement or throw out
+ return 0;
+ }
+
+ /** How many blocks failed permanently due to fatal errors? */
+ public int fatallyFailedBlocks() {
+ return fatallyFailedBlocks;
+ }
+
+ public synchronized void onSuccess(FetchResult result, ClientGetState
state) {
+ Integer token = (Integer) ((SingleFileFetcher)state).getToken();
+ int blockNo = token.intValue();
+ if(blockNo < dataBlocks.length) {
+ if(dataBlocks[blockNo] == null) {
+ Logger.error(this, "Block already finished:
"+blockNo);
+ return;
+ }
+ dataBlocks[blockNo] = null;
+ dataBuckets[blockNo].setData(result.asBucket());
+ } else if(blockNo < checkBlocks.length) {
+ if(checkBlocks[blockNo-dataBlocks.length] == null) {
+ Logger.error(this, "Block already finished:
"+blockNo);
+ return;
+ }
+ checkBlocks[blockNo-dataBlocks.length] = null;
+ checkBuckets[blockNo].setData(result.asBucket());
+ } else
+ Logger.error(this, "Unrecognized block number:
"+blockNo, new Exception("error"));
+ fetchedBlocks++;
+ if(fetchedBlocks >= minFetched)
+ startDecode();
+ }
+
+ private void startDecode() {
+ Runnable r = new Decoder();
+ Thread t = new Thread(r, "Decoder for "+this);
+ t.setDaemon(true);
+ t.start();
+ }
+
+ class Decoder implements Runnable {
+
+ public void run() {
+
+ // Now decode
+ Logger.minor(this, "Decoding "+this);
+
+ FECCodec codec = FECCodec.getCodec(splitfileType,
dataBlocks.length, checkBlocks.length);
+ try {
+ if(splitfileType !=
Metadata.SPLITFILE_NONREDUNDANT) {
+ // FIXME hardcoded block size below.
+ codec.decode(dataBuckets, checkBuckets,
32768, fetcherContext.bucketFactory);
+ // Now have all the data blocks (not
necessarily all the check blocks)
+ }
+
+ decodedData =
fetcherContext.bucketFactory.makeBucket(-1);
+ Logger.minor(this, "Copying data from data
blocks");
+ OutputStream os = decodedData.getOutputStream();
+ for(int i=0;i<dataBlockStatus.length;i++) {
+ SplitfileBlock status = dataBuckets[i];
+ Bucket data = status.getData();
+ BucketTools.copyTo(data, os,
Long.MAX_VALUE);
+ }
+ Logger.minor(this, "Copied data");
+ os.close();
+ // Must set finished BEFORE calling
parentFetcher.
+ // Otherwise a race is possible that might
result in it not seeing our finishing.
+ finished = true;
+
parentFetcher.segmentFinished(SplitFileFetcherSegment.this);
+ } catch (IOException e) {
+ Logger.minor(this, "Caught bucket error?: "+e,
e);
+ finished = true;
+ failureException = new
FetchException(FetchException.BUCKET_ERROR);
+
parentFetcher.segmentFinished(SplitFileFetcherSegment.this);
+ return;
+ }
+
+ // Now heal
+
+ // Encode any check blocks we don't have
+ if(codec != null) {
+ try {
+ codec.encode(dataBuckets, checkBuckets,
32768, fetcherContext.bucketFactory);
+ } catch (IOException e) {
+ Logger.error(this, "Bucket error while
healing: "+e, e);
+ }
+ }
+
+ // Now insert *ALL* blocks on which we had at least one
failure, and didn't eventually succeed
+ for(int i=0;i<dataBlockStatus.length;i++) {
+ if(dataBuckets[i].getData() != null) continue;
+ SingleFileFetcher fetcher = dataBlockStatus[i];
+ if(fetcher.getRetryCount() == 0) {
+ // 80% chance of not inserting, if we
never tried it
+ if(fetcherContext.random.nextInt(5) ==
0) continue;
+ }
+ queueHeal(dataBuckets[i].getData());
+ }
+ for(int i=0;i<checkBlockStatus.length;i++) {
+ if(checkBuckets[i].getData() != null) continue;
+ SingleFileFetcher fetcher = checkBlockStatus[i];
+ if(fetcher.getRetryCount() == 0) {
+ // 80% chance of not inserting, if we
never tried it
+ if(fetcherContext.random.nextInt(5) ==
0) continue;
+ }
+ queueHeal(checkBuckets[i].getData());
+ }
+
+ for(int i=0;i<dataBlocks.length;i++) {
+ dataBuckets[i] = null;
+ dataBlockStatus[i] = null;
+ dataBlocks[i] = null;
+ }
+ for(int i=0;i<checkBlocks.length;i++) {
+ checkBuckets[i] = null;
+ checkBlockStatus[i] = null;
+ checkBlocks[i] = null;
+ }
+ }
+
+ }
+
+ private void queueHeal(Bucket data) {
+ // TODO Auto-generated method stub
+
+ }
+
+ /** This is after any retries and therefore is either out-of-retries or
fatal */
+ public synchronized void onFailure(FetchException e, ClientGetState
state) {
+ Integer token = (Integer) ((SingleFileFetcher)state).getToken();
+ int blockNo = token.intValue();
+ if(blockNo < dataBlocks.length) {
+ if(dataBlocks[blockNo] == null) {
+ Logger.error(this, "Block already finished:
"+blockNo);
+ return;
+ }
+ dataBlocks[blockNo] = null;
+ } else if(blockNo < checkBlocks.length) {
+ if(checkBlocks[blockNo-dataBlocks.length] == null) {
+ Logger.error(this, "Block already finished:
"+blockNo);
+ return;
+ }
+ checkBlocks[blockNo-dataBlocks.length] = null;
+ } else
+ Logger.error(this, "Unrecognized block number:
"+blockNo, new Exception("error"));
+ // :(
+ Logger.minor(this, "Permanently failed: "+state+" on "+this);
+ if(e.isFatal())
+ fatallyFailedBlocks++;
+ else
+ failedBlocks++;
+ // FIXME this may not be accurate across all the retries?
+ if(e.errorCodes != null)
+ errors.merge(e.errorCodes);
+ else
+ errors.inc(new Integer(e.mode),
((SingleFileFetcher)state).getRetryCount());
+ }
+
+}
Modified: branches/async-client/src/freenet/node/Version.java
===================================================================
--- branches/async-client/src/freenet/node/Version.java 2006-01-20 19:42:14 UTC
(rev 7889)
+++ branches/async-client/src/freenet/node/Version.java 2006-01-20 19:46:21 UTC
(rev 7890)
@@ -20,7 +20,7 @@
public static final String protocolVersion = "1.0";
/** The build number of the current revision */
- public static final int buildNumber = 364;
+ public static final int buildNumber = 363;
/** Oldest build of Fred we will talk to */
public static final int lastGoodBuild = 359;
Modified: branches/async-client/src/freenet/node/fcp/ClientGetMessage.java
===================================================================
--- branches/async-client/src/freenet/node/fcp/ClientGetMessage.java
2006-01-20 19:42:14 UTC (rev 7889)
+++ branches/async-client/src/freenet/node/fcp/ClientGetMessage.java
2006-01-20 19:46:21 UTC (rev 7890)
@@ -18,7 +18,7 @@
* Identifier=Request Number One
* Verbosity=0 // no status, just tell us when it's done
* ReturnType=direct // return all at once over the FCP connection
- * MaxSize=100 // maximum size of returned data
+ * MaxSize=100 // maximum size of returned data (all numbers in hex)
* MaxTempSize=1000 // maximum size of intermediary data
* MaxRetries=100 // automatic retry supported as an option
* EndMessage
@@ -55,7 +55,7 @@
verbosity = 0;
else {
try {
- verbosity = Integer.parseInt(verbosityString,
10);
+ verbosity = Integer.parseInt(verbosityString,
16);
} catch (NumberFormatException e) {
throw new
MessageInvalidException(ProtocolErrorMessage.ERROR_PARSING_NUMBER, "Error
parsing Verbosity field: "+e.getMessage());
}
@@ -71,7 +71,7 @@
maxSize = Long.MAX_VALUE;
else {
try {
- maxSize = Long.parseLong(maxSizeString, 10);
+ maxSize = Long.parseLong(maxSizeString, 16);
} catch (NumberFormatException e) {
throw new
MessageInvalidException(ProtocolErrorMessage.ERROR_PARSING_NUMBER, "Error
parsing MaxSize field: "+e.getMessage());
}
@@ -82,7 +82,7 @@
maxTempSize = Long.MAX_VALUE;
else {
try {
- maxTempSize = Long.parseLong(maxTempSizeString,
10);
+ maxTempSize = Long.parseLong(maxTempSizeString,
16);
} catch (NumberFormatException e) {
throw new
MessageInvalidException(ProtocolErrorMessage.ERROR_PARSING_NUMBER, "Error
parsing MaxSize field: "+e.getMessage());
}
@@ -93,7 +93,7 @@
maxRetries = 0;
else {
try {
- maxRetries = Integer.parseInt(maxRetriesString,
10);
+ maxRetries = Integer.parseInt(maxRetriesString,
16);
} catch (NumberFormatException e) {
throw new
MessageInvalidException(ProtocolErrorMessage.ERROR_PARSING_NUMBER, "Error
parsing MaxSize field: "+e.getMessage());
}
Modified: branches/async-client/src/freenet/node/fcp/ClientPutMessage.java
===================================================================
--- branches/async-client/src/freenet/node/fcp/ClientPutMessage.java
2006-01-20 19:42:14 UTC (rev 7889)
+++ branches/async-client/src/freenet/node/fcp/ClientPutMessage.java
2006-01-20 19:46:21 UTC (rev 7890)
@@ -11,7 +11,7 @@
* ClientPut
* URI=CHK@ // could as easily be an insertable SSK URI
* Metadata.ContentType=text/html
- * DataLength=100 // 100kB
+ * DataLength=19000 // hex for 100kB
* Identifier=Insert-1 // identifier, as always
* Verbosity=0 // just report when complete
* MaxRetries=999999 // lots of retries
@@ -44,7 +44,7 @@
verbosity = 0;
else {
try {
- verbosity = Integer.parseInt(verbosityString,
10);
+ verbosity = Integer.parseInt(verbosityString,
16);
} catch (NumberFormatException e) {
throw new
MessageInvalidException(ProtocolErrorMessage.ERROR_PARSING_NUMBER, "Error
parsing Verbosity field: "+e.getMessage());
}
@@ -53,7 +53,7 @@
if(dataLengthString == null)
throw new
MessageInvalidException(ProtocolErrorMessage.MISSING_FIELD, "Need DataLength on
a ClientPut");
try {
- dataLength = Long.parseLong(dataLengthString, 10);
+ dataLength = Long.parseLong(dataLengthString, 16);
} catch (NumberFormatException e) {
throw new
MessageInvalidException(ProtocolErrorMessage.ERROR_PARSING_NUMBER, "Error
parsing DataLength field: "+e.getMessage());
}
@@ -64,7 +64,7 @@
maxRetries = 0;
else {
try {
- maxRetries = Integer.parseInt(maxRetriesString,
10);
+ maxRetries = Integer.parseInt(maxRetriesString,
16);
} catch (NumberFormatException e) {
throw new
MessageInvalidException(ProtocolErrorMessage.ERROR_PARSING_NUMBER, "Error
parsing MaxSize field: "+e.getMessage());
}