Author: toad
Date: 2006-01-20 19:11:02 +0000 (Fri, 20 Jan 2006)
New Revision: 7887

Added:
   branches/async-client-layer/src/freenet/client/SplitfileBlock.java
   branches/async-client-layer/src/freenet/client/async/
   branches/async-client-layer/src/freenet/client/async/Client.java
   branches/async-client-layer/src/freenet/client/async/ClientGet.java
   branches/async-client-layer/src/freenet/client/async/ClientGetState.java
   branches/async-client-layer/src/freenet/client/async/ClientRequest.java
   
branches/async-client-layer/src/freenet/client/async/ClientRequestScheduler.java
   
branches/async-client-layer/src/freenet/client/async/MinimalSplitfileBlock.java
   
branches/async-client-layer/src/freenet/client/async/RequestCompletionCallback.java
   branches/async-client-layer/src/freenet/client/async/SendableRequest.java
   branches/async-client-layer/src/freenet/client/async/SingleFileFetcher.java
   branches/async-client-layer/src/freenet/client/async/SplitFileFetcher.java
   
branches/async-client-layer/src/freenet/client/async/SplitFileFetcherSegment.java
Log:
Add missing files.



Added: branches/async-client-layer/src/freenet/client/SplitfileBlock.java
===================================================================
--- branches/async-client-layer/src/freenet/client/SplitfileBlock.java  
2006-01-20 19:10:04 UTC (rev 7886)
+++ branches/async-client-layer/src/freenet/client/SplitfileBlock.java  
2006-01-20 19:11:02 UTC (rev 7887)
@@ -0,0 +1,21 @@
+package freenet.client;
+
+import freenet.keys.FreenetURI;
+import freenet.support.Bucket;
+
+public interface SplitfileBlock {
+
+       /** Get block number. [0,k[ = data blocks, [k, n[ = check blocks */
+       abstract int getNumber();
+       
+       /** Has data? */
+       abstract boolean hasData();
+       
+       /** Get data */
+       abstract Bucket getData();
+       
+       /** Set data */
+       abstract void setData(Bucket data);
+
+
+}

Added: branches/async-client-layer/src/freenet/client/async/Client.java
===================================================================
--- branches/async-client-layer/src/freenet/client/async/Client.java    
2006-01-20 19:10:04 UTC (rev 7886)
+++ branches/async-client-layer/src/freenet/client/async/Client.java    
2006-01-20 19:11:02 UTC (rev 7887)
@@ -0,0 +1,17 @@
+package freenet.client.async;
+
+import freenet.client.FetchException;
+import freenet.client.FetchResult;
+
+/**
+ * A client process. Something that initiates requests, and can cancel
+ * them. FCP, Fproxy, and the GlobalPersistentClient, implement this
+ * somewhere.
+ */
+public interface Client {
+
+       public void onSuccess(FetchResult result, ClientGet state);
+       
+       public void onFailure(FetchException e, ClientGet state);
+
+}

Added: branches/async-client-layer/src/freenet/client/async/ClientGet.java
===================================================================
--- branches/async-client-layer/src/freenet/client/async/ClientGet.java 
2006-01-20 19:10:04 UTC (rev 7886)
+++ branches/async-client-layer/src/freenet/client/async/ClientGet.java 
2006-01-20 19:11:02 UTC (rev 7887)
@@ -0,0 +1,79 @@
+package freenet.client.async;
+
+import java.net.MalformedURLException;
+
+import freenet.client.ArchiveContext;
+import freenet.client.ClientMetadata;
+import freenet.client.FetchException;
+import freenet.client.FetchResult;
+import freenet.client.FetcherContext;
+import freenet.keys.FreenetURI;
+
+/**
+ * A high level data request.
+ */
+public class ClientGet extends ClientRequest implements 
RequestCompletionCallback {
+
+       final Client client;
+       final FreenetURI uri;
+       final FetcherContext ctx;
+       final ArchiveContext actx;
+       final ClientRequestScheduler scheduler;
+       ClientGetState fetchState;
+       private boolean finished;
+       private boolean cancelled;
+       final int priorityClass;
+       private int archiveRestarts;
+       
+       public ClientGet(Client client, ClientRequestScheduler sched, 
FreenetURI uri, FetcherContext ctx, short priorityClass) {
+               super(priorityClass);
+               this.client = client;
+               this.uri = uri;
+               this.ctx = ctx;
+               this.scheduler = sched;
+               this.finished = false;
+               this.actx = new ArchiveContext();
+               this.priorityClass = priorityClass;
+               archiveRestarts = 0;
+               start();
+       }
+       
+       private void start() {
+               try {
+                       fetchState = new SingleFileFetcher(this, this, new 
ClientMetadata(), uri, ctx, actx, priorityClass, 0, false, null);
+                       fetchState.schedule();
+               } catch (MalformedURLException e) {
+                       onFailure(new 
FetchException(FetchException.INVALID_URI, e), null);
+               } catch (FetchException e) {
+                       onFailure(e, null);
+               }
+       }
+
+       public void cancel() {
+               cancelled = true;
+       }
+       
+       public boolean isCancelled() {
+               return cancelled;
+       }
+       
+       public void onSuccess(FetchResult result, ClientGetState state) {
+               finished = true;
+               client.onSuccess(result, this);
+       }
+
+       public void onFailure(FetchException e, ClientGetState state) {
+               if(e.mode == FetchException.ARCHIVE_RESTART) {
+                       archiveRestarts++;
+                       if(archiveRestarts > ctx.maxArchiveRestarts)
+                               e = new 
FetchException(FetchException.TOO_MANY_ARCHIVE_RESTARTS);
+                       else {
+                               start();
+                               return;
+                       }
+               }
+               finished = true;
+               client.onFailure(e, this);
+       }
+       
+}

Added: branches/async-client-layer/src/freenet/client/async/ClientGetState.java
===================================================================
--- branches/async-client-layer/src/freenet/client/async/ClientGetState.java    
2006-01-20 19:10:04 UTC (rev 7886)
+++ branches/async-client-layer/src/freenet/client/async/ClientGetState.java    
2006-01-20 19:11:02 UTC (rev 7887)
@@ -0,0 +1,16 @@
+package freenet.client.async;
+
+/**
+ * A ClientGetState.
+ * Represents a stage in the fetch process.
+ */
+public abstract class ClientGetState {
+
+       public abstract ClientGet getParent();
+
+       public void schedule() {
+               // TODO Auto-generated method stub
+               
+       }
+       
+}

Added: branches/async-client-layer/src/freenet/client/async/ClientRequest.java
===================================================================
--- branches/async-client-layer/src/freenet/client/async/ClientRequest.java     
2006-01-20 19:10:04 UTC (rev 7886)
+++ branches/async-client-layer/src/freenet/client/async/ClientRequest.java     
2006-01-20 19:11:02 UTC (rev 7887)
@@ -0,0 +1,23 @@
+package freenet.client.async;
+
+/** A high level client request. A request (either fetch or put) started
+ * by a Client. Has a suitable context and a URI; is fulfilled only when
+ * we have followed all the redirects etc, or have an error. Can be 
+ * retried.
+ */
+public abstract class ClientRequest {
+
+       // FIXME move the priority classes from RequestStarter here
+       private short priorityClass;
+       
+       public short getPriorityClass() {
+               return priorityClass;
+       }
+       
+       protected ClientRequest(short priorityClass) {
+               this.priorityClass = priorityClass;
+       }
+       
+       public abstract void cancel();
+       
+}

Added: 
branches/async-client-layer/src/freenet/client/async/ClientRequestScheduler.java
===================================================================
--- 
branches/async-client-layer/src/freenet/client/async/ClientRequestScheduler.java
    2006-01-20 19:10:04 UTC (rev 7886)
+++ 
branches/async-client-layer/src/freenet/client/async/ClientRequestScheduler.java
    2006-01-20 19:11:02 UTC (rev 7887)
@@ -0,0 +1,22 @@
+package freenet.client.async;
+
+/**
+ * Every X seconds, the RequestSender calls the ClientRequestScheduler to
+ * ask for a request to start. A request is then started, in its own 
+ * thread. It is removed at that point.
+ */
+public class ClientRequestScheduler {
+
+       public void register(SendableRequest req) {
+               // FIXME
+       }
+       
+       public void remove(SendableRequest sr) {
+               // FIXME
+       }
+       
+       public void update(SendableRequest sr) {
+               // FIXME
+       }
+       
+}

Added: 
branches/async-client-layer/src/freenet/client/async/MinimalSplitfileBlock.java
===================================================================
--- 
branches/async-client-layer/src/freenet/client/async/MinimalSplitfileBlock.java 
    2006-01-20 19:10:04 UTC (rev 7886)
+++ 
branches/async-client-layer/src/freenet/client/async/MinimalSplitfileBlock.java 
    2006-01-20 19:11:02 UTC (rev 7887)
@@ -0,0 +1,32 @@
+package freenet.client.async;
+
+import freenet.client.SplitfileBlock;
+import freenet.keys.FreenetURI;
+import freenet.support.Bucket;
+
+public class MinimalSplitfileBlock implements SplitfileBlock {
+
+       public final int number;
+       Bucket data;
+       
+       public MinimalSplitfileBlock(int n) {
+               this.number = n;
+       }
+
+       public int getNumber() {
+               return number;
+       }
+
+       public boolean hasData() {
+               return data != null;
+       }
+
+       public Bucket getData() {
+               return data;
+       }
+
+       public void setData(Bucket data) {
+               this.data = data;
+       }
+
+}

Added: 
branches/async-client-layer/src/freenet/client/async/RequestCompletionCallback.java
===================================================================
--- 
branches/async-client-layer/src/freenet/client/async/RequestCompletionCallback.java
 2006-01-20 19:10:04 UTC (rev 7886)
+++ 
branches/async-client-layer/src/freenet/client/async/RequestCompletionCallback.java
 2006-01-20 19:11:02 UTC (rev 7887)
@@ -0,0 +1,16 @@
+package freenet.client.async;
+
+import freenet.client.FetchException;
+import freenet.client.FetchResult;
+
+/**
+ * Callback called when part of a get request completes - either with a 
+ * Bucket full of data, or with an error.
+ */
+public interface RequestCompletionCallback {
+
+       public void onSuccess(FetchResult result, ClientGetState state);
+       
+       public void onFailure(FetchException e, ClientGetState state);
+       
+}

Added: branches/async-client-layer/src/freenet/client/async/SendableRequest.java
===================================================================
--- branches/async-client-layer/src/freenet/client/async/SendableRequest.java   
2006-01-20 19:10:04 UTC (rev 7886)
+++ branches/async-client-layer/src/freenet/client/async/SendableRequest.java   
2006-01-20 19:11:02 UTC (rev 7887)
@@ -0,0 +1,25 @@
+package freenet.client.async;
+
+import freenet.keys.ClientKey;
+import freenet.keys.ClientKeyBlock;
+import freenet.node.LowLevelPutException;
+
+/**
+ * A low-level request which can be sent immediately. These are registered
+ * on the ClientRequestScheduler.
+ */
+public interface SendableRequest {
+       
+       public ClientKey getKey();
+       
+       public short getPriorityClass();
+       
+       public int getRetryCount();
+       
+       /** Called when/if the low-level request succeeds. */
+       public void onSuccess(ClientKeyBlock block);
+       
+       /** Called when/if the low-level request fails. */
+       public void onFailure(LowLevelPutException e);
+       
+}

Added: 
branches/async-client-layer/src/freenet/client/async/SingleFileFetcher.java
===================================================================
--- branches/async-client-layer/src/freenet/client/async/SingleFileFetcher.java 
2006-01-20 19:10:04 UTC (rev 7886)
+++ branches/async-client-layer/src/freenet/client/async/SingleFileFetcher.java 
2006-01-20 19:11:02 UTC (rev 7887)
@@ -0,0 +1,457 @@
+package freenet.client.async;
+
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.util.LinkedList;
+
+import freenet.client.ArchiveContext;
+import freenet.client.ArchiveFailureException;
+import freenet.client.ArchiveRestartException;
+import freenet.client.ArchiveStoreContext;
+import freenet.client.ClientMetadata;
+import freenet.client.FetchException;
+import freenet.client.FetchResult;
+import freenet.client.FetcherContext;
+import freenet.client.Metadata;
+import freenet.client.MetadataParseException;
+import freenet.keys.ClientKey;
+import freenet.keys.ClientKeyBlock;
+import freenet.keys.FreenetURI;
+import freenet.keys.KeyDecodeException;
+import freenet.node.LowLevelGetException;
+import freenet.node.LowLevelPutException;
+import freenet.support.Bucket;
+import freenet.support.Logger;
+import freenet.support.compress.CompressionOutputSizeException;
+import freenet.support.compress.Compressor;
+
+public class SingleFileFetcher extends ClientGetState implements 
SendableRequest {
+
+       final ClientGet parent;
+       //final FreenetURI uri;
+       final ClientKey key;
+       final LinkedList metaStrings;
+       final FetcherContext ctx;
+       final RequestCompletionCallback rcb;
+       final ClientMetadata clientMetadata;
+       private Metadata metadata;
+       final int maxRetries;
+       final ArchiveContext actx;
+       /** Archive handler. We can only have one archive handler at a time. */
+       private ArchiveStoreContext ah;
+       private int recursionLevel;
+       /** The URI of the currently-being-processed data, for archives etc. */
+       private FreenetURI thisKey;
+       private int retryCount;
+       private final LinkedList decompressors;
+       private final boolean dontTellClientGet;
+       private Object token;
+       
+       
+       /** Create a new SingleFileFetcher and register self.
+        * Called when following a redirect, or direct from ClientGet.
+        * @param token 
+        * @param dontTellClientGet 
+        */
+       public SingleFileFetcher(ClientGet get, RequestCompletionCallback cb, 
ClientMetadata metadata, ClientKey key, LinkedList metaStrings, FetcherContext 
ctx, ArchiveContext actx, int maxRetries, int recursionLevel, boolean 
dontTellClientGet, Object token) throws FetchException {
+               this.dontTellClientGet = dontTellClientGet;
+               this.token = token;
+               this.parent = get;
+               //this.uri = uri;
+               //this.key = ClientKey.getBaseKey(uri);
+               //metaStrings = uri.listMetaStrings();
+               this.key = key;
+               this.metaStrings = metaStrings;
+               this.ctx = ctx;
+               retryCount = 0;
+               this.rcb = cb;
+               this.clientMetadata = metadata;
+               this.maxRetries = maxRetries;
+               thisKey = key.getURI();
+               this.actx = actx;
+               this.recursionLevel = recursionLevel + 1;
+               if(recursionLevel > ctx.maxRecursionLevel)
+                       throw new 
FetchException(FetchException.TOO_MUCH_RECURSION);
+               this.decompressors = new LinkedList();
+       }
+
+       /** Called by ClientGet. */ 
+       public SingleFileFetcher(ClientGet get, RequestCompletionCallback cb, 
ClientMetadata metadata, FreenetURI uri, FetcherContext ctx, ArchiveContext 
actx, int maxRetries, int recursionLevel, boolean dontTellClientGet, Object 
token) throws MalformedURLException, FetchException {
+               this(get, cb, metadata, ClientKey.getBaseKey(uri), 
uri.listMetaStrings(), ctx, actx, maxRetries, recursionLevel, 
dontTellClientGet, token);
+       }
+       
+       /** Copy constructor, modifies a few given fields, don't call 
schedule() */
+       public SingleFileFetcher(SingleFileFetcher fetcher, Metadata newMeta, 
RequestCompletionCallback callback, FetcherContext ctx2) throws FetchException {
+               this.token = fetcher.token;
+               this.dontTellClientGet = fetcher.dontTellClientGet;
+               this.actx = fetcher.actx;
+               this.ah = fetcher.ah;
+               this.clientMetadata = fetcher.clientMetadata;
+               this.ctx = ctx2;
+               this.key = fetcher.key;
+               this.maxRetries = fetcher.maxRetries;
+               this.metadata = newMeta;
+               this.metaStrings = fetcher.metaStrings;
+               this.parent = fetcher.parent;
+               this.rcb = callback;
+               this.retryCount = 0;
+               this.recursionLevel = fetcher.recursionLevel + 1;
+               if(recursionLevel > ctx.maxRecursionLevel)
+                       throw new 
FetchException(FetchException.TOO_MUCH_RECURSION);
+               this.thisKey = fetcher.thisKey;
+               this.decompressors = fetcher.decompressors;
+       }
+
+       public void schedule() {
+               if(!dontTellClientGet)
+                       this.parent.fetchState = this;
+               parent.scheduler.register(this);
+       }
+
+       public ClientGet getParent() {
+               return parent;
+       }
+
+       public ClientKey getKey() {
+               return key;
+       }
+
+       public short getPriorityClass() {
+               return parent.getPriorityClass();
+       }
+
+       public int getRetryCount() {
+               return retryCount;
+       }
+
+       // Process the completed data. May result in us going to a
+       // splitfile, or another SingleFileFetcher, etc.
+       public void onSuccess(ClientKeyBlock block) {
+               // Extract data
+               Bucket data;
+               try {
+                       data = block.decode(ctx.bucketFactory, 
(int)(Math.min(ctx.maxOutputLength, Integer.MAX_VALUE)));
+               } catch (KeyDecodeException e1) {
+                       onFailure(new 
FetchException(FetchException.BLOCK_DECODE_ERROR, e1.getMessage()));
+                       return;
+               } catch (IOException e) {
+                       Logger.error(this, "Could not capture data - disk 
full?: "+e, e);
+                       onFailure(new 
FetchException(FetchException.BUCKET_ERROR, e));
+                       return;
+               }
+               if(!block.isMetadata()) {
+                       onSuccess(new FetchResult(clientMetadata, data));
+               } else {
+                       if(!ctx.followRedirects) {
+                               onFailure(new 
FetchException(FetchException.INVALID_METADATA, "Told me not to follow 
redirects (splitfile block??)"));
+                               return;
+                       }
+                       if(parent.isCancelled()) {
+                               onFailure(new 
FetchException(FetchException.CANCELLED));
+                               return;
+                       }
+                       if(data.size() > ctx.maxMetadataSize) {
+                               onFailure(new 
FetchException(FetchException.TOO_BIG_METADATA));
+                               return;
+                       }
+                       // Parse metadata
+                       try {
+                               metadata = Metadata.construct(data);
+                       } catch (MetadataParseException e) {
+                               onFailure(new FetchException(e));
+                               return;
+                       } catch (IOException e) {
+                               // Bucket error?
+                               onFailure(new 
FetchException(FetchException.BUCKET_ERROR, e));
+                               return;
+                       }
+                       try {
+                               handleMetadata();
+                       } catch (MetadataParseException e) {
+                               onFailure(new FetchException(e));
+                               return;
+                       } catch (FetchException e) {
+                               onFailure(e);
+                               return;
+                       } catch (ArchiveFailureException e) {
+                               onFailure(new FetchException(e));
+                       } catch (ArchiveRestartException e) {
+                               onFailure(new FetchException(e));
+                       }
+               }
+       }
+
+       private void onSuccess(FetchResult result) {
+               if(!decompressors.isEmpty()) {
+                       Bucket data = result.asBucket();
+                       while(!decompressors.isEmpty()) {
+                               Compressor c = (Compressor) 
decompressors.removeLast();
+                               try {
+                                       data = c.decompress(data, 
ctx.bucketFactory, Math.max(ctx.maxTempLength, ctx.maxOutputLength));
+                               } catch (IOException e) {
+                                       onFailure(new 
FetchException(FetchException.BUCKET_ERROR, e));
+                                       return;
+                               } catch (CompressionOutputSizeException e) {
+                                       onFailure(new 
FetchException(FetchException.TOO_BIG, e));
+                                       return;
+                               }
+                       }
+                       result = new FetchResult(result, data);
+               }
+               rcb.onSuccess(result, this);
+       }
+
+       private void handleMetadata() throws FetchException, 
MetadataParseException, ArchiveFailureException, ArchiveRestartException {
+               while(true) {
+                       if(metadata.isSimpleManifest()) {
+                               String name;
+                               if(metaStrings.isEmpty())
+                                       name = null;
+                               else
+                                       name = (String) 
metaStrings.removeFirst();
+                               // Since metadata is a document, we just 
replace metadata here
+                               if(name == null) {
+                                       metadata = 
metadata.getDefaultDocument();
+                                       if(metadata == null)
+                                               throw new 
FetchException(FetchException.NOT_ENOUGH_METASTRINGS);
+                               } else {
+                                       metadata = metadata.getDocument(name);
+                                       thisKey = thisKey.pushMetaString(name);
+                                       if(metadata == null)
+                                               throw new 
FetchException(FetchException.NOT_IN_ARCHIVE);
+                               }
+                               continue; // loop
+                       } else if(metadata.isArchiveManifest()) {
+                               if(metaStrings.isEmpty() && 
ctx.returnZIPManifests) {
+                                       // Just return the archive, whole.
+                                       metadata.setSimpleRedirect();
+                                       continue;
+                               }
+                               // First we need the archive metadata.
+                               // Then parse it.
+                               // Then we may need to fetch something from 
inside the archive.
+                               ah = (ArchiveStoreContext) 
ctx.archiveManager.makeHandler(thisKey, metadata.getArchiveType(), false);
+                               // ah is set. This means we are currently 
handling an archive.
+                               Bucket metadataBucket;
+                               metadataBucket = ah.getMetadata(actx, null, 
null, recursionLevel+1, true);
+                               if(metadataBucket != null) {
+                                       try {
+                                               metadata = 
Metadata.construct(metadataBucket);
+                                       } catch (IOException e) {
+                                               // Bucket error?
+                                               throw new 
FetchException(FetchException.BUCKET_ERROR, e);
+                                       }
+                               } else {
+                                       fetchArchive(); // will result in this 
function being called again
+                                       return;
+                               }
+                               continue;
+                       } else if(metadata.isArchiveInternalRedirect()) {
+                               
clientMetadata.mergeNoOverwrite(metadata.getClientMetadata()); // even 
splitfiles can have mime types!
+                               // Fetch it from the archive
+                               if(ah == null)
+                                       throw new 
FetchException(FetchException.UNKNOWN_METADATA, "Archive redirect not in an 
archive");
+                               if(metaStrings.isEmpty())
+                                       throw new 
FetchException(FetchException.NOT_ENOUGH_METASTRINGS);
+                               Bucket dataBucket = ah.get((String) 
metaStrings.removeFirst(), actx, null, null, recursionLevel+1, true);
+                               if(dataBucket != null) {
+                                       // Return the data
+                                       onSuccess(new 
FetchResult(this.clientMetadata, dataBucket));
+                                       return;
+                               } else {
+                                       // Metadata cannot contain pointers to 
files which don't exist.
+                                       // We enforce this in ArchiveHandler.
+                                       // Therefore, the archive needs to be 
fetched.
+                                       fetchArchive();
+                                       // Will call back into this function 
when it has been fetched.
+                                       return;
+                               }
+                       } else if(metadata.isMultiLevelMetadata()) {
+                               // Fetch on a second SingleFileFetcher, like 
with archives.
+                               Metadata newMeta = (Metadata) metadata.clone();
+                               metadata.setSimpleRedirect();
+                               SingleFileFetcher f = new 
SingleFileFetcher(this, newMeta, new MultiLevelMetadataCallback(), ctx);
+                               f.handleMetadata();
+                               return;
+                       } else if(metadata.isSingleFileRedirect()) {
+                               
clientMetadata.mergeNoOverwrite(metadata.getClientMetadata()); // even 
splitfiles can have mime types!
+                               // FIXME implement implicit archive support
+                               
+                               // Simple redirect
+                               // Just create a new SingleFileFetcher
+                               // Which will then fetch the target URI, and 
call the rcd.success
+                               // Hopefully!
+                               FreenetURI uri = metadata.getSingleTarget();
+                               ClientKey key;
+                               try {
+                                       key = ClientKey.getBaseKey(uri);
+                               } catch (MalformedURLException e) {
+                                       throw new 
FetchException(FetchException.INVALID_URI, e);
+                               }
+                               LinkedList newMetaStrings = 
uri.listMetaStrings();
+                               
+                               // Move any new meta strings to beginning of 
our list of remaining meta strings
+                               while(!newMetaStrings.isEmpty()) {
+                                       Object o = newMetaStrings.removeLast();
+                                       metaStrings.addFirst(o);
+                               }
+
+                               SingleFileFetcher f = new 
SingleFileFetcher(parent, rcb, clientMetadata, key, metaStrings, ctx, actx, 
maxRetries, recursionLevel, false, null);
+                               if(metadata.isCompressed()) {
+                                       Compressor codec = 
Compressor.getCompressionAlgorithmByMetadataID(metadata.getCompressionCodec());
+                                       f.addDecompressor(codec);
+                               }
+                               f.schedule();
+                               // All done! No longer our problem!
+                               return;
+                       } else if(metadata.isSplitfile()) {
+                               // FIXME implicit archive support
+                               
+                               
clientMetadata.mergeNoOverwrite(metadata.getClientMetadata()); // even 
splitfiles can have mime types!
+                               
+                               // Splitfile (possibly compressed)
+                               
+                               if(metadata.isCompressed()) {
+                                       Compressor codec = 
Compressor.getCompressionAlgorithmByMetadataID(metadata.getCompressionCodec());
+                                       addDecompressor(codec);
+                               }
+                               
+                               SplitFileFetcher sf = new 
SplitFileFetcher(metadata, rcb, parent, ctx, 
+                                               decompressors, clientMetadata, 
actx, recursionLevel);
+                               // SplitFile will now run.
+                               // Then it will return data to rcd.
+                               // We are now out of the loop. Yay!
+                               return;
+                       } else {
+                               Logger.error(this, "Don't know what to do with 
metadata: "+metadata);
+                               throw new 
FetchException(FetchException.UNKNOWN_METADATA);
+                       }
+               }
+       }
+
+       private void addDecompressor(Compressor codec) {
+               decompressors.addLast(codec);
+       }
+
+       private void fetchArchive() throws FetchException, 
MetadataParseException, ArchiveFailureException, ArchiveRestartException {
+               // Fetch the archive
+               // How?
+               // Spawn a separate SingleFileFetcher,
+               // which fetches the archive, then calls
+               // our Callback, which unpacks the archive, then
+               // reschedules us.
+               Metadata newMeta = (Metadata) metadata.clone();
+               newMeta.setSimpleRedirect();
+               SingleFileFetcher f;
+               f = new SingleFileFetcher(this, newMeta, new 
ArchiveFetcherCallback(), new FetcherContext(ctx, 
FetcherContext.SET_RETURN_ARCHIVES));
+               f.handleMetadata();
+               // When it is done (if successful), the ArchiveCallback will 
re-call this function.
+               // Which will then discover that the metadata *is* available.
+               // And will also discover that the data is available, and will 
complete.
+       }
+
+       class ArchiveFetcherCallback implements RequestCompletionCallback {
+
+               public void onSuccess(FetchResult result, ClientGetState state) 
{
+                       parent.fetchState = SingleFileFetcher.this;
+                       try {
+                               ctx.archiveManager.extractToCache(thisKey, 
ah.getArchiveType(), result.asBucket(), actx, ah);
+                       } catch (ArchiveFailureException e) {
+                               SingleFileFetcher.this.onFailure(new 
FetchException(e));
+                       } catch (ArchiveRestartException e) {
+                               SingleFileFetcher.this.onFailure(new 
FetchException(e));
+                       }
+                       try {
+                               handleMetadata();
+                       } catch (MetadataParseException e) {
+                               SingleFileFetcher.this.onFailure(new 
FetchException(e));
+                       } catch (FetchException e) {
+                               SingleFileFetcher.this.onFailure(e);
+                       } catch (ArchiveFailureException e) {
+                               SingleFileFetcher.this.onFailure(new 
FetchException(e));
+                       } catch (ArchiveRestartException e) {
+                               SingleFileFetcher.this.onFailure(new 
FetchException(e));
+                       }
+               }
+
+               public void onFailure(FetchException e, ClientGetState state) {
+                       // Force fatal as the fetcher is presumed to have made 
a reasonable effort.
+                       SingleFileFetcher.this.onFailure(e, true);
+               }
+               
+       }
+
+       class MultiLevelMetadataCallback implements RequestCompletionCallback {
+               
+               public void onSuccess(FetchResult result, ClientGetState state) 
{
+                       parent.fetchState = SingleFileFetcher.this;
+                       try {
+                               metadata = 
Metadata.construct(result.asBucket());
+                       } catch (MetadataParseException e) {
+                               SingleFileFetcher.this.onFailure(new 
FetchException(e));
+                               return;
+                       } catch (IOException e) {
+                               // Bucket error?
+                               SingleFileFetcher.this.onFailure(new 
FetchException(FetchException.BUCKET_ERROR, e));
+                               return;
+                       }
+               }
+               
+               public void onFailure(FetchException e, ClientGetState state) {
+                       // Pass it on; fetcher is assumed to have retried as 
appropriate already, so this is fatal.
+                       SingleFileFetcher.this.onFailure(e, true);
+               }
+               
+       }
+       
+       private final void onFailure(FetchException e) {
+               onFailure(e, false);
+       }
+       
+       // Real onFailure
+       private void onFailure(FetchException e, boolean forceFatal) {
+               if(!(e.isFatal() || forceFatal) ) {
+                       if(retryCount <= maxRetries) {
+                               if(parent.isCancelled()) {
+                                       onFailure(new 
FetchException(FetchException.CANCELLED));
+                                       return;
+                               }
+                               retryCount++;
+                               parent.scheduler.register(this);
+                               return;
+                       }
+               }
+               // :(
+               rcb.onFailure(e, this);
+       }
+
+       // Translate it, then call the real onFailure
+       public void onFailure(LowLevelPutException e) {
+               switch(e.code) {
+               case LowLevelGetException.DATA_NOT_FOUND:
+                       onFailure(new 
FetchException(FetchException.DATA_NOT_FOUND));
+               case LowLevelGetException.DATA_NOT_FOUND_IN_STORE:
+                       onFailure(new 
FetchException(FetchException.DATA_NOT_FOUND));
+               case LowLevelGetException.DECODE_FAILED:
+                       onFailure(new 
FetchException(FetchException.BLOCK_DECODE_ERROR));
+               case LowLevelGetException.INTERNAL_ERROR:
+                       onFailure(new 
FetchException(FetchException.INTERNAL_ERROR));
+               case LowLevelGetException.REJECTED_OVERLOAD:
+                       onFailure(new 
FetchException(FetchException.REJECTED_OVERLOAD));
+               case LowLevelGetException.ROUTE_NOT_FOUND:
+                       onFailure(new 
FetchException(FetchException.ROUTE_NOT_FOUND));
+               case LowLevelGetException.TRANSFER_FAILED:
+                       onFailure(new 
FetchException(FetchException.TRANSFER_FAILED));
+               case LowLevelGetException.VERIFY_FAILED:
+                       onFailure(new 
FetchException(FetchException.BLOCK_DECODE_ERROR));
+               default:
+                       Logger.error(this, "Unknown LowLevelGetException code: 
"+e.code);
+                       onFailure(new 
FetchException(FetchException.INTERNAL_ERROR));
+               }
+       }
+
+       public Object getToken() {
+               return token;
+       }
+       
+}

Added: 
branches/async-client-layer/src/freenet/client/async/SplitFileFetcher.java
===================================================================
--- branches/async-client-layer/src/freenet/client/async/SplitFileFetcher.java  
2006-01-20 19:10:04 UTC (rev 7886)
+++ branches/async-client-layer/src/freenet/client/async/SplitFileFetcher.java  
2006-01-20 19:11:02 UTC (rev 7887)
@@ -0,0 +1,238 @@
+package freenet.client.async;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.LinkedList;
+
+import freenet.client.ArchiveContext;
+import freenet.client.ClientMetadata;
+import freenet.client.FetchException;
+import freenet.client.FetchResult;
+import freenet.client.FetcherContext;
+import freenet.client.Metadata;
+import freenet.client.MetadataParseException;
+import freenet.client.events.SplitfileProgressEvent;
+import freenet.keys.FreenetURI;
+import freenet.keys.NodeCHK;
+import freenet.support.Bucket;
+import freenet.support.Fields;
+import freenet.support.Logger;
+import freenet.support.compress.CompressionOutputSizeException;
+import freenet.support.compress.Compressor;
+
+/**
+ * Fetch a splitfile, decompress it if need be, and return it to the 
RequestCompletionCallback.
+ * Most of the work is done by the segments, and we do not need a thread.
+ */
+public class SplitFileFetcher extends ClientGetState {
+
+       final FetcherContext fetchContext;
+       final ArchiveContext archiveContext;
+       final LinkedList decompressors;
+       final ClientMetadata clientMetadata;
+       final ClientGet parent;
+       final RequestCompletionCallback cb;
+       final int recursionLevel;
+       /** The splitfile type. See the SPLITFILE_ constants on Metadata. */
+       final short splitfileType;
+       /** The segment length. -1 means not segmented and must get everything 
to decode. */
+       final int blocksPerSegment;
+       /** The segment length in check blocks. */
+       final int checkBlocksPerSegment;
+       /** Total number of segments */
+       final int segmentCount;
+       /** The detailed information on each segment */
+       final SplitFileFetcherSegment[] segments;
+       /** The splitfile data blocks. */
+       final FreenetURI[] splitfileDataBlocks;
+       /** The splitfile check blocks. */
+       final FreenetURI[] splitfileCheckBlocks;
+       /** Maximum temporary length */
+       final long maxTempLength;
+       /** Have all segments finished? Access synchronized. */
+       private boolean allSegmentsFinished = false;
+       /** Override length. If this is positive, truncate the splitfile to 
this length. */
+       private final long overrideLength;
+       /** Accept non-full splitfile chunks? */
+       private final boolean splitUseLengths;
+       private boolean finished;
+       
+       public SplitFileFetcher(Metadata metadata, RequestCompletionCallback 
rcb, ClientGet parent,
+                       FetcherContext newCtx, LinkedList decompressors, 
ClientMetadata clientMetadata, 
+                       ArchiveContext actx, int recursionLevel) throws 
FetchException, MetadataParseException {
+               this.finished = false;
+               this.fetchContext = newCtx;
+               this.archiveContext = actx;
+               this.decompressors = decompressors;
+               this.clientMetadata = clientMetadata;
+               this.cb = rcb;
+               this.recursionLevel = recursionLevel + 1;
+               this.parent = parent;
+               if(parent.isCancelled())
+                       throw new FetchException(FetchException.CANCELLED);
+               overrideLength = metadata.dataLength();
+               this.splitfileType = metadata.getSplitfileType();
+               splitfileDataBlocks = metadata.getSplitfileDataKeys();
+               splitfileCheckBlocks = metadata.getSplitfileCheckKeys();
+               splitUseLengths = metadata.splitUseLengths();
+               int blockLength = splitUseLengths ? -1 : NodeCHK.BLOCK_SIZE;
+               if(splitfileType == Metadata.SPLITFILE_NONREDUNDANT) {
+                       // Don't need to do much - just fetch everything and 
piece it together.
+                       blocksPerSegment = -1;
+                       checkBlocksPerSegment = -1;
+                       segmentCount = 1;
+               } else if(splitfileType == Metadata.SPLITFILE_ONION_STANDARD) {
+                       byte[] params = metadata.splitfileParams();
+                       if(params == null || params.length < 8)
+                               throw new MetadataParseException("No splitfile 
params");
+                       blocksPerSegment = Fields.bytesToInt(params, 0);
+                       checkBlocksPerSegment = Fields.bytesToInt(params, 4);
+                       if(blocksPerSegment > 
fetchContext.maxDataBlocksPerSegment
+                                       || checkBlocksPerSegment > 
fetchContext.maxCheckBlocksPerSegment)
+                               throw new 
FetchException(FetchException.TOO_MANY_BLOCKS_PER_SEGMENT, "Too many blocks per 
segment: "+blocksPerSegment+" data, "+checkBlocksPerSegment+" check");
+                       segmentCount = (splitfileDataBlocks.length / 
blocksPerSegment) +
+                               (splitfileDataBlocks.length % blocksPerSegment 
== 0 ? 0 : 1);
+                       // Onion, 128/192.
+                       // Will be segmented.
+               } else throw new MetadataParseException("Unknown splitfile 
format: "+splitfileType);
+               this.maxTempLength = fetchContext.maxTempLength;
+               Logger.minor(this, "Algorithm: "+splitfileType+", blocks per 
segment: "+blocksPerSegment+", check blocks per segment: 
"+checkBlocksPerSegment+", segments: "+segmentCount);
+               segments = new SplitFileFetcherSegment[segmentCount]; // 
initially null on all entries
+               if(segmentCount == 1) {
+                       segments[0] = new 
SplitFileFetcherSegment(splitfileType, splitfileDataBlocks, 
splitfileCheckBlocks, this, archiveContext, fetchContext, maxTempLength, 
splitUseLengths, recursionLevel);
+               } else {
+                       int dataBlocksPtr = 0;
+                       int checkBlocksPtr = 0;
+                       for(int i=0;i<segments.length;i++) {
+                               // Create a segment. Give it its keys.
+                               int copyDataBlocks = 
Math.min(splitfileDataBlocks.length - dataBlocksPtr, blocksPerSegment);
+                               int copyCheckBlocks = 
Math.min(splitfileCheckBlocks.length - checkBlocksPtr, checkBlocksPerSegment);
+                               FreenetURI[] dataBlocks = new 
FreenetURI[copyDataBlocks];
+                               FreenetURI[] checkBlocks = new 
FreenetURI[copyCheckBlocks];
+                               if(copyDataBlocks > 0)
+                                       System.arraycopy(splitfileDataBlocks, 
dataBlocksPtr, dataBlocks, 0, copyDataBlocks);
+                               if(copyCheckBlocks > 0)
+                                       System.arraycopy(splitfileCheckBlocks, 
checkBlocksPtr, checkBlocks, 0, copyCheckBlocks);
+                               dataBlocksPtr += copyDataBlocks;
+                               checkBlocksPtr += copyCheckBlocks;
+                               segments[i] = new 
SplitFileFetcherSegment(splitfileType, dataBlocks, checkBlocks, this, 
archiveContext, fetchContext, maxTempLength, splitUseLengths, recursionLevel+1);
+                       }
+               }
+       }
+
+       /** Return the final status of the fetch. Throws an exception, or 
returns a 
+        * Bucket containing the fetched data.
+        * @throws FetchException If the fetch failed for some reason.
+        */
+       private Bucket finalStatus() throws FetchException {
+               long finalLength = 0;
+               for(int i=0;i<segments.length;i++) {
+                       SplitFileFetcherSegment s = segments[i];
+                       if(!s.isFinished()) throw new 
IllegalStateException("Not all finished");
+                       s.throwError();
+                       // If still here, it succeeded
+                       finalLength += s.decodedLength();
+                       // Healing is done by Segment
+               }
+               if(finalLength > overrideLength)
+                       finalLength = overrideLength;
+               
+               long bytesWritten = 0;
+               OutputStream os = null;
+               Bucket output;
+               try {
+                       output = 
fetchContext.bucketFactory.makeBucket(finalLength);
+                       os = output.getOutputStream();
+                       for(int i=0;i<segments.length;i++) {
+                               SplitFileFetcherSegment s = segments[i];
+                               long max = (finalLength < 0 ? 0 : (finalLength 
- bytesWritten));
+                               bytesWritten += s.writeDecodedDataTo(os, max);
+                       }
+               } catch (IOException e) {
+                       throw new FetchException(FetchException.BUCKET_ERROR, 
e);
+               } finally {
+                       if(os != null) {
+                               try {
+                                       os.close();
+                               } catch (IOException e) {
+                                       // If it fails to close it may return 
corrupt data.
+                                       throw new 
FetchException(FetchException.BUCKET_ERROR, e);
+                               }
+                       }
+               }
+               return output;
+       }
+
+       public void segmentFinished(SplitFileFetcherSegment segment) {
+               Logger.minor(this, "Finished segment: "+segment);
+               synchronized(this) {
+                       boolean allDone = true;
+                       for(int i=0;i<segments.length;i++)
+                               if(!segments[i].isFinished()) {
+                                       Logger.minor(this, "Segment 
"+segments[i]+" is not finished");
+                                       allDone = false;
+                               }
+                       if(allDone) {
+                               if(allSegmentsFinished)
+                                       Logger.error(this, "Was already 
finished! (segmentFinished("+segment+")");
+                               else {
+                                       allSegmentsFinished = true;
+                                       finish();
+                               }
+                       }
+                       notifyAll();
+               }
+       }
+
+       private void finish() {
+               try {
+                       synchronized(this) {
+                               if(finished) {
+                                       Logger.error(this, "Was already 
finished");
+                                       return;
+                               }
+                               finished = true;
+                       }
+                       Bucket data = finalStatus();
+                       // Decompress
+                       while(!decompressors.isEmpty()) {
+                               Compressor c = (Compressor) 
decompressors.removeLast();
+                               try {
+                                       data = c.decompress(data, 
fetchContext.bucketFactory, Math.max(fetchContext.maxTempLength, 
fetchContext.maxOutputLength));
+                               } catch (IOException e) {
+                                       cb.onFailure(new 
FetchException(FetchException.BUCKET_ERROR, e), this);
+                                       return;
+                               } catch (CompressionOutputSizeException e) {
+                                       cb.onFailure(new 
FetchException(FetchException.TOO_BIG, e), this);
+                                       return;
+                               }
+                       }
+                       cb.onSuccess(new FetchResult(clientMetadata, data), 
this);
+               } catch (FetchException e) {
+                       cb.onFailure(e, this);
+               }
+       }
+
+       public ClientGet getParent() {
+               return parent;
+       }
+
+       public void onProgress() {
+               int totalBlocks = splitfileDataBlocks.length;
+               int fetchedBlocks = 0;
+               int failedBlocks = 0;
+               int fatallyFailedBlocks = 0;
+               int runningBlocks = 0;
+               for(int i=0;i<segments.length;i++) {
+                       SplitFileFetcherSegment segment = segments[i];
+                       Logger.minor(this, "Segment: "+segment+": 
fetched="+segment.fetchedBlocks()+", failedBlocks: "+segment.failedBlocks()+
+                                       ", fatally: 
"+segment.fatallyFailedBlocks()+", running: "+segment.runningBlocks());
+                       fetchedBlocks += segment.fetchedBlocks();
+                       failedBlocks += segment.failedBlocks();
+                       fatallyFailedBlocks += segment.fatallyFailedBlocks();
+                       runningBlocks += segment.runningBlocks();
+               }
+               fetchContext.eventProducer.produceEvent(new 
SplitfileProgressEvent(totalBlocks, fetchedBlocks, failedBlocks, 
fatallyFailedBlocks, runningBlocks));
+       }
+
+}

Added: 
branches/async-client-layer/src/freenet/client/async/SplitFileFetcherSegment.java
===================================================================
--- 
branches/async-client-layer/src/freenet/client/async/SplitFileFetcherSegment.java
   2006-01-20 19:10:04 UTC (rev 7886)
+++ 
branches/async-client-layer/src/freenet/client/async/SplitFileFetcherSegment.java
   2006-01-20 19:11:02 UTC (rev 7887)
@@ -0,0 +1,298 @@
+package freenet.client.async;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.MalformedURLException;
+
+import freenet.client.ArchiveContext;
+import freenet.client.FECCodec;
+import freenet.client.FailureCodeTracker;
+import freenet.client.FetchException;
+import freenet.client.FetchResult;
+import freenet.client.FetcherContext;
+import freenet.client.Metadata;
+import freenet.client.MetadataParseException;
+import freenet.client.SplitfileBlock;
+import freenet.keys.FreenetURI;
+import freenet.support.Bucket;
+import freenet.support.BucketTools;
+import freenet.support.Logger;
+
+/**
+ * A single segment within a SplitFileFetcher.
+ * This in turn controls a large number of SingleFileFetcher's.
+ */
+public class SplitFileFetcherSegment implements RequestCompletionCallback {
+       
+       final short splitfileType;
+       final FreenetURI[] dataBlocks;
+       final FreenetURI[] checkBlocks;
+       final SingleFileFetcher[] dataBlockStatus;
+       final SingleFileFetcher[] checkBlockStatus;
+       final MinimalSplitfileBlock[] dataBuckets;
+       final MinimalSplitfileBlock[] checkBuckets;
+       final int minFetched;
+       final SplitFileFetcher parentFetcher;
+       final ArchiveContext archiveContext;
+       final FetcherContext fetcherContext;
+       final long maxBlockLength;
+       final boolean nonFullBlocksAllowed;
+       /** Has the segment finished processing? Irreversible. */
+       private boolean finished;
+       /** Bucket to store the data retrieved, after it has been decoded */
+       private Bucket decodedData;
+       /** Fetch context for block fetches */
+       final FetcherContext blockFetchContext;
+       /** Recursion level */
+       final int recursionLevel;
+       private FetchException failureException;
+       private int fatallyFailedBlocks;
+       private int failedBlocks;
+       private int fetchedBlocks;
+       private FailureCodeTracker errors;
+       
+       public SplitFileFetcherSegment(short splitfileType, FreenetURI[] 
splitfileDataBlocks, FreenetURI[] splitfileCheckBlocks, SplitFileFetcher 
fetcher, ArchiveContext archiveContext, FetcherContext fetchContext, long 
maxTempLength, boolean splitUseLengths, int recursionLevel) throws 
MetadataParseException, FetchException {
+               this.parentFetcher = fetcher;
+               this.archiveContext = archiveContext;
+               this.splitfileType = splitfileType;
+               dataBlocks = splitfileDataBlocks;
+               checkBlocks = splitfileCheckBlocks;
+               if(splitfileType == Metadata.SPLITFILE_NONREDUNDANT) {
+                       minFetched = dataBlocks.length;
+               } else if(splitfileType == Metadata.SPLITFILE_ONION_STANDARD) {
+                       minFetched = dataBlocks.length;
+               } else throw new MetadataParseException("Unknown splitfile 
type"+splitfileType);
+               finished = false;
+               decodedData = null;
+               dataBlockStatus = new SingleFileFetcher[dataBlocks.length];
+               checkBlockStatus = new SingleFileFetcher[checkBlocks.length];
+               dataBuckets = new MinimalSplitfileBlock[dataBlocks.length];
+               checkBuckets = new MinimalSplitfileBlock[checkBlocks.length];
+               for(int i=0;i<dataBuckets.length;i++) {
+                       dataBuckets[i] = new MinimalSplitfileBlock(i);
+               }
+               for(int i=0;i<checkBuckets.length;i++)
+                       checkBuckets[i] = new 
MinimalSplitfileBlock(i+dataBuckets.length);
+               nonFullBlocksAllowed = splitUseLengths;
+               this.fetcherContext = fetchContext;
+               maxBlockLength = maxTempLength;
+               if(splitUseLengths) {
+                       blockFetchContext = new FetcherContext(fetcherContext, 
FetcherContext.SPLITFILE_USE_LENGTHS_MASK);
+                       this.recursionLevel = recursionLevel + 1;
+               } else {
+                       blockFetchContext = new FetcherContext(fetcherContext, 
FetcherContext.SPLITFILE_DEFAULT_BLOCK_MASK);
+                       this.recursionLevel = 0;
+               }
+               
+               try {
+                       for(int i=0;i<dataBlocks.length;i++) {
+                               dataBlockStatus[i] =
+                                       new 
SingleFileFetcher(parentFetcher.parent, this, null, dataBlocks[i], 
blockFetchContext, archiveContext, fetchContext.maxSplitfileBlockRetries, 
recursionLevel, true, new Integer(i));
+                               dataBlockStatus[i].schedule();
+                       }
+                       for(int i=0;i<checkBlocks.length;i++) {
+                               checkBlockStatus[i] =
+                                       new 
SingleFileFetcher(parentFetcher.parent, this, null, checkBlocks[i], 
blockFetchContext, archiveContext, fetchContext.maxSplitfileBlockRetries, 
recursionLevel, true, new Integer(dataBlocks.length+i));
+                               checkBlockStatus[i].schedule();
+                       }
+               } catch (MalformedURLException e) {
+                       // Invalidates the whole splitfile
+                       throw new FetchException(FetchException.INVALID_URI, 
"Invalid URI in splitfile");
+               }
+       }
+
+       public boolean isFinished() {
+               return finished;
+       }
+
+       /** Throw a FetchException, if we have one. Else do nothing. */
+       public synchronized void throwError() throws FetchException {
+               if(failureException != null)
+                       throw failureException;
+       }
+       
+       /** Decoded length? */
+       public long decodedLength() {
+               return decodedData.size();
+       }
+
+       /** Write the decoded segment's data to an OutputStream */
+       public long writeDecodedDataTo(OutputStream os, long truncateLength) 
throws IOException {
+               long len = decodedData.size();
+               if(truncateLength >= 0 && truncateLength < len)
+                       len = truncateLength;
+               BucketTools.copyTo(decodedData, os, Math.min(truncateLength, 
decodedData.size()));
+               return len;
+       }
+
+       /** How many blocks have failed due to running out of retries? */
+       public synchronized int failedBlocks() {
+               return failedBlocks;
+       }
+       
+       /** How many blocks have been successfully fetched? */
+       public synchronized int fetchedBlocks() {
+               return fetchedBlocks;
+       }
+
+       /** How many blocks have currently running requests? */ 
+       public int runningBlocks() {
+               // FIXME implement or throw out
+               return 0;
+       }
+
+       /** How many blocks failed permanently due to fatal errors? */
+       public int fatallyFailedBlocks() {
+               return fatallyFailedBlocks;
+       }
+
+       public synchronized void onSuccess(FetchResult result, ClientGetState 
state) {
+               Integer token = (Integer) ((SingleFileFetcher)state).getToken();
+               int blockNo = token.intValue();
+               if(blockNo < dataBlocks.length) {
+                       if(dataBlocks[blockNo] == null) {
+                               Logger.error(this, "Block already finished: 
"+blockNo);
+                               return;
+                       }
+                       dataBlocks[blockNo] = null;
+                       dataBuckets[blockNo].setData(result.asBucket());
+               } else if(blockNo < checkBlocks.length) {
+                       if(checkBlocks[blockNo-dataBlocks.length] == null) {
+                               Logger.error(this, "Block already finished: 
"+blockNo);
+                               return;
+                       }
+                       checkBlocks[blockNo-dataBlocks.length] = null;
+                       checkBuckets[blockNo].setData(result.asBucket());
+               } else
+                       Logger.error(this, "Unrecognized block number: 
"+blockNo, new Exception("error"));
+               fetchedBlocks++;
+               if(fetchedBlocks >= minFetched)
+                       startDecode();
+       }
+
+       private void startDecode() {
+               Runnable r = new Decoder();
+               Thread t = new Thread(r, "Decoder for "+this);
+               t.setDaemon(true);
+               t.start();
+       }
+       
+       class Decoder implements Runnable {
+
+               public void run() {
+                       
+                       // Now decode
+                       Logger.minor(this, "Decoding "+this);
+                       
+                       FECCodec codec = FECCodec.getCodec(splitfileType, 
dataBlocks.length, checkBlocks.length);
+                       try {
+                               if(splitfileType != 
Metadata.SPLITFILE_NONREDUNDANT) {
+                                       // FIXME hardcoded block size below.
+                                       codec.decode(dataBuckets, checkBuckets, 
32768, fetcherContext.bucketFactory);
+                                       // Now have all the data blocks (not 
necessarily all the check blocks)
+                               }
+                               
+                               decodedData = 
fetcherContext.bucketFactory.makeBucket(-1);
+                               Logger.minor(this, "Copying data from data 
blocks");
+                               OutputStream os = decodedData.getOutputStream();
+                               for(int i=0;i<dataBlockStatus.length;i++) {
+                                       SplitfileBlock status = dataBuckets[i];
+                                       Bucket data = status.getData();
+                                       BucketTools.copyTo(data, os, 
Long.MAX_VALUE);
+                               }
+                               Logger.minor(this, "Copied data");
+                               os.close();
+                               // Must set finished BEFORE calling 
parentFetcher.
+                               // Otherwise a race is possible that might 
result in it not seeing our finishing.
+                               finished = true;
+                               
parentFetcher.segmentFinished(SplitFileFetcherSegment.this);
+                       } catch (IOException e) {
+                               Logger.minor(this, "Caught bucket error?: "+e, 
e);
+                               finished = true;
+                               failureException = new 
FetchException(FetchException.BUCKET_ERROR);
+                               
parentFetcher.segmentFinished(SplitFileFetcherSegment.this);
+                               return;
+                       }
+                       
+                       // Now heal
+                       
+                       // Encode any check blocks we don't have
+                       if(codec != null) {
+                               try {
+                                       codec.encode(dataBuckets, checkBuckets, 
32768, fetcherContext.bucketFactory);
+                               } catch (IOException e) {
+                                       Logger.error(this, "Bucket error while 
healing: "+e, e);
+                               }
+                       }
+                       
+                       // Now insert *ALL* blocks on which we had at least one 
failure, and didn't eventually succeed
+                       for(int i=0;i<dataBlockStatus.length;i++) {
+                               if(dataBuckets[i].getData() != null) continue;
+                               SingleFileFetcher fetcher = dataBlockStatus[i];
+                               if(fetcher.getRetryCount() == 0) {
+                                       // 80% chance of not inserting, if we 
never tried it
+                                       if(fetcherContext.random.nextInt(5) == 
0) continue;
+                               }
+                               queueHeal(dataBuckets[i].getData());
+                       }
+                       for(int i=0;i<checkBlockStatus.length;i++) {
+                               if(checkBuckets[i].getData() != null) continue;
+                               SingleFileFetcher fetcher = checkBlockStatus[i];
+                               if(fetcher.getRetryCount() == 0) {
+                                       // 80% chance of not inserting, if we 
never tried it
+                                       if(fetcherContext.random.nextInt(5) == 
0) continue;
+                               }
+                               queueHeal(checkBuckets[i].getData());
+                       }
+                       
+                       for(int i=0;i<dataBlocks.length;i++) {
+                               dataBuckets[i] = null;
+                               dataBlockStatus[i] = null;
+                               dataBlocks[i] = null;
+                       }
+                       for(int i=0;i<checkBlocks.length;i++) {
+                               checkBuckets[i] = null;
+                               checkBlockStatus[i] = null;
+                               checkBlocks[i] = null;
+                       }
+               }
+
+       }
+
+       private void queueHeal(Bucket data) {
+               // TODO Auto-generated method stub
+               
+       }
+       
+       /** This is after any retries and therefore is either out-of-retries or 
fatal */
+       public synchronized void onFailure(FetchException e, ClientGetState 
state) {
+               Integer token = (Integer) ((SingleFileFetcher)state).getToken();
+               int blockNo = token.intValue();
+               if(blockNo < dataBlocks.length) {
+                       if(dataBlocks[blockNo] == null) {
+                               Logger.error(this, "Block already finished: 
"+blockNo);
+                               return;
+                       }
+                       dataBlocks[blockNo] = null;
+               } else if(blockNo < checkBlocks.length) {
+                       if(checkBlocks[blockNo-dataBlocks.length] == null) {
+                               Logger.error(this, "Block already finished: 
"+blockNo);
+                               return;
+                       }
+                       checkBlocks[blockNo-dataBlocks.length] = null;
+               } else
+                       Logger.error(this, "Unrecognized block number: 
"+blockNo, new Exception("error"));
+               // :(
+               Logger.minor(this, "Permanently failed: "+state+" on "+this);
+               if(e.isFatal())
+                       fatallyFailedBlocks++;
+               else
+                       failedBlocks++;
+               // FIXME this may not be accurate across all the retries?
+               if(e.errorCodes != null)
+                       errors.merge(e.errorCodes);
+               else
+                       errors.inc(new Integer(e.mode), 
((SingleFileFetcher)state).getRetryCount());
+       }
+
+}


Reply via email to