Author: toad
Date: 2006-01-06 21:18:26 +0000 (Fri, 06 Jan 2006)
New Revision: 7770

Added:
   trunk/freenet/src/freenet/node/AnyInsertSender.java
   trunk/freenet/src/freenet/node/CHKInsertSender.java
Removed:
   trunk/freenet/src/freenet/node/InsertSender.java
Modified:
   trunk/freenet/src/freenet/io/comm/DMT.java
   trunk/freenet/src/freenet/keys/CHKBlock.java
   trunk/freenet/src/freenet/keys/ClientCHKBlock.java
   trunk/freenet/src/freenet/keys/KeyBlock.java
   trunk/freenet/src/freenet/keys/SSKBlock.java
   trunk/freenet/src/freenet/node/InsertHandler.java
   trunk/freenet/src/freenet/node/Node.java
   trunk/freenet/src/freenet/node/RealNodeRequestInsertTest.java
   trunk/freenet/src/freenet/node/RequestHandler.java
   trunk/freenet/src/freenet/node/Version.java
   trunk/freenet/src/freenet/store/BaseFreenetStore.java
   trunk/freenet/src/freenet/store/BerkeleyDBFreenetStore.java
Log:
324:
Implement SSKInsertSender.

Modified: trunk/freenet/src/freenet/io/comm/DMT.java
===================================================================
--- trunk/freenet/src/freenet/io/comm/DMT.java  2006-01-06 19:26:42 UTC (rev 
7769)
+++ trunk/freenet/src/freenet/io/comm/DMT.java  2006-01-06 21:18:26 UTC (rev 
7770)
@@ -23,6 +23,7 @@
 import java.util.List;

 import freenet.keys.Key;
+import freenet.keys.NodeSSK;
 import freenet.support.BitArray;
 import freenet.support.Buffer;
 import freenet.support.ShortBuffer;
@@ -80,6 +81,9 @@
     public static final String STREAM_SEQNO = "streamSequenceNumber";
     public static final String IS_LOCAL = "isLocal";
     public static final String ANY_TIMED_OUT = "anyTimedOut";
+    public static final String PUBKEY_HASH = "pubkeyHash";
+    public static final String NEED_PUB_KEY = "needPubKey";
+    public static final String PUBKEY_AS_BYTES = "pubkeyAsBytes";

        //Diagnostic
        public static final MessageType ping = new MessageType("ping") {{
@@ -666,7 +670,83 @@
             return "Receive failed";
         return "Unknown reason code: "+reason;
     }
+
+    public static final MessageType FNPSSKInsertRequest = new 
MessageType("FNPSSKInsertRequest") {{
+       addField(UID, Long.class);
+       addField(HTL, Short.class);
+       addField(KEY, NodeSSK.class);
+        addField(NEAREST_LOCATION, Double.class);
+        addField(BLOCK_HEADERS, ShortBuffer.class);
+        addField(PUBKEY_HASH, ShortBuffer.class);
+    }};

+       public static Message createFNPSSKInsertRequest(long uid, short htl, 
NodeSSK myKey, double closestLocation, byte[] headers, byte[] pubKeyHash) {
+               Message msg = new Message(FNPSSKInsertRequest);
+               msg.set(UID, uid);
+               msg.set(HTL, htl);
+               msg.set(KEY, myKey);
+               msg.set(NEAREST_LOCATION, closestLocation);
+               msg.set(BLOCK_HEADERS, new ShortBuffer(headers));
+               msg.set(PUBKEY_HASH, new ShortBuffer(pubKeyHash));
+               return msg;
+       }
+
+       public static final MessageType FNPSSKDataFound = new 
MessageType("FNPSSKDataFound") {{
+       addField(UID, Long.class);
+       addField(HTL, Short.class);
+       addField(KEY, NodeSSK.class);
+        addField(NEAREST_LOCATION, Double.class);
+        addField(BLOCK_HEADERS, ShortBuffer.class);
+        addField(PUBKEY_HASH, ShortBuffer.class);
+       }};
+       
+       public static Message createFNPSSKDataFound(long uid, short htl, 
NodeSSK myKey, double closestLocation, byte[] headers, byte[] pubKeyHash) {
+               Message msg = new Message(FNPSSKDataFound);
+               msg.set(UID, uid);
+               msg.set(HTL, htl);
+               msg.set(KEY, myKey);
+               msg.set(NEAREST_LOCATION, closestLocation);
+               msg.set(BLOCK_HEADERS, new ShortBuffer(headers));
+               msg.set(PUBKEY_HASH, new ShortBuffer(pubKeyHash));
+               return msg;
+       }
+
+       
+       
+       public static MessageType FNPSSKAccepted = new 
MessageType("FNPSSKAccepted") {{
+               addField(UID, Long.class);
+               addField(NEED_PUB_KEY, Boolean.class);
+       }};
+       
+       public static final Message createFNPSSKAccepted(long uid, boolean 
needPubKey) {
+               Message msg = new Message(FNPSSKAccepted);
+               msg.set(UID, uid);
+               msg.set(NEED_PUB_KEY, needPubKey);
+               return msg;
+       }
+       
+       public static MessageType FNPSSKPubKey = new 
MessageType("FNPSSKPubKey") {{
+               addField(UID, Long.class);
+               addField(PUBKEY_AS_BYTES, ShortBuffer.class);
+       }};
+       
+       public static Message createFNPSSKPubKey(long uid, byte[] pubkey) {
+               Message msg = new Message(FNPSSKPubKey);
+               msg.set(UID, uid);
+               msg.set(PUBKEY_AS_BYTES, new ShortBuffer(pubkey));
+               return msg;
+       }
+       
+       public static MessageType FNPSSKPubKeyAccepted = new 
MessageType("FNPSSKPubKeyAccepted") {{
+               addField(UID, Long.class);
+       }};
+       
+       public static Message createFNPSSKPubKeyAccepted(long uid) {
+               Message msg = new Message(FNPSSKPubKeyAccepted);
+               msg.set(UID, uid);
+               return msg;
+       }
+       
     public static final MessageType FNPPing = new MessageType("FNPPing") {{
         addField(PING_SEQNO, Integer.class);
     }};
@@ -812,7 +892,7 @@
         msg.set(HTL, htl);
         return msg;
     }
-    
+
        public static void init() { }

 }

Modified: trunk/freenet/src/freenet/keys/CHKBlock.java
===================================================================
--- trunk/freenet/src/freenet/keys/CHKBlock.java        2006-01-06 19:26:42 UTC 
(rev 7769)
+++ trunk/freenet/src/freenet/keys/CHKBlock.java        2006-01-06 21:18:26 UTC 
(rev 7770)
@@ -1,26 +1,8 @@
 package freenet.keys;

-import java.io.IOException;
 import java.security.MessageDigest;
 import java.security.NoSuchAlgorithmException;
-import java.util.Arrays;

-import freenet.crypt.BlockCipher;
-import freenet.crypt.PCFBMode;
-import freenet.crypt.UnsupportedCipherException;
-import freenet.crypt.ciphers.Rijndael;
-import freenet.node.Node;
-import freenet.support.ArrayBucket;
-import freenet.support.ArrayBucketFactory;
-import freenet.support.Bucket;
-import freenet.support.BucketFactory;
-import freenet.support.BucketTools;
-import freenet.support.Logger;
-import freenet.support.SimpleReadOnlyArrayBucket;
-import freenet.support.compress.CompressionOutputSizeException;
-import freenet.support.compress.Compressor;
-import freenet.support.compress.DecompressException;
-
 /**
  * @author amphibian
  * 
@@ -30,7 +12,7 @@
 public class CHKBlock implements KeyBlock {

     final byte[] data;
-    final byte[] header;
+    final byte[] headers;
     final short hashIdentifier;
     final NodeCHK chk;
     public static final int MAX_LENGTH_BEFORE_COMPRESSION = Integer.MAX_VALUE;
@@ -43,8 +25,8 @@
     /**
      * @return The header for this key. DO NOT MODIFY THIS DATA!
      */
-    public byte[] getHeader() {
-        return header;
+    public byte[] getHeaders() {
+        return headers;
     }

     /**
@@ -60,10 +42,10 @@

     public CHKBlock(byte[] data2, byte[] header2, NodeCHK key, boolean verify) 
throws CHKVerifyException {
         data = data2;
-        header = header2;
-        if(header.length != TOTAL_HEADERS_LENGTH)
-               throw new IllegalArgumentException("Wrong length: 
"+header.length+" should be "+TOTAL_HEADERS_LENGTH);
-        hashIdentifier = (short)(((header[0] & 0xff) << 8) + (header[1] & 
0xff));
+        headers = header2;
+        if(headers.length != TOTAL_HEADERS_LENGTH)
+               throw new IllegalArgumentException("Wrong length: 
"+headers.length+" should be "+TOTAL_HEADERS_LENGTH);
+        hashIdentifier = (short)(((headers[0] & 0xff) << 8) + (headers[1] & 
0xff));
         this.chk = key;
 //        Logger.debug(CHKBlock.class, "Data length: "+data.length+", header 
length: "+header.length);
         if(!verify) return;
@@ -79,7 +61,7 @@
             throw new Error(e);
         }

-        md.update(header);
+        md.update(headers);
         md.update(data);
         byte[] hash = md.digest();
         byte[] check = chk.routingKey;
@@ -92,4 +74,12 @@
        public Key getKey() {
         return chk;
     }
+
+       public byte[] getRawHeaders() {
+               return headers;
+       }
+
+       public byte[] getRawData() {
+               return data;
+       }
 }

Modified: trunk/freenet/src/freenet/keys/ClientCHKBlock.java
===================================================================
--- trunk/freenet/src/freenet/keys/ClientCHKBlock.java  2006-01-06 19:26:42 UTC 
(rev 7769)
+++ trunk/freenet/src/freenet/keys/ClientCHKBlock.java  2006-01-06 21:18:26 UTC 
(rev 7770)
@@ -55,7 +55,7 @@
      * Construct from a CHKBlock and a key.
      */
     public ClientCHKBlock(CHKBlock block, ClientCHK key2) throws 
CHKVerifyException {
-        this(block.getData(), block.getHeader(), key2, true);
+        this(block.getData(), block.getHeaders(), key2, true);
     }

     /**
@@ -98,8 +98,8 @@
             throw new CHKDecodeException("Crypto key too short");
         cipher.initialize(key.cryptoKey);
         PCFBMode pcfb = new PCFBMode(cipher);
-        byte[] hbuf = new byte[header.length-2];
-        System.arraycopy(header, 2, hbuf, 0, header.length-2);
+        byte[] hbuf = new byte[headers.length-2];
+        System.arraycopy(headers, 2, hbuf, 0, headers.length-2);
         byte[] dbuf = new byte[data.length];
         System.arraycopy(data, 0, dbuf, 0, data.length);
         // Decipher header first - functions as IV

Modified: trunk/freenet/src/freenet/keys/KeyBlock.java
===================================================================
--- trunk/freenet/src/freenet/keys/KeyBlock.java        2006-01-06 19:26:42 UTC 
(rev 7769)
+++ trunk/freenet/src/freenet/keys/KeyBlock.java        2006-01-06 21:18:26 UTC 
(rev 7770)
@@ -1,10 +1,5 @@
 package freenet.keys;

-import java.io.IOException;
-
-import freenet.support.Bucket;
-import freenet.support.BucketFactory;
-
 /**
  * Interface for fetched blocks. Can be decoded with a key.
  */
@@ -12,4 +7,7 @@

     final static int HASH_SHA256 = 1;

+    public Key getKey();
+    public byte[] getRawHeaders();
+    public byte[] getRawData();
 }

Modified: trunk/freenet/src/freenet/keys/SSKBlock.java
===================================================================
--- trunk/freenet/src/freenet/keys/SSKBlock.java        2006-01-06 19:26:42 UTC 
(rev 7769)
+++ trunk/freenet/src/freenet/keys/SSKBlock.java        2006-01-06 21:18:26 UTC 
(rev 7770)
@@ -109,4 +109,16 @@
                        throw new SSKVerifyException("E(H(docname)) wrong - 
wrong key??");
        }

+       public Key getKey() {
+               return nodeKey;
+       }
+
+       public byte[] getRawHeaders() {
+               return headers;
+       }
+
+       public byte[] getRawData() {
+               return data;
+       }
+
 }

Copied: trunk/freenet/src/freenet/node/AnyInsertSender.java (from rev 7767, 
trunk/freenet/src/freenet/node/InsertSender.java)
===================================================================
--- trunk/freenet/src/freenet/node/InsertSender.java    2006-01-06 12:27:49 UTC 
(rev 7767)
+++ trunk/freenet/src/freenet/node/AnyInsertSender.java 2006-01-06 21:18:26 UTC 
(rev 7770)
@@ -0,0 +1,16 @@
+package freenet.node;
+
+public interface AnyInsertSender {
+
+       public abstract int getStatus();
+
+       public abstract short getHTL();
+
+       /**
+        * @return The current status as a string
+        */
+       public abstract String getStatusString();
+
+       public abstract boolean sentRequest();
+
+}
\ No newline at end of file

Copied: trunk/freenet/src/freenet/node/CHKInsertSender.java (from rev 7767, 
trunk/freenet/src/freenet/node/InsertSender.java)
===================================================================
--- trunk/freenet/src/freenet/node/InsertSender.java    2006-01-06 12:27:49 UTC 
(rev 7767)
+++ trunk/freenet/src/freenet/node/CHKInsertSender.java 2006-01-06 21:18:26 UTC 
(rev 7770)
@@ -0,0 +1,799 @@
+package freenet.node;
+
+import java.util.HashSet;
+import java.util.Vector;
+
+import freenet.io.comm.DMT;
+import freenet.io.comm.DisconnectedException;
+import freenet.io.comm.Message;
+import freenet.io.comm.MessageFilter;
+import freenet.io.comm.NotConnectedException;
+import freenet.io.xfer.AbortedException;
+import freenet.io.xfer.BlockTransmitter;
+import freenet.io.xfer.PartiallyReceivedBlock;
+import freenet.keys.CHKBlock;
+import freenet.keys.CHKVerifyException;
+import freenet.keys.NodeCHK;
+import freenet.support.Logger;
+
+public final class CHKInsertSender implements Runnable, AnyInsertSender {
+
+       private class AwaitingCompletion {
+               
+               /** Node we are waiting for response from */
+               final PeerNode pn;
+               /** We may be sending data to that node */
+               BlockTransmitter bt;
+               /** Have we received notice of the downstream success
+                * or failure of dependant transfers from that node?
+                * Includes timing out. */
+               boolean receivedCompletionNotice = false;
+               /** Timed out - didn't receive completion notice in
+                * the allotted time?? */
+               boolean completionTimedOut = false;
+               /** Was the notification of successful transfer? */
+               boolean completionSucceeded;
+               
+               /** Have we completed the immediate transfer? */
+               boolean completedTransfer = false;
+               /** Did it succeed? */
+               boolean transferSucceeded = false;
+               
+               AwaitingCompletion(PeerNode pn, PartiallyReceivedBlock prb) {
+                       this.pn = pn;
+                       bt = new BlockTransmitter(node.usm, pn, uid, prb);
+                       Sender s = new Sender(this);
+            Thread senderThread = new Thread(s, "Sender for "+uid+" to 
"+pn.getPeer());
+            senderThread.setDaemon(true);
+            senderThread.start();
+               }
+               
+               void completed(boolean timeout, boolean success) {
+                       synchronized(this) {
+                               if(timeout)
+                                       completionTimedOut = true;
+                               else
+                                       completionSucceeded = success;
+                               receivedCompletionNotice = true;
+                               notifyAll();
+                       }
+                       synchronized(nodesWaitingForCompletion) {
+                               nodesWaitingForCompletion.notifyAll();
+                       }
+                       if(!success) {
+                               synchronized(CHKInsertSender.this) {
+                                       transferTimedOut = true;
+                                       CHKInsertSender.this.notifyAll();
+                               }
+                       }
+               }
+               
+               void completedTransfer(boolean success) {
+                       synchronized(this) {
+                               transferSucceeded = success;
+                               completedTransfer = true;
+                               notifyAll();
+                       }
+                       synchronized(nodesWaitingForCompletion) {
+                               nodesWaitingForCompletion.notifyAll();
+                       }
+                       if(!success) {
+                               synchronized(CHKInsertSender.this) {
+                                       transferTimedOut = true;
+                                       CHKInsertSender.this.notifyAll();
+                               }
+                       }
+               }
+       }
+       
+    public class Sender implements Runnable {
+       
+       final AwaitingCompletion completion;
+       final BlockTransmitter bt;
+       
+       public Sender(AwaitingCompletion ac) {
+               this.bt = ac.bt;
+               this.completion = ac;
+       }
+       
+               public void run() {
+                       try {
+                               bt.send();
+                               if(bt.failedDueToOverload()) {
+                                       completion.completedTransfer(false);
+                               } else {
+                                       completion.completedTransfer(true);
+                               }
+                       } catch (Throwable t) {
+                               completion.completedTransfer(false);
+                               Logger.error(this, "Caught "+t, t);
+                       }
+               }
+       }
+    
+       CHKInsertSender(NodeCHK myKey, long uid, byte[] headers, short htl, 
+            PeerNode source, Node node, PartiallyReceivedBlock prb, boolean 
fromStore, double closestLocation) {
+        this.myKey = myKey;
+        this.target = myKey.toNormalizedDouble();
+        this.uid = uid;
+        this.headers = headers;
+        this.htl = htl;
+        this.source = source;
+        this.node = node;
+        this.prb = prb;
+        this.fromStore = fromStore;
+        this.closestLocation = closestLocation;
+        this.startTime = System.currentTimeMillis();
+        this.nodesWaitingForCompletion = new Vector();
+        Thread t = new Thread(this, "CHKInsertSender for UID "+uid+" on 
"+node.portNumber+" at "+System.currentTimeMillis());
+        t.setDaemon(true);
+        t.start();
+    }
+    
+    // Constants
+    static final int ACCEPTED_TIMEOUT = 10000;
+    static final int SEARCH_TIMEOUT = 60000;
+    static final int TRANSFER_COMPLETION_TIMEOUT = 120000;
+
+    // Basics
+    final NodeCHK myKey;
+    final double target;
+    final long uid;
+    short htl;
+    final PeerNode source;
+    final Node node;
+    final byte[] headers; // received BEFORE creation => we handle Accepted 
elsewhere
+    final PartiallyReceivedBlock prb;
+    final boolean fromStore;
+    private boolean receiveFailed = false;
+    final double closestLocation;
+    final long startTime;
+    private boolean sentRequest;
+    
+    /** List of nodes we are waiting for either a transfer completion
+     * notice or a transfer completion from. */
+    private Vector nodesWaitingForCompletion;
+    
+    /** Have all transfers completed and all nodes reported completion status? 
*/
+    private boolean allTransfersCompleted = false;
+    
+    /** Has a transfer timed out, either directly or downstream? */
+    private boolean transferTimedOut = false;
+    
+    /** Runnable which waits for completion of all transfers */
+    private CompletionWaiter cw = null;
+
+    /** Time at which we set status to a value other than NOT_FINISHED */
+    private long setStatusTime = -1;
+    
+    
+    private int status = -1;
+    /** Still running */
+    static final int NOT_FINISHED = -1;
+    /** Successful insert */
+    static final int SUCCESS = 0;
+    /** Route not found */
+    static final int ROUTE_NOT_FOUND = 1;
+    /** Internal error */
+    static final int INTERNAL_ERROR = 3;
+    /** Timed out waiting for response */
+    static final int TIMED_OUT = 4;
+    /** Locally Generated a RejectedOverload */
+    static final int GENERATED_REJECTED_OVERLOAD = 5;
+    /** Could not get off the node at all! */
+    static final int ROUTE_REALLY_NOT_FOUND = 6;
+    
+    public String toString() {
+        return super.toString()+" for "+uid;
+    }
+    
+    public void run() {
+        short origHTL = htl;
+        try {
+               realRun();
+        } catch (Throwable t) {
+            Logger.error(this, "Caught "+t, t);
+            if(status == NOT_FINISHED)
+               finish(INTERNAL_ERROR, null);
+        } finally {
+            node.completed(uid);
+               node.removeInsertSender(myKey, origHTL, this);
+        }
+    }
+    
+    private void realRun() {
+        HashSet nodesRoutedTo = new HashSet();
+        HashSet nodesNotIgnored = new HashSet();
+        
+        while(true) {
+            if(receiveFailed) return; // don't need to set status as killed by 
InsertHandler
+            
+            if(htl == 0) {
+                // Send an InsertReply back
+                finish(SUCCESS, null);
+                return;
+            }
+            
+            // Route it
+            PeerNode next;
+            // Can backtrack, so only route to nodes closer than we are to 
target.
+            double nextValue;
+            synchronized(node.peers) {
+                next = node.peers.closerPeer(source, nodesRoutedTo, 
nodesNotIgnored, target, true);
+                if(next != null)
+                    nextValue = next.getLocation().getValue();
+                else
+                    nextValue = -1.0;
+            }
+            
+            if(next == null) {
+                // Backtrack
+                finish(ROUTE_NOT_FOUND, null);
+                return;
+            }
+            Logger.minor(this, "Routing insert to "+next);
+            nodesRoutedTo.add(next);
+            
+            if(Math.abs(target - nextValue) > Math.abs(target - 
closestLocation)) {
+                Logger.minor(this, "Backtracking: target="+target+" 
next="+nextValue+" closest="+closestLocation);
+                htl = node.decrementHTL(source, htl);
+            }
+            
+            Message req = DMT.createFNPInsertRequest(uid, htl, myKey, 
closestLocation);
+            
+            // Wait for ack or reject... will come before even a locally 
generated DataReply
+            
+            MessageFilter mfAccepted = 
MessageFilter.create().setSource(next).setField(DMT.UID, 
uid).setTimeout(ACCEPTED_TIMEOUT).setType(DMT.FNPAccepted);
+            MessageFilter mfRejectedLoop = 
MessageFilter.create().setSource(next).setField(DMT.UID, 
uid).setTimeout(ACCEPTED_TIMEOUT).setType(DMT.FNPRejectedLoop);
+            MessageFilter mfRejectedOverload = 
MessageFilter.create().setSource(next).setField(DMT.UID, 
uid).setTimeout(ACCEPTED_TIMEOUT).setType(DMT.FNPRejectedOverload);
+            
+            // mfRejectedOverload must be the last thing in the or
+            // So its or pointer remains null
+            // Otherwise we need to recreate it below
+            mfRejectedOverload.clearOr();
+            MessageFilter mf = 
mfAccepted.or(mfRejectedLoop.or(mfRejectedOverload));
+            
+            // Send to next node
+            
+            try {
+                               next.send(req);
+                       } catch (NotConnectedException e1) {
+                               Logger.minor(this, "Not connected to "+next);
+                               continue;
+                       }
+            sentRequest = true;
+            
+            if(receiveFailed) return; // don't need to set status as killed by 
InsertHandler
+            Message msg = null;
+            
+            /*
+             * Because messages may be re-ordered, it is
+             * entirely possible that we get a non-local RejectedOverload,
+             * followed by an Accepted. So we must loop here.
+             */
+            
+            while (true) {
+               
+                               try {
+                                       msg = node.usm.waitFor(mf);
+                               } catch (DisconnectedException e) {
+                                       Logger.normal(this, "Disconnected from 
" + next
+                                                       + " while waiting for 
Accepted");
+                                       break;
+                               }
+                               
+                               if (receiveFailed)
+                                       return; // don't need to set status as 
killed by InsertHandler
+                               
+                               if (msg == null) {
+                                       // Terminal overload
+                                       // Try to propagate back to source
+                                       Logger.minor(this, "Timeout");
+                                       next.localRejectedOverload();
+                                       finish(TIMED_OUT, next);
+                                       return;
+                               }
+                               
+                               if (msg.getSpec() == DMT.FNPRejectedOverload) {
+                                       // Non-fatal - probably still have time 
left
+                                       if (msg.getBoolean(DMT.IS_LOCAL)) {
+                                               next.localRejectedOverload();
+                                               Logger.minor(this,
+                                                                               
"Local RejectedOverload, moving on to next peer");
+                                               // Give up on this one, try 
another
+                                               break;
+                                       } else {
+                                               forwardRejectedOverload();
+                                       }
+                                       continue;
+                               }
+                               
+                               if (msg.getSpec() == DMT.FNPRejectedLoop) {
+                                       next.successNotOverload();
+                                       // Loop - we don't want to send the 
data to this one
+                                       break;
+                               }
+                               
+                               if (msg.getSpec() != DMT.FNPAccepted) {
+                                       Logger.error(this,
+                                                       "Unexpected message 
waiting for Accepted: "
+                                                                       + msg);
+                                       break;
+                               }
+                               // Otherwise is an FNPAccepted
+                               break;
+                       }
+            
+            if(msg == null || msg.getSpec() != DMT.FNPAccepted) continue;
+            
+            Logger.minor(this, "Got Accepted on "+this);
+            
+            // Send them the data.
+            // Which might be the new data resulting from a collision...
+
+            Message dataInsert;
+            PartiallyReceivedBlock prbNow;
+            prbNow = prb;
+            dataInsert = DMT.createFNPDataInsert(uid, headers);
+            /** What are we waiting for now??:
+             * - FNPRouteNotFound - couldn't exhaust HTL, but send us the 
+             *   data anyway please
+             * - FNPInsertReply - used up all HTL, yay
+             * - FNPRejectOverload - propagating an overload error :(
+             * - FNPRejectTimeout - we took too long to send the DataInsert
+             * - FNPDataInsertRejected - the insert was invalid
+             */
+            
+            MessageFilter mfInsertReply = 
MessageFilter.create().setSource(next).setField(DMT.UID, 
uid).setTimeout(SEARCH_TIMEOUT).setType(DMT.FNPInsertReply);
+            mfRejectedOverload.setTimeout(SEARCH_TIMEOUT);
+            mfRejectedOverload.clearOr();
+            MessageFilter mfRouteNotFound = 
MessageFilter.create().setSource(next).setField(DMT.UID, 
uid).setTimeout(SEARCH_TIMEOUT).setType(DMT.FNPRouteNotFound);
+            MessageFilter mfDataInsertRejected = 
MessageFilter.create().setSource(next).setField(DMT.UID, 
uid).setTimeout(SEARCH_TIMEOUT).setType(DMT.FNPDataInsertRejected);
+            MessageFilter mfTimeout = 
MessageFilter.create().setSource(next).setField(DMT.UID, 
uid).setTimeout(SEARCH_TIMEOUT).setType(DMT.FNPRejectedTimeout);
+            
+            mf = 
mfInsertReply.or(mfRouteNotFound.or(mfDataInsertRejected.or(mfTimeout.or(mfRejectedOverload))));
+
+            Logger.minor(this, "Sending DataInsert");
+            if(receiveFailed) return;
+            try {
+                               next.send(dataInsert);
+                       } catch (NotConnectedException e1) {
+                               Logger.minor(this, "Not connected sending 
DataInsert: "+next+" for "+uid);
+                               continue;
+                       }
+
+            Logger.minor(this, "Sending data");
+            if(receiveFailed) return;
+            AwaitingCompletion ac = new AwaitingCompletion(next, prbNow);
+            synchronized(nodesWaitingForCompletion) {
+               nodesWaitingForCompletion.add(ac);
+               nodesWaitingForCompletion.notifyAll();
+            }
+            makeCompletionWaiter();
+
+            while (true) {
+
+                               if (receiveFailed)
+                                       return;
+                               
+                               try {
+                                       msg = node.usm.waitFor(mf);
+                               } catch (DisconnectedException e) {
+                                       Logger.normal(this, "Disconnected from 
" + next
+                                                       + " while waiting for 
InsertReply on " + this);
+                                       break;
+                               }
+                               if (receiveFailed)
+                                       return;
+                               
+                               if (msg == null || msg.getSpec() == 
DMT.FNPRejectedTimeout) {
+                                       // Timeout :(
+                                       // Fairly serious problem
+                                       Logger.error(this, "Timeout (" + msg
+                                                       + ") after Accepted in 
insert");
+                                       // Terminal overload
+                                       // Try to propagate back to source
+                                       next.localRejectedOverload();
+                                       finish(TIMED_OUT, next);
+                                       return;
+                               }
+
+                               if (msg.getSpec() == DMT.FNPRejectedOverload) {
+                                       // Probably non-fatal, if so, we have 
time left, can try next one
+                                       if (msg.getBoolean(DMT.IS_LOCAL)) {
+                                               next.localRejectedOverload();
+                                               Logger.minor(this,
+                                                               "Local 
RejectedOverload, moving on to next peer");
+                                               // Give up on this one, try 
another
+                                               break;
+                                       } else {
+                                               forwardRejectedOverload();
+                                       }
+                                       continue; // Wait for any further 
response
+                               }
+
+                               if (msg.getSpec() == DMT.FNPRouteNotFound) {
+                                       Logger.minor(this, "Rejected: RNF");
+                                       short newHtl = msg.getShort(DMT.HTL);
+                                       if (htl > newHtl)
+                                               htl = newHtl;
+                                       // Finished as far as this node is 
concerned
+                                       next.successNotOverload();
+                                       break;
+                               }
+
+                               if (msg.getSpec() == DMT.FNPDataInsertRejected) 
{
+                                       next.successNotOverload();
+                                       short reason = msg
+                                                       
.getShort(DMT.DATA_INSERT_REJECTED_REASON);
+                                       Logger.minor(this, "DataInsertRejected: 
" + reason);
+                                               if (reason == 
DMT.DATA_INSERT_REJECTED_VERIFY_FAILED) {
+                                               if (fromStore) {
+                                                       // That's odd...
+                                                       
Logger.error(this,"Verify failed on next node "
+                                                                       + next 
+ " for DataInsert but we were sending from the store!");
+                                               } else {
+                                                       try {
+                                                               if 
(!prb.allReceived())
+                                                                       
Logger.error(this,
+                                                                               
        "Did not receive all packets but next node says invalid anyway!");
+                                                               else {
+                                                                       // 
Check the data
+                                                                       new 
CHKBlock(prb.getBlock(), headers,
+                                                                               
        myKey);
+                                                                       
Logger.error(this,
+                                                                               
        "Verify failed on " + next
+                                                                               
        + " but data was valid!");
+                                                               }
+                                                       } catch 
(CHKVerifyException e) {
+                                                               Logger
+                                                                               
.normal(this,
+                                                                               
                "Verify failed because data was invalid");
+                                                       } catch 
(AbortedException e) {
+                                                               receiveFailed = 
true;
+                                                       }
+                                               }
+                                               break; // What else can we do?
+                                       } else if (reason == 
DMT.DATA_INSERT_REJECTED_RECEIVE_FAILED) {
+                                               if (receiveFailed) {
+                                                       Logger.minor(this, 
"Failed to receive data, so failed to send data");
+                                               } else {
+                                                       try {
+                                                               if 
(prb.allReceived()) {
+                                                                       
Logger.error(this, "Received all data but send failed to " + next);
+                                                               } else {
+                                                                       if 
(prb.isAborted()) {
+                                                                               
Logger.normal(this, "Send failed: aborted: " + prb.getAbortReason() + ": " + 
prb.getAbortDescription());
+                                                                       } else
+                                                                               
Logger.normal(this, "Send failed; have not yet received all data but not 
aborted: " + next);
+                                                               }
+                                                       } catch 
(AbortedException e) {
+                                                               receiveFailed = 
true;
+                                                       }
+                                               }
+                                       }
+                                       Logger.error(this, "DataInsert 
rejected! Reason="
+                                                       + 
DMT.getDataInsertRejectedReason(reason));
+                                       break;
+                               }
+                               
+                               if (msg.getSpec() != DMT.FNPInsertReply) {
+                                       Logger.error(this, "Unknown reply: " + 
msg);
+                                       finish(INTERNAL_ERROR, next);
+                               }
+                               
+                               // Our task is complete
+                               next.successNotOverload();
+                               finish(SUCCESS, next);
+                               return;
+                       }
+               }
+       }
+
+       private boolean hasForwardedRejectedOverload = false;
+    
+    synchronized boolean receivedRejectedOverload() {
+       return hasForwardedRejectedOverload;
+    }
+    
+    /** Forward RejectedOverload to the request originator.
+     * DO NOT CALL if have a *local* RejectedOverload.
+     */
+    private synchronized void forwardRejectedOverload() {
+       if(hasForwardedRejectedOverload) return;
+       hasForwardedRejectedOverload = true;
+               notifyAll();
+       }
+    
+    private void finish(int code, PeerNode next) {
+        Logger.minor(this, "Finished: "+code+" on "+this, new 
Exception("debug"));
+        if(status != NOT_FINISHED)
+               throw new IllegalStateException("finish() called with "+code+" 
when was already "+status);
+
+        setStatusTime = System.currentTimeMillis();
+        
+        if(code == ROUTE_NOT_FOUND && !sentRequest)
+               code = ROUTE_REALLY_NOT_FOUND;
+        
+        status = code;
+        
+        synchronized(this) {
+            notifyAll();
+        }
+
+        Logger.minor(this, "Set status code: "+getStatusString()+" on "+uid);
+        
+        // Now wait for transfers, or for downstream transfer notifications.
+        
+        synchronized(this) {
+               if(cw != null) {
+                       while(!allTransfersCompleted) {
+                               try {
+                                       wait(10*1000);
+                               } catch (InterruptedException e) {
+                                       // Try again
+                               }
+                       }
+               } else {
+                       // There weren't any transfers
+                       allTransfersCompleted = true;
+               }
+            notifyAll();
+        }
+        
+        Logger.minor(this, "Returning from finish()");
+    }
+
+    public int getStatus() {
+        return status;
+    }
+    
+    public short getHTL() {
+        return htl;
+    }
+
+    /**
+     * Called by InsertHandler to notify that the receive has
+     * failed.
+     */
+    public void receiveFailed() {
+        receiveFailed = true;
+    }
+
+    /**
+     * @return The current status as a string
+     */
+    public String getStatusString() {
+        if(status == SUCCESS)
+            return "SUCCESS";
+        if(status == ROUTE_NOT_FOUND)
+            return "ROUTE NOT FOUND";
+        if(status == NOT_FINISHED)
+            return "NOT FINISHED";
+        if(status == INTERNAL_ERROR)
+               return "INTERNAL ERROR";
+        if(status == TIMED_OUT)
+               return "TIMED OUT";
+        if(status == GENERATED_REJECTED_OVERLOAD)
+               return "GENERATED REJECTED OVERLOAD";
+        if(status == ROUTE_REALLY_NOT_FOUND)
+               return "ROUTE REALLY NOT FOUND";
+        return "UNKNOWN STATUS CODE: "+status;
+    }
+
+       public boolean sentRequest() {
+               return sentRequest;
+       }
+       
+       private synchronized void makeCompletionWaiter() {
+               if(cw == null) {
+                       cw = new CompletionWaiter();
+                       Thread t = new Thread(cw, "Completion waiter for "+uid);
+                       t.setDaemon(true);
+                       t.start();
+               }
+       }
+       
+       private class CompletionWaiter implements Runnable {
+               
+               public void run() {
+                       Logger.minor(this, "Starting "+this);
+outer:         while(true) {
+                       AwaitingCompletion[] waiters;
+                       synchronized(nodesWaitingForCompletion) {
+                               waiters = new 
AwaitingCompletion[nodesWaitingForCompletion.size()];
+                               waiters = (AwaitingCompletion[]) 
nodesWaitingForCompletion.toArray(waiters);
+                       }
+                       
+                       // First calculate the timeout
+                       
+                       int timeout;
+                       boolean noTimeLeft = false;
+
+                       long now = System.currentTimeMillis();
+                       if(status == NOT_FINISHED) {
+                               // Wait 5 seconds, then try again
+                               timeout = 5000;
+                       } else {
+                               // Completed, wait for everything
+                               timeout = (int)Math.min(Integer.MAX_VALUE, 
(setStatusTime + TRANSFER_COMPLETION_TIMEOUT) - now);
+                       }
+                       if(timeout <= 0) {
+                               noTimeLeft = true;
+                               timeout = 1;
+                       }
+                       
+                       MessageFilter mf = null;
+                       for(int i=0;i<waiters.length;i++) {
+                               AwaitingCompletion awc = waiters[i];
+                               if(!awc.pn.isConnected()) {
+                                       Logger.normal(this, "Disconnected: 
"+awc.pn+" in "+CHKInsertSender.this);
+                                       continue;
+                               }
+                               if(!awc.receivedCompletionNotice) {
+                                       MessageFilter m =
+                                               
MessageFilter.create().setField(DMT.UID, 
uid).setType(DMT.FNPInsertTransfersCompleted).setSource(awc.pn).setTimeout(timeout);
+                                       if(mf == null)
+                                               mf = m;
+                                       else
+                                               mf = m.or(mf);
+                                       Logger.minor(this, "Waiting for 
"+awc.pn.getPeer());
+                               }
+                       }
+                       
+                       if(mf == null) {
+                               if(status != NOT_FINISHED) {
+                                       if(nodesWaitingForCompletion.size() != 
waiters.length) {
+                                               // Added another one
+                                               Logger.minor(this, "Looping 
(mf==null): waiters="+waiters.length+" but 
waiting="+nodesWaitingForCompletion.size());
+                                               continue;
+                                       }
+                                       if(waitForCompletedTransfers(waiters, 
timeout, noTimeLeft)) {
+                                               
synchronized(CHKInsertSender.this) {
+                                                       allTransfersCompleted = 
true;
+                                                       
CHKInsertSender.this.notifyAll();
+                                               }
+                                               return;
+                                       }
+                                       if(noTimeLeft) {
+                                               for(int 
i=0;i<waiters.length;i++) {
+                                                       
if(!waiters[i].pn.isConnected()) continue;
+                                                       
if(!waiters[i].completedTransfer) {
+                                                               
waiters[i].completedTransfer(false);
+                                                       }
+                                               }
+                                               
synchronized(CHKInsertSender.this) {
+                                                       allTransfersCompleted = 
true;
+                                                       
CHKInsertSender.this.notifyAll();
+                                               }
+                                               return;
+                                       }
+                                       // Otherwise, not finished, go back 
around loop
+                                       continue;
+                               } else {
+                                       // Still waiting for request 
completion, so more may be added
+                                       synchronized(nodesWaitingForCompletion) 
{
+                                               try {
+                                                       
nodesWaitingForCompletion.wait(timeout);
+                                               } catch (InterruptedException 
e) {
+                                                       // Go back around the 
loop
+                                               }
+                                       }
+                               }
+                               continue;
+                       } else {
+                               Message m;
+                               try {
+                                       m = node.usm.waitFor(mf);
+                               } catch (DisconnectedException e) {
+                                       // Which one? I have no idea.
+                                       // Go around the loop again.
+                                       continue;
+                               }
+                               if(m != null) {
+                                       // Process message
+                                       PeerNode pn = (PeerNode) m.getSource();
+                                       boolean processed = false;
+                                       for(int i=0;i<waiters.length;i++) {
+                                               PeerNode p = waiters[i].pn;
+                                               if(p == pn) {
+                                                       boolean anyTimedOut = 
m.getBoolean(DMT.ANY_TIMED_OUT);
+                                                       
waiters[i].completed(false, !anyTimedOut);
+                                                       if(anyTimedOut) {
+                                                               
synchronized(CHKInsertSender.this) {
+                                                                       
if(!transferTimedOut) {
+                                                                               
transferTimedOut = true;
+                                                                               
CHKInsertSender.this.notifyAll();
+                                                                       }
+                                                               }
+                                                       }
+                                                       processed = true;
+                                                       break;
+                                               }
+                                       }
+                                       if(!processed) {
+                                               Logger.error(this, "Did not 
process message: "+m+" on "+this);
+                                       }
+                               } else {
+                                       if(nodesWaitingForCompletion.size() > 
waiters.length) {
+                                               // Added another one
+                                               Logger.minor(this, "Looping: 
waiters="+waiters.length+" but waiting="+nodesWaitingForCompletion.size());
+                                               continue;
+                                       }
+                                       if(noTimeLeft) {
+                                               Logger.minor(this, "Overall 
timeout on "+CHKInsertSender.this);
+                                               for(int 
i=0;i<waiters.length;i++) {
+                                                       
if(!waiters[i].pn.isConnected()) continue;
+                                                       
if(!waiters[i].receivedCompletionNotice)
+                                                               
waiters[i].completed(false, false);
+                                                       
if(!waiters[i].completedTransfer)
+                                                               
waiters[i].completedTransfer(false);
+                                               }
+                                               
synchronized(CHKInsertSender.this) {
+                                                       transferTimedOut = true;
+                                                       allTransfersCompleted = 
true;
+                                                       
CHKInsertSender.this.notifyAll();
+                                               }
+                                               return;
+                                       }
+                               }
+                       }
+               }
+               }
+
+               /** @return True if all transfers have completed, false 
otherwise. */
+               private boolean waitForCompletedTransfers(AwaitingCompletion[] 
waiters, int timeout, boolean noTimeLeft) {
+                       // MAYBE all done
+                       boolean completedTransfers = true;
+                       synchronized(nodesWaitingForCompletion) {
+                               for(int i=0;i<waiters.length;i++) {
+                                       if(!waiters[i].pn.isConnected()) 
continue;
+                                       if(!waiters[i].completedTransfer) {
+                                               completedTransfers = false;
+                                               break;
+                                       }
+                               }
+                               if(!completedTransfers) {
+                                       try {
+                                               if(!noTimeLeft) {
+                                                       
nodesWaitingForCompletion.wait(timeout);
+                                               } else {
+                                                       // Timed out
+                                               }
+                                               completedTransfers = true;
+                                               for(int 
i=0;i<waiters.length;i++) {
+                                                       
if(!waiters[i].pn.isConnected()) continue;
+                                                       
if(!waiters[i].completedTransfer) {
+                                                               
completedTransfers = false;
+                                                               break;
+                                                       }
+                                               }
+                                       } catch (InterruptedException e) {
+                                               // Ignore
+                                       }
+                               }
+                       }
+                       if(completedTransfers) {
+                               // All done!
+                               Logger.minor(this, "Completed, 
status="+getStatusString()+", nothing left to wait for.");
+                               synchronized(CHKInsertSender.this) {
+                                       allTransfersCompleted = true;
+                                       CHKInsertSender.this.notifyAll();
+                               }
+                               return true;
+                       } else return false;
+               }
+
+               public String toString() {
+                       return super.toString()+" for "+uid;
+               }
+       }
+
+       public boolean completed() {
+               return allTransfersCompleted;
+       }
+
+       public boolean anyTransfersFailed() {
+               return transferTimedOut;
+       }
+}

Modified: trunk/freenet/src/freenet/node/InsertHandler.java
===================================================================
--- trunk/freenet/src/freenet/node/InsertHandler.java   2006-01-06 19:26:42 UTC 
(rev 7769)
+++ trunk/freenet/src/freenet/node/InsertHandler.java   2006-01-06 21:18:26 UTC 
(rev 7770)
@@ -35,7 +35,7 @@
     final long startTime;
     private double closestLoc;
     private short htl;
-    private InsertSender sender;
+    private CHKInsertSender sender;
     private byte[] headers;
     private BlockReceiver br;
     private Thread runThread;
@@ -104,7 +104,7 @@
         headers = ((ShortBuffer)msg.getObject(DMT.BLOCK_HEADERS)).getData();
         // FIXME check the headers

-        // Now create an InsertSender, or use an existing one, or
+        // Now create an CHKInsertSender, or use an existing one, or
         // discover that the data is in the store.

         // From this point onwards, if we return cleanly we must go through 
finish().
@@ -134,16 +134,16 @@
         // What do we want to wait for?
         // If the data receive completes, that's very nice,
         // but doesn't really matter. What matters is what
-        // happens to the InsertSender. If the data receive
+        // happens to the CHKInsertSender. If the data receive
         // fails, that does matter...

-        // We are waiting for a terminal status on the InsertSender,
+        // We are waiting for a terminal status on the CHKInsertSender,
         // including REPLIED_WITH_DATA.
         // If we get transfer failed, we can check whether the receive
         // failed first. If it did it's not our fault.
         // If the receive failed, and we haven't started transferring
         // yet, we probably want to kill the sender.
-        // So we call the wait method on the InsertSender, but we
+        // So we call the wait method on the CHKInsertSender, but we
         // also have a flag locally to indicate the receive failed.
         // And if it does, we interrupt.

@@ -152,7 +152,7 @@
         while(true) {
             synchronized(sender) {
                 try {
-                       if(sender.getStatus() == InsertSender.NOT_FINISHED)
+                       if(sender.getStatus() == CHKInsertSender.NOT_FINISHED)
                                sender.wait(5000);
                 } catch (InterruptedException e) {
                     // Cool, probably this is because the receive failed...
@@ -175,7 +175,7 @@

             int status = sender.getStatus();

-            if(status == InsertSender.NOT_FINISHED) {
+            if(status == CHKInsertSender.NOT_FINISHED) {
                 continue;
             }

@@ -191,20 +191,20 @@
             // Local RejectedOverload's (fatal).
             // Internal error counts as overload. It'd only create a timeout 
otherwise, which is the same thing anyway.
             // We *really* need a good way to deal with nodes that constantly 
R_O!
-            if(status == InsertSender.TIMED_OUT ||
-                       status == InsertSender.GENERATED_REJECTED_OVERLOAD ||
-                       status == InsertSender.INTERNAL_ERROR) {
+            if(status == CHKInsertSender.TIMED_OUT ||
+                       status == CHKInsertSender.GENERATED_REJECTED_OVERLOAD ||
+                       status == CHKInsertSender.INTERNAL_ERROR) {
                 msg = DMT.createFNPRejectedOverload(uid, true);
                 source.send(msg);
                 // Might as well store it anyway.
-                if(status == InsertSender.TIMED_OUT ||
-                               status == 
InsertSender.GENERATED_REJECTED_OVERLOAD)
+                if(status == CHKInsertSender.TIMED_OUT ||
+                               status == 
CHKInsertSender.GENERATED_REJECTED_OVERLOAD)
                        canCommit = true;
                 finish();
                 return;
             }

-            if(status == InsertSender.ROUTE_NOT_FOUND || status == 
InsertSender.ROUTE_REALLY_NOT_FOUND) {
+            if(status == CHKInsertSender.ROUTE_NOT_FOUND || status == 
CHKInsertSender.ROUTE_REALLY_NOT_FOUND) {
                 msg = DMT.createFNPRouteNotFound(uid, sender.getHTL());
                 source.send(msg);
                 canCommit = true;
@@ -212,7 +212,7 @@
                 return;
             }

-            if(status == InsertSender.SUCCESS) {
+            if(status == CHKInsertSender.SUCCESS) {
                msg = DMT.createFNPInsertReply(uid);
                sentSuccess = true;
                source.send(msg);

Deleted: trunk/freenet/src/freenet/node/InsertSender.java
===================================================================
--- trunk/freenet/src/freenet/node/InsertSender.java    2006-01-06 19:26:42 UTC 
(rev 7769)
+++ trunk/freenet/src/freenet/node/InsertSender.java    2006-01-06 21:18:26 UTC 
(rev 7770)
@@ -1,778 +0,0 @@
-package freenet.node;
-
-import java.util.HashSet;
-import java.util.Vector;
-
-import freenet.io.comm.DMT;
-import freenet.io.comm.DisconnectedException;
-import freenet.io.comm.Message;
-import freenet.io.comm.MessageFilter;
-import freenet.io.xfer.BlockTransmitter;
-import freenet.io.xfer.PartiallyReceivedBlock;
-import freenet.keys.CHKBlock;
-import freenet.keys.CHKVerifyException;
-import freenet.keys.NodeCHK;
-import freenet.support.Logger;
-
-public final class InsertSender implements Runnable {
-
-       private class AwaitingCompletion {
-               
-               /** Node we are waiting for response from */
-               final PeerNode pn;
-               /** We may be sending data to that node */
-               BlockTransmitter bt;
-               /** Have we received notice of the downstream success
-                * or failure of dependant transfers from that node?
-                * Includes timing out. */
-               boolean receivedCompletionNotice = false;
-               /** Timed out - didn't receive completion notice in
-                * the allotted time?? */
-               boolean completionTimedOut = false;
-               /** Was the notification of successful transfer? */
-               boolean completionSucceeded;
-               
-               /** Have we completed the immediate transfer? */
-               boolean completedTransfer = false;
-               /** Did it succeed? */
-               boolean transferSucceeded = false;
-               
-               AwaitingCompletion(PeerNode pn, PartiallyReceivedBlock prb) {
-                       this.pn = pn;
-                       bt = new BlockTransmitter(node.usm, pn, uid, prb);
-                       Sender s = new Sender(this);
-            Thread senderThread = new Thread(s, "Sender for "+uid+" to 
"+pn.getPeer());
-            senderThread.setDaemon(true);
-            senderThread.start();
-               }
-               
-               void completed(boolean timeout, boolean success) {
-                       synchronized(this) {
-                               if(timeout)
-                                       completionTimedOut = true;
-                               else
-                                       completionSucceeded = success;
-                               receivedCompletionNotice = true;
-                               notifyAll();
-                       }
-                       synchronized(nodesWaitingForCompletion) {
-                               nodesWaitingForCompletion.notifyAll();
-                       }
-                       if(!success) {
-                               synchronized(InsertSender.this) {
-                                       transferTimedOut = true;
-                                       InsertSender.this.notifyAll();
-                               }
-                       }
-               }
-               
-               void completedTransfer(boolean success) {
-                       synchronized(this) {
-                               transferSucceeded = success;
-                               completedTransfer = true;
-                               notifyAll();
-                       }
-                       synchronized(nodesWaitingForCompletion) {
-                               nodesWaitingForCompletion.notifyAll();
-                       }
-                       if(!success) {
-                               synchronized(InsertSender.this) {
-                                       transferTimedOut = true;
-                                       InsertSender.this.notifyAll();
-                               }
-                       }
-               }
-       }
-       
-    public class Sender implements Runnable {
-       
-       final AwaitingCompletion completion;
-       final BlockTransmitter bt;
-       
-       public Sender(AwaitingCompletion ac) {
-               this.bt = ac.bt;
-               this.completion = ac;
-       }
-       
-               public void run() {
-                       try {
-                               bt.send();
-                               if(bt.failedDueToOverload()) {
-                                       completion.completedTransfer(false);
-                               } else {
-                                       completion.completedTransfer(true);
-                               }
-                       } catch (Throwable t) {
-                               completion.completedTransfer(false);
-                               Logger.error(this, "Caught "+t, t);
-                       }
-               }
-       }
-    
-       InsertSender(NodeCHK myKey, long uid, byte[] headers, short htl, 
-            PeerNode source, Node node, PartiallyReceivedBlock prb, boolean 
fromStore, double closestLocation) {
-        this.myKey = myKey;
-        this.target = myKey.toNormalizedDouble();
-        this.uid = uid;
-        this.headers = headers;
-        this.htl = htl;
-        this.source = source;
-        this.node = node;
-        this.prb = prb;
-        this.fromStore = fromStore;
-        this.closestLocation = closestLocation;
-        this.startTime = System.currentTimeMillis();
-        this.nodesWaitingForCompletion = new Vector();
-        Thread t = new Thread(this, "InsertSender for UID "+uid+" on 
"+node.portNumber+" at "+System.currentTimeMillis());
-        t.setDaemon(true);
-        t.start();
-    }
-    
-    // Constants
-    static final int ACCEPTED_TIMEOUT = 10000;
-    static final int SEARCH_TIMEOUT = 60000;
-    static final int TRANSFER_COMPLETION_TIMEOUT = 120000;
-
-    // Basics
-    final NodeCHK myKey;
-    final double target;
-    final long uid;
-    short htl;
-    final PeerNode source;
-    final Node node;
-    final byte[] headers; // received BEFORE creation => we handle Accepted 
elsewhere
-    final PartiallyReceivedBlock prb;
-    final boolean fromStore;
-    private boolean receiveFailed = false;
-    final double closestLocation;
-    final long startTime;
-    private boolean sentRequest;
-    
-    /** List of nodes we are waiting for either a transfer completion
-     * notice or a transfer completion from. */
-    private Vector nodesWaitingForCompletion;
-    
-    /** Have all transfers completed and all nodes reported completion status? 
*/
-    private boolean allTransfersCompleted = false;
-    
-    /** Has a transfer timed out, either directly or downstream? */
-    private boolean transferTimedOut = false;
-    
-    /** Runnable which waits for completion of all transfers */
-    private CompletionWaiter cw = null;
-
-    /** Time at which we set status to a value other than NOT_FINISHED */
-    private long setStatusTime = -1;
-    
-    
-    private int status = -1;
-    /** Still running */
-    static final int NOT_FINISHED = -1;
-    /** Successful insert */
-    static final int SUCCESS = 0;
-    /** Route not found */
-    static final int ROUTE_NOT_FOUND = 1;
-    /** Internal error */
-    static final int INTERNAL_ERROR = 3;
-    /** Timed out waiting for response */
-    static final int TIMED_OUT = 4;
-    /** Locally Generated a RejectedOverload */
-    static final int GENERATED_REJECTED_OVERLOAD = 5;
-    /** Could not get off the node at all! */
-    static final int ROUTE_REALLY_NOT_FOUND = 6;
-    
-    public String toString() {
-        return super.toString()+" for "+uid;
-    }
-    
-    public void run() {
-        short origHTL = htl;
-        try {
-        HashSet nodesRoutedTo = new HashSet();
-        HashSet nodesNotIgnored = new HashSet();
-        
-        while(true) {
-            if(receiveFailed) return; // don't need to set status as killed by 
InsertHandler
-            
-            if(htl == 0) {
-                // Send an InsertReply back
-                finish(SUCCESS, null);
-                return;
-            }
-            
-            // Route it
-            PeerNode next;
-            // Can backtrack, so only route to nodes closer than we are to 
target.
-            double nextValue;
-            synchronized(node.peers) {
-                next = node.peers.closerPeer(source, nodesRoutedTo, 
nodesNotIgnored, target, true);
-                if(next != null)
-                    nextValue = next.getLocation().getValue();
-                else
-                    nextValue = -1.0;
-            }
-            
-            if(next == null) {
-                // Backtrack
-                finish(ROUTE_NOT_FOUND, null);
-                return;
-            }
-            Logger.minor(this, "Routing insert to "+next);
-            nodesRoutedTo.add(next);
-            
-            if(Math.abs(target - nextValue) > Math.abs(target - 
closestLocation)) {
-                Logger.minor(this, "Backtracking: target="+target+" 
next="+nextValue+" closest="+closestLocation);
-                htl = node.decrementHTL(source, htl);
-            }
-            
-            Message req = DMT.createFNPInsertRequest(uid, htl, myKey, 
closestLocation);
-            
-            // Wait for ack or reject... will come before even a locally 
generated DataReply
-            
-            MessageFilter mfAccepted = 
MessageFilter.create().setSource(next).setField(DMT.UID, 
uid).setTimeout(ACCEPTED_TIMEOUT).setType(DMT.FNPAccepted);
-            MessageFilter mfRejectedLoop = 
MessageFilter.create().setSource(next).setField(DMT.UID, 
uid).setTimeout(ACCEPTED_TIMEOUT).setType(DMT.FNPRejectedLoop);
-            MessageFilter mfRejectedOverload = 
MessageFilter.create().setSource(next).setField(DMT.UID, 
uid).setTimeout(ACCEPTED_TIMEOUT).setType(DMT.FNPRejectedOverload);
-            
-            // mfRejectedOverload must be the last thing in the or
-            // So its or pointer remains null
-            // Otherwise we need to recreate it below
-            mfRejectedOverload.clearOr();
-            MessageFilter mf = 
mfAccepted.or(mfRejectedLoop.or(mfRejectedOverload));
-            
-            // Send to next node
-            
-            next.send(req);
-            sentRequest = true;
-            
-            if(receiveFailed) return; // don't need to set status as killed by 
InsertHandler
-            Message msg = null;
-            
-            /*
-             * Because messages may be re-ordered, it is
-             * entirely possible that we get a non-local RejectedOverload,
-             * followed by an Accepted. So we must loop here.
-             */
-            
-            while (true) {
-               
-                               try {
-                                       msg = node.usm.waitFor(mf);
-                               } catch (DisconnectedException e) {
-                                       Logger.normal(this, "Disconnected from 
" + next
-                                                       + " while waiting for 
Accepted");
-                                       break;
-                               }
-                               
-                               if (receiveFailed)
-                                       return; // don't need to set status as 
killed by InsertHandler
-                               
-                               if (msg == null) {
-                                       // Terminal overload
-                                       // Try to propagate back to source
-                                       Logger.minor(this, "Timeout");
-                                       next.localRejectedOverload();
-                                       finish(TIMED_OUT, next);
-                                       return;
-                               }
-                               
-                               if (msg.getSpec() == DMT.FNPRejectedOverload) {
-                                       // Non-fatal - probably still have time 
left
-                                       if (msg.getBoolean(DMT.IS_LOCAL)) {
-                                               next.localRejectedOverload();
-                                               Logger.minor(this,
-                                                                               
"Local RejectedOverload, moving on to next peer");
-                                               // Give up on this one, try 
another
-                                               break;
-                                       } else {
-                                               forwardRejectedOverload();
-                                       }
-                                       continue;
-                               }
-                               
-                               if (msg.getSpec() == DMT.FNPRejectedLoop) {
-                                       next.successNotOverload();
-                                       // Loop - we don't want to send the 
data to this one
-                                       break;
-                               }
-                               
-                               if (msg.getSpec() != DMT.FNPAccepted) {
-                                       Logger.error(this,
-                                                       "Unexpected message 
waiting for Accepted: "
-                                                                       + msg);
-                                       break;
-                               }
-                               // Otherwise is an FNPAccepted
-                               break;
-                       }
-            
-            if(msg == null || msg.getSpec() != DMT.FNPAccepted) continue;
-            
-            Logger.minor(this, "Got Accepted on "+this);
-            
-            // Send them the data.
-            // Which might be the new data resulting from a collision...
-
-            Message dataInsert;
-            PartiallyReceivedBlock prbNow;
-            prbNow = prb;
-            dataInsert = DMT.createFNPDataInsert(uid, headers);
-            /** What are we waiting for now??:
-             * - FNPRouteNotFound - couldn't exhaust HTL, but send us the 
-             *   data anyway please
-             * - FNPInsertReply - used up all HTL, yay
-             * - FNPRejectOverload - propagating an overload error :(
-             * - FNPDataFound - target already has the data, and the data is
-             *   an SVK/SSK/KSK, therefore could be different to what we are
-             *   inserting.
-             */
-            
-            MessageFilter mfInsertReply = 
MessageFilter.create().setSource(next).setField(DMT.UID, 
uid).setTimeout(SEARCH_TIMEOUT).setType(DMT.FNPInsertReply);
-            mfRejectedOverload.setTimeout(SEARCH_TIMEOUT);
-            mfRejectedOverload.clearOr();
-            MessageFilter mfRouteNotFound = 
MessageFilter.create().setSource(next).setField(DMT.UID, 
uid).setTimeout(SEARCH_TIMEOUT).setType(DMT.FNPRouteNotFound);
-            MessageFilter mfDataInsertRejected = 
MessageFilter.create().setSource(next).setField(DMT.UID, 
uid).setTimeout(SEARCH_TIMEOUT).setType(DMT.FNPDataInsertRejected);
-            MessageFilter mfTimeout = 
MessageFilter.create().setSource(next).setField(DMT.UID, 
uid).setTimeout(SEARCH_TIMEOUT).setType(DMT.FNPRejectedTimeout);
-            
-            mf = 
mfInsertReply.or(mfRouteNotFound.or(mfDataInsertRejected.or(mfTimeout.or(mfRejectedOverload))));
-
-            Logger.minor(this, "Sending DataInsert");
-            if(receiveFailed) return;
-            next.send(dataInsert);
-
-            Logger.minor(this, "Sending data");
-            if(receiveFailed) return;
-            AwaitingCompletion ac = new AwaitingCompletion(next, prbNow);
-            synchronized(nodesWaitingForCompletion) {
-               nodesWaitingForCompletion.add(ac);
-               nodesWaitingForCompletion.notifyAll();
-            }
-            makeCompletionWaiter();
-
-            while (true) {
-
-                               if (receiveFailed)
-                                       return;
-                               
-                               try {
-                                       msg = node.usm.waitFor(mf);
-                               } catch (DisconnectedException e) {
-                                       Logger.normal(this, "Disconnected from 
" + next
-                                                       + " while waiting for 
InsertReply on " + this);
-                                       break;
-                               }
-                               if (receiveFailed)
-                                       return;
-                               
-                               if (msg == null || msg.getSpec() == 
DMT.FNPRejectedTimeout) {
-                                       // Timeout :(
-                                       // Fairly serious problem
-                                       Logger.error(this, "Timeout (" + msg
-                                                       + ") after Accepted in 
insert");
-                                       // Terminal overload
-                                       // Try to propagate back to source
-                                       next.localRejectedOverload();
-                                       finish(TIMED_OUT, next);
-                                       return;
-                               }
-
-                               if (msg.getSpec() == DMT.FNPRejectedOverload) {
-                                       // Probably non-fatal, if so, we have 
time left, can try next one
-                                       if (msg.getBoolean(DMT.IS_LOCAL)) {
-                                               next.localRejectedOverload();
-                                               Logger.minor(this,
-                                                               "Local 
RejectedOverload, moving on to next peer");
-                                               // Give up on this one, try 
another
-                                               break;
-                                       } else {
-                                               forwardRejectedOverload();
-                                       }
-                                       continue; // Wait for any further 
response
-                               }
-
-                               if (msg.getSpec() == DMT.FNPRouteNotFound) {
-                                       Logger.minor(this, "Rejected: RNF");
-                                       short newHtl = msg.getShort(DMT.HTL);
-                                       if (htl > newHtl)
-                                               htl = newHtl;
-                                       // Finished as far as this node is 
concerned
-                                       next.successNotOverload();
-                                       break;
-                               }
-
-                               if (msg.getSpec() == DMT.FNPDataInsertRejected) 
{
-                                       next.successNotOverload();
-                                       short reason = msg
-                                                       
.getShort(DMT.DATA_INSERT_REJECTED_REASON);
-                                       Logger.minor(this, "DataInsertRejected: 
" + reason);
-                                               if (reason == 
DMT.DATA_INSERT_REJECTED_VERIFY_FAILED) {
-                                               if (fromStore) {
-                                                       // That's odd...
-                                                       
Logger.error(this,"Verify failed on next node "
-                                                                       + next 
+ " for DataInsert but we were sending from the store!");
-                                               } else {
-                                                       try {
-                                                               if 
(!prb.allReceived())
-                                                                       
Logger.error(this,
-                                                                               
        "Did not receive all packets but next node says invalid anyway!");
-                                                               else {
-                                                                       // 
Check the data
-                                                                       new 
CHKBlock(prb.getBlock(), headers,
-                                                                               
        myKey);
-                                                                       
Logger.error(this,
-                                                                               
        "Verify failed on " + next
-                                                                               
        + " but data was valid!");
-                                                               }
-                                                       } catch 
(CHKVerifyException e) {
-                                                               Logger
-                                                                               
.normal(this,
-                                                                               
                "Verify failed because data was invalid");
-                                                       }
-                                               }
-                                               break; // What else can we do?
-                                       } else if (reason == 
DMT.DATA_INSERT_REJECTED_RECEIVE_FAILED) {
-                                               if (receiveFailed) {
-                                                       Logger.minor(this, 
"Failed to receive data, so failed to send data");
-                                               } else {
-                                                       if (prb.allReceived()) {
-                                                               
Logger.error(this, "Received all data but send failed to " + next);
-                                                       } else {
-                                                               if 
(prb.isAborted()) {
-                                                                       
Logger.normal(this, "Send failed: aborted: " + prb.getAbortReason() + ": " + 
prb.getAbortDescription());
-                                                               } else
-                                                                       
Logger.normal(this, "Send failed; have not yet received all data but not 
aborted: " + next);
-                                                       }
-                                               }
-                                               break;
-                                       }
-                                       Logger.error(this, "DataInsert 
rejected! Reason="
-                                               + 
DMT.getDataInsertRejectedReason(reason));
-                               }
-                               
-                               if (msg.getSpec() != DMT.FNPInsertReply) {
-                                       Logger.error(this, "Unknown reply: " + 
msg);
-                                       finish(INTERNAL_ERROR, next);
-                               }
-                               
-                               // Our task is complete
-                               next.successNotOverload();
-                               finish(SUCCESS, next);
-                               return;
-                       }
-               }
-        } catch (Throwable t) {
-            Logger.error(this, "Caught "+t, t);
-            if(status == NOT_FINISHED)
-               finish(INTERNAL_ERROR, null);
-        } finally {
-            node.completed(uid);
-               node.removeInsertSender(myKey, origHTL, this);
-        }
-    }
-    
-    private boolean hasForwardedRejectedOverload = false;
-    
-    synchronized boolean receivedRejectedOverload() {
-       return hasForwardedRejectedOverload;
-    }
-    
-    /** Forward RejectedOverload to the request originator.
-     * DO NOT CALL if have a *local* RejectedOverload.
-     */
-    private synchronized void forwardRejectedOverload() {
-       if(hasForwardedRejectedOverload) return;
-       hasForwardedRejectedOverload = true;
-               notifyAll();
-       }
-    
-    private void finish(int code, PeerNode next) {
-        Logger.minor(this, "Finished: "+code+" on "+this, new 
Exception("debug"));
-        if(status != NOT_FINISHED)
-               throw new IllegalStateException("finish() called with "+code+" 
when was already "+status);
-
-        setStatusTime = System.currentTimeMillis();
-        
-        if(code == ROUTE_NOT_FOUND && !sentRequest)
-               code = ROUTE_REALLY_NOT_FOUND;
-        
-        status = code;
-        
-        synchronized(this) {
-            notifyAll();
-        }
-
-        Logger.minor(this, "Set status code: "+getStatusString());
-        
-        // Now wait for transfers, or for downstream transfer notifications.
-        
-        synchronized(this) {
-               if(cw != null) {
-                       while(!allTransfersCompleted) {
-                               try {
-                                       wait(10*1000);
-                               } catch (InterruptedException e) {
-                                       // Try again
-                               }
-                       }
-               } else {
-                       // There weren't any transfers
-                       allTransfersCompleted = true;
-               }
-            notifyAll();
-        }
-        
-        Logger.minor(this, "Returning from finish()");
-    }
-
-    public int getStatus() {
-        return status;
-    }
-    
-    public short getHTL() {
-        return htl;
-    }
-
-    /**
-     * Called by InsertHandler to notify that the receive has
-     * failed.
-     */
-    public void receiveFailed() {
-        receiveFailed = true;
-    }
-
-    /**
-     * @return The current status as a string
-     */
-    public String getStatusString() {
-        if(status == SUCCESS)
-            return "SUCCESS";
-        if(status == ROUTE_NOT_FOUND)
-            return "ROUTE NOT FOUND";
-        if(status == NOT_FINISHED)
-            return "NOT FINISHED";
-        if(status == INTERNAL_ERROR)
-               return "INTERNAL ERROR";
-        if(status == TIMED_OUT)
-               return "TIMED OUT";
-        if(status == GENERATED_REJECTED_OVERLOAD)
-               return "GENERATED REJECTED OVERLOAD";
-        if(status == ROUTE_REALLY_NOT_FOUND)
-               return "ROUTE REALLY NOT FOUND";
-        return "UNKNOWN STATUS CODE: "+status;
-    }
-
-       public boolean sentRequest() {
-               return sentRequest;
-       }
-       
-       private synchronized void makeCompletionWaiter() {
-               if(cw == null) {
-                       cw = new CompletionWaiter();
-                       Thread t = new Thread(cw, "Completion waiter for "+uid);
-                       t.setDaemon(true);
-                       t.start();
-               }
-       }
-       
-       private class CompletionWaiter implements Runnable {
-               
-               public void run() {
-                       Logger.minor(this, "Starting "+this);
-outer:         while(true) {
-                       AwaitingCompletion[] waiters;
-                       synchronized(nodesWaitingForCompletion) {
-                               waiters = new 
AwaitingCompletion[nodesWaitingForCompletion.size()];
-                               waiters = (AwaitingCompletion[]) 
nodesWaitingForCompletion.toArray(waiters);
-                       }
-                       
-                       // First calculate the timeout
-                       
-                       int timeout;
-                       boolean noTimeLeft = false;
-
-                       long now = System.currentTimeMillis();
-                       if(status == NOT_FINISHED) {
-                               // Wait 5 seconds, then try again
-                               timeout = 5000;
-                       } else {
-                               // Completed, wait for everything
-                               timeout = (int)Math.min(Integer.MAX_VALUE, 
(setStatusTime + TRANSFER_COMPLETION_TIMEOUT) - now);
-                       }
-                       if(timeout <= 0) {
-                               noTimeLeft = true;
-                               timeout = 1;
-                       }
-                       
-                       MessageFilter mf = null;
-                       for(int i=0;i<waiters.length;i++) {
-                               AwaitingCompletion awc = waiters[i];
-                               if(!awc.pn.isConnected()) {
-                                       Logger.normal(this, "Disconnected: 
"+awc.pn+" in "+InsertSender.this);
-                                       continue;
-                               }
-                               if(!awc.receivedCompletionNotice) {
-                                       MessageFilter m =
-                                               
MessageFilter.create().setField(DMT.UID, 
uid).setType(DMT.FNPInsertTransfersCompleted).setSource(awc.pn).setTimeout(timeout);
-                                       if(mf == null)
-                                               mf = m;
-                                       else
-                                               mf = m.or(mf);
-                                       Logger.minor(this, "Waiting for 
"+awc.pn.getPeer());
-                               }
-                       }
-                       
-                       if(mf == null) {
-                               if(status != NOT_FINISHED) {
-                                       if(nodesWaitingForCompletion.size() != 
waiters.length) {
-                                               // Added another one
-                                               Logger.minor(this, "Looping 
(mf==null): waiters="+waiters.length+" but 
waiting="+nodesWaitingForCompletion.size());
-                                               continue;
-                                       }
-                                       if(waitForCompletedTransfers(waiters, 
timeout, noTimeLeft)) {
-                                               synchronized(InsertSender.this) 
{
-                                                       allTransfersCompleted = 
true;
-                                                       
InsertSender.this.notifyAll();
-                                               }
-                                               return;
-                                       }
-                                       if(noTimeLeft) {
-                                               for(int 
i=0;i<waiters.length;i++) {
-                                                       
if(!waiters[i].pn.isConnected()) continue;
-                                                       
if(!waiters[i].completedTransfer) {
-                                                               
waiters[i].completedTransfer(false);
-                                                       }
-                                               }
-                                               synchronized(InsertSender.this) 
{
-                                                       allTransfersCompleted = 
true;
-                                                       
InsertSender.this.notifyAll();
-                                               }
-                                               return;
-                                       }
-                                       // Otherwise, not finished, go back 
around loop
-                                       continue;
-                               } else {
-                                       // Still waiting for request 
completion, so more may be added
-                                       synchronized(nodesWaitingForCompletion) 
{
-                                               try {
-                                                       
nodesWaitingForCompletion.wait(timeout);
-                                               } catch (InterruptedException 
e) {
-                                                       // Go back around the 
loop
-                                               }
-                                       }
-                               }
-                               continue;
-                       } else {
-                               Message m;
-                               try {
-                                       m = node.usm.waitFor(mf);
-                               } catch (DisconnectedException e) {
-                                       // Which one? I have no idea.
-                                       // Go around the loop again.
-                                       continue;
-                               }
-                               if(m != null) {
-                                       // Process message
-                                       PeerNode pn = (PeerNode) m.getSource();
-                                       boolean processed = false;
-                                       for(int i=0;i<waiters.length;i++) {
-                                               PeerNode p = waiters[i].pn;
-                                               if(p == pn) {
-                                                       boolean anyTimedOut = 
m.getBoolean(DMT.ANY_TIMED_OUT);
-                                                       
waiters[i].completed(false, !anyTimedOut);
-                                                       if(anyTimedOut) {
-                                                               
synchronized(InsertSender.this) {
-                                                                       
if(!transferTimedOut) {
-                                                                               
transferTimedOut = true;
-                                                                               
InsertSender.this.notifyAll();
-                                                                       }
-                                                               }
-                                                       }
-                                                       processed = true;
-                                                       break;
-                                               }
-                                       }
-                                       if(!processed) {
-                                               Logger.error(this, "Did not 
process message: "+m+" on "+this);
-                                       }
-                               } else {
-                                       if(nodesWaitingForCompletion.size() > 
waiters.length) {
-                                               // Added another one
-                                               Logger.minor(this, "Looping: 
waiters="+waiters.length+" but waiting="+nodesWaitingForCompletion.size());
-                                               continue;
-                                       }
-                                       if(noTimeLeft) {
-                                               Logger.minor(this, "Overall 
timeout on "+InsertSender.this);
-                                               for(int 
i=0;i<waiters.length;i++) {
-                                                       
if(!waiters[i].pn.isConnected()) continue;
-                                                       
if(!waiters[i].receivedCompletionNotice)
-                                                               
waiters[i].completed(false, false);
-                                                       
if(!waiters[i].completedTransfer)
-                                                               
waiters[i].completedTransfer(false);
-                                               }
-                                               synchronized(InsertSender.this) 
{
-                                                       transferTimedOut = true;
-                                                       allTransfersCompleted = 
true;
-                                                       
InsertSender.this.notifyAll();
-                                               }
-                                               return;
-                                       }
-                               }
-                       }
-               }
-               }
-
-               /** @return True if all transfers have completed, false 
otherwise. */
-               private boolean waitForCompletedTransfers(AwaitingCompletion[] 
waiters, int timeout, boolean noTimeLeft) {
-                       // MAYBE all done
-                       boolean completedTransfers = true;
-                       synchronized(nodesWaitingForCompletion) {
-                               for(int i=0;i<waiters.length;i++) {
-                                       if(!waiters[i].pn.isConnected()) 
continue;
-                                       if(!waiters[i].completedTransfer) {
-                                               completedTransfers = false;
-                                               break;
-                                       }
-                               }
-                               if(!completedTransfers) {
-                                       try {
-                                               if(!noTimeLeft) {
-                                                       
nodesWaitingForCompletion.wait(timeout);
-                                               } else {
-                                                       // Timed out
-                                               }
-                                               completedTransfers = true;
-                                               for(int 
i=0;i<waiters.length;i++) {
-                                                       
if(!waiters[i].pn.isConnected()) continue;
-                                                       
if(!waiters[i].completedTransfer) {
-                                                               
completedTransfers = false;
-                                                               break;
-                                                       }
-                                               }
-                                       } catch (InterruptedException e) {
-                                               // Ignore
-                                       }
-                               }
-                       }
-                       if(completedTransfers) {
-                               // All done!
-                               Logger.minor(this, "Completed, 
status="+getStatusString()+", nothing left to wait for.");
-                               synchronized(InsertSender.this) {
-                                       allTransfersCompleted = true;
-                                       InsertSender.this.notifyAll();
-                               }
-                               return true;
-                       } else return false;
-               }
-
-               public String toString() {
-                       return super.toString()+" for "+uid;
-               }
-       }
-
-       public boolean completed() {
-               return allTransfersCompleted;
-       }
-
-       public boolean anyTransfersFailed() {
-               return transferTimedOut;
-       }
-}

Modified: trunk/freenet/src/freenet/node/Node.java
===================================================================
--- trunk/freenet/src/freenet/node/Node.java    2006-01-06 19:26:42 UTC (rev 
7769)
+++ trunk/freenet/src/freenet/node/Node.java    2006-01-06 21:18:26 UTC (rev 
7770)
@@ -47,6 +47,7 @@
 import freenet.keys.ClientCHKBlock;
 import freenet.keys.ClientKey;
 import freenet.keys.ClientKeyBlock;
+import freenet.keys.Key;
 import freenet.keys.NodeCHK;
 import freenet.keys.SSKBlock;
 import freenet.store.BerkeleyDBFreenetStore;
@@ -128,7 +129,7 @@
     private final HashMap requestSenders;
     /** RequestSender's currently transferring, by key */
     private final HashMap transferringRequestSenders;
-    /** InsertSender's currently running, by KeyHTLPair */
+    /** CHKInsertSender's currently running, by KeyHTLPair */
     private final HashMap insertSenders;
     /** IP address detector */
     private final IPAddressDetector ipDetector;
@@ -555,9 +556,9 @@

     public void realPutCHK(ClientCHKBlock block, boolean cache) throws 
LowLevelPutException {
         byte[] data = block.getData();
-        byte[] headers = block.getHeader();
+        byte[] headers = block.getHeaders();
         PartiallyReceivedBlock prb = new 
PartiallyReceivedBlock(PACKETS_IN_BLOCK, PACKET_SIZE, data);
-        InsertSender is;
+        CHKInsertSender is;
         long uid = random.nextLong();
         if(!lockUID(uid)) {
             Logger.error(this, "Could not lock UID just randomly generated: 
"+uid+" - probably indicates broken PRNG");
@@ -579,14 +580,14 @@
         // Wait for status
         while(true) {
                synchronized(is) {
-                       if(is.getStatus() == InsertSender.NOT_FINISHED) {
+                       if(is.getStatus() == CHKInsertSender.NOT_FINISHED) {
                                try {
                                        is.wait(5*1000);
                                } catch (InterruptedException e) {
                                        // Ignore
                                }
                        }
-                       if(is.getStatus() != InsertSender.NOT_FINISHED) break;
+                       if(is.getStatus() != CHKInsertSender.NOT_FINISHED) 
break;
                }
                if((!hasForwardedRejectedOverload) && 
is.receivedRejectedOverload()) {
                        hasForwardedRejectedOverload = true;
@@ -615,8 +616,8 @@
         // Finished?
         if(!hasForwardedRejectedOverload) {
                // Is it ours? Did we send a request?
-               if(is.sentRequest() && is.uid == uid && (is.getStatus() == 
InsertSender.ROUTE_NOT_FOUND 
-                               || is.getStatus() == InsertSender.SUCCESS)) {
+               if(is.sentRequest() && is.uid == uid && (is.getStatus() == 
CHKInsertSender.ROUTE_NOT_FOUND 
+                               || is.getStatus() == CHKInsertSender.SUCCESS)) {
                        // It worked!
                        long endTime = System.currentTimeMillis();
                        long len = endTime - startTime;
@@ -624,33 +625,33 @@
                }
         }

-        if(is.getStatus() == InsertSender.SUCCESS) {
+        if(is.getStatus() == CHKInsertSender.SUCCESS) {
                Logger.normal(this, "Succeeded inserting "+block);
                return;
         } else {
                int status = is.getStatus();
                String msg = "Failed inserting "+block+" : 
"+is.getStatusString();
-               if(status == InsertSender.ROUTE_NOT_FOUND)
+               if(status == CHKInsertSender.ROUTE_NOT_FOUND)
                        msg += " - this is normal on small networks; the data 
will still be propagated, but it can't find the 20+ nodes needed for full 
success";
-               if(is.getStatus() != InsertSender.ROUTE_NOT_FOUND)
+               if(is.getStatus() != CHKInsertSender.ROUTE_NOT_FOUND)
                        Logger.error(this, msg);
                else
                        Logger.normal(this, msg);
                switch(is.getStatus()) {
-               case InsertSender.NOT_FINISHED:
+               case CHKInsertSender.NOT_FINISHED:
                        Logger.error(this, "IS still running in putCHK!: "+is);
                        throw new 
LowLevelPutException(LowLevelPutException.INTERNAL_ERROR);
-               case InsertSender.GENERATED_REJECTED_OVERLOAD:
-               case InsertSender.TIMED_OUT:
+               case CHKInsertSender.GENERATED_REJECTED_OVERLOAD:
+               case CHKInsertSender.TIMED_OUT:
                        throw new 
LowLevelPutException(LowLevelPutException.REJECTED_OVERLOAD);
-               case InsertSender.ROUTE_NOT_FOUND:
+               case CHKInsertSender.ROUTE_NOT_FOUND:
                        throw new 
LowLevelPutException(LowLevelPutException.ROUTE_NOT_FOUND);
-               case InsertSender.ROUTE_REALLY_NOT_FOUND:
+               case CHKInsertSender.ROUTE_REALLY_NOT_FOUND:
                        throw new 
LowLevelPutException(LowLevelPutException.ROUTE_REALLY_NOT_FOUND);
-               case InsertSender.INTERNAL_ERROR:
+               case CHKInsertSender.INTERNAL_ERROR:
                        throw new 
LowLevelPutException(LowLevelPutException.INTERNAL_ERROR);
                default:
-                       Logger.error(this, "Unknown InsertSender code in 
putCHK: "+is.getStatus()+" on "+is);
+                       Logger.error(this, "Unknown CHKInsertSender code in 
putCHK: "+is.getStatus()+" on "+is);
                        throw new 
LowLevelPutException(LowLevelPutException.INTERNAL_ERROR);
                }
         }
@@ -793,9 +794,9 @@
     }

     static class KeyHTLPair {
-        final NodeCHK key;
+        final Key key;
         final short htl;
-        KeyHTLPair(NodeCHK key, short htl) {
+        KeyHTLPair(Key key, short htl) {
             this.key = key;
             this.htl = htl;
         }
@@ -864,11 +865,11 @@
     }

     /**
-     * Remove an InsertSender from the map.
+     * Remove an CHKInsertSender from the map.
      */
-    public void removeInsertSender(NodeCHK key, short htl, InsertSender 
sender) {
+    public void removeInsertSender(Key key, short htl, AnyInsertSender sender) 
{
         KeyHTLPair kh = new KeyHTLPair(key, htl);
-        InsertSender is = (InsertSender) insertSenders.remove(kh);
+        AnyInsertSender is = (AnyInsertSender) insertSenders.remove(kh);
         if(is != sender) {
             Logger.error(this, "Removed "+is+" should be "+sender+" for 
"+key+","+htl+" in removeInsertSender");
         }
@@ -900,28 +901,28 @@
     }

     /**
-     * Fetch or create an InsertSender for a given key/htl.
+     * Fetch or create an CHKInsertSender for a given key/htl.
      * @param key The key to be inserted.
      * @param htl The current HTL. We can't coalesce inserts across
      * HTL's.
      * @param uid The UID of the caller's request chain, or a new
      * one. This is obviously not used if there is already an 
-     * InsertSender running.
+     * CHKInsertSender running.
      * @param source The node that sent the InsertRequest, or null
      * if it originated locally.
      */
-    public synchronized InsertSender makeInsertSender(NodeCHK key, short htl, 
long uid, PeerNode source,
+    public synchronized CHKInsertSender makeInsertSender(NodeCHK key, short 
htl, long uid, PeerNode source,
             byte[] headers, PartiallyReceivedBlock prb, boolean fromStore, 
double closestLoc, boolean cache) {
         Logger.minor(this, 
"makeInsertSender("+key+","+htl+","+uid+","+source+",...,"+fromStore);
         KeyHTLPair kh = new KeyHTLPair(key, htl);
-        InsertSender is = (InsertSender) insertSenders.get(kh);
+        CHKInsertSender is = (CHKInsertSender) insertSenders.get(kh);
         if(is != null) {
             Logger.minor(this, "Found "+is+" for "+kh);
             return is;
         }
         if(fromStore && !cache)
                throw new IllegalArgumentException("From store = true but cache 
= false !!!");
-        is = new InsertSender(key, uid, headers, htl, source, this, prb, 
fromStore, closestLoc);
+        is = new CHKInsertSender(key, uid, headers, htl, source, this, prb, 
fromStore, closestLoc);
         Logger.minor(this, is.toString()+" for "+kh.toString());
         insertSenders.put(kh, is);
         return is;
@@ -962,7 +963,7 @@
                // Dump
                Iterator i = insertSenders.values().iterator();
                while(i.hasNext()) {
-                       InsertSender s = (InsertSender) i.next();
+                       CHKInsertSender s = (CHKInsertSender) i.next();
                        sb.append(s.uid);
                        sb.append(": ");
                        sb.append(s.getStatusString());

Modified: trunk/freenet/src/freenet/node/RealNodeRequestInsertTest.java
===================================================================
--- trunk/freenet/src/freenet/node/RealNodeRequestInsertTest.java       
2006-01-06 19:26:42 UTC (rev 7769)
+++ trunk/freenet/src/freenet/node/RealNodeRequestInsertTest.java       
2006-01-06 21:18:26 UTC (rev 7770)
@@ -177,11 +177,11 @@
                 block = ClientCHKBlock.encode(data, false, false, (short)-1, 
0);
                 ClientCHK chk = block.getClientKey();
                 byte[] encData = block.getData();
-                byte[] encHeaders = block.getHeader();
+                byte[] encHeaders = block.getHeaders();
                 ClientCHKBlock newBlock = new ClientCHKBlock(encData, 
encHeaders, chk, true);
                 Logger.error(RealNodeRequestInsertTest.class, "Decoded: "+new 
String(newBlock.memoryDecode()));
                 Logger.error(RealNodeRequestInsertTest.class,"CHK: 
"+chk.getURI());
-                Logger.error(RealNodeRequestInsertTest.class,"Headers: 
"+HexUtil.bytesToHex(block.getHeader()));
+                Logger.error(RealNodeRequestInsertTest.class,"Headers: 
"+HexUtil.bytesToHex(block.getHeaders()));
                 randomNode.putCHK(block, starters[node1], true);
                 Logger.error(RealNodeRequestInsertTest.class, "Inserted to 
"+node1);
                 Logger.error(RealNodeRequestInsertTest.class, "Data: 
"+Fields.hashCode(encData)+", Headers: "+Fields.hashCode(encHeaders));

Modified: trunk/freenet/src/freenet/node/RequestHandler.java
===================================================================
--- trunk/freenet/src/freenet/node/RequestHandler.java  2006-01-06 19:26:42 UTC 
(rev 7769)
+++ trunk/freenet/src/freenet/node/RequestHandler.java  2006-01-06 21:18:26 UTC 
(rev 7770)
@@ -53,7 +53,7 @@
         Object o = node.makeRequestSender(key, htl, uid, source, closestLoc, 
false, true);
         if(o instanceof CHKBlock) {
             CHKBlock block = (CHKBlock) o;
-            Message df = DMT.createFNPDataFound(uid, block.getHeader());
+            Message df = DMT.createFNPDataFound(uid, block.getHeaders());
             source.send(df);
             PartiallyReceivedBlock prb =
                 new PartiallyReceivedBlock(Node.PACKETS_IN_BLOCK, 
Node.PACKET_SIZE, block.getData());

Modified: trunk/freenet/src/freenet/node/Version.java
===================================================================
--- trunk/freenet/src/freenet/node/Version.java 2006-01-06 19:26:42 UTC (rev 
7769)
+++ trunk/freenet/src/freenet/node/Version.java 2006-01-06 21:18:26 UTC (rev 
7770)
@@ -20,7 +20,7 @@
        public static final String protocolVersion = "1.0";

        /** The build number of the current revision */
-       public static final int buildNumber = 323;
+       public static final int buildNumber = 324;

        /** Oldest build of Fred we will talk to */
        public static final int lastGoodBuild = 318;

Modified: trunk/freenet/src/freenet/store/BaseFreenetStore.java
===================================================================
--- trunk/freenet/src/freenet/store/BaseFreenetStore.java       2006-01-06 
19:26:42 UTC (rev 7769)
+++ trunk/freenet/src/freenet/store/BaseFreenetStore.java       2006-01-06 
21:18:26 UTC (rev 7770)
@@ -95,7 +95,7 @@
      */
     public synchronized void put(CHKBlock block) throws IOException {
         byte[] data = block.getData();
-        byte[] headers = block.getHeader();
+        byte[] headers = block.getHeaders();
         int hlen = headers.length;
         if(data.length != DATA_BLOCK_SIZE || hlen > HEADER_BLOCK_SIZE-2)
             throw new IllegalArgumentException("Too big - data: 
"+data.length+" should be "+

Modified: trunk/freenet/src/freenet/store/BerkeleyDBFreenetStore.java
===================================================================
--- trunk/freenet/src/freenet/store/BerkeleyDBFreenetStore.java 2006-01-06 
19:26:42 UTC (rev 7769)
+++ trunk/freenet/src/freenet/store/BerkeleyDBFreenetStore.java 2006-01-06 
21:18:26 UTC (rev 7770)
@@ -212,7 +212,7 @@

        byte[] routingkey = ((NodeCHK)block.getKey()).getRoutingKey();
         byte[] data = block.getData();
-        byte[] header = block.getHeader();
+        byte[] header = block.getHeaders();

         if(data.length!=dataBlockSize) {
                Logger.minor(this, "This data is "+data.length+" bytes. Should 
be "+dataBlockSize);


Reply via email to