Author: toad
Date: 2006-01-21 21:18:26 +0000 (Sat, 21 Jan 2006)
New Revision: 7898

Added:
   branches/async-client/src/freenet/client/async/ClientPut.java
   branches/async-client/src/freenet/client/async/ClientPutState.java
   branches/async-client/src/freenet/client/async/GetCompletionCallback.java
   
branches/async-client/src/freenet/client/async/MultiPutCompletionCallback.java
   branches/async-client/src/freenet/client/async/PutCompletionCallback.java
   branches/async-client/src/freenet/client/async/SendableGet.java
   branches/async-client/src/freenet/client/async/SendableInsert.java
   branches/async-client/src/freenet/client/async/SingleBlockInserter.java
   branches/async-client/src/freenet/client/async/SingleFileInserter.java
   branches/async-client/src/freenet/client/async/SplitFileInserter.java
   branches/async-client/src/freenet/client/async/SplitFileInserterSegment.java
   
branches/async-client/src/freenet/client/async/SplitPutCompletionCallback.java
   branches/async-client/src/freenet/node/fcp/FetchErrorMessage.java
Removed:
   branches/async-client/src/freenet/client/async/RequestCompletionCallback.java
Modified:
   branches/async-client/src/freenet/client/FECCodec.java
   branches/async-client/src/freenet/client/FileInserter.java
   branches/async-client/src/freenet/client/HighLevelSimpleClientImpl.java
   branches/async-client/src/freenet/client/InsertBlock.java
   branches/async-client/src/freenet/client/InserterContext.java
   branches/async-client/src/freenet/client/InserterException.java
   branches/async-client/src/freenet/client/Metadata.java
   branches/async-client/src/freenet/client/StandardOnionFECCodec.java
   branches/async-client/src/freenet/client/async/Client.java
   branches/async-client/src/freenet/client/async/ClientGet.java
   branches/async-client/src/freenet/client/async/ClientGetState.java
   branches/async-client/src/freenet/client/async/ClientRequest.java
   branches/async-client/src/freenet/client/async/SendableRequest.java
   branches/async-client/src/freenet/client/async/SingleFileFetcher.java
   branches/async-client/src/freenet/client/async/SplitFileFetcher.java
   branches/async-client/src/freenet/client/async/SplitFileFetcherSegment.java
Log:
Async branch: simple inserts 99% done.

Modified: branches/async-client/src/freenet/client/FECCodec.java
===================================================================
--- branches/async-client/src/freenet/client/FECCodec.java      2006-01-21 
20:51:10 UTC (rev 7897)
+++ branches/async-client/src/freenet/client/FECCodec.java      2006-01-21 
21:18:26 UTC (rev 7898)
@@ -2,6 +2,7 @@

 import java.io.IOException;

+import freenet.support.Bucket;
 import freenet.support.BucketFactory;

 /**
@@ -67,6 +68,17 @@
        public abstract void encode(SplitfileBlock[] dataBlocks, 
SplitfileBlock[] checkBlocks, int blockLength, BucketFactory bucketFactory) 
throws IOException;

        /**
+        * Encode all missing *check* blocks.
+        * Requires that all the data blocks be present.
+        * @param dataBlocks The data blocks.
+        * @param checkBlocks The check blocks.
+        * @param blockLength The block length in bytes.
+        * @param bf The BucketFactory to use to generate buckets.
+        * @throws IOException If there is an error in decoding caused by an 
I/O error (usually involving buckets).
+        */
+       public abstract void encode(Bucket[] dataBlocks, Bucket[] checkBlocks, 
int blockLength, BucketFactory bucketFactory) throws IOException;
+
+       /**
         * How many check blocks?
         */
        public abstract int countCheckBlocks();

Modified: branches/async-client/src/freenet/client/FileInserter.java
===================================================================
--- branches/async-client/src/freenet/client/FileInserter.java  2006-01-21 
20:51:10 UTC (rev 7897)
+++ branches/async-client/src/freenet/client/FileInserter.java  2006-01-21 
21:18:26 UTC (rev 7898)
@@ -65,16 +65,13 @@
                int blockSize;
                int maxSourceDataSize;
                boolean isSSK = false;
-               boolean dontCompress = false;
+               boolean dontCompress = ctx.dontCompress;

                long origSize = data.size();
                if(type.equals("SSK") || type.equals("KSK")) {
                        blockSize = SSKBlock.DATA_LENGTH;
                        isSSK = true;
                        maxSourceDataSize = 
ClientSSKBlock.MAX_DECOMPRESSED_DATA_LENGTH;
-                       if(origSize > maxSourceDataSize)
-                               dontCompress = true;
-                       // If too big to fit in an SSK, don't even try.
                } else if(block.desiredURI.getKeyType().equals("CHK")) {
                        blockSize = CHKBlock.DATA_LENGTH;
                        maxSourceDataSize = 
ClientCHKBlock.MAX_LENGTH_BEFORE_COMPRESSION;

Modified: 
branches/async-client/src/freenet/client/HighLevelSimpleClientImpl.java
===================================================================
--- branches/async-client/src/freenet/client/HighLevelSimpleClientImpl.java     
2006-01-21 20:51:10 UTC (rev 7897)
+++ branches/async-client/src/freenet/client/HighLevelSimpleClientImpl.java     
2006-01-21 21:18:26 UTC (rev 7898)
@@ -1,7 +1,6 @@
 package freenet.client;

 import java.io.IOException;
-import java.net.MalformedURLException;
 import java.util.HashMap;

 import freenet.client.events.ClientEventListener;
@@ -9,10 +8,8 @@
 import freenet.client.events.EventLogger;
 import freenet.client.events.SimpleEventProducer;
 import freenet.crypt.RandomSource;
-import freenet.keys.ClientCHK;
 import freenet.keys.ClientKey;
 import freenet.keys.FreenetURI;
-import freenet.keys.InsertableClientSSK;
 import freenet.node.RequestStarterClient;
 import freenet.node.SimpleLowLevelClient;
 import freenet.support.Bucket;

Modified: branches/async-client/src/freenet/client/InsertBlock.java
===================================================================
--- branches/async-client/src/freenet/client/InsertBlock.java   2006-01-21 
20:51:10 UTC (rev 7897)
+++ branches/async-client/src/freenet/client/InsertBlock.java   2006-01-21 
21:18:26 UTC (rev 7898)
@@ -9,8 +9,8 @@
 public class InsertBlock {

        Bucket data;
-       final FreenetURI desiredURI;
-       final ClientMetadata clientMetadata;
+       public final FreenetURI desiredURI;
+       public final ClientMetadata clientMetadata;

        public InsertBlock(Bucket data, ClientMetadata metadata, FreenetURI 
desiredURI) {
                this.data = data;
@@ -20,7 +20,9 @@
                        clientMetadata = metadata;
                this.desiredURI = desiredURI;
        }
+       
+       public Bucket getData() {
+               return data;
+       }

-
-
 }

Modified: branches/async-client/src/freenet/client/InserterContext.java
===================================================================
--- branches/async-client/src/freenet/client/InserterContext.java       
2006-01-21 20:51:10 UTC (rev 7897)
+++ branches/async-client/src/freenet/client/InserterContext.java       
2006-01-21 21:18:26 UTC (rev 7898)
@@ -10,16 +10,16 @@
 public class InserterContext {

        final SimpleLowLevelClient client;
-       final BucketFactory bf;
+       public final BucketFactory bf;
        /** If true, don't try to compress the data */
-       final boolean dontCompress;
-       final RandomSource random;
-       final short splitfileAlgorithm;
+       public final boolean dontCompress;
+       public final RandomSource random;
+       public final short splitfileAlgorithm;
        public int maxInsertRetries;
        final int maxSplitInsertThreads;
        final int consecutiveRNFsCountAsSuccess;
-       final int splitfileSegmentDataBlocks;
-       final int splitfileSegmentCheckBlocks;
+       public final int splitfileSegmentDataBlocks;
+       public final int splitfileSegmentCheckBlocks;
        final ClientEventProducer eventProducer;
        final RequestStarterClient starterClient;
        /** Interesting tradeoff, see comments at top of Node.java. */

Modified: branches/async-client/src/freenet/client/InserterException.java
===================================================================
--- branches/async-client/src/freenet/client/InserterException.java     
2006-01-21 20:51:10 UTC (rev 7897)
+++ branches/async-client/src/freenet/client/InserterException.java     
2006-01-21 21:18:26 UTC (rev 7898)
@@ -82,6 +82,8 @@
        public static final int ROUTE_REALLY_NOT_FOUND = 8;
        /** Collided with pre-existing content */
        public static final int COLLISION = 9;
+       /** Cancelled by user */
+       public static final int CANCELLED = 10;

        public static String getMessage(int mode) {
                switch(mode) {
@@ -103,6 +105,8 @@
                        return "Insert could not leave the node at all";
                case COLLISION:
                        return "Insert collided with different, pre-existing 
data at the same key";
+               case CANCELLED:
+                       return "Cancelled by user";
                default:
                        return "Unknown error "+mode;
                }

Modified: branches/async-client/src/freenet/client/Metadata.java
===================================================================
--- branches/async-client/src/freenet/client/Metadata.java      2006-01-21 
20:51:10 UTC (rev 7897)
+++ branches/async-client/src/freenet/client/Metadata.java      2006-01-21 
21:18:26 UTC (rev 7898)
@@ -30,7 +30,7 @@

        // document type
        byte documentType;
-       static final byte SIMPLE_REDIRECT = 0;
+       public static final byte SIMPLE_REDIRECT = 0;
        static final byte MULTI_LEVEL_METADATA = 1;
        static final byte SIMPLE_MANIFEST = 2;
        static final byte ZIP_MANIFEST = 3;
@@ -554,7 +554,7 @@
        /**
         * Write the data to a byte array.
         */
-       byte[] writeToByteArray() {
+       public byte[] writeToByteArray() {
                ByteArrayOutputStream baos = new ByteArrayOutputStream();
                DataOutputStream dos = new DataOutputStream(baos);
                try {

Modified: branches/async-client/src/freenet/client/StandardOnionFECCodec.java
===================================================================
--- branches/async-client/src/freenet/client/StandardOnionFECCodec.java 
2006-01-21 20:51:10 UTC (rev 7897)
+++ branches/async-client/src/freenet/client/StandardOnionFECCodec.java 
2006-01-21 21:18:26 UTC (rev 7898)
@@ -8,7 +8,6 @@

 import com.onionnetworks.fec.DefaultFECCodeFactory;
 import com.onionnetworks.fec.FECCode;
-import com.onionnetworks.fec.PureCode;
 import com.onionnetworks.util.Buffer;

 import freenet.support.Bucket;
@@ -24,7 +23,7 @@

        public class Encoder implements Runnable {

-               private final SplitfileBlock[] dataBlockStatus, 
checkBlockStatus;
+               private final Bucket[] dataBlockStatus, checkBlockStatus;
                private final int blockLength;
                private final BucketFactory bf;
                private IOException thrownIOE;
@@ -32,7 +31,7 @@
                private Error thrownError;
                private boolean finished;

-               public Encoder(SplitfileBlock[] dataBlockStatus, 
SplitfileBlock[] checkBlockStatus, int blockLength, BucketFactory bf) {
+               public Encoder(Bucket[] dataBlockStatus, Bucket[] 
checkBlockStatus, int blockLength, BucketFactory bf) {
                        this.dataBlockStatus = dataBlockStatus;
                        this.checkBlockStatus = checkBlockStatus;
                        this.blockLength = blockLength;
@@ -288,7 +287,7 @@
                }
        }

-       public void encode(SplitfileBlock[] dataBlockStatus, SplitfileBlock[] 
checkBlockStatus, int blockLength, BucketFactory bf) throws IOException {
+       public void encode(Bucket[] dataBlockStatus, Bucket[] checkBlockStatus, 
int blockLength, BucketFactory bf) throws IOException {
                // Encodes count as decodes.
                synchronized(runningDecodesSync) {
                        while(runningDecodes >= PARALLEL_DECODES) {
@@ -324,12 +323,26 @@
                        }
                }
        }
+       
+       public void encode(SplitfileBlock[] dataBlockStatus, SplitfileBlock[] 
checkBlockStatus, int blockLength, BucketFactory bf) throws IOException {
+               Bucket[] dataBlocks = new Bucket[dataBlockStatus.length];
+               Bucket[] checkBlocks = new Bucket[checkBlockStatus.length];
+               for(int i=0;i<dataBlocks.length;i++)
+                       dataBlocks[i] = dataBlockStatus[i].getData();
+               for(int i=0;i<checkBlocks.length;i++)
+                       checkBlocks[i] = checkBlockStatus[i].getData();
+               encode(dataBlocks, checkBlocks, blockLength, bf);
+               for(int i=0;i<dataBlocks.length;i++)
+                       dataBlockStatus[i].setData(dataBlocks[i]);
+               for(int i=0;i<checkBlocks.length;i++)
+                       checkBlockStatus[i].setData(checkBlocks[i]);
+       }

        /**
         * Do the actual encode.
         */
-       private void realEncode(SplitfileBlock[] dataBlockStatus,
-                       SplitfileBlock[] checkBlockStatus, int blockLength, 
BucketFactory bf)
+       private void realEncode(Bucket[] dataBlockStatus,
+                       Bucket[] checkBlockStatus, int blockLength, 
BucketFactory bf)
                        throws IOException {
 //             Runtime.getRuntime().gc();
 //             Runtime.getRuntime().runFinalization();
@@ -366,7 +379,7 @@
                                                STRIPE_SIZE);

                        for (int i = 0; i < dataBlockStatus.length; i++) {
-                               buckets[i] = dataBlockStatus[i].getData();
+                               buckets[i] = dataBlockStatus[i];
                                long sz = buckets[i].size();
                                if (sz < blockLength) {
                                        if (i != dataBlockStatus.length - 1)
@@ -382,7 +395,7 @@
                        }

                        for (int i = 0; i < checkBlockStatus.length; i++) {
-                               buckets[i + k] = checkBlockStatus[i].getData();
+                               buckets[i + k] = checkBlockStatus[i];
                                if (buckets[i + k] == null) {
                                        buckets[i + k] = 
bf.makeBucket(blockLength);
                                        writers[i] = buckets[i + 
k].getOutputStream();
@@ -455,7 +468,7 @@
                        Bucket data = buckets[i + k];
                        if (data == null)
                                throw new NullPointerException();
-                       checkBlockStatus[i].setData(data);
+                       checkBlockStatus[i] = data;
                }
        }


Modified: branches/async-client/src/freenet/client/async/Client.java
===================================================================
--- branches/async-client/src/freenet/client/async/Client.java  2006-01-21 
20:51:10 UTC (rev 7897)
+++ branches/async-client/src/freenet/client/async/Client.java  2006-01-21 
21:18:26 UTC (rev 7898)
@@ -2,6 +2,7 @@

 import freenet.client.FetchException;
 import freenet.client.FetchResult;
+import freenet.client.InserterException;

 /**
  * A client process. Something that initiates requests, and can cancel
@@ -14,4 +15,8 @@

        public void onFailure(FetchException e, ClientGet state);

+       public void onSuccess(ClientPut state);
+       
+       public void onFailure(InserterException e, ClientPut state);
+       
 }

Modified: branches/async-client/src/freenet/client/async/ClientGet.java
===================================================================
--- branches/async-client/src/freenet/client/async/ClientGet.java       
2006-01-21 20:51:10 UTC (rev 7897)
+++ branches/async-client/src/freenet/client/async/ClientGet.java       
2006-01-21 21:18:26 UTC (rev 7898)
@@ -12,17 +12,15 @@
 /**
  * A high level data request.
  */
-public class ClientGet extends ClientRequest implements 
RequestCompletionCallback {
+public class ClientGet extends ClientRequest implements GetCompletionCallback {

        final Client client;
        final FreenetURI uri;
        final FetcherContext ctx;
        final ArchiveContext actx;
        final ClientRequestScheduler scheduler;
-       ClientGetState fetchState;
+       ClientGetState currentState;
        private boolean finished;
-       private boolean cancelled;
-       final int priorityClass;
        private int archiveRestarts;

        public ClientGet(Client client, ClientRequestScheduler sched, 
FreenetURI uri, FetcherContext ctx, short priorityClass) {
@@ -33,15 +31,14 @@
                this.scheduler = sched;
                this.finished = false;
                this.actx = new ArchiveContext();
-               this.priorityClass = priorityClass;
                archiveRestarts = 0;
                start();
        }

        private void start() {
                try {
-                       fetchState = new SingleFileFetcher(this, this, new 
ClientMetadata(), uri, ctx, actx, priorityClass, 0, false, null);
-                       fetchState.schedule();
+                       currentState = new SingleFileFetcher(this, this, new 
ClientMetadata(), uri, ctx, actx, getPriorityClass(), 0, false, null);
+                       currentState.schedule();
                } catch (MalformedURLException e) {
                        onFailure(new 
FetchException(FetchException.INVALID_URI, e), null);
                } catch (FetchException e) {
@@ -49,16 +46,9 @@
                }
        }

-       public void cancel() {
-               cancelled = true;
-       }
-       
-       public boolean isCancelled() {
-               return cancelled;
-       }
-       
        public void onSuccess(FetchResult result, ClientGetState state) {
                finished = true;
+               currentState = null;
                client.onSuccess(result, this);
        }


Modified: branches/async-client/src/freenet/client/async/ClientGetState.java
===================================================================
--- branches/async-client/src/freenet/client/async/ClientGetState.java  
2006-01-21 20:51:10 UTC (rev 7897)
+++ branches/async-client/src/freenet/client/async/ClientGetState.java  
2006-01-21 21:18:26 UTC (rev 7898)
@@ -8,9 +8,6 @@

        public abstract ClientGet getParent();

-       public void schedule() {
-               // TODO Auto-generated method stub
-               
-       }
-       
+       public abstract void schedule();
+
 }

Added: branches/async-client/src/freenet/client/async/ClientPut.java
===================================================================
--- branches/async-client/src/freenet/client/async/ClientPut.java       
2006-01-21 20:51:10 UTC (rev 7897)
+++ branches/async-client/src/freenet/client/async/ClientPut.java       
2006-01-21 21:18:26 UTC (rev 7898)
@@ -0,0 +1,60 @@
+package freenet.client.async;
+
+import freenet.client.ClientMetadata;
+import freenet.client.InsertBlock;
+import freenet.client.InserterContext;
+import freenet.client.InserterException;
+import freenet.keys.FreenetURI;
+import freenet.support.Bucket;
+
+public class ClientPut extends ClientRequest implements PutCompletionCallback {
+
+       final Client client;
+       final Bucket data;
+       final FreenetURI targetURI;
+       final ClientMetadata cm;
+       final InserterContext ctx;
+       final ClientRequestScheduler scheduler;
+       private ClientPutState currentState;
+       private boolean finished;
+       private boolean cancelled;
+
+       public ClientPut(Client client, Bucket data, FreenetURI targetURI, 
ClientMetadata cm, InserterContext ctx,
+                       ClientRequestScheduler scheduler, short priorityClass) {
+               super(priorityClass);
+               this.cm = cm;
+               this.client = client;
+               this.data = data;
+               this.targetURI = targetURI;
+               this.ctx = ctx;
+               this.scheduler = scheduler;
+               this.finished = false;
+               this.cancelled = false;
+               try {
+                       start();
+               } catch (InserterException e) {
+                       onFailure(e, null);
+               }
+       }
+
+       private void start() throws InserterException {
+               currentState =
+                       new SingleFileInserter(this, this, new 
InsertBlock(data, cm, targetURI), false, ctx, false, false);
+       }
+
+       void setCurrentState(ClientPutState s) {
+               currentState = s;
+       }
+
+       public void onSuccess(ClientPutState state) {
+               finished = true;
+               currentState = null;
+               client.onSuccess(this);
+       }
+
+       public void onFailure(InserterException e, ClientPutState state) {
+               finished = true;
+               currentState = null;
+               client.onFailure(e, this);
+       }
+}

Added: branches/async-client/src/freenet/client/async/ClientPutState.java
===================================================================
--- branches/async-client/src/freenet/client/async/ClientPutState.java  
2006-01-21 20:51:10 UTC (rev 7897)
+++ branches/async-client/src/freenet/client/async/ClientPutState.java  
2006-01-21 21:18:26 UTC (rev 7898)
@@ -0,0 +1,12 @@
+package freenet.client.async;
+
+/**
+ * ClientPutState
+ * 
+ * Represents a state in the insert process.
+ */
+public interface ClientPutState {
+
+       public abstract ClientPut getParent();
+
+}

Modified: branches/async-client/src/freenet/client/async/ClientRequest.java
===================================================================
--- branches/async-client/src/freenet/client/async/ClientRequest.java   
2006-01-21 20:51:10 UTC (rev 7897)
+++ branches/async-client/src/freenet/client/async/ClientRequest.java   
2006-01-21 21:18:26 UTC (rev 7898)
@@ -9,6 +9,7 @@

        // FIXME move the priority classes from RequestStarter here
        private short priorityClass;
+       private boolean cancelled;

        public short getPriorityClass() {
                return priorityClass;
@@ -18,6 +19,13 @@
                this.priorityClass = priorityClass;
        }

-       public abstract void cancel();
+       public void cancel() {
+               cancelled = true;
+       }

+       public boolean isCancelled() {
+               return cancelled;
+       }
+       
+
 }

Copied: 
branches/async-client/src/freenet/client/async/GetCompletionCallback.java (from 
rev 7890, 
branches/async-client/src/freenet/client/async/RequestCompletionCallback.java)
===================================================================
--- 
branches/async-client/src/freenet/client/async/RequestCompletionCallback.java   
    2006-01-20 19:46:21 UTC (rev 7890)
+++ branches/async-client/src/freenet/client/async/GetCompletionCallback.java   
2006-01-21 21:18:26 UTC (rev 7898)
@@ -0,0 +1,16 @@
+package freenet.client.async;
+
+import freenet.client.FetchException;
+import freenet.client.FetchResult;
+
+/**
+ * Callback called when part of a get request completes - either with a 
+ * Bucket full of data, or with an error.
+ */
+public interface GetCompletionCallback {
+
+       public void onSuccess(FetchResult result, ClientGetState state);
+       
+       public void onFailure(FetchException e, ClientGetState state);
+       
+}

Added: 
branches/async-client/src/freenet/client/async/MultiPutCompletionCallback.java
===================================================================
--- 
branches/async-client/src/freenet/client/async/MultiPutCompletionCallback.java  
    2006-01-21 20:51:10 UTC (rev 7897)
+++ 
branches/async-client/src/freenet/client/async/MultiPutCompletionCallback.java  
    2006-01-21 21:18:26 UTC (rev 7898)
@@ -0,0 +1,61 @@
+package freenet.client.async;
+
+import java.util.LinkedList;
+
+import freenet.client.InserterException;
+
+public class MultiPutCompletionCallback implements PutCompletionCallback, 
ClientPutState {
+
+       private final LinkedList waitingFor;
+       private final PutCompletionCallback cb;
+       private final ClientPut parent;
+       private boolean finished;
+       private boolean started;
+       
+       public MultiPutCompletionCallback(PutCompletionCallback cb, ClientPut 
parent, boolean dontTellParent) {
+               this.cb = cb;
+               this.waitingFor = new LinkedList();
+               this.parent = parent;
+               finished = false;
+               if(!dontTellParent)
+                       parent.setCurrentState(this);
+       }
+
+       public synchronized void onSuccess(ClientPutState state) {
+               if(finished) return;
+               waitingFor.remove(state);
+               if(waitingFor.isEmpty() && started) {
+                       complete(null);
+               }
+       }
+
+       public synchronized void onFailure(InserterException e, ClientPutState 
state) {
+               if(finished) return;
+               waitingFor.remove(state);
+               if(waitingFor.isEmpty()) {
+                       complete(e);
+               }
+       }
+
+       private synchronized void complete(InserterException e) {
+               finished = true;
+               if(e != null)
+                       cb.onFailure(e, this);
+               else
+                       cb.onSuccess(this);
+       }
+
+       public synchronized void add(ClientPutState ps) {
+               if(finished) return;
+               waitingFor.add(ps);
+       }
+
+       public synchronized void arm() {
+               started = true;
+       }
+
+       public ClientPut getParent() {
+               return parent;
+       }
+
+}

Added: branches/async-client/src/freenet/client/async/PutCompletionCallback.java
===================================================================
--- branches/async-client/src/freenet/client/async/PutCompletionCallback.java   
2006-01-21 20:51:10 UTC (rev 7897)
+++ branches/async-client/src/freenet/client/async/PutCompletionCallback.java   
2006-01-21 21:18:26 UTC (rev 7898)
@@ -0,0 +1,17 @@
+package freenet.client.async;
+
+import freenet.client.InserterException;
+import freenet.keys.ClientKey;
+
+/**
+ * Callback called when part of a put request completes.
+ */
+public interface PutCompletionCallback {
+
+       public void onSuccess(ClientPutState state);
+       
+       public void onFailure(InserterException e, ClientPutState state);
+
+       public void onEncode(ClientKey key);
+       
+}

Deleted: 
branches/async-client/src/freenet/client/async/RequestCompletionCallback.java
===================================================================
--- 
branches/async-client/src/freenet/client/async/RequestCompletionCallback.java   
    2006-01-21 20:51:10 UTC (rev 7897)
+++ 
branches/async-client/src/freenet/client/async/RequestCompletionCallback.java   
    2006-01-21 21:18:26 UTC (rev 7898)
@@ -1,16 +0,0 @@
-package freenet.client.async;
-
-import freenet.client.FetchException;
-import freenet.client.FetchResult;
-
-/**
- * Callback called when part of a get request completes - either with a 
- * Bucket full of data, or with an error.
- */
-public interface RequestCompletionCallback {
-
-       public void onSuccess(FetchResult result, ClientGetState state);
-       
-       public void onFailure(FetchException e, ClientGetState state);
-       
-}

Added: branches/async-client/src/freenet/client/async/SendableGet.java
===================================================================
--- branches/async-client/src/freenet/client/async/SendableGet.java     
2006-01-21 20:51:10 UTC (rev 7897)
+++ branches/async-client/src/freenet/client/async/SendableGet.java     
2006-01-21 21:18:26 UTC (rev 7898)
@@ -0,0 +1,20 @@
+package freenet.client.async;
+
+import freenet.keys.ClientKey;
+import freenet.keys.ClientKeyBlock;
+import freenet.node.LowLevelPutException;
+
+/**
+ * A low-level key fetch which can be sent immediately. @see SendableRequest
+ */
+public interface SendableGet extends SendableRequest {
+
+       public ClientKey getKey();
+       
+       /** Called when/if the low-level request succeeds. */
+       public void onSuccess(ClientKeyBlock block);
+       
+       /** Called when/if the low-level request fails. */
+       public void onFailure(LowLevelPutException e);
+       
+}

Added: branches/async-client/src/freenet/client/async/SendableInsert.java
===================================================================
--- branches/async-client/src/freenet/client/async/SendableInsert.java  
2006-01-21 20:51:10 UTC (rev 7897)
+++ branches/async-client/src/freenet/client/async/SendableInsert.java  
2006-01-21 21:18:26 UTC (rev 7898)
@@ -0,0 +1,26 @@
+package freenet.client.async;
+
+import freenet.keys.ClientKeyBlock;
+import freenet.node.LowLevelPutException;
+
+/**
+ * Callback interface for a low level insert, which is immediately sendable. 
These
+ * should be registered on the ClientRequestScheduler when we want to send 
them. It will
+ * then, when it is time to send, create a thread, send the request, and call 
the 
+ * callback below.
+ */
+public interface SendableInsert extends SendableRequest {
+
+       /** Get the ClientKeyBlock to insert. This may be created
+        * just-in-time, and may return null; ClientRequestScheduler
+        * will simply unregister the SendableInsert if this happens.
+        */
+       public ClientKeyBlock getBlock();
+       
+       /** Called when we successfully insert the data */
+       public void onSuccess();
+       
+       /** Called when we don't! */
+       public void onFailure(LowLevelPutException e);
+
+}

Modified: branches/async-client/src/freenet/client/async/SendableRequest.java
===================================================================
--- branches/async-client/src/freenet/client/async/SendableRequest.java 
2006-01-21 20:51:10 UTC (rev 7897)
+++ branches/async-client/src/freenet/client/async/SendableRequest.java 
2006-01-21 21:18:26 UTC (rev 7898)
@@ -1,25 +1,13 @@
 package freenet.client.async;

-import freenet.keys.ClientKey;
-import freenet.keys.ClientKeyBlock;
-import freenet.node.LowLevelPutException;
-
 /**
  * A low-level request which can be sent immediately. These are registered
  * on the ClientRequestScheduler.
  */
 public interface SendableRequest {

-       public ClientKey getKey();
-       
        public short getPriorityClass();

        public int getRetryCount();

-       /** Called when/if the low-level request succeeds. */
-       public void onSuccess(ClientKeyBlock block);
-       
-       /** Called when/if the low-level request fails. */
-       public void onFailure(LowLevelPutException e);
-       
 }

Added: branches/async-client/src/freenet/client/async/SingleBlockInserter.java
===================================================================
--- branches/async-client/src/freenet/client/async/SingleBlockInserter.java     
2006-01-21 20:51:10 UTC (rev 7897)
+++ branches/async-client/src/freenet/client/async/SingleBlockInserter.java     
2006-01-21 21:18:26 UTC (rev 7898)
@@ -0,0 +1,186 @@
+package freenet.client.async;
+
+import java.io.IOException;
+import java.lang.ref.SoftReference;
+import java.net.MalformedURLException;
+
+import freenet.client.FailureCodeTracker;
+import freenet.client.InserterContext;
+import freenet.client.InserterException;
+import freenet.keys.CHKEncodeException;
+import freenet.keys.ClientCHKBlock;
+import freenet.keys.ClientKey;
+import freenet.keys.ClientKeyBlock;
+import freenet.keys.FreenetURI;
+import freenet.keys.InsertableClientSSK;
+import freenet.keys.SSKEncodeException;
+import freenet.node.LowLevelPutException;
+import freenet.support.Bucket;
+import freenet.support.Logger;
+
+/**
+ * Insert *ONE KEY*.
+ */
+public class SingleBlockInserter implements SendableInsert, ClientPutState {
+
+       final Bucket sourceData;
+       final short compressionCodec;
+       final FreenetURI uri; // uses essentially no RAM in the common case of 
a CHK because we use FreenetURI.EMPTY_CHK_URI
+       FreenetURI resultingURI;
+       final PutCompletionCallback cb;
+       final ClientPut parent;
+       final InserterContext ctx;
+       private int retries;
+       private final FailureCodeTracker errors;
+       private boolean finished;
+       private ClientKey key;
+       private SoftReference refToClientKeyBlock;
+       final int token; // for e.g. splitfiles
+       final boolean isMetadata;
+       final int sourceLength;
+       
+       public SingleBlockInserter(ClientPut parent, Bucket data, short 
compressionCodec, FreenetURI uri, InserterContext ctx, PutCompletionCallback 
cb, boolean isMetadata, int sourceLength, int token) throws InserterException {
+               this.token = token;
+               this.parent = parent;
+               this.retries = 0;
+               this.finished = false;
+               this.ctx = ctx;
+               errors = new FailureCodeTracker(true);
+               this.cb = cb;
+               this.uri = uri;
+               this.compressionCodec = compressionCodec;
+               this.sourceData = data;
+               this.isMetadata = isMetadata;
+               this.sourceLength = sourceLength;
+       }
+
+       protected ClientKeyBlock innerEncode() throws InserterException {
+               String uriType = uri.getKeyType().toUpperCase();
+               if(uriType.equals("CHK")) {
+                       try {
+                               return ClientCHKBlock.encode(sourceData, 
isMetadata, true, compressionCodec, sourceLength);
+                       } catch (CHKEncodeException e) {
+                               Logger.error(this, "Caught "+e, e);
+                               throw new 
InserterException(InserterException.INTERNAL_ERROR, e, null);
+                       } catch (IOException e) {
+                               Logger.error(this, "Caught "+e, e);
+                               throw new 
InserterException(InserterException.BUCKET_ERROR, e, null);
+                       }
+               } else if(uriType.equals("SSK")) {
+                       try {
+                               InsertableClientSSK ik = 
InsertableClientSSK.create(uri);
+                               return ik.encode(sourceData, isMetadata, true, 
compressionCodec, sourceLength, ctx.random);
+                       } catch (MalformedURLException e) {
+                               throw new 
InserterException(InserterException.INVALID_URI, e, null);
+                       } catch (SSKEncodeException e) {
+                               Logger.error(this, "Caught "+e, e);
+                               throw new 
InserterException(InserterException.INTERNAL_ERROR, e, null);
+                       } catch (IOException e) {
+                               Logger.error(this, "Caught "+e, e);
+                               throw new 
InserterException(InserterException.BUCKET_ERROR, e, null);
+                       }
+               } else {
+                       throw new 
InserterException(InserterException.INVALID_URI, "Unknown keytype "+uriType, 
null);
+               }
+       }
+
+       protected synchronized ClientKeyBlock encode() throws InserterException 
{
+               if(refToClientKeyBlock != null) {
+                       ClientKeyBlock block = (ClientKeyBlock) 
refToClientKeyBlock.get();
+                       if(block != null) return block;
+               }
+               ClientKeyBlock block = innerEncode();
+               refToClientKeyBlock = 
+                       new SoftReference(block);
+               resultingURI = block.getClientKey().getURI();
+               return block;
+       }
+       
+       public boolean isInsert() {
+               return true;
+       }
+
+       public short getPriorityClass() {
+               return parent.getPriorityClass();
+       }
+
+       public int getRetryCount() {
+               return retries;
+       }
+
+       public void onFailure(LowLevelPutException e) {
+               if(parent.isCancelled())
+                       fail(new 
InserterException(InserterException.CANCELLED));
+               if(e.code == LowLevelPutException.COLLISION)
+                       fail(new 
InserterException(InserterException.COLLISION));
+               switch(e.code) {
+               case LowLevelPutException.INTERNAL_ERROR:
+                       errors.inc(InserterException.INTERNAL_ERROR);
+                       break;
+               case LowLevelPutException.REJECTED_OVERLOAD:
+                       errors.inc(InserterException.REJECTED_OVERLOAD);
+                       break;
+               case LowLevelPutException.ROUTE_NOT_FOUND:
+                       errors.inc(InserterException.ROUTE_NOT_FOUND);
+                       break;
+               case LowLevelPutException.ROUTE_REALLY_NOT_FOUND:
+                       errors.inc(InserterException.ROUTE_REALLY_NOT_FOUND);
+                       break;
+               default:
+                       Logger.error(this, "Unknown LowLevelPutException code: 
"+e.code);
+                       errors.inc(InserterException.INTERNAL_ERROR);
+               }
+               if(retries > ctx.maxInsertRetries) {
+                       if(errors.isOneCodeOnly())
+                               fail(new 
InserterException(errors.getFirstCode()));
+                       else
+                               fail(new 
InserterException(InserterException.TOO_MANY_RETRIES_IN_BLOCKS, errors, 
getURI()));
+               }
+               retries++;
+               parent.scheduler.register(this);
+       }
+
+       private synchronized void fail(InserterException e) {
+               if(finished) return;
+               finished = true;
+               cb.onFailure(e, this);
+       }
+
+       public ClientKeyBlock getBlock() {
+               try {
+                       return encode();
+               } catch (InserterException e) {
+                       cb.onFailure(e, this);
+                       return null;
+               } catch (Throwable t) {
+                       cb.onFailure(new 
InserterException(InserterException.INTERNAL_ERROR, t, null), this);
+                       return null;
+               }
+       }
+
+       public void schedule() {
+               if(finished) {
+                       Logger.normal(this, "Asking to schedule but already 
finished");
+                       return;
+               }
+               parent.scheduler.register(this);
+       }
+
+       public FreenetURI getURI() {
+               if(resultingURI == null)
+                       getBlock();
+               return resultingURI;
+       }
+
+       public void onSuccess() {
+               synchronized(this) {
+                       finished = true;
+               }
+               cb.onSuccess(this);
+       }
+
+       public ClientPut getParent() {
+               return parent;
+       }
+
+}

Modified: branches/async-client/src/freenet/client/async/SingleFileFetcher.java
===================================================================
--- branches/async-client/src/freenet/client/async/SingleFileFetcher.java       
2006-01-21 20:51:10 UTC (rev 7897)
+++ branches/async-client/src/freenet/client/async/SingleFileFetcher.java       
2006-01-21 21:18:26 UTC (rev 7898)
@@ -25,14 +25,14 @@
 import freenet.support.compress.CompressionOutputSizeException;
 import freenet.support.compress.Compressor;

-public class SingleFileFetcher extends ClientGetState implements 
SendableRequest {
+public class SingleFileFetcher extends ClientGetState implements SendableGet {

        final ClientGet parent;
        //final FreenetURI uri;
        final ClientKey key;
        final LinkedList metaStrings;
        final FetcherContext ctx;
-       final RequestCompletionCallback rcb;
+       final GetCompletionCallback rcb;
        final ClientMetadata clientMetadata;
        private Metadata metadata;
        final int maxRetries;
@@ -45,6 +45,7 @@
        private int retryCount;
        private final LinkedList decompressors;
        private final boolean dontTellClientGet;
+       private boolean cancelled;
        private Object token;


@@ -53,7 +54,8 @@
         * @param token 
         * @param dontTellClientGet 
         */
-       public SingleFileFetcher(ClientGet get, RequestCompletionCallback cb, 
ClientMetadata metadata, ClientKey key, LinkedList metaStrings, FetcherContext 
ctx, ArchiveContext actx, int maxRetries, int recursionLevel, boolean 
dontTellClientGet, Object token) throws FetchException {
+       public SingleFileFetcher(ClientGet get, GetCompletionCallback cb, 
ClientMetadata metadata, ClientKey key, LinkedList metaStrings, FetcherContext 
ctx, ArchiveContext actx, int maxRetries, int recursionLevel, boolean 
dontTellClientGet, Object token) throws FetchException {
+               this.cancelled = false;
                this.dontTellClientGet = dontTellClientGet;
                this.token = token;
                this.parent = get;
@@ -76,12 +78,12 @@
        }

        /** Called by ClientGet. */ 
-       public SingleFileFetcher(ClientGet get, RequestCompletionCallback cb, 
ClientMetadata metadata, FreenetURI uri, FetcherContext ctx, ArchiveContext 
actx, int maxRetries, int recursionLevel, boolean dontTellClientGet, Object 
token) throws MalformedURLException, FetchException {
+       public SingleFileFetcher(ClientGet get, GetCompletionCallback cb, 
ClientMetadata metadata, FreenetURI uri, FetcherContext ctx, ArchiveContext 
actx, int maxRetries, int recursionLevel, boolean dontTellClientGet, Object 
token) throws MalformedURLException, FetchException {
                this(get, cb, metadata, ClientKey.getBaseKey(uri), 
uri.listMetaStrings(), ctx, actx, maxRetries, recursionLevel, 
dontTellClientGet, token);
        }

        /** Copy constructor, modifies a few given fields, don't call 
schedule() */
-       public SingleFileFetcher(SingleFileFetcher fetcher, Metadata newMeta, 
RequestCompletionCallback callback, FetcherContext ctx2) throws FetchException {
+       public SingleFileFetcher(SingleFileFetcher fetcher, Metadata newMeta, 
GetCompletionCallback callback, FetcherContext ctx2) throws FetchException {
                this.token = fetcher.token;
                this.dontTellClientGet = fetcher.dontTellClientGet;
                this.actx = fetcher.actx;
@@ -104,7 +106,7 @@

        public void schedule() {
                if(!dontTellClientGet)
-                       this.parent.fetchState = this;
+                       this.parent.currentState = this;
                parent.scheduler.register(this);
        }

@@ -350,10 +352,10 @@
                // And will also discover that the data is available, and will 
complete.
        }

-       class ArchiveFetcherCallback implements RequestCompletionCallback {
+       class ArchiveFetcherCallback implements GetCompletionCallback {

                public void onSuccess(FetchResult result, ClientGetState state) 
{
-                       parent.fetchState = SingleFileFetcher.this;
+                       parent.currentState = SingleFileFetcher.this;
                        try {
                                ctx.archiveManager.extractToCache(thisKey, 
ah.getArchiveType(), result.asBucket(), actx, ah);
                        } catch (ArchiveFailureException e) {
@@ -381,10 +383,10 @@

        }

-       class MultiLevelMetadataCallback implements RequestCompletionCallback {
+       class MultiLevelMetadataCallback implements GetCompletionCallback {

                public void onSuccess(FetchResult result, ClientGetState state) 
{
-                       parent.fetchState = SingleFileFetcher.this;
+                       parent.currentState = SingleFileFetcher.this;
                        try {
                                metadata = 
Metadata.construct(result.asBucket());
                        } catch (MetadataParseException e) {
@@ -410,6 +412,10 @@

        // Real onFailure
        private void onFailure(FetchException e, boolean forceFatal) {
+               if(parent.isCancelled() || cancelled) {
+                       onFailure(new FetchException(FetchException.CANCELLED));
+                       return;
+               }
                if(!(e.isFatal() || forceFatal) ) {
                        if(retryCount <= maxRetries) {
                                if(parent.isCancelled()) {
@@ -453,5 +459,9 @@
        public Object getToken() {
                return token;
        }
-       
+
+       public void cancel() {
+               cancelled = true;
+       }
+
 }

Added: branches/async-client/src/freenet/client/async/SingleFileInserter.java
===================================================================
--- branches/async-client/src/freenet/client/async/SingleFileInserter.java      
2006-01-21 20:51:10 UTC (rev 7897)
+++ branches/async-client/src/freenet/client/async/SingleFileInserter.java      
2006-01-21 21:18:26 UTC (rev 7898)
@@ -0,0 +1,257 @@
+package freenet.client.async;
+
+import java.io.IOException;
+
+import freenet.client.InsertBlock;
+import freenet.client.InserterContext;
+import freenet.client.InserterException;
+import freenet.client.Metadata;
+import freenet.keys.CHKBlock;
+import freenet.keys.ClientCHKBlock;
+import freenet.keys.ClientKey;
+import freenet.keys.FreenetURI;
+import freenet.keys.SSKBlock;
+import freenet.support.Bucket;
+import freenet.support.BucketTools;
+import freenet.support.compress.CompressionOutputSizeException;
+import freenet.support.compress.Compressor;
+
+/**
+ * Attempt to insert a file. May include metadata.
+ * 
+ * This stage:
+ * Attempt to compress the file. Off-thread if it will take a while.
+ * Then hand it off to SimpleFileInserter.
+ */
+class SingleFileInserter implements ClientPutState {
+
+       // Config option???
+       private static final long COMPRESS_OFF_THREAD_LIMIT = 65536;
+       
+       final ClientPut parent;
+       final InsertBlock block;
+       final InserterContext ctx;
+       final boolean metadata;
+       final PutCompletionCallback cb;
+       final boolean getCHKOnly;
+       /** If true, we are not the top level request, and should not
+        * update our parent to point to us as current put-stage. */
+       final boolean dontTellParent;
+
+       SingleFileInserter(ClientPut parent, PutCompletionCallback cb, 
InsertBlock block, boolean metadata, InserterContext ctx, boolean dontCompress, 
boolean dontTellParent, boolean getCHKOnly) throws InserterException {
+               this.parent = parent;
+               this.block = block;
+               this.ctx = ctx;
+               this.metadata = metadata;
+               this.cb = cb;
+               this.dontTellParent = dontTellParent;
+               this.getCHKOnly = getCHKOnly;
+               if(!dontTellParent)
+                       parent.setCurrentState(this);
+               start();
+       }
+       
+       private void start() throws InserterException {
+               if((!ctx.dontCompress) && block.getData().size() > 
COMPRESS_OFF_THREAD_LIMIT) {
+                       // Run off thread
+                       OffThreadCompressor otc = new OffThreadCompressor();
+                       Thread t = new Thread(otc, "Compressor for "+this);
+                       t.setDaemon(true);
+                       t.start();
+               } else {
+                       tryCompress();
+               }
+       }
+
+       private class OffThreadCompressor implements Runnable {
+               public void run() {
+                       try {
+                               tryCompress();
+                       } catch (InserterException e) {
+                               cb.onFailure(e, SingleFileInserter.this);
+                       }
+               }
+       }
+       
+       private void tryCompress() throws InserterException {
+               // First, determine how small it needs to be
+               Bucket origData = block.getData();
+               Bucket data = origData;
+               int blockSize;
+               boolean dontCompress = ctx.dontCompress;
+               
+               long origSize = data.size();
+               String type = block.desiredURI.getKeyType().toUpperCase();
+               if(type.equals("SSK") || type.equals("KSK")) {
+                       blockSize = SSKBlock.DATA_LENGTH;
+               } else if(type.equals("CHK")) {
+                       blockSize = CHKBlock.DATA_LENGTH;
+               } else {
+                       throw new 
InserterException(InserterException.INVALID_URI);
+               }
+               
+               Compressor bestCodec = null;
+               Bucket bestCompressedData = null;
+
+               if(origSize > blockSize && (!ctx.dontCompress) && 
(!dontCompress)) {
+                       // Try to compress the data.
+                       // Try each algorithm, starting with the fastest and 
weakest.
+                       // Stop when run out of algorithms, or the compressed 
data fits in a single block.
+                       int algos = Compressor.countCompressAlgorithms();
+                       try {
+                               for(int i=0;i<algos;i++) {
+                                       Compressor comp = 
Compressor.getCompressionAlgorithmByDifficulty(i);
+                                       Bucket result;
+                                       result = comp.compress(origData, 
ctx.bf, Long.MAX_VALUE);
+                                       if(result.size() < blockSize) {
+                                               bestCodec = comp;
+                                               data = result;
+                                               if(bestCompressedData != null)
+                                                       
ctx.bf.freeBucket(bestCompressedData);
+                                               bestCompressedData = data;
+                                               break;
+                                       }
+                                       if(bestCompressedData != null && 
result.size() <  bestCompressedData.size()) {
+                                               
ctx.bf.freeBucket(bestCompressedData);
+                                               bestCompressedData = result;
+                                               data = result;
+                                               bestCodec = comp;
+                                       } else if(bestCompressedData == null && 
result.size() < data.size()) {
+                                               bestCompressedData = result;
+                                               bestCodec = comp;
+                                               data = result;
+                                       }
+                               }
+                       } catch (IOException e) {
+                               throw new 
InserterException(InserterException.BUCKET_ERROR, e, null);
+                       } catch (CompressionOutputSizeException e) {
+                               // Impossible
+                               throw new Error(e);
+                       }
+               }
+               
+               // Compressed data
+               
+               // Insert it...
+               short codecNumber = bestCodec == null ? -1 : 
bestCodec.codecNumberForMetadata();
+
+               if(block.getData().size() > Integer.MAX_VALUE)
+                       throw new 
InserterException(InserterException.INTERNAL_ERROR, "2GB+ should not encode to 
one block!", null);
+               
+               if((block.clientMetadata == null || 
block.clientMetadata.isTrivial())) {
+                       if(data.size() < blockSize) {
+                               // Just insert it
+                               SingleBlockInserter bi = new 
SingleBlockInserter(parent, data, codecNumber, block.desiredURI, ctx, cb, 
metadata, (int)block.getData().size(), -1);
+                               bi.schedule();
+                               return;
+                       }
+               }
+               if (data.size() < ClientCHKBlock.MAX_COMPRESSED_DATA_LENGTH) {
+                       MultiPutCompletionCallback mcb = 
+                               new MultiPutCompletionCallback(cb, parent, 
dontTellParent);
+                       // Insert single block, then insert pointer to it
+                       SingleBlockInserter dataPutter = new 
SingleBlockInserter(parent, data, codecNumber, FreenetURI.EMPTY_CHK_URI, ctx, 
mcb, metadata, (int)origSize, -1);
+                       Metadata meta = new Metadata(Metadata.SIMPLE_REDIRECT, 
dataPutter.getURI(), block.clientMetadata);
+                       Bucket metadataBucket;
+                       try {
+                               metadataBucket = 
BucketTools.makeImmutableBucket(ctx.bf, meta.writeToByteArray());
+                       } catch (IOException e) {
+                               throw new 
InserterException(InserterException.BUCKET_ERROR, e, null);
+                       }
+                       SingleBlockInserter metaPutter = new 
SingleBlockInserter(parent, metadataBucket, (short) -1, block.desiredURI, ctx, 
mcb, true, (int)origSize, -1);
+                       mcb.add(metaPutter);
+                       mcb.add(dataPutter);
+                       mcb.arm();
+                       dataPutter.schedule();
+                       metaPutter.schedule();
+                       return;
+               }
+               // Otherwise the file is too big to fit into one block
+               // We therefore must make a splitfile
+               // Job of SplitHandler: when the splitinserter has the metadata,
+               // insert it. Then when the splitinserter has finished, and the
+               // metadata insert has finished too, tell the master callback.
+               SplitHandler sh = new SplitHandler();
+               SplitFileInserter sfi = new SplitFileInserter(parent, sh, data, 
bestCodec, block.clientMetadata, ctx, sh, getCHKOnly, metadata, true);
+               sh.sfi = sfi;
+               if(!dontTellParent)
+                       parent.setCurrentState(sh);
+               sfi.start();
+               return;
+       }
+       
+       /**
+        * When we get the metadata, start inserting it to our target key.
+        * When we have inserted both the metadata and the splitfile,
+        * call the master callback.
+        */
+       class SplitHandler implements SplitPutCompletionCallback, 
ClientPutState {
+
+               SplitFileInserter sfi;
+               SingleFileInserter metadataPutter;
+               boolean finished = false;
+               boolean splitInsertSuccess = false;
+               boolean metaInsertSuccess = false;
+               
+               public synchronized void onSuccess(ClientPutState state) {
+                       if(finished) return;
+                       if(state == sfi)
+                               splitInsertSuccess = true;
+                       else if(state == metadataPutter)
+                               metaInsertSuccess = true;
+                       if(splitInsertSuccess && metaInsertSuccess)
+                               cb.onSuccess(this);
+               }
+
+               public synchronized void onFailure(InserterException e, 
ClientPutState state) {
+                       if(finished) return;
+                       fail(e);
+               }
+
+               public void onGeneratedMetadata(Metadata meta) {
+                       if(finished) return;
+                       synchronized(this) {
+                               Bucket metadataBucket;
+                               try {
+                                       metadataBucket = 
BucketTools.makeImmutableBucket(ctx.bf, meta.writeToByteArray());
+                               } catch (IOException e) {
+                                       InserterException ex = new 
InserterException(InserterException.BUCKET_ERROR, e, null);
+                                       fail(ex);
+                                       return;
+                               }
+                               InsertBlock newBlock = new 
InsertBlock(metadataBucket, null, block.desiredURI);
+                               try {
+                                       metadataPutter = new 
SingleFileInserter(parent, this, newBlock, true, ctx, false, getCHKOnly, true);
+                               } catch (InserterException e) {
+                                       cb.onFailure(e, this);
+                                       return;
+                               }
+                       }
+                       try {
+                               metadataPutter.start();
+                       } catch (InserterException e) {
+                               fail(e);
+                               return;
+                       }
+               }
+
+               private synchronized void fail(InserterException e) {
+                       if(finished) return;
+                       finished = true;
+                       cb.onFailure(e, this);
+               }
+
+               public ClientPut getParent() {
+                       return parent;
+               }
+
+               public void onEncode(ClientKey key) {
+                       // Ignore
+               }
+               
+       }
+
+       public ClientPut getParent() {
+               return parent;
+       }
+}

Modified: branches/async-client/src/freenet/client/async/SplitFileFetcher.java
===================================================================
--- branches/async-client/src/freenet/client/async/SplitFileFetcher.java        
2006-01-21 20:51:10 UTC (rev 7897)
+++ branches/async-client/src/freenet/client/async/SplitFileFetcher.java        
2006-01-21 21:18:26 UTC (rev 7898)
@@ -21,7 +21,7 @@
 import freenet.support.compress.Compressor;

 /**
- * Fetch a splitfile, decompress it if need be, and return it to the 
RequestCompletionCallback.
+ * Fetch a splitfile, decompress it if need be, and return it to the 
GetCompletionCallback.
  * Most of the work is done by the segments, and we do not need a thread.
  */
 public class SplitFileFetcher extends ClientGetState {
@@ -31,7 +31,7 @@
        final LinkedList decompressors;
        final ClientMetadata clientMetadata;
        final ClientGet parent;
-       final RequestCompletionCallback cb;
+       final GetCompletionCallback cb;
        final int recursionLevel;
        /** The splitfile type. See the SPLITFILE_ constants on Metadata. */
        final short splitfileType;
@@ -57,7 +57,7 @@
        private final boolean splitUseLengths;
        private boolean finished;

-       public SplitFileFetcher(Metadata metadata, RequestCompletionCallback 
rcb, ClientGet parent,
+       public SplitFileFetcher(Metadata metadata, GetCompletionCallback rcb, 
ClientGet parent,
                        FetcherContext newCtx, LinkedList decompressors, 
ClientMetadata clientMetadata, 
                        ArchiveContext actx, int recursionLevel) throws 
FetchException, MetadataParseException {
                this.finished = false;
@@ -235,4 +235,10 @@
                fetchContext.eventProducer.produceEvent(new 
SplitfileProgressEvent(totalBlocks, fetchedBlocks, failedBlocks, 
fatallyFailedBlocks, runningBlocks));
        }

+       public void schedule() {
+               for(int i=0;i<segments.length;i++) {
+                       segments[i].schedule();
+               }
+       }
+
 }

Modified: 
branches/async-client/src/freenet/client/async/SplitFileFetcherSegment.java
===================================================================
--- branches/async-client/src/freenet/client/async/SplitFileFetcherSegment.java 
2006-01-21 20:51:10 UTC (rev 7897)
+++ branches/async-client/src/freenet/client/async/SplitFileFetcherSegment.java 
2006-01-21 21:18:26 UTC (rev 7898)
@@ -22,7 +22,7 @@
  * A single segment within a SplitFileFetcher.
  * This in turn controls a large number of SingleFileFetcher's.
  */
-public class SplitFileFetcherSegment implements RequestCompletionCallback {
+public class SplitFileFetcherSegment implements GetCompletionCallback {

        final short splitfileType;
        final FreenetURI[] dataBlocks;
@@ -83,22 +83,6 @@
                        blockFetchContext = new FetcherContext(fetcherContext, 
FetcherContext.SPLITFILE_DEFAULT_BLOCK_MASK);
                        this.recursionLevel = 0;
                }
-               
-               try {
-                       for(int i=0;i<dataBlocks.length;i++) {
-                               dataBlockStatus[i] =
-                                       new 
SingleFileFetcher(parentFetcher.parent, this, null, dataBlocks[i], 
blockFetchContext, archiveContext, fetchContext.maxSplitfileBlockRetries, 
recursionLevel, true, new Integer(i));
-                               dataBlockStatus[i].schedule();
-                       }
-                       for(int i=0;i<checkBlocks.length;i++) {
-                               checkBlockStatus[i] =
-                                       new 
SingleFileFetcher(parentFetcher.parent, this, null, checkBlocks[i], 
blockFetchContext, archiveContext, fetchContext.maxSplitfileBlockRetries, 
recursionLevel, true, new Integer(dataBlocks.length+i));
-                               checkBlockStatus[i].schedule();
-                       }
-               } catch (MalformedURLException e) {
-                       // Invalidates the whole splitfile
-                       throw new FetchException(FetchException.INVALID_URI, 
"Invalid URI in splitfile");
-               }
        }

        public boolean isFinished() {
@@ -293,6 +277,48 @@
                        errors.merge(e.errorCodes);
                else
                        errors.inc(new Integer(e.mode), 
((SingleFileFetcher)state).getRetryCount());
+               if(failedBlocks + fatallyFailedBlocks > (dataBlocks.length + 
checkBlocks.length - minFetched)) {
+                       fail(new FetchException(FetchException.SPLITFILE_ERROR, 
errors));
+               }
        }

+       private void fail(FetchException e) {
+               synchronized(this) {
+                       if(finished) return;
+                       finished = true;
+                       this.failureException = e;
+               }
+               for(int i=0;i<dataBlockStatus.length;i++) {
+                       SingleFileFetcher f = dataBlockStatus[i];
+                       if(f != null)
+                               f.cancel();
+               }
+               for(int i=0;i<checkBlockStatus.length;i++) {
+                       SingleFileFetcher f = dataBlockStatus[i];
+                       if(f != null)
+                               f.cancel();
+               }
+               parentFetcher.segmentFinished(this);
+       }
+
+       public void schedule() {
+               try {
+                       for(int i=0;i<dataBlocks.length;i++) {
+                               dataBlockStatus[i] =
+                                       new 
SingleFileFetcher(parentFetcher.parent, this, null, dataBlocks[i], 
blockFetchContext, archiveContext, blockFetchContext.maxSplitfileBlockRetries, 
recursionLevel, true, new Integer(i));
+                               dataBlockStatus[i].schedule();
+                       }
+                       for(int i=0;i<checkBlocks.length;i++) {
+                               checkBlockStatus[i] =
+                                       new 
SingleFileFetcher(parentFetcher.parent, this, null, checkBlocks[i], 
blockFetchContext, archiveContext, blockFetchContext.maxSplitfileBlockRetries, 
recursionLevel, true, new Integer(dataBlocks.length+i));
+                               checkBlockStatus[i].schedule();
+                       }
+               } catch (MalformedURLException e) {
+                       // Invalidates the whole splitfile
+                       fail(new FetchException(FetchException.INVALID_URI, 
"Invalid URI in splitfile"));
+               } catch (Throwable t) {
+                       fail(new FetchException(FetchException.INVALID_URI, t));
+               }
+       }
+
 }

Added: branches/async-client/src/freenet/client/async/SplitFileInserter.java
===================================================================
--- branches/async-client/src/freenet/client/async/SplitFileInserter.java       
2006-01-21 20:51:10 UTC (rev 7897)
+++ branches/async-client/src/freenet/client/async/SplitFileInserter.java       
2006-01-21 21:18:26 UTC (rev 7898)
@@ -0,0 +1,214 @@
+package freenet.client.async;
+
+import java.io.IOException;
+import java.util.Vector;
+
+import freenet.client.ClientMetadata;
+import freenet.client.FECCodec;
+import freenet.client.InserterContext;
+import freenet.client.InserterException;
+import freenet.client.Metadata;
+import freenet.client.async.SingleFileInserter.SplitHandler;
+import freenet.keys.ClientCHKBlock;
+import freenet.keys.FreenetURI;
+import freenet.support.Bucket;
+import freenet.support.BucketTools;
+import freenet.support.Logger;
+import freenet.support.compress.Compressor;
+
+public class SplitFileInserter implements ClientPutState {
+       
+       final ClientPut parent;
+       final InserterContext ctx;
+       final SplitPutCompletionCallback cb;
+       final long dataLength;
+       final short compressionCodec;
+       final short splitfileAlgorithm;
+       final int segmentSize;
+       final int checkSegmentSize;
+       final SplitFileInserterSegment[] segments;
+       final boolean getCHKOnly;
+       final int countCheckBlocks;
+       final int countDataBlocks;
+       private boolean haveSentMetadata;
+       final ClientMetadata cm;
+       final boolean isMetadata;
+       private boolean finished;
+
+       public SplitFileInserter(ClientPut put, SplitPutCompletionCallback cb, 
Bucket data, Compressor bestCodec, ClientMetadata clientMetadata, 
InserterContext ctx, SplitHandler sh, boolean getCHKOnly, boolean isMetadata, 
boolean dontTellParent) throws InserterException {
+               this.parent = put;
+               if(!dontTellParent)
+                       parent.setCurrentState(this);
+               this.finished = false;
+               this.isMetadata = isMetadata;
+               this.cm = clientMetadata;
+               this.getCHKOnly = getCHKOnly;
+               this.cb = cb;
+               this.ctx = ctx;
+               Bucket[] dataBuckets;
+               try {
+                       dataBuckets = BucketTools.split(data, 
ClientCHKBlock.DATA_LENGTH, ctx.bf);
+               } catch (IOException e) {
+                       throw new 
InserterException(InserterException.BUCKET_ERROR, e, null);
+               }
+               countDataBlocks = dataBuckets.length;
+               // Encoding is done by segments
+               if(bestCodec == null)
+                       compressionCodec = -1;
+               else
+                       compressionCodec = bestCodec.codecNumberForMetadata();
+               this.splitfileAlgorithm = ctx.splitfileAlgorithm;
+               this.dataLength = data.size();
+               segmentSize = ctx.splitfileSegmentDataBlocks;
+               checkSegmentSize = splitfileAlgorithm == 
Metadata.SPLITFILE_NONREDUNDANT ? 0 : ctx.splitfileSegmentCheckBlocks;
+               
+               // Create segments
+               segments = splitIntoSegments(segmentSize, dataBuckets);
+               int count = 0;
+               for(int i=0;i<segments.length;i++)
+                       count += segments[i].countCheckBlocks();
+               countCheckBlocks = count;
+       }
+
+       /**
+        * Group the blocks into segments.
+        */
+       private SplitFileInserterSegment[] splitIntoSegments(int segmentSize, 
Bucket[] origDataBlocks) {
+               int dataBlocks = origDataBlocks.length;
+
+               Vector segs = new Vector();
+               
+               FECCodec codec = FECCodec.getCodec(splitfileAlgorithm, 
origDataBlocks.length);
+               
+               // First split the data up
+               if(dataBlocks < segmentSize || segmentSize == -1) {
+                       // Single segment
+                       SplitFileInserterSegment onlySeg = new 
SplitFileInserterSegment(this, codec, origDataBlocks, ctx, getCHKOnly, 0);
+                       segs.add(onlySeg);
+               } else {
+                       int j = 0;
+                       int segNo = 0;
+                       for(int i=segmentSize;;i+=segmentSize) {
+                               if(i > dataBlocks) i = dataBlocks;
+                               Bucket[] seg = new Bucket[i-j];
+                               System.arraycopy(origDataBlocks, j, seg, 0, 
i-j);
+                               j = i;
+                               for(int x=0;x<seg.length;x++)
+                                       if(seg[x] == null) throw new 
NullPointerException("In splitIntoSegs: "+x+" is null of "+seg.length+" of 
"+segNo);
+                               SplitFileInserterSegment s = new 
SplitFileInserterSegment(this, codec, seg, ctx, getCHKOnly, segNo);
+                               segs.add(s);
+                               
+                               if(i == dataBlocks) break;
+                               segNo++;
+                       }
+               }
+               return (SplitFileInserterSegment[]) segs.toArray(new 
SplitFileInserterSegment[segs.size()]);
+       }
+       
+       public void start() {
+               for(int i=0;i<segments.length;i++)
+                       segments[i].start();
+       }
+
+       public void encodedSegment(SplitFileInserterSegment segment) {
+               Logger.minor(this, "Encoded segment "+segment.segNo+" of 
"+this);
+       }
+       
+       public void segmentHasURIs(SplitFileInserterSegment segment) {
+               if(haveSentMetadata) {
+                       Logger.error(this, "WTF? Already sent metadata");
+                       return;
+               }
+               
+               boolean allHaveURIs = true;
+               synchronized(this) {
+                       for(int i=0;i<segments.length;i++) {
+                               if(!segments[i].isEncoded())
+                                       allHaveURIs = false;
+                       }
+               }
+               
+               if(allHaveURIs) {
+                       boolean missingURIs;
+                       Metadata m = null;
+                       synchronized(this) {
+                               // Create metadata
+                               FreenetURI[] dataURIs = getDataURIs();
+                               FreenetURI[] checkURIs = getCheckURIs();
+                               
+                               Logger.minor(this, "Data URIs: 
"+dataURIs.length+", check URIs: "+checkURIs.length);
+                               
+                               missingURIs = anyNulls(dataURIs) || 
anyNulls(checkURIs);
+                               
+                               if(!missingURIs) {
+                                       // Create Metadata
+                                       Metadata metadata = new 
Metadata(splitfileAlgorithm, dataURIs, checkURIs, segmentSize, 
checkSegmentSize, cm, dataLength, compressionCodec, isMetadata);
+                               }
+                               haveSentMetadata = true;
+                       }
+                       if(missingURIs) {
+                               // Error
+                               fail(new 
InserterException(InserterException.INTERNAL_ERROR, "Missing URIs after 
encoding", null));
+                               return;
+                       } else
+                               cb.onGeneratedMetadata(m);
+               }
+
+       }
+       
+       private void fail(InserterException e) {
+               synchronized(this) {
+                       if(finished) return;
+                       finished = true;
+               }
+               cb.onFailure(e, this);
+       }
+
+       // FIXME move this to somewhere
+       private static boolean anyNulls(Object[] array) {
+               for(int i=0;i<array.length;i++)
+                       if(array[i] == null) return true;
+               return false;
+       }
+
+       private FreenetURI[] getCheckURIs() {
+               // Copy check blocks from each segment into a FreenetURI[].
+               FreenetURI[] uris = new FreenetURI[countCheckBlocks];
+               int x = 0;
+               for(int i=0;i<segments.length;i++) {
+                       FreenetURI[] segURIs = segments[i].getCheckURIs();
+                       if(x + segURIs.length > countCheckBlocks) 
+                               throw new IllegalStateException("x="+x+", 
segURIs="+segURIs.length+", countCheckBlocks="+countCheckBlocks);
+                       System.arraycopy(segURIs, 0, uris, x, segURIs.length);
+                       x += segURIs.length;
+               }
+
+               if(uris.length != x)
+                       throw new IllegalStateException("Total is wrong");
+               
+               return uris;
+       }
+
+       private FreenetURI[] getDataURIs() {
+               // Copy check blocks from each segment into a FreenetURI[].
+               FreenetURI[] uris = new FreenetURI[countDataBlocks];
+               int x = 0;
+               for(int i=0;i<segments.length;i++) {
+                       FreenetURI[] segURIs = segments[i].getDataURIs();
+                       if(x + segURIs.length > countDataBlocks) 
+                               throw new IllegalStateException("x="+x+", 
segURIs="+segURIs.length+", countDataBlocks="+countDataBlocks);
+                       System.arraycopy(segURIs, 0, uris, x, segURIs.length);
+                       x += segURIs.length;
+               }
+
+               if(uris.length != x)
+                       throw new IllegalStateException("Total is wrong");
+               
+               return uris;
+       }
+
+       public ClientPut getParent() {
+               return parent;
+       }
+
+}

Added: 
branches/async-client/src/freenet/client/async/SplitFileInserterSegment.java
===================================================================
--- 
branches/async-client/src/freenet/client/async/SplitFileInserterSegment.java    
    2006-01-21 20:51:10 UTC (rev 7897)
+++ 
branches/async-client/src/freenet/client/async/SplitFileInserterSegment.java    
    2006-01-21 21:18:26 UTC (rev 7898)
@@ -0,0 +1,130 @@
+package freenet.client.async;
+
+import java.io.IOException;
+
+import freenet.client.FECCodec;
+import freenet.client.InserterContext;
+import freenet.client.InserterException;
+import freenet.keys.ClientCHKBlock;
+import freenet.keys.ClientKey;
+import freenet.keys.FreenetURI;
+import freenet.support.Bucket;
+
+public class SplitFileInserterSegment implements PutCompletionCallback {
+
+       final SplitFileInserter parent;
+       final FECCodec splitfileAlgo;
+       final Bucket[] dataBlocks;
+       final Bucket[] checkBlocks;
+       final FreenetURI[] dataURIs;
+       final FreenetURI[] checkURIs;
+       final SingleBlockInserter[] dataBlockInserters;
+       final SingleBlockInserter[] checkBlockInserters;
+       final InserterContext blockInsertContext;
+       final int segNo;
+       private boolean encoded;
+       private boolean finished;
+       private InserterException toThrow;
+       
+       public SplitFileInserterSegment(SplitFileInserter parent, FECCodec 
splitfileAlgo, Bucket[] origDataBlocks, InserterContext blockInsertContext, 
boolean getCHKOnly, int segNo) {
+               this.parent = parent;
+               this.blockInsertContext = blockInsertContext;
+               this.splitfileAlgo = splitfileAlgo;
+               this.dataBlocks = origDataBlocks;
+               int checkBlockCount = splitfileAlgo == null ? 0 : 
splitfileAlgo.countCheckBlocks();
+               checkBlocks = new Bucket[checkBlockCount];
+               checkURIs = new FreenetURI[checkBlockCount];
+               dataURIs = new FreenetURI[origDataBlocks.length];
+               dataBlockInserters = new SingleBlockInserter[dataBlocks.length];
+               checkBlockInserters = new 
SingleBlockInserter[checkBlocks.length];
+               this.segNo = segNo;
+       }
+       
+       public void start() {
+               if(splitfileAlgo == null) {
+                       // Don't need to encode blocks
+               } else {
+                       // Encode blocks
+                       Thread t = new Thread(new EncodeBlocksRunnable(), 
"Blocks encoder");
+                       t.setDaemon(true);
+                       t.start();
+               }
+       }
+       
+       private class EncodeBlocksRunnable implements Runnable {
+               
+               public void run() {
+                       encode();
+               }
+       }
+
+       void encode() {
+               try {
+                       splitfileAlgo.encode(dataBlocks, checkBlocks, 
ClientCHKBlock.DATA_LENGTH, blockInsertContext.bf);
+                       // Success! Start the fetches.
+                       encoded = true;
+                       parent.encodedSegment(this);
+                       // Start the inserts
+                       for(int i=0;i<dataBlockInserters.length;i++)
+                               dataBlockInserters[i] = 
+                                       new SingleBlockInserter(parent.parent, 
dataBlocks[i], (short)-1, FreenetURI.EMPTY_CHK_URI, blockInsertContext, this, 
false, ClientCHKBlock.DATA_LENGTH, i);
+                       for(int i=0;i<checkBlockInserters.length;i++)
+                               checkBlockInserters[i] = 
+                                       new SingleBlockInserter(parent.parent, 
checkBlocks[i], (short)-1, FreenetURI.EMPTY_CHK_URI, blockInsertContext, this, 
false, ClientCHKBlock.DATA_LENGTH, i + dataBlocks.length);
+               } catch (IOException e) {
+                       InserterException ex = 
+                               new 
InserterException(InserterException.BUCKET_ERROR, e, null);
+                       finish(ex);
+               } catch (Throwable t) {
+                       InserterException ex = 
+                               new 
InserterException(InserterException.INTERNAL_ERROR, t, null);
+                       finish(ex);
+               }
+       }
+
+       private void finish(InserterException ex) {
+               synchronized(this) {
+                       if(finished) return;
+                       finished = true;
+                       toThrow = ex;
+               }
+               parent.segmentFinished(this);
+       }
+
+       public void onSuccess(ClientPutState state) {
+               // TODO Auto-generated method stub
+               
+       }
+
+       public void onFailure(InserterException e, ClientPutState state) {
+               // TODO Auto-generated method stub
+               
+       }
+
+       public boolean isFinished() {
+               return finished;
+       }
+       
+       public boolean isEncoded() {
+               return encoded;
+       }
+
+       public int countCheckBlocks() {
+               return checkBlocks.length;
+       }
+
+       public FreenetURI[] getCheckURIs() {
+               return checkURIs;
+       }
+
+       public FreenetURI[] getDataURIs() {
+               // TODO Auto-generated method stub
+               return null;
+       }
+       
+       public void onEncode(ClientKey key) {
+               // TODO Auto-generated method stub
+               
+       }
+
+}

Added: 
branches/async-client/src/freenet/client/async/SplitPutCompletionCallback.java
===================================================================
--- 
branches/async-client/src/freenet/client/async/SplitPutCompletionCallback.java  
    2006-01-21 20:51:10 UTC (rev 7897)
+++ 
branches/async-client/src/freenet/client/async/SplitPutCompletionCallback.java  
    2006-01-21 21:18:26 UTC (rev 7898)
@@ -0,0 +1,9 @@
+package freenet.client.async;
+
+import freenet.client.Metadata;
+
+public interface SplitPutCompletionCallback extends PutCompletionCallback {
+
+       public void onGeneratedMetadata(Metadata meta);
+       
+}

Added: branches/async-client/src/freenet/node/fcp/FetchErrorMessage.java
===================================================================
--- branches/async-client/src/freenet/node/fcp/FetchErrorMessage.java   
2006-01-21 20:51:10 UTC (rev 7897)
+++ branches/async-client/src/freenet/node/fcp/FetchErrorMessage.java   
2006-01-21 21:18:26 UTC (rev 7898)
@@ -0,0 +1,45 @@
+package freenet.node.fcp;
+
+import freenet.client.FailureCodeTracker;
+import freenet.client.FetchException;
+import freenet.node.Node;
+import freenet.support.SimpleFieldSet;
+
+public class FetchErrorMessage extends FCPMessage {
+
+       final int code;
+       final String codeDescription;
+       final String extraDescription;
+       final FailureCodeTracker tracker;
+       final boolean isFatal;
+       
+       public FetchErrorMessage(FCPConnectionHandler handler, FetchException 
e, String identifier) {
+               this.tracker = e.errorCodes;
+               this.code = e.mode;
+               this.codeDescription = FetchException.getMessage(code);
+               this.extraDescription = e.extraMessage;
+               this.isFatal = e.isFatal();
+       }
+
+       public SimpleFieldSet getFieldSet() {
+               SimpleFieldSet sfs = new SimpleFieldSet();
+               sfs.put("Code", Integer.toHexString(code));
+               sfs.put("CodeDescription", codeDescription);
+               if(extraDescription != null)
+                       sfs.put("ExtraDescription", extraDescription);
+               sfs.put("Fatal", Boolean.toString(isFatal));
+               if(tracker != null) {
+                       tracker.copyToFieldSet(sfs, "Errors.");
+               }
+               return sfs;
+       }
+
+       public String getName() {
+               return "FetchError";
+       }
+
+       public void run(FCPConnectionHandler handler, Node node) throws 
MessageInvalidException {
+               throw new 
MessageInvalidException(ProtocolErrorMessage.INVALID_MESSAGE, "FetchError goes 
from server to client not the other way around");
+       }
+
+}


Reply via email to