Author: toad
Date: 2006-01-20 19:11:02 +0000 (Fri, 20 Jan 2006)
New Revision: 7887
Added:
branches/async-client-layer/src/freenet/client/SplitfileBlock.java
branches/async-client-layer/src/freenet/client/async/
branches/async-client-layer/src/freenet/client/async/Client.java
branches/async-client-layer/src/freenet/client/async/ClientGet.java
branches/async-client-layer/src/freenet/client/async/ClientGetState.java
branches/async-client-layer/src/freenet/client/async/ClientRequest.java
branches/async-client-layer/src/freenet/client/async/ClientRequestScheduler.java
branches/async-client-layer/src/freenet/client/async/MinimalSplitfileBlock.java
branches/async-client-layer/src/freenet/client/async/RequestCompletionCallback.java
branches/async-client-layer/src/freenet/client/async/SendableRequest.java
branches/async-client-layer/src/freenet/client/async/SingleFileFetcher.java
branches/async-client-layer/src/freenet/client/async/SplitFileFetcher.java
branches/async-client-layer/src/freenet/client/async/SplitFileFetcherSegment.java
Log:
Add missing files.
Added: branches/async-client-layer/src/freenet/client/SplitfileBlock.java
===================================================================
--- branches/async-client-layer/src/freenet/client/SplitfileBlock.java
2006-01-20 19:10:04 UTC (rev 7886)
+++ branches/async-client-layer/src/freenet/client/SplitfileBlock.java
2006-01-20 19:11:02 UTC (rev 7887)
@@ -0,0 +1,21 @@
+package freenet.client;
+
+import freenet.keys.FreenetURI;
+import freenet.support.Bucket;
+
+public interface SplitfileBlock {
+
+ /** Get block number. [0,k[ = data blocks, [k, n[ = check blocks */
+ abstract int getNumber();
+
+ /** Has data? */
+ abstract boolean hasData();
+
+ /** Get data */
+ abstract Bucket getData();
+
+ /** Set data */
+ abstract void setData(Bucket data);
+
+
+}
Added: branches/async-client-layer/src/freenet/client/async/Client.java
===================================================================
--- branches/async-client-layer/src/freenet/client/async/Client.java
2006-01-20 19:10:04 UTC (rev 7886)
+++ branches/async-client-layer/src/freenet/client/async/Client.java
2006-01-20 19:11:02 UTC (rev 7887)
@@ -0,0 +1,17 @@
+package freenet.client.async;
+
+import freenet.client.FetchException;
+import freenet.client.FetchResult;
+
+/**
+ * A client process. Something that initiates requests, and can cancel
+ * them. FCP, Fproxy, and the GlobalPersistentClient, implement this
+ * somewhere.
+ */
+public interface Client {
+
+ public void onSuccess(FetchResult result, ClientGet state);
+
+ public void onFailure(FetchException e, ClientGet state);
+
+}
Added: branches/async-client-layer/src/freenet/client/async/ClientGet.java
===================================================================
--- branches/async-client-layer/src/freenet/client/async/ClientGet.java
2006-01-20 19:10:04 UTC (rev 7886)
+++ branches/async-client-layer/src/freenet/client/async/ClientGet.java
2006-01-20 19:11:02 UTC (rev 7887)
@@ -0,0 +1,79 @@
+package freenet.client.async;
+
+import java.net.MalformedURLException;
+
+import freenet.client.ArchiveContext;
+import freenet.client.ClientMetadata;
+import freenet.client.FetchException;
+import freenet.client.FetchResult;
+import freenet.client.FetcherContext;
+import freenet.keys.FreenetURI;
+
+/**
+ * A high level data request.
+ */
+public class ClientGet extends ClientRequest implements
RequestCompletionCallback {
+
+ final Client client;
+ final FreenetURI uri;
+ final FetcherContext ctx;
+ final ArchiveContext actx;
+ final ClientRequestScheduler scheduler;
+ ClientGetState fetchState;
+ private boolean finished;
+ private boolean cancelled;
+ final int priorityClass;
+ private int archiveRestarts;
+
+ public ClientGet(Client client, ClientRequestScheduler sched,
FreenetURI uri, FetcherContext ctx, short priorityClass) {
+ super(priorityClass);
+ this.client = client;
+ this.uri = uri;
+ this.ctx = ctx;
+ this.scheduler = sched;
+ this.finished = false;
+ this.actx = new ArchiveContext();
+ this.priorityClass = priorityClass;
+ archiveRestarts = 0;
+ start();
+ }
+
+ private void start() {
+ try {
+ fetchState = new SingleFileFetcher(this, this, new
ClientMetadata(), uri, ctx, actx, priorityClass, 0, false, null);
+ fetchState.schedule();
+ } catch (MalformedURLException e) {
+ onFailure(new
FetchException(FetchException.INVALID_URI, e), null);
+ } catch (FetchException e) {
+ onFailure(e, null);
+ }
+ }
+
+ public void cancel() {
+ cancelled = true;
+ }
+
+ public boolean isCancelled() {
+ return cancelled;
+ }
+
+ public void onSuccess(FetchResult result, ClientGetState state) {
+ finished = true;
+ client.onSuccess(result, this);
+ }
+
+ public void onFailure(FetchException e, ClientGetState state) {
+ if(e.mode == FetchException.ARCHIVE_RESTART) {
+ archiveRestarts++;
+ if(archiveRestarts > ctx.maxArchiveRestarts)
+ e = new
FetchException(FetchException.TOO_MANY_ARCHIVE_RESTARTS);
+ else {
+ start();
+ return;
+ }
+ }
+ finished = true;
+ client.onFailure(e, this);
+ }
+
+}
Added: branches/async-client-layer/src/freenet/client/async/ClientGetState.java
===================================================================
--- branches/async-client-layer/src/freenet/client/async/ClientGetState.java
2006-01-20 19:10:04 UTC (rev 7886)
+++ branches/async-client-layer/src/freenet/client/async/ClientGetState.java
2006-01-20 19:11:02 UTC (rev 7887)
@@ -0,0 +1,16 @@
+package freenet.client.async;
+
+/**
+ * A ClientGetState.
+ * Represents a stage in the fetch process.
+ */
+public abstract class ClientGetState {
+
+ public abstract ClientGet getParent();
+
+ public void schedule() {
+ // TODO Auto-generated method stub
+
+ }
+
+}
Added: branches/async-client-layer/src/freenet/client/async/ClientRequest.java
===================================================================
--- branches/async-client-layer/src/freenet/client/async/ClientRequest.java
2006-01-20 19:10:04 UTC (rev 7886)
+++ branches/async-client-layer/src/freenet/client/async/ClientRequest.java
2006-01-20 19:11:02 UTC (rev 7887)
@@ -0,0 +1,23 @@
+package freenet.client.async;
+
+/** A high level client request. A request (either fetch or put) started
+ * by a Client. Has a suitable context and a URI; is fulfilled only when
+ * we have followed all the redirects etc, or have an error. Can be
+ * retried.
+ */
+public abstract class ClientRequest {
+
+ // FIXME move the priority classes from RequestStarter here
+ private short priorityClass;
+
+ public short getPriorityClass() {
+ return priorityClass;
+ }
+
+ protected ClientRequest(short priorityClass) {
+ this.priorityClass = priorityClass;
+ }
+
+ public abstract void cancel();
+
+}
Added:
branches/async-client-layer/src/freenet/client/async/ClientRequestScheduler.java
===================================================================
---
branches/async-client-layer/src/freenet/client/async/ClientRequestScheduler.java
2006-01-20 19:10:04 UTC (rev 7886)
+++
branches/async-client-layer/src/freenet/client/async/ClientRequestScheduler.java
2006-01-20 19:11:02 UTC (rev 7887)
@@ -0,0 +1,22 @@
+package freenet.client.async;
+
+/**
+ * Every X seconds, the RequestSender calls the ClientRequestScheduler to
+ * ask for a request to start. A request is then started, in its own
+ * thread. It is removed at that point.
+ */
+public class ClientRequestScheduler {
+
+ public void register(SendableRequest req) {
+ // FIXME
+ }
+
+ public void remove(SendableRequest sr) {
+ // FIXME
+ }
+
+ public void update(SendableRequest sr) {
+ // FIXME
+ }
+
+}
Added:
branches/async-client-layer/src/freenet/client/async/MinimalSplitfileBlock.java
===================================================================
---
branches/async-client-layer/src/freenet/client/async/MinimalSplitfileBlock.java
2006-01-20 19:10:04 UTC (rev 7886)
+++
branches/async-client-layer/src/freenet/client/async/MinimalSplitfileBlock.java
2006-01-20 19:11:02 UTC (rev 7887)
@@ -0,0 +1,32 @@
+package freenet.client.async;
+
+import freenet.client.SplitfileBlock;
+import freenet.keys.FreenetURI;
+import freenet.support.Bucket;
+
+public class MinimalSplitfileBlock implements SplitfileBlock {
+
+ public final int number;
+ Bucket data;
+
+ public MinimalSplitfileBlock(int n) {
+ this.number = n;
+ }
+
+ public int getNumber() {
+ return number;
+ }
+
+ public boolean hasData() {
+ return data != null;
+ }
+
+ public Bucket getData() {
+ return data;
+ }
+
+ public void setData(Bucket data) {
+ this.data = data;
+ }
+
+}
Added:
branches/async-client-layer/src/freenet/client/async/RequestCompletionCallback.java
===================================================================
---
branches/async-client-layer/src/freenet/client/async/RequestCompletionCallback.java
2006-01-20 19:10:04 UTC (rev 7886)
+++
branches/async-client-layer/src/freenet/client/async/RequestCompletionCallback.java
2006-01-20 19:11:02 UTC (rev 7887)
@@ -0,0 +1,16 @@
+package freenet.client.async;
+
+import freenet.client.FetchException;
+import freenet.client.FetchResult;
+
+/**
+ * Callback called when part of a get request completes - either with a
+ * Bucket full of data, or with an error.
+ */
+public interface RequestCompletionCallback {
+
+ public void onSuccess(FetchResult result, ClientGetState state);
+
+ public void onFailure(FetchException e, ClientGetState state);
+
+}
Added: branches/async-client-layer/src/freenet/client/async/SendableRequest.java
===================================================================
--- branches/async-client-layer/src/freenet/client/async/SendableRequest.java
2006-01-20 19:10:04 UTC (rev 7886)
+++ branches/async-client-layer/src/freenet/client/async/SendableRequest.java
2006-01-20 19:11:02 UTC (rev 7887)
@@ -0,0 +1,25 @@
+package freenet.client.async;
+
+import freenet.keys.ClientKey;
+import freenet.keys.ClientKeyBlock;
+import freenet.node.LowLevelPutException;
+
+/**
+ * A low-level request which can be sent immediately. These are registered
+ * on the ClientRequestScheduler.
+ */
+public interface SendableRequest {
+
+ public ClientKey getKey();
+
+ public short getPriorityClass();
+
+ public int getRetryCount();
+
+ /** Called when/if the low-level request succeeds. */
+ public void onSuccess(ClientKeyBlock block);
+
+ /** Called when/if the low-level request fails. */
+ public void onFailure(LowLevelPutException e);
+
+}
Added:
branches/async-client-layer/src/freenet/client/async/SingleFileFetcher.java
===================================================================
--- branches/async-client-layer/src/freenet/client/async/SingleFileFetcher.java
2006-01-20 19:10:04 UTC (rev 7886)
+++ branches/async-client-layer/src/freenet/client/async/SingleFileFetcher.java
2006-01-20 19:11:02 UTC (rev 7887)
@@ -0,0 +1,457 @@
+package freenet.client.async;
+
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.util.LinkedList;
+
+import freenet.client.ArchiveContext;
+import freenet.client.ArchiveFailureException;
+import freenet.client.ArchiveRestartException;
+import freenet.client.ArchiveStoreContext;
+import freenet.client.ClientMetadata;
+import freenet.client.FetchException;
+import freenet.client.FetchResult;
+import freenet.client.FetcherContext;
+import freenet.client.Metadata;
+import freenet.client.MetadataParseException;
+import freenet.keys.ClientKey;
+import freenet.keys.ClientKeyBlock;
+import freenet.keys.FreenetURI;
+import freenet.keys.KeyDecodeException;
+import freenet.node.LowLevelGetException;
+import freenet.node.LowLevelPutException;
+import freenet.support.Bucket;
+import freenet.support.Logger;
+import freenet.support.compress.CompressionOutputSizeException;
+import freenet.support.compress.Compressor;
+
+public class SingleFileFetcher extends ClientGetState implements
SendableRequest {
+
+ final ClientGet parent;
+ //final FreenetURI uri;
+ final ClientKey key;
+ final LinkedList metaStrings;
+ final FetcherContext ctx;
+ final RequestCompletionCallback rcb;
+ final ClientMetadata clientMetadata;
+ private Metadata metadata;
+ final int maxRetries;
+ final ArchiveContext actx;
+ /** Archive handler. We can only have one archive handler at a time. */
+ private ArchiveStoreContext ah;
+ private int recursionLevel;
+ /** The URI of the currently-being-processed data, for archives etc. */
+ private FreenetURI thisKey;
+ private int retryCount;
+ private final LinkedList decompressors;
+ private final boolean dontTellClientGet;
+ private Object token;
+
+
+ /** Create a new SingleFileFetcher and register self.
+ * Called when following a redirect, or direct from ClientGet.
+ * @param token
+ * @param dontTellClientGet
+ */
+ public SingleFileFetcher(ClientGet get, RequestCompletionCallback cb,
ClientMetadata metadata, ClientKey key, LinkedList metaStrings, FetcherContext
ctx, ArchiveContext actx, int maxRetries, int recursionLevel, boolean
dontTellClientGet, Object token) throws FetchException {
+ this.dontTellClientGet = dontTellClientGet;
+ this.token = token;
+ this.parent = get;
+ //this.uri = uri;
+ //this.key = ClientKey.getBaseKey(uri);
+ //metaStrings = uri.listMetaStrings();
+ this.key = key;
+ this.metaStrings = metaStrings;
+ this.ctx = ctx;
+ retryCount = 0;
+ this.rcb = cb;
+ this.clientMetadata = metadata;
+ this.maxRetries = maxRetries;
+ thisKey = key.getURI();
+ this.actx = actx;
+ this.recursionLevel = recursionLevel + 1;
+ if(recursionLevel > ctx.maxRecursionLevel)
+ throw new
FetchException(FetchException.TOO_MUCH_RECURSION);
+ this.decompressors = new LinkedList();
+ }
+
+ /** Called by ClientGet. */
+ public SingleFileFetcher(ClientGet get, RequestCompletionCallback cb,
ClientMetadata metadata, FreenetURI uri, FetcherContext ctx, ArchiveContext
actx, int maxRetries, int recursionLevel, boolean dontTellClientGet, Object
token) throws MalformedURLException, FetchException {
+ this(get, cb, metadata, ClientKey.getBaseKey(uri),
uri.listMetaStrings(), ctx, actx, maxRetries, recursionLevel,
dontTellClientGet, token);
+ }
+
+ /** Copy constructor, modifies a few given fields, don't call
schedule() */
+ public SingleFileFetcher(SingleFileFetcher fetcher, Metadata newMeta,
RequestCompletionCallback callback, FetcherContext ctx2) throws FetchException {
+ this.token = fetcher.token;
+ this.dontTellClientGet = fetcher.dontTellClientGet;
+ this.actx = fetcher.actx;
+ this.ah = fetcher.ah;
+ this.clientMetadata = fetcher.clientMetadata;
+ this.ctx = ctx2;
+ this.key = fetcher.key;
+ this.maxRetries = fetcher.maxRetries;
+ this.metadata = newMeta;
+ this.metaStrings = fetcher.metaStrings;
+ this.parent = fetcher.parent;
+ this.rcb = callback;
+ this.retryCount = 0;
+ this.recursionLevel = fetcher.recursionLevel + 1;
+ if(recursionLevel > ctx.maxRecursionLevel)
+ throw new
FetchException(FetchException.TOO_MUCH_RECURSION);
+ this.thisKey = fetcher.thisKey;
+ this.decompressors = fetcher.decompressors;
+ }
+
+ public void schedule() {
+ if(!dontTellClientGet)
+ this.parent.fetchState = this;
+ parent.scheduler.register(this);
+ }
+
+ public ClientGet getParent() {
+ return parent;
+ }
+
+ public ClientKey getKey() {
+ return key;
+ }
+
+ public short getPriorityClass() {
+ return parent.getPriorityClass();
+ }
+
+ public int getRetryCount() {
+ return retryCount;
+ }
+
+ // Process the completed data. May result in us going to a
+ // splitfile, or another SingleFileFetcher, etc.
+ public void onSuccess(ClientKeyBlock block) {
+ // Extract data
+ Bucket data;
+ try {
+ data = block.decode(ctx.bucketFactory,
(int)(Math.min(ctx.maxOutputLength, Integer.MAX_VALUE)));
+ } catch (KeyDecodeException e1) {
+ onFailure(new
FetchException(FetchException.BLOCK_DECODE_ERROR, e1.getMessage()));
+ return;
+ } catch (IOException e) {
+ Logger.error(this, "Could not capture data - disk
full?: "+e, e);
+ onFailure(new
FetchException(FetchException.BUCKET_ERROR, e));
+ return;
+ }
+ if(!block.isMetadata()) {
+ onSuccess(new FetchResult(clientMetadata, data));
+ } else {
+ if(!ctx.followRedirects) {
+ onFailure(new
FetchException(FetchException.INVALID_METADATA, "Told me not to follow
redirects (splitfile block??)"));
+ return;
+ }
+ if(parent.isCancelled()) {
+ onFailure(new
FetchException(FetchException.CANCELLED));
+ return;
+ }
+ if(data.size() > ctx.maxMetadataSize) {
+ onFailure(new
FetchException(FetchException.TOO_BIG_METADATA));
+ return;
+ }
+ // Parse metadata
+ try {
+ metadata = Metadata.construct(data);
+ } catch (MetadataParseException e) {
+ onFailure(new FetchException(e));
+ return;
+ } catch (IOException e) {
+ // Bucket error?
+ onFailure(new
FetchException(FetchException.BUCKET_ERROR, e));
+ return;
+ }
+ try {
+ handleMetadata();
+ } catch (MetadataParseException e) {
+ onFailure(new FetchException(e));
+ return;
+ } catch (FetchException e) {
+ onFailure(e);
+ return;
+ } catch (ArchiveFailureException e) {
+ onFailure(new FetchException(e));
+ } catch (ArchiveRestartException e) {
+ onFailure(new FetchException(e));
+ }
+ }
+ }
+
+ private void onSuccess(FetchResult result) {
+ if(!decompressors.isEmpty()) {
+ Bucket data = result.asBucket();
+ while(!decompressors.isEmpty()) {
+ Compressor c = (Compressor)
decompressors.removeLast();
+ try {
+ data = c.decompress(data,
ctx.bucketFactory, Math.max(ctx.maxTempLength, ctx.maxOutputLength));
+ } catch (IOException e) {
+ onFailure(new
FetchException(FetchException.BUCKET_ERROR, e));
+ return;
+ } catch (CompressionOutputSizeException e) {
+ onFailure(new
FetchException(FetchException.TOO_BIG, e));
+ return;
+ }
+ }
+ result = new FetchResult(result, data);
+ }
+ rcb.onSuccess(result, this);
+ }
+
+ private void handleMetadata() throws FetchException,
MetadataParseException, ArchiveFailureException, ArchiveRestartException {
+ while(true) {
+ if(metadata.isSimpleManifest()) {
+ String name;
+ if(metaStrings.isEmpty())
+ name = null;
+ else
+ name = (String)
metaStrings.removeFirst();
+ // Since metadata is a document, we just
replace metadata here
+ if(name == null) {
+ metadata =
metadata.getDefaultDocument();
+ if(metadata == null)
+ throw new
FetchException(FetchException.NOT_ENOUGH_METASTRINGS);
+ } else {
+ metadata = metadata.getDocument(name);
+ thisKey = thisKey.pushMetaString(name);
+ if(metadata == null)
+ throw new
FetchException(FetchException.NOT_IN_ARCHIVE);
+ }
+ continue; // loop
+ } else if(metadata.isArchiveManifest()) {
+ if(metaStrings.isEmpty() &&
ctx.returnZIPManifests) {
+ // Just return the archive, whole.
+ metadata.setSimpleRedirect();
+ continue;
+ }
+ // First we need the archive metadata.
+ // Then parse it.
+ // Then we may need to fetch something from
inside the archive.
+ ah = (ArchiveStoreContext)
ctx.archiveManager.makeHandler(thisKey, metadata.getArchiveType(), false);
+ // ah is set. This means we are currently
handling an archive.
+ Bucket metadataBucket;
+ metadataBucket = ah.getMetadata(actx, null,
null, recursionLevel+1, true);
+ if(metadataBucket != null) {
+ try {
+ metadata =
Metadata.construct(metadataBucket);
+ } catch (IOException e) {
+ // Bucket error?
+ throw new
FetchException(FetchException.BUCKET_ERROR, e);
+ }
+ } else {
+ fetchArchive(); // will result in this
function being called again
+ return;
+ }
+ continue;
+ } else if(metadata.isArchiveInternalRedirect()) {
+
clientMetadata.mergeNoOverwrite(metadata.getClientMetadata()); // even
splitfiles can have mime types!
+ // Fetch it from the archive
+ if(ah == null)
+ throw new
FetchException(FetchException.UNKNOWN_METADATA, "Archive redirect not in an
archive");
+ if(metaStrings.isEmpty())
+ throw new
FetchException(FetchException.NOT_ENOUGH_METASTRINGS);
+ Bucket dataBucket = ah.get((String)
metaStrings.removeFirst(), actx, null, null, recursionLevel+1, true);
+ if(dataBucket != null) {
+ // Return the data
+ onSuccess(new
FetchResult(this.clientMetadata, dataBucket));
+ return;
+ } else {
+ // Metadata cannot contain pointers to
files which don't exist.
+ // We enforce this in ArchiveHandler.
+ // Therefore, the archive needs to be
fetched.
+ fetchArchive();
+ // Will call back into this function
when it has been fetched.
+ return;
+ }
+ } else if(metadata.isMultiLevelMetadata()) {
+ // Fetch on a second SingleFileFetcher, like
with archives.
+ Metadata newMeta = (Metadata) metadata.clone();
+ metadata.setSimpleRedirect();
+ SingleFileFetcher f = new
SingleFileFetcher(this, newMeta, new MultiLevelMetadataCallback(), ctx);
+ f.handleMetadata();
+ return;
+ } else if(metadata.isSingleFileRedirect()) {
+
clientMetadata.mergeNoOverwrite(metadata.getClientMetadata()); // even
splitfiles can have mime types!
+ // FIXME implement implicit archive support
+
+ // Simple redirect
+ // Just create a new SingleFileFetcher
+ // Which will then fetch the target URI, and
call the rcd.success
+ // Hopefully!
+ FreenetURI uri = metadata.getSingleTarget();
+ ClientKey key;
+ try {
+ key = ClientKey.getBaseKey(uri);
+ } catch (MalformedURLException e) {
+ throw new
FetchException(FetchException.INVALID_URI, e);
+ }
+ LinkedList newMetaStrings =
uri.listMetaStrings();
+
+ // Move any new meta strings to beginning of
our list of remaining meta strings
+ while(!newMetaStrings.isEmpty()) {
+ Object o = newMetaStrings.removeLast();
+ metaStrings.addFirst(o);
+ }
+
+ SingleFileFetcher f = new
SingleFileFetcher(parent, rcb, clientMetadata, key, metaStrings, ctx, actx,
maxRetries, recursionLevel, false, null);
+ if(metadata.isCompressed()) {
+ Compressor codec =
Compressor.getCompressionAlgorithmByMetadataID(metadata.getCompressionCodec());
+ f.addDecompressor(codec);
+ }
+ f.schedule();
+ // All done! No longer our problem!
+ return;
+ } else if(metadata.isSplitfile()) {
+ // FIXME implicit archive support
+
+
clientMetadata.mergeNoOverwrite(metadata.getClientMetadata()); // even
splitfiles can have mime types!
+
+ // Splitfile (possibly compressed)
+
+ if(metadata.isCompressed()) {
+ Compressor codec =
Compressor.getCompressionAlgorithmByMetadataID(metadata.getCompressionCodec());
+ addDecompressor(codec);
+ }
+
+ SplitFileFetcher sf = new
SplitFileFetcher(metadata, rcb, parent, ctx,
+ decompressors, clientMetadata,
actx, recursionLevel);
+ // SplitFile will now run.
+ // Then it will return data to rcd.
+ // We are now out of the loop. Yay!
+ return;
+ } else {
+ Logger.error(this, "Don't know what to do with
metadata: "+metadata);
+ throw new
FetchException(FetchException.UNKNOWN_METADATA);
+ }
+ }
+ }
+
+ private void addDecompressor(Compressor codec) {
+ decompressors.addLast(codec);
+ }
+
+ private void fetchArchive() throws FetchException,
MetadataParseException, ArchiveFailureException, ArchiveRestartException {
+ // Fetch the archive
+ // How?
+ // Spawn a separate SingleFileFetcher,
+ // which fetches the archive, then calls
+ // our Callback, which unpacks the archive, then
+ // reschedules us.
+ Metadata newMeta = (Metadata) metadata.clone();
+ newMeta.setSimpleRedirect();
+ SingleFileFetcher f;
+ f = new SingleFileFetcher(this, newMeta, new
ArchiveFetcherCallback(), new FetcherContext(ctx,
FetcherContext.SET_RETURN_ARCHIVES));
+ f.handleMetadata();
+ // When it is done (if successful), the ArchiveCallback will
re-call this function.
+ // Which will then discover that the metadata *is* available.
+ // And will also discover that the data is available, and will
complete.
+ }
+
+ class ArchiveFetcherCallback implements RequestCompletionCallback {
+
+ public void onSuccess(FetchResult result, ClientGetState state)
{
+ parent.fetchState = SingleFileFetcher.this;
+ try {
+ ctx.archiveManager.extractToCache(thisKey,
ah.getArchiveType(), result.asBucket(), actx, ah);
+ } catch (ArchiveFailureException e) {
+ SingleFileFetcher.this.onFailure(new
FetchException(e));
+ } catch (ArchiveRestartException e) {
+ SingleFileFetcher.this.onFailure(new
FetchException(e));
+ }
+ try {
+ handleMetadata();
+ } catch (MetadataParseException e) {
+ SingleFileFetcher.this.onFailure(new
FetchException(e));
+ } catch (FetchException e) {
+ SingleFileFetcher.this.onFailure(e);
+ } catch (ArchiveFailureException e) {
+ SingleFileFetcher.this.onFailure(new
FetchException(e));
+ } catch (ArchiveRestartException e) {
+ SingleFileFetcher.this.onFailure(new
FetchException(e));
+ }
+ }
+
+ public void onFailure(FetchException e, ClientGetState state) {
+ // Force fatal as the fetcher is presumed to have made
a reasonable effort.
+ SingleFileFetcher.this.onFailure(e, true);
+ }
+
+ }
+
+ class MultiLevelMetadataCallback implements RequestCompletionCallback {
+
+ public void onSuccess(FetchResult result, ClientGetState state)
{
+ parent.fetchState = SingleFileFetcher.this;
+ try {
+ metadata =
Metadata.construct(result.asBucket());
+ } catch (MetadataParseException e) {
+ SingleFileFetcher.this.onFailure(new
FetchException(e));
+ return;
+ } catch (IOException e) {
+ // Bucket error?
+ SingleFileFetcher.this.onFailure(new
FetchException(FetchException.BUCKET_ERROR, e));
+ return;
+ }
+ }
+
+ public void onFailure(FetchException e, ClientGetState state) {
+ // Pass it on; fetcher is assumed to have retried as
appropriate already, so this is fatal.
+ SingleFileFetcher.this.onFailure(e, true);
+ }
+
+ }
+
+ private final void onFailure(FetchException e) {
+ onFailure(e, false);
+ }
+
+ // Real onFailure
+ private void onFailure(FetchException e, boolean forceFatal) {
+ if(!(e.isFatal() || forceFatal) ) {
+ if(retryCount <= maxRetries) {
+ if(parent.isCancelled()) {
+ onFailure(new
FetchException(FetchException.CANCELLED));
+ return;
+ }
+ retryCount++;
+ parent.scheduler.register(this);
+ return;
+ }
+ }
+ // :(
+ rcb.onFailure(e, this);
+ }
+
+ // Translate it, then call the real onFailure
+ public void onFailure(LowLevelPutException e) {
+ switch(e.code) {
+ case LowLevelGetException.DATA_NOT_FOUND:
+ onFailure(new
FetchException(FetchException.DATA_NOT_FOUND));
+ case LowLevelGetException.DATA_NOT_FOUND_IN_STORE:
+ onFailure(new
FetchException(FetchException.DATA_NOT_FOUND));
+ case LowLevelGetException.DECODE_FAILED:
+ onFailure(new
FetchException(FetchException.BLOCK_DECODE_ERROR));
+ case LowLevelGetException.INTERNAL_ERROR:
+ onFailure(new
FetchException(FetchException.INTERNAL_ERROR));
+ case LowLevelGetException.REJECTED_OVERLOAD:
+ onFailure(new
FetchException(FetchException.REJECTED_OVERLOAD));
+ case LowLevelGetException.ROUTE_NOT_FOUND:
+ onFailure(new
FetchException(FetchException.ROUTE_NOT_FOUND));
+ case LowLevelGetException.TRANSFER_FAILED:
+ onFailure(new
FetchException(FetchException.TRANSFER_FAILED));
+ case LowLevelGetException.VERIFY_FAILED:
+ onFailure(new
FetchException(FetchException.BLOCK_DECODE_ERROR));
+ default:
+ Logger.error(this, "Unknown LowLevelGetException code:
"+e.code);
+ onFailure(new
FetchException(FetchException.INTERNAL_ERROR));
+ }
+ }
+
+ public Object getToken() {
+ return token;
+ }
+
+}
Added:
branches/async-client-layer/src/freenet/client/async/SplitFileFetcher.java
===================================================================
--- branches/async-client-layer/src/freenet/client/async/SplitFileFetcher.java
2006-01-20 19:10:04 UTC (rev 7886)
+++ branches/async-client-layer/src/freenet/client/async/SplitFileFetcher.java
2006-01-20 19:11:02 UTC (rev 7887)
@@ -0,0 +1,238 @@
+package freenet.client.async;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.LinkedList;
+
+import freenet.client.ArchiveContext;
+import freenet.client.ClientMetadata;
+import freenet.client.FetchException;
+import freenet.client.FetchResult;
+import freenet.client.FetcherContext;
+import freenet.client.Metadata;
+import freenet.client.MetadataParseException;
+import freenet.client.events.SplitfileProgressEvent;
+import freenet.keys.FreenetURI;
+import freenet.keys.NodeCHK;
+import freenet.support.Bucket;
+import freenet.support.Fields;
+import freenet.support.Logger;
+import freenet.support.compress.CompressionOutputSizeException;
+import freenet.support.compress.Compressor;
+
+/**
+ * Fetch a splitfile, decompress it if need be, and return it to the
RequestCompletionCallback.
+ * Most of the work is done by the segments, and we do not need a thread.
+ */
+public class SplitFileFetcher extends ClientGetState {
+
+ final FetcherContext fetchContext;
+ final ArchiveContext archiveContext;
+ final LinkedList decompressors;
+ final ClientMetadata clientMetadata;
+ final ClientGet parent;
+ final RequestCompletionCallback cb;
+ final int recursionLevel;
+ /** The splitfile type. See the SPLITFILE_ constants on Metadata. */
+ final short splitfileType;
+ /** The segment length. -1 means not segmented and must get everything
to decode. */
+ final int blocksPerSegment;
+ /** The segment length in check blocks. */
+ final int checkBlocksPerSegment;
+ /** Total number of segments */
+ final int segmentCount;
+ /** The detailed information on each segment */
+ final SplitFileFetcherSegment[] segments;
+ /** The splitfile data blocks. */
+ final FreenetURI[] splitfileDataBlocks;
+ /** The splitfile check blocks. */
+ final FreenetURI[] splitfileCheckBlocks;
+ /** Maximum temporary length */
+ final long maxTempLength;
+ /** Have all segments finished? Access synchronized. */
+ private boolean allSegmentsFinished = false;
+ /** Override length. If this is positive, truncate the splitfile to
this length. */
+ private final long overrideLength;
+ /** Accept non-full splitfile chunks? */
+ private final boolean splitUseLengths;
+ private boolean finished;
+
+ public SplitFileFetcher(Metadata metadata, RequestCompletionCallback
rcb, ClientGet parent,
+ FetcherContext newCtx, LinkedList decompressors,
ClientMetadata clientMetadata,
+ ArchiveContext actx, int recursionLevel) throws
FetchException, MetadataParseException {
+ this.finished = false;
+ this.fetchContext = newCtx;
+ this.archiveContext = actx;
+ this.decompressors = decompressors;
+ this.clientMetadata = clientMetadata;
+ this.cb = rcb;
+ this.recursionLevel = recursionLevel + 1;
+ this.parent = parent;
+ if(parent.isCancelled())
+ throw new FetchException(FetchException.CANCELLED);
+ overrideLength = metadata.dataLength();
+ this.splitfileType = metadata.getSplitfileType();
+ splitfileDataBlocks = metadata.getSplitfileDataKeys();
+ splitfileCheckBlocks = metadata.getSplitfileCheckKeys();
+ splitUseLengths = metadata.splitUseLengths();
+ int blockLength = splitUseLengths ? -1 : NodeCHK.BLOCK_SIZE;
+ if(splitfileType == Metadata.SPLITFILE_NONREDUNDANT) {
+ // Don't need to do much - just fetch everything and
piece it together.
+ blocksPerSegment = -1;
+ checkBlocksPerSegment = -1;
+ segmentCount = 1;
+ } else if(splitfileType == Metadata.SPLITFILE_ONION_STANDARD) {
+ byte[] params = metadata.splitfileParams();
+ if(params == null || params.length < 8)
+ throw new MetadataParseException("No splitfile
params");
+ blocksPerSegment = Fields.bytesToInt(params, 0);
+ checkBlocksPerSegment = Fields.bytesToInt(params, 4);
+ if(blocksPerSegment >
fetchContext.maxDataBlocksPerSegment
+ || checkBlocksPerSegment >
fetchContext.maxCheckBlocksPerSegment)
+ throw new
FetchException(FetchException.TOO_MANY_BLOCKS_PER_SEGMENT, "Too many blocks per
segment: "+blocksPerSegment+" data, "+checkBlocksPerSegment+" check");
+ segmentCount = (splitfileDataBlocks.length /
blocksPerSegment) +
+ (splitfileDataBlocks.length % blocksPerSegment
== 0 ? 0 : 1);
+ // Onion, 128/192.
+ // Will be segmented.
+ } else throw new MetadataParseException("Unknown splitfile
format: "+splitfileType);
+ this.maxTempLength = fetchContext.maxTempLength;
+ Logger.minor(this, "Algorithm: "+splitfileType+", blocks per
segment: "+blocksPerSegment+", check blocks per segment:
"+checkBlocksPerSegment+", segments: "+segmentCount);
+ segments = new SplitFileFetcherSegment[segmentCount]; //
initially null on all entries
+ if(segmentCount == 1) {
+ segments[0] = new
SplitFileFetcherSegment(splitfileType, splitfileDataBlocks,
splitfileCheckBlocks, this, archiveContext, fetchContext, maxTempLength,
splitUseLengths, recursionLevel);
+ } else {
+ int dataBlocksPtr = 0;
+ int checkBlocksPtr = 0;
+ for(int i=0;i<segments.length;i++) {
+ // Create a segment. Give it its keys.
+ int copyDataBlocks =
Math.min(splitfileDataBlocks.length - dataBlocksPtr, blocksPerSegment);
+ int copyCheckBlocks =
Math.min(splitfileCheckBlocks.length - checkBlocksPtr, checkBlocksPerSegment);
+ FreenetURI[] dataBlocks = new
FreenetURI[copyDataBlocks];
+ FreenetURI[] checkBlocks = new
FreenetURI[copyCheckBlocks];
+ if(copyDataBlocks > 0)
+ System.arraycopy(splitfileDataBlocks,
dataBlocksPtr, dataBlocks, 0, copyDataBlocks);
+ if(copyCheckBlocks > 0)
+ System.arraycopy(splitfileCheckBlocks,
checkBlocksPtr, checkBlocks, 0, copyCheckBlocks);
+ dataBlocksPtr += copyDataBlocks;
+ checkBlocksPtr += copyCheckBlocks;
+ segments[i] = new
SplitFileFetcherSegment(splitfileType, dataBlocks, checkBlocks, this,
archiveContext, fetchContext, maxTempLength, splitUseLengths, recursionLevel+1);
+ }
+ }
+ }
+
+ /** Return the final status of the fetch. Throws an exception, or
returns a
+ * Bucket containing the fetched data.
+ * @throws FetchException If the fetch failed for some reason.
+ */
+ private Bucket finalStatus() throws FetchException {
+ long finalLength = 0;
+ for(int i=0;i<segments.length;i++) {
+ SplitFileFetcherSegment s = segments[i];
+ if(!s.isFinished()) throw new
IllegalStateException("Not all finished");
+ s.throwError();
+ // If still here, it succeeded
+ finalLength += s.decodedLength();
+ // Healing is done by Segment
+ }
+ if(finalLength > overrideLength)
+ finalLength = overrideLength;
+
+ long bytesWritten = 0;
+ OutputStream os = null;
+ Bucket output;
+ try {
+ output =
fetchContext.bucketFactory.makeBucket(finalLength);
+ os = output.getOutputStream();
+ for(int i=0;i<segments.length;i++) {
+ SplitFileFetcherSegment s = segments[i];
+ long max = (finalLength < 0 ? 0 : (finalLength
- bytesWritten));
+ bytesWritten += s.writeDecodedDataTo(os, max);
+ }
+ } catch (IOException e) {
+ throw new FetchException(FetchException.BUCKET_ERROR,
e);
+ } finally {
+ if(os != null) {
+ try {
+ os.close();
+ } catch (IOException e) {
+ // If it fails to close it may return
corrupt data.
+ throw new
FetchException(FetchException.BUCKET_ERROR, e);
+ }
+ }
+ }
+ return output;
+ }
+
+ public void segmentFinished(SplitFileFetcherSegment segment) {
+ Logger.minor(this, "Finished segment: "+segment);
+ synchronized(this) {
+ boolean allDone = true;
+ for(int i=0;i<segments.length;i++)
+ if(!segments[i].isFinished()) {
+ Logger.minor(this, "Segment
"+segments[i]+" is not finished");
+ allDone = false;
+ }
+ if(allDone) {
+ if(allSegmentsFinished)
+ Logger.error(this, "Was already
finished! (segmentFinished("+segment+")");
+ else {
+ allSegmentsFinished = true;
+ finish();
+ }
+ }
+ notifyAll();
+ }
+ }
+
+ private void finish() {
+ try {
+ synchronized(this) {
+ if(finished) {
+ Logger.error(this, "Was already
finished");
+ return;
+ }
+ finished = true;
+ }
+ Bucket data = finalStatus();
+ // Decompress
+ while(!decompressors.isEmpty()) {
+ Compressor c = (Compressor)
decompressors.removeLast();
+ try {
+ data = c.decompress(data,
fetchContext.bucketFactory, Math.max(fetchContext.maxTempLength,
fetchContext.maxOutputLength));
+ } catch (IOException e) {
+ cb.onFailure(new
FetchException(FetchException.BUCKET_ERROR, e), this);
+ return;
+ } catch (CompressionOutputSizeException e) {
+ cb.onFailure(new
FetchException(FetchException.TOO_BIG, e), this);
+ return;
+ }
+ }
+ cb.onSuccess(new FetchResult(clientMetadata, data),
this);
+ } catch (FetchException e) {
+ cb.onFailure(e, this);
+ }
+ }
+
+ public ClientGet getParent() {
+ return parent;
+ }
+
+ public void onProgress() {
+ int totalBlocks = splitfileDataBlocks.length;
+ int fetchedBlocks = 0;
+ int failedBlocks = 0;
+ int fatallyFailedBlocks = 0;
+ int runningBlocks = 0;
+ for(int i=0;i<segments.length;i++) {
+ SplitFileFetcherSegment segment = segments[i];
+ Logger.minor(this, "Segment: "+segment+":
fetched="+segment.fetchedBlocks()+", failedBlocks: "+segment.failedBlocks()+
+ ", fatally:
"+segment.fatallyFailedBlocks()+", running: "+segment.runningBlocks());
+ fetchedBlocks += segment.fetchedBlocks();
+ failedBlocks += segment.failedBlocks();
+ fatallyFailedBlocks += segment.fatallyFailedBlocks();
+ runningBlocks += segment.runningBlocks();
+ }
+ fetchContext.eventProducer.produceEvent(new
SplitfileProgressEvent(totalBlocks, fetchedBlocks, failedBlocks,
fatallyFailedBlocks, runningBlocks));
+ }
+
+}
Added:
branches/async-client-layer/src/freenet/client/async/SplitFileFetcherSegment.java
===================================================================
---
branches/async-client-layer/src/freenet/client/async/SplitFileFetcherSegment.java
2006-01-20 19:10:04 UTC (rev 7886)
+++
branches/async-client-layer/src/freenet/client/async/SplitFileFetcherSegment.java
2006-01-20 19:11:02 UTC (rev 7887)
@@ -0,0 +1,298 @@
+package freenet.client.async;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.MalformedURLException;
+
+import freenet.client.ArchiveContext;
+import freenet.client.FECCodec;
+import freenet.client.FailureCodeTracker;
+import freenet.client.FetchException;
+import freenet.client.FetchResult;
+import freenet.client.FetcherContext;
+import freenet.client.Metadata;
+import freenet.client.MetadataParseException;
+import freenet.client.SplitfileBlock;
+import freenet.keys.FreenetURI;
+import freenet.support.Bucket;
+import freenet.support.BucketTools;
+import freenet.support.Logger;
+
+/**
+ * A single segment within a SplitFileFetcher.
+ * This in turn controls a large number of SingleFileFetcher's.
+ */
+public class SplitFileFetcherSegment implements RequestCompletionCallback {
+
+ final short splitfileType;
+ final FreenetURI[] dataBlocks;
+ final FreenetURI[] checkBlocks;
+ final SingleFileFetcher[] dataBlockStatus;
+ final SingleFileFetcher[] checkBlockStatus;
+ final MinimalSplitfileBlock[] dataBuckets;
+ final MinimalSplitfileBlock[] checkBuckets;
+ final int minFetched;
+ final SplitFileFetcher parentFetcher;
+ final ArchiveContext archiveContext;
+ final FetcherContext fetcherContext;
+ final long maxBlockLength;
+ final boolean nonFullBlocksAllowed;
+ /** Has the segment finished processing? Irreversible. */
+ private boolean finished;
+ /** Bucket to store the data retrieved, after it has been decoded */
+ private Bucket decodedData;
+ /** Fetch context for block fetches */
+ final FetcherContext blockFetchContext;
+ /** Recursion level */
+ final int recursionLevel;
+ private FetchException failureException;
+ private int fatallyFailedBlocks;
+ private int failedBlocks;
+ private int fetchedBlocks;
+ private FailureCodeTracker errors;
+
+ public SplitFileFetcherSegment(short splitfileType, FreenetURI[]
splitfileDataBlocks, FreenetURI[] splitfileCheckBlocks, SplitFileFetcher
fetcher, ArchiveContext archiveContext, FetcherContext fetchContext, long
maxTempLength, boolean splitUseLengths, int recursionLevel) throws
MetadataParseException, FetchException {
+ this.parentFetcher = fetcher;
+ this.archiveContext = archiveContext;
+ this.splitfileType = splitfileType;
+ dataBlocks = splitfileDataBlocks;
+ checkBlocks = splitfileCheckBlocks;
+ if(splitfileType == Metadata.SPLITFILE_NONREDUNDANT) {
+ minFetched = dataBlocks.length;
+ } else if(splitfileType == Metadata.SPLITFILE_ONION_STANDARD) {
+ minFetched = dataBlocks.length;
+ } else throw new MetadataParseException("Unknown splitfile
type"+splitfileType);
+ finished = false;
+ decodedData = null;
+ dataBlockStatus = new SingleFileFetcher[dataBlocks.length];
+ checkBlockStatus = new SingleFileFetcher[checkBlocks.length];
+ dataBuckets = new MinimalSplitfileBlock[dataBlocks.length];
+ checkBuckets = new MinimalSplitfileBlock[checkBlocks.length];
+ for(int i=0;i<dataBuckets.length;i++) {
+ dataBuckets[i] = new MinimalSplitfileBlock(i);
+ }
+ for(int i=0;i<checkBuckets.length;i++)
+ checkBuckets[i] = new
MinimalSplitfileBlock(i+dataBuckets.length);
+ nonFullBlocksAllowed = splitUseLengths;
+ this.fetcherContext = fetchContext;
+ maxBlockLength = maxTempLength;
+ if(splitUseLengths) {
+ blockFetchContext = new FetcherContext(fetcherContext,
FetcherContext.SPLITFILE_USE_LENGTHS_MASK);
+ this.recursionLevel = recursionLevel + 1;
+ } else {
+ blockFetchContext = new FetcherContext(fetcherContext,
FetcherContext.SPLITFILE_DEFAULT_BLOCK_MASK);
+ this.recursionLevel = 0;
+ }
+
+ try {
+ for(int i=0;i<dataBlocks.length;i++) {
+ dataBlockStatus[i] =
+ new
SingleFileFetcher(parentFetcher.parent, this, null, dataBlocks[i],
blockFetchContext, archiveContext, fetchContext.maxSplitfileBlockRetries,
recursionLevel, true, new Integer(i));
+ dataBlockStatus[i].schedule();
+ }
+ for(int i=0;i<checkBlocks.length;i++) {
+ checkBlockStatus[i] =
+ new
SingleFileFetcher(parentFetcher.parent, this, null, checkBlocks[i],
blockFetchContext, archiveContext, fetchContext.maxSplitfileBlockRetries,
recursionLevel, true, new Integer(dataBlocks.length+i));
+ checkBlockStatus[i].schedule();
+ }
+ } catch (MalformedURLException e) {
+ // Invalidates the whole splitfile
+ throw new FetchException(FetchException.INVALID_URI,
"Invalid URI in splitfile");
+ }
+ }
+
+ public boolean isFinished() {
+ return finished;
+ }
+
+ /** Throw a FetchException, if we have one. Else do nothing. */
+ public synchronized void throwError() throws FetchException {
+ if(failureException != null)
+ throw failureException;
+ }
+
+ /** Decoded length? */
+ public long decodedLength() {
+ return decodedData.size();
+ }
+
+ /** Write the decoded segment's data to an OutputStream */
+ public long writeDecodedDataTo(OutputStream os, long truncateLength)
throws IOException {
+ long len = decodedData.size();
+ if(truncateLength >= 0 && truncateLength < len)
+ len = truncateLength;
+ BucketTools.copyTo(decodedData, os, Math.min(truncateLength,
decodedData.size()));
+ return len;
+ }
+
+ /** How many blocks have failed due to running out of retries? */
+ public synchronized int failedBlocks() {
+ return failedBlocks;
+ }
+
+ /** How many blocks have been successfully fetched? */
+ public synchronized int fetchedBlocks() {
+ return fetchedBlocks;
+ }
+
+ /** How many blocks have currently running requests? */
+ public int runningBlocks() {
+ // FIXME implement or throw out
+ return 0;
+ }
+
+ /** How many blocks failed permanently due to fatal errors? */
+ public int fatallyFailedBlocks() {
+ return fatallyFailedBlocks;
+ }
+
+ public synchronized void onSuccess(FetchResult result, ClientGetState
state) {
+ Integer token = (Integer) ((SingleFileFetcher)state).getToken();
+ int blockNo = token.intValue();
+ if(blockNo < dataBlocks.length) {
+ if(dataBlocks[blockNo] == null) {
+ Logger.error(this, "Block already finished:
"+blockNo);
+ return;
+ }
+ dataBlocks[blockNo] = null;
+ dataBuckets[blockNo].setData(result.asBucket());
+ } else if(blockNo < checkBlocks.length) {
+ if(checkBlocks[blockNo-dataBlocks.length] == null) {
+ Logger.error(this, "Block already finished:
"+blockNo);
+ return;
+ }
+ checkBlocks[blockNo-dataBlocks.length] = null;
+ checkBuckets[blockNo].setData(result.asBucket());
+ } else
+ Logger.error(this, "Unrecognized block number:
"+blockNo, new Exception("error"));
+ fetchedBlocks++;
+ if(fetchedBlocks >= minFetched)
+ startDecode();
+ }
+
+ private void startDecode() {
+ Runnable r = new Decoder();
+ Thread t = new Thread(r, "Decoder for "+this);
+ t.setDaemon(true);
+ t.start();
+ }
+
+ class Decoder implements Runnable {
+
+ public void run() {
+
+ // Now decode
+ Logger.minor(this, "Decoding "+this);
+
+ FECCodec codec = FECCodec.getCodec(splitfileType,
dataBlocks.length, checkBlocks.length);
+ try {
+ if(splitfileType !=
Metadata.SPLITFILE_NONREDUNDANT) {
+ // FIXME hardcoded block size below.
+ codec.decode(dataBuckets, checkBuckets,
32768, fetcherContext.bucketFactory);
+ // Now have all the data blocks (not
necessarily all the check blocks)
+ }
+
+ decodedData =
fetcherContext.bucketFactory.makeBucket(-1);
+ Logger.minor(this, "Copying data from data
blocks");
+ OutputStream os = decodedData.getOutputStream();
+ for(int i=0;i<dataBlockStatus.length;i++) {
+ SplitfileBlock status = dataBuckets[i];
+ Bucket data = status.getData();
+ BucketTools.copyTo(data, os,
Long.MAX_VALUE);
+ }
+ Logger.minor(this, "Copied data");
+ os.close();
+ // Must set finished BEFORE calling
parentFetcher.
+ // Otherwise a race is possible that might
result in it not seeing our finishing.
+ finished = true;
+
parentFetcher.segmentFinished(SplitFileFetcherSegment.this);
+ } catch (IOException e) {
+ Logger.minor(this, "Caught bucket error?: "+e,
e);
+ finished = true;
+ failureException = new
FetchException(FetchException.BUCKET_ERROR);
+
parentFetcher.segmentFinished(SplitFileFetcherSegment.this);
+ return;
+ }
+
+ // Now heal
+
+ // Encode any check blocks we don't have
+ if(codec != null) {
+ try {
+ codec.encode(dataBuckets, checkBuckets,
32768, fetcherContext.bucketFactory);
+ } catch (IOException e) {
+ Logger.error(this, "Bucket error while
healing: "+e, e);
+ }
+ }
+
+ // Now insert *ALL* blocks on which we had at least one
failure, and didn't eventually succeed
+ for(int i=0;i<dataBlockStatus.length;i++) {
+ if(dataBuckets[i].getData() != null) continue;
+ SingleFileFetcher fetcher = dataBlockStatus[i];
+ if(fetcher.getRetryCount() == 0) {
+ // 80% chance of not inserting, if we
never tried it
+ if(fetcherContext.random.nextInt(5) ==
0) continue;
+ }
+ queueHeal(dataBuckets[i].getData());
+ }
+ for(int i=0;i<checkBlockStatus.length;i++) {
+ if(checkBuckets[i].getData() != null) continue;
+ SingleFileFetcher fetcher = checkBlockStatus[i];
+ if(fetcher.getRetryCount() == 0) {
+ // 80% chance of not inserting, if we
never tried it
+ if(fetcherContext.random.nextInt(5) ==
0) continue;
+ }
+ queueHeal(checkBuckets[i].getData());
+ }
+
+ for(int i=0;i<dataBlocks.length;i++) {
+ dataBuckets[i] = null;
+ dataBlockStatus[i] = null;
+ dataBlocks[i] = null;
+ }
+ for(int i=0;i<checkBlocks.length;i++) {
+ checkBuckets[i] = null;
+ checkBlockStatus[i] = null;
+ checkBlocks[i] = null;
+ }
+ }
+
+ }
+
+ private void queueHeal(Bucket data) {
+ // TODO Auto-generated method stub
+
+ }
+
+ /** This is after any retries and therefore is either out-of-retries or
fatal */
+ public synchronized void onFailure(FetchException e, ClientGetState
state) {
+ Integer token = (Integer) ((SingleFileFetcher)state).getToken();
+ int blockNo = token.intValue();
+ if(blockNo < dataBlocks.length) {
+ if(dataBlocks[blockNo] == null) {
+ Logger.error(this, "Block already finished:
"+blockNo);
+ return;
+ }
+ dataBlocks[blockNo] = null;
+ } else if(blockNo < checkBlocks.length) {
+ if(checkBlocks[blockNo-dataBlocks.length] == null) {
+ Logger.error(this, "Block already finished:
"+blockNo);
+ return;
+ }
+ checkBlocks[blockNo-dataBlocks.length] = null;
+ } else
+ Logger.error(this, "Unrecognized block number:
"+blockNo, new Exception("error"));
+ // :(
+ Logger.minor(this, "Permanently failed: "+state+" on "+this);
+ if(e.isFatal())
+ fatallyFailedBlocks++;
+ else
+ failedBlocks++;
+ // FIXME this may not be accurate across all the retries?
+ if(e.errorCodes != null)
+ errors.merge(e.errorCodes);
+ else
+ errors.inc(new Integer(e.mode),
((SingleFileFetcher)state).getRetryCount());
+ }
+
+}