Author: toad
Date: 2005-11-29 18:18:12 +0000 (Tue, 29 Nov 2005)
New Revision: 7631

Added:
   trunk/freenet/src/freenet/node/NodePinger.java
Modified:
   trunk/freenet/src/freenet/io/comm/DMT.java
   trunk/freenet/src/freenet/io/xfer/BlockTransmitter.java
   trunk/freenet/src/freenet/node/InsertHandler.java
   trunk/freenet/src/freenet/node/InsertSender.java
   trunk/freenet/src/freenet/node/Node.java
   trunk/freenet/src/freenet/node/NodeDispatcher.java
   trunk/freenet/src/freenet/node/PeerManager.java
   trunk/freenet/src/freenet/node/PeerNode.java
   trunk/freenet/src/freenet/node/RequestHandler.java
   trunk/freenet/src/freenet/node/RequestSender.java
   trunk/freenet/src/freenet/node/Version.java
   trunk/freenet/src/freenet/support/LRUHashtable.java
Log:
245: (mandatory)
New load balancing and limiting, very simple, based on analogy to 
tcp-over-ethernet. Simple randomized exponential backoff, and the existing 
TCP-like load limiting. Thanks ian.

Modified: trunk/freenet/src/freenet/io/comm/DMT.java
===================================================================
--- trunk/freenet/src/freenet/io/comm/DMT.java  2005-11-29 00:36:32 UTC (rev 
7630)
+++ trunk/freenet/src/freenet/io/comm/DMT.java  2005-11-29 18:18:12 UTC (rev 
7631)
@@ -78,6 +78,7 @@
     public static final String BLOCK_HEADERS = "blockHeaders";
     public static final String DATA_INSERT_REJECTED_REASON = 
"dataInsertRejectedReason";
     public static final String STREAM_SEQNO = "streamSequenceNumber";
+    public static final String IS_LOCAL = "isLocal";

        //Diagnostic
        public static final MessageType ping = new MessageType("ping") {{
@@ -528,14 +529,16 @@
     // to source, and reduce send rate.
     public static final MessageType FNPRejectedOverload = new 
MessageType("FNPRejectOverload") {{
         addField(UID, Long.class);
+        addField(IS_LOCAL, Boolean.class);
     }};

-    public static final Message createFNPRejectedOverload(long id) {
+    public static final Message createFNPRejectedOverload(long id, boolean 
isLocal) {
         Message msg = new Message(FNPRejectedOverload);
         msg.set(UID, id);
+        msg.set(IS_LOCAL, isLocal);
         return msg;
     }
-
+    
     public static final MessageType FNPAccepted = new 
MessageType("FNPAccepted") {{
         addField(UID, Long.class);
     }};
@@ -661,6 +664,26 @@
         return msg;
     }

+    public static final MessageType FNPLinkPing = new 
MessageType("FNPLinkPing") {{
+       addField(PING_SEQNO, Long.class);
+    }};
+    
+    public static final Message createFNPLinkPing(long seqNo) {
+       Message msg = new Message(FNPLinkPing);
+       msg.set(PING_SEQNO, seqNo);
+       return msg;
+    }
+    
+    public static final MessageType FNPLinkPong = new 
MessageType("FNPLinkPong") {{
+       addField(PING_SEQNO, Long.class);
+    }};
+    
+    public static final Message createFNPLinkPong(long seqNo) {
+       Message msg = new Message(FNPLinkPong);
+       msg.set(PING_SEQNO, seqNo);
+       return msg;
+    }
+    
     public static final MessageType FNPPong = new MessageType("FNPPong") {{
         addField(PING_SEQNO, Integer.class);
     }};

Modified: trunk/freenet/src/freenet/io/xfer/BlockTransmitter.java
===================================================================
--- trunk/freenet/src/freenet/io/xfer/BlockTransmitter.java     2005-11-29 
00:36:32 UTC (rev 7630)
+++ trunk/freenet/src/freenet/io/xfer/BlockTransmitter.java     2005-11-29 
18:18:12 UTC (rev 7631)
@@ -258,4 +258,8 @@
        public boolean failedDueToOverload() {
                return failedByOverload;
        }
+
+       public PeerContext getDestination() {
+               return _destination;
+       }
 }

Modified: trunk/freenet/src/freenet/node/InsertHandler.java
===================================================================
--- trunk/freenet/src/freenet/node/InsertHandler.java   2005-11-29 00:36:32 UTC 
(rev 7630)
+++ trunk/freenet/src/freenet/node/InsertHandler.java   2005-11-29 18:18:12 UTC 
(rev 7631)
@@ -139,6 +139,8 @@
         // also have a flag locally to indicate the receive failed.
         // And if it does, we interrupt.

+        boolean receivedRejectedOverload = false;
+        
         while(true) {
             synchronized(sender) {
                 try {
@@ -154,20 +156,29 @@
                 return;
             }

+            if((!receivedRejectedOverload) && 
sender.receivedRejectedOverload()) {
+               // Forward it
+               Message m = DMT.createFNPRejectedOverload(uid, false);
+               source.send(m);
+            }
+            
             int status = sender.getStatus();

             if(status == InsertSender.NOT_FINISHED) {
                 continue;
             }
-            
+
+            // Local RejectedOverload's (fatal).
             // Internal error counts as overload. It'd only create a timeout 
otherwise, which is the same thing anyway.
             // We *really* need a good way to deal with nodes that constantly 
R_O!
-            if(status == InsertSender.REJECTED_OVERLOAD || 
+            if(status == InsertSender.TIMED_OUT ||
+                       status == InsertSender.GENERATED_REJECTED_OVERLOAD ||
                        status == InsertSender.INTERNAL_ERROR) {
-                msg = DMT.createFNPRejectedOverload(uid);
+                msg = DMT.createFNPRejectedOverload(uid, true);
                 source.send(msg);
                 // Might as well store it anyway.
-                if(status == InsertSender.REJECTED_OVERLOAD)
+                if(status == InsertSender.TIMED_OUT ||
+                               status == 
InsertSender.GENERATED_REJECTED_OVERLOAD)
                        canCommit = true;
                 return;
             }

Modified: trunk/freenet/src/freenet/node/InsertSender.java
===================================================================
--- trunk/freenet/src/freenet/node/InsertSender.java    2005-11-29 00:36:32 UTC 
(rev 7630)
+++ trunk/freenet/src/freenet/node/InsertSender.java    2005-11-29 18:18:12 UTC 
(rev 7631)
@@ -74,11 +74,18 @@
     private boolean sentRequest;

     private int status = -1;
+    /** Still running */
     static final int NOT_FINISHED = -1;
+    /** Successful insert */
     static final int SUCCESS = 0;
+    /** Route not found */
     static final int ROUTE_NOT_FOUND = 1;
-    static final int REJECTED_OVERLOAD = 2;
+    /** Internal error */
     static final int INTERNAL_ERROR = 3;
+    /** Timed out waiting for response */
+    static final int TIMED_OUT = 4;
+    /** Locally Generated a RejectedOverload */
+    static final int GENERATED_REJECTED_OVERLOAD = 5;

     public String toString() {
         return super.toString()+" for "+uid;
@@ -145,29 +152,66 @@

             if(receiveFailed) return; // don't need to set status as killed by 
InsertHandler
             Message msg;
-            try {
-                msg = node.usm.waitFor(mf);
-            } catch (DisconnectedException e) {
-                Logger.normal(this, "Disconnected from "+next+" while waiting 
for Accepted");
-                continue;
-            }
-            if(receiveFailed) return; // don't need to set status as killed by 
InsertHandler

-            if(msg == null || msg.getSpec() == DMT.FNPRejectedOverload) {
-                // Overload... hmmmm - propagate error back to source
-                Logger.error(this, "Propagating "+msg+" back to source on 
"+this);
-                next.insertRejectedOverload();
-                finish(REJECTED_OVERLOAD, next);
-                return;
-            }
+            /*
+             * Because messages may be re-ordered, it is
+             * entirely possible that we get a non-local RejectedOverload,
+             * followed by an Accepted. So we must loop here.
+             */

-            if(msg.getSpec() == DMT.FNPRejectedLoop) {
-                       next.insertDidNotRejectOverload();
-                // Loop - we don't want to send the data to this one
-                continue;
-            }
+            while (true) {
+               
+                               try {
+                                       msg = node.usm.waitFor(mf);
+                               } catch (DisconnectedException e) {
+                                       Logger.normal(this, "Disconnected from 
" + next
+                                                       + " while waiting for 
Accepted");
+                                       continue;
+                               }
+                               
+                               if (receiveFailed)
+                                       return; // don't need to set status as 
killed by InsertHandler
+                               
+                               if (msg == null) {
+                                       // Terminal overload
+                                       // Try to propagate back to source
+                                       next.localRejectedOverload();
+                                       finish(TIMED_OUT, next);
+                                       return;
+                               }
+                               
+                               if (msg.getSpec() == DMT.FNPRejectedOverload) {
+                                       // Non-fatal - probably still have time 
left
+                                       if (msg.getBoolean(DMT.IS_LOCAL)) {
+                                               next.localRejectedOverload();
+                                               Logger
+                                                               .minor(this,
+                                                                               
"Local RejectedOverload, moving on to next peer");
+                                               // Give up on this one, try 
another
+                                               break;
+                                       } else {
+                                               forwardRejectedOverload();
+                                       }
+                                       continue;
+                               }
+                               
+                               if (msg.getSpec() == DMT.FNPRejectedLoop) {
+                                       next.successNotOverload();
+                                       // Loop - we don't want to send the 
data to this one
+                                       break;
+                               }
+                               
+                               if (msg.getSpec() != DMT.FNPAccepted) {
+                                       Logger.error(this,
+                                                       "Unexpected message 
waiting for Accepted: "
+                                                                       + msg);
+                                       break;
+                               }
+                               // Otherwise is an FNPAccepted
+                               break;
+                       }

-            // Otherwise must be an Accepted
+            if(msg == null || msg.getSpec() != DMT.FNPAccepted) continue;

             // Send them the data.
             // Which might be the new data resulting from a collision...
@@ -208,95 +252,118 @@
             senderThread.start();
             senderThreads.add(senderThread);
             blockSenders.add(bt);
-            
-            if(receiveFailed) return;
-            try {
-                msg = node.usm.waitFor(mf);
-            } catch (DisconnectedException e) {
-                Logger.normal(this, "Disconnected from "+next+" while waiting 
for InsertReply on "+this);
-                continue;
-            }
-            if(receiveFailed) return;
-            
-            if(msg == null) {
-                // Timeout :(
-                // Fairly serious problem
-                Logger.error(this, "Timeout after Accepted in insert");
-                // Treat as rejected-overload
-                       next.insertRejectedOverload();
-                finish(REJECTED_OVERLOAD, next);
-                return;
-            }
-            
-            if(msg.getSpec() == DMT.FNPRejectedOverload || msg.getSpec() == 
DMT.FNPRejectedTimeout) {
-                Logger.minor(this, "Rejected due to overload");
-                       next.insertRejectedOverload();
-                finish(REJECTED_OVERLOAD, next);
-                return;
-            }
-            
-            if(msg.getSpec() == DMT.FNPRouteNotFound) {
-                Logger.minor(this, "Rejected: RNF");
-                short newHtl = msg.getShort(DMT.HTL);
-                if(htl > newHtl) htl = newHtl;
-                // Finished as far as this node is concerned
-                       next.insertDidNotRejectOverload();
-                continue;
-            }
-            
-            if(msg.getSpec() == DMT.FNPDataInsertRejected) {
-                       next.insertDidNotRejectOverload();
-                short reason = msg.getShort(DMT.DATA_INSERT_REJECTED_REASON);
-                Logger.minor(this, "DataInsertRejected: "+reason);
-                
-                if(reason == DMT.DATA_INSERT_REJECTED_VERIFY_FAILED) {
-                    if(fromStore) {
-                        // That's odd...
-                        Logger.error(this, "Verify failed on next node 
"+next+" for DataInsert but we were sending from the store!");
-                    } else {
-                        try {
-                            if(!prb.allReceived())
-                                Logger.error(this, "Did not receive all 
packets but next node says invalid anyway!");
-                            else {
-                                // Check the data
-                                new CHKBlock(prb.getBlock(), headers, myKey);
-                                Logger.error(this, "Verify failed on "+next+" 
but data was valid!");
-                            }
-                        } catch (CHKVerifyException e) {
-                            Logger.normal(this, "Verify failed because data 
was invalid");
-                        }
-                    }
-                    continue; // What else can we do?
-                } else if(reason == DMT.DATA_INSERT_REJECTED_RECEIVE_FAILED) {
-                    if(receiveFailed) {
-                        Logger.minor(this, "Failed to receive data, so failed 
to send data");
-                    } else {
-                        if(prb.allReceived()) {
-                            Logger.error(this, "Received all data but send 
failed to "+next);
-                        } else {
-                            if(prb.isAborted()) {
-                                Logger.normal(this, "Send failed: aborted: 
"+prb.getAbortReason()+": "+prb.getAbortDescription());
-                            } else
-                                Logger.normal(this, "Send failed; have not yet 
received all data but not aborted: "+next);
-                        }
-                    }
-                    continue;
-                }
-                
-                Logger.error(this, "DataInsert rejected! 
Reason="+DMT.getDataInsertRejectedReason(reason));
-                
-            }
-            
-            if(msg.getSpec() != DMT.FNPInsertReply) {
-               Logger.error(this, "Unknown reply: "+msg);
-               finish(INTERNAL_ERROR, next);
-            }
-            
-            // Our task is complete
-                       next.insertDidNotRejectOverload();
-            finish(SUCCESS, next);
-            return;
-        }
+
+            while (true) {
+
+                               if (receiveFailed)
+                                       return;
+                               
+                               try {
+                                       msg = node.usm.waitFor(mf);
+                               } catch (DisconnectedException e) {
+                                       Logger.normal(this, "Disconnected from 
" + next
+                                                       + " while waiting for 
InsertReply on " + this);
+                                       break;
+                               }
+                               if (receiveFailed)
+                                       return;
+                               
+                               if (msg == null || msg.getSpec() == 
DMT.FNPRejectedTimeout) {
+                                       // Timeout :(
+                                       // Fairly serious problem
+                                       Logger.error(this, "Timeout (" + msg
+                                                       + ") after Accepted in 
insert");
+                                       // Terminal overload
+                                       // Try to propagate back to source
+                                       next.localRejectedOverload();
+                                       finish(TIMED_OUT, next);
+                                       return;
+                               }
+
+                               if (msg.getSpec() == DMT.FNPRejectedOverload) {
+                                       // Probably non-fatal, if so, we have 
time left, can try next one
+                                       if (msg.getBoolean(DMT.IS_LOCAL)) {
+                                               next.localRejectedOverload();
+                                               Logger.minor(this,
+                                                               "Local 
RejectedOverload, moving on to next peer");
+                                               // Give up on this one, try 
another
+                                               break;
+                                       } else {
+                                               forwardRejectedOverload();
+                                       }
+                                       continue; // Wait for any further 
response
+                               }
+
+                               if (msg.getSpec() == DMT.FNPRouteNotFound) {
+                                       Logger.minor(this, "Rejected: RNF");
+                                       short newHtl = msg.getShort(DMT.HTL);
+                                       if (htl > newHtl)
+                                               htl = newHtl;
+                                       // Finished as far as this node is 
concerned
+                                       next.successNotOverload();
+                                       break;
+                               }
+
+                               if (msg.getSpec() == DMT.FNPDataInsertRejected) 
{
+                                       next.successNotOverload();
+                                       short reason = msg
+                                                       
.getShort(DMT.DATA_INSERT_REJECTED_REASON);
+                                       Logger.minor(this, "DataInsertRejected: 
" + reason);
+                                               if (reason == 
DMT.DATA_INSERT_REJECTED_VERIFY_FAILED) {
+                                               if (fromStore) {
+                                                       // That's odd...
+                                                       
Logger.error(this,"Verify failed on next node "
+                                                                       + next 
+ " for DataInsert but we were sending from the store!");
+                                               } else {
+                                                       try {
+                                                               if 
(!prb.allReceived())
+                                                                       
Logger.error(this,
+                                                                               
        "Did not receive all packets but next node says invalid anyway!");
+                                                               else {
+                                                                       // 
Check the data
+                                                                       new 
CHKBlock(prb.getBlock(), headers,
+                                                                               
        myKey);
+                                                                       
Logger.error(this,
+                                                                               
        "Verify failed on " + next
+                                                                               
        + " but data was valid!");
+                                                               }
+                                                       } catch 
(CHKVerifyException e) {
+                                                               Logger
+                                                                               
.normal(this,
+                                                                               
                "Verify failed because data was invalid");
+                                                       }
+                                               }
+                                               break; // What else can we do?
+                                       } else if (reason == 
DMT.DATA_INSERT_REJECTED_RECEIVE_FAILED) {
+                                               if (receiveFailed) {
+                                                       Logger.minor(this, 
"Failed to receive data, so failed to send data");
+                                               } else {
+                                                       if (prb.allReceived()) {
+                                                               
Logger.error(this, "Received all data but send failed to " + next);
+                                                       } else {
+                                                               if 
(prb.isAborted()) {
+                                                                       
Logger.normal(this, "Send failed: aborted: " + prb.getAbortReason() + ": " + 
prb.getAbortDescription());
+                                                               } else
+                                                                       
Logger.normal(this, "Send failed; have not yet received all data but not 
aborted: " + next);
+                                                       }
+                                               }
+                                               break;
+                                       }
+                                       Logger.error(this, "DataInsert 
rejected! Reason="
+                                               + 
DMT.getDataInsertRejectedReason(reason));
+                               }
+                               
+                               if (msg.getSpec() != DMT.FNPInsertReply) {
+                                       Logger.error(this, "Unknown reply: " + 
msg);
+                                       finish(INTERNAL_ERROR, next);
+                               }
+                               
+                               // Our task is complete
+                               next.successNotOverload();
+                               finish(SUCCESS, next);
+                               return;
+                       }
+               }
         } catch (Throwable t) {
             Logger.error(this, "Caught "+t, t);
             if(status == NOT_FINISHED)
@@ -306,21 +373,22 @@
                node.removeInsertSender(myKey, origHTL, this);
         }
     }
-
-    /**
-     * Wait until we have a terminal status code.
-     */
-    public synchronized void waitUntilFinished() {
-        while(true) {
-            if(status != NOT_FINISHED) return;
-            try {
-                wait(10000);
-            } catch (InterruptedException e) {
-                // Ignore
-            }
-        }
+    
+    private boolean hasForwardedRejectedOverload = false;
+    
+    synchronized boolean receivedRejectedOverload() {
+       return hasForwardedRejectedOverload;
     }

+    /** Forward RejectedOverload to the request originator.
+     * DO NOT CALL if have a *local* RejectedOverload.
+     */
+    private synchronized void forwardRejectedOverload() {
+       if(hasForwardedRejectedOverload) return;
+       hasForwardedRejectedOverload = true;
+               notifyAll();
+       }
+    
     private void finish(int code, PeerNode next) {
         Logger.minor(this, "Finished: "+code+" on "+this, new 
Exception("debug"));
         if(status != NOT_FINISHED)
@@ -330,30 +398,13 @@
                BlockTransmitter bt = (BlockTransmitter) i.next();
                bt.waitForComplete();
                if(bt.failedDueToOverload() && (status == SUCCESS || status == 
ROUTE_NOT_FOUND)) {
-                       status = REJECTED_OVERLOAD;
+                       forwardRejectedOverload();
+                       ((PeerNode)bt.getDestination()).localRejectedOverload();
                        break;
                }
         }

-        for(Iterator i = senderThreads.iterator();i.hasNext();) {
-               Thread senderThread = (Thread) i.next();
-               while(senderThread.isAlive()) {
-                       try {
-                               senderThread.join();
-                       } catch (InterruptedException e) {
-                               // Ignore
-                       }
-               }
-        }
-        
         status = code;
-        if(sentRequest) {
-               if(status == REJECTED_OVERLOAD) {
-                       node.getInsertThrottle().requestRejectedOverload();
-               } else if(status == SUCCESS || status == ROUTE_NOT_FOUND) {
-                       
node.getInsertThrottle().requestCompleted(System.currentTimeMillis() - 
startTime);
-               }
-        }

         synchronized(this) {
             notifyAll();
@@ -384,12 +435,18 @@
             return "SUCCESS";
         if(status == ROUTE_NOT_FOUND)
             return "ROUTE NOT FOUND";
-        if(status == REJECTED_OVERLOAD)
-            return "REJECTED: OVERLOAD";
         if(status == NOT_FINISHED)
             return "NOT FINISHED";
         if(status == INTERNAL_ERROR)
                return "INTERNAL ERROR";
+        if(status == TIMED_OUT)
+               return "TIMED OUT";
+        if(status == GENERATED_REJECTED_OVERLOAD)
+               return "GENERATED REJECTED OVERLOAD";
         return "UNKNOWN STATUS CODE: "+status;
     }
+
+       public boolean sentRequest() {
+               return sentRequest;
+       }
 }

Modified: trunk/freenet/src/freenet/node/Node.java
===================================================================
--- trunk/freenet/src/freenet/node/Node.java    2005-11-29 00:36:32 UTC (rev 
7630)
+++ trunk/freenet/src/freenet/node/Node.java    2005-11-29 18:18:12 UTC (rev 
7631)
@@ -80,8 +80,8 @@
        public static final boolean DONT_CACHE_LOCAL_REQUESTS = true;
     public static final int PACKETS_IN_BLOCK = 32;
     public static final int PACKET_SIZE = 1024;
-    public static final double DECREMENT_AT_MIN_PROB = 0.2;
-    public static final double DECREMENT_AT_MAX_PROB = 0.1;
+    public static final double DECREMENT_AT_MIN_PROB = 0.25;
+    public static final double DECREMENT_AT_MAX_PROB = 0.5;
     // Send keepalives every 2.5-5.0 seconds
     public static final int KEEPALIVE_INTERVAL = 2500;
     // If no activity for 30 seconds, node is dead
@@ -95,6 +95,10 @@
     public static final int RANDOMIZED_TIME_BETWEEN_VERSION_PROBES = 
HANDSHAKE_TIMEOUT*2; // 20-30 secs
     // If we don't receive any packets at all in this period, from any node, 
tell the user
     public static final long ALARM_TIME = 60*1000;
+    /** Maximum overall average ping time. If ping is greater than this,
+     * we reject all requests.
+     */
+    public static final long MAX_PING_TIME = 1000;

     // 900ms
     static final int MIN_INTERVAL_BETWEEN_INCOMING_SWAP_REQUESTS = 900;
@@ -133,6 +137,7 @@
     final FNPPacketMangler packetMangler;
     final PacketSender ps;
     final NodeDispatcher dispatcher;
+    final NodePinger nodePinger;
     final String filenamesPrefix;
     final FilenameGenerator tempFilenameGenerator;
     static short MAX_HTL = 10;
@@ -365,6 +370,7 @@
                        System.exit(EXIT_TEMP_INIT_ERROR);
                        throw new Error();
                }
+        nodePinger = new NodePinger(this);
                tempBucketFactory = new 
PaddedEphemerallyEncryptedBucketFactory(new 
TempBucketFactory(tempFilenameGenerator), random, 1024);
                archiveManager = new ArchiveManager(MAX_ARCHIVE_HANDLERS, 
MAX_CACHED_ARCHIVE_DATA, MAX_ARCHIVE_SIZE, MAX_ARCHIVED_FILE_SIZE, 
MAX_CACHED_ELEMENTS, random, tempFilenameGenerator);
                requestThrottle = new RequestThrottle(5000, 2.0F);
@@ -401,6 +407,7 @@
      * Either it succeeds or it doesn't.
      */
     ClientCHKBlock realGetCHK(ClientCHK key, boolean localOnly, boolean cache) 
throws LowLevelGetException {
+       long startTime = System.currentTimeMillis();
         Object o = makeRequestSender(key.getNodeCHK(), MAX_HTL, 
random.nextLong(), null, lm.loc.getValue(), localOnly, cache);
         if(o instanceof CHKBlock) {
             try {
@@ -414,36 +421,65 @@
                throw new 
LowLevelGetException(LowLevelGetException.DATA_NOT_FOUND_IN_STORE);
         }
         RequestSender rs = (RequestSender)o;
-        rs.waitUntilFinished();
-        if(rs.getStatus() == RequestSender.SUCCESS) {
-            try {
-                return new ClientCHKBlock(rs.getPRB().getBlock(), 
rs.getHeaders(), key, true);
-            } catch (CHKVerifyException e) {
-                Logger.error(this, "Does not verify: "+e, e);
-                throw new 
LowLevelGetException(LowLevelGetException.DECODE_FAILED);                
-            } catch (AbortedException e) {
-               Logger.error(this, "Impossible: "+e, e);
-               throw new 
LowLevelGetException(LowLevelGetException.INTERNAL_ERROR);
-                       }
-        } else {
-               switch(rs.getStatus()) {
-               case RequestSender.NOT_FINISHED:
-                       Logger.error(this, "RS still running in getCHK!: "+rs);
-                       throw new 
LowLevelGetException(LowLevelGetException.INTERNAL_ERROR);
-               case RequestSender.DATA_NOT_FOUND:
-                       throw new 
LowLevelGetException(LowLevelGetException.DATA_NOT_FOUND);
-               case RequestSender.REJECTED_OVERLOAD:
-                       throw new 
LowLevelGetException(LowLevelGetException.REJECTED_OVERLOAD);
-               case RequestSender.ROUTE_NOT_FOUND:
-                       throw new 
LowLevelGetException(LowLevelGetException.ROUTE_NOT_FOUND);
-               case RequestSender.TRANSFER_FAILED:
-                       throw new 
LowLevelGetException(LowLevelGetException.TRANSFER_FAILED);
-               case RequestSender.VERIFY_FAILURE:
-                       throw new 
LowLevelGetException(LowLevelGetException.VERIFY_FAILED);
-               default:
-                       Logger.error(this, "Unknown RequestSender code in 
getCHK: "+rs.getStatus()+" on "+rs);
-                       throw new 
LowLevelGetException(LowLevelGetException.INTERNAL_ERROR);
+        boolean rejectedOverload = false;
+        while(true) {
+               if(rs.waitUntilStatusChange() && (!rejectedOverload)) {
+                       requestThrottle.requestRejectedOverload();
+                       rejectedOverload = true;
                }
+
+               int status = rs.getStatus();
+               
+               if(status == RequestSender.NOT_FINISHED) 
+                       continue;
+               
+               if(status == RequestSender.TIMED_OUT ||
+                               status == 
RequestSender.GENERATED_REJECTED_OVERLOAD) {
+                       if(!rejectedOverload) {
+                       requestThrottle.requestRejectedOverload();
+                               rejectedOverload = true;
+                       }
+               } else {
+                       if(status == RequestSender.DATA_NOT_FOUND ||
+                                       status == RequestSender.SUCCESS ||
+                                       status == RequestSender.ROUTE_NOT_FOUND 
||
+                                       status == RequestSender.VERIFY_FAILURE) 
{
+                               long rtt = System.currentTimeMillis() - 
startTime;
+                               insertThrottle.requestCompleted(rtt);
+                       }
+               }
+               
+               if(rs.getStatus() == RequestSender.SUCCESS) {
+                       try {
+                               return new 
ClientCHKBlock(rs.getPRB().getBlock(), rs.getHeaders(), key, true);
+                       } catch (CHKVerifyException e) {
+                               Logger.error(this, "Does not verify: "+e, e);
+                               throw new 
LowLevelGetException(LowLevelGetException.DECODE_FAILED);                
+                       } catch (AbortedException e) {
+                               Logger.error(this, "Impossible: "+e, e);
+                               throw new 
LowLevelGetException(LowLevelGetException.INTERNAL_ERROR);
+                       }
+               } else {
+                       switch(rs.getStatus()) {
+                       case RequestSender.NOT_FINISHED:
+                               Logger.error(this, "RS still running in 
getCHK!: "+rs);
+                               throw new 
LowLevelGetException(LowLevelGetException.INTERNAL_ERROR);
+                       case RequestSender.DATA_NOT_FOUND:
+                               throw new 
LowLevelGetException(LowLevelGetException.DATA_NOT_FOUND);
+                       case RequestSender.ROUTE_NOT_FOUND:
+                               throw new 
LowLevelGetException(LowLevelGetException.ROUTE_NOT_FOUND);
+                       case RequestSender.TRANSFER_FAILED:
+                               throw new 
LowLevelGetException(LowLevelGetException.TRANSFER_FAILED);
+                       case RequestSender.VERIFY_FAILURE:
+                               throw new 
LowLevelGetException(LowLevelGetException.VERIFY_FAILED);
+                       case RequestSender.GENERATED_REJECTED_OVERLOAD:
+                       case RequestSender.TIMED_OUT:
+                               throw new 
LowLevelGetException(LowLevelGetException.REJECTED_OVERLOAD);
+                       default:
+                               Logger.error(this, "Unknown RequestSender code 
in getCHK: "+rs.getStatus()+" on "+rs);
+                               throw new 
LowLevelGetException(LowLevelGetException.INTERNAL_ERROR);
+                       }
+               }
         }
     }

@@ -459,6 +495,7 @@
         long uid = random.nextLong();
         if(!lockUID(uid))
             Logger.error(this, "Could not lock UID just randomly generated: 
"+uid+" - probably indicates broken PRNG");
+        long startTime = System.currentTimeMillis();
         synchronized(this) {
                if(cache) {
                        try {
@@ -470,35 +507,64 @@
             is = makeInsertSender(block.getClientKey().getNodeCHK(), 
                     MAX_HTL, uid, null, headers, prb, false, 
lm.getLocation().getValue(), cache);
         }
-        is.waitUntilFinished();
-        if(is.getStatus() == InsertSender.SUCCESS) {
-            Logger.normal(this, "Succeeded inserting "+block);
-        } else {
-            int status = is.getStatus();
-            String msg = "Failed inserting "+block+" : "+is.getStatusString();
-            if(status == InsertSender.ROUTE_NOT_FOUND)
-                msg += " - this is normal on small networks; the data will 
still be propagated, but it can't find the 20+ nodes needed for full success";
-            if(is.getStatus() != InsertSender.ROUTE_NOT_FOUND)
-               Logger.error(this, msg);
-            else
-               Logger.normal(this, msg);
-            switch(is.getStatus()) {
-            case InsertSender.NOT_FINISHED:
-                       Logger.error(this, "IS still running in putCHK!: "+is);
-                       throw new 
LowLevelPutException(LowLevelPutException.INTERNAL_ERROR);
-            case InsertSender.REJECTED_OVERLOAD:
-               throw new 
LowLevelPutException(LowLevelPutException.REJECTED_OVERLOAD);
-            case InsertSender.ROUTE_NOT_FOUND:
-               throw new 
LowLevelPutException(LowLevelPutException.ROUTE_NOT_FOUND);
-            case InsertSender.INTERNAL_ERROR:
-               throw new 
LowLevelPutException(LowLevelPutException.INTERNAL_ERROR);
-            default:
-                       Logger.error(this, "Unknown InsertSender code in 
putCHK: "+is.getStatus()+" on "+is);
-                       throw new 
LowLevelPutException(LowLevelPutException.INTERNAL_ERROR);
-            }
+        boolean hasForwardedRejectedOverload = false;
+        while(true) {
+               synchronized(is) {
+                       if(is.getStatus() == InsertSender.NOT_FINISHED) {
+                               try {
+                                       is.wait(5*1000);
+                               } catch (InterruptedException e) {
+                                       // Ignore
+                               }
+                       }
+                       if((!hasForwardedRejectedOverload) && 
is.receivedRejectedOverload()) {
+                               insertThrottle.requestRejectedOverload();
+                       }
+                       if(is.getStatus() == InsertSender.NOT_FINISHED) 
continue;
+               }
+               // Finished?
+               if(!hasForwardedRejectedOverload) {
+                       // Is it ours? Did we send a request?
+                       if(is.sentRequest() && is.uid == uid && (is.getStatus() 
== InsertSender.ROUTE_NOT_FOUND || is.getStatus() == InsertSender.SUCCESS)) {
+                               // It worked!
+                               long endTime = System.currentTimeMillis();
+                               long len = endTime - startTime;
+                               insertThrottle.requestCompleted(len);
+                       }
+               }
+               if(is.getStatus() == InsertSender.SUCCESS) {
+                       Logger.normal(this, "Succeeded inserting "+block);
+               } else {
+                       int status = is.getStatus();
+                       String msg = "Failed inserting "+block+" : 
"+is.getStatusString();
+                       if(status == InsertSender.ROUTE_NOT_FOUND)
+                               msg += " - this is normal on small networks; 
the data will still be propagated, but it can't find the 20+ nodes needed for 
full success";
+                       if(is.getStatus() != InsertSender.ROUTE_NOT_FOUND)
+                               Logger.error(this, msg);
+                       else
+                               Logger.normal(this, msg);
+                       switch(is.getStatus()) {
+                       case InsertSender.NOT_FINISHED:
+                               Logger.error(this, "IS still running in 
putCHK!: "+is);
+                               throw new 
LowLevelPutException(LowLevelPutException.INTERNAL_ERROR);
+                       case InsertSender.GENERATED_REJECTED_OVERLOAD:
+                               throw new 
LowLevelPutException(LowLevelPutException.REJECTED_OVERLOAD);
+                       case InsertSender.ROUTE_NOT_FOUND:
+                               throw new 
LowLevelPutException(LowLevelPutException.ROUTE_NOT_FOUND);
+                       case InsertSender.INTERNAL_ERROR:
+                               throw new 
LowLevelPutException(LowLevelPutException.INTERNAL_ERROR);
+                       default:
+                               Logger.error(this, "Unknown InsertSender code 
in putCHK: "+is.getStatus()+" on "+is);
+                               throw new 
LowLevelPutException(LowLevelPutException.INTERNAL_ERROR);
+                       }
+               }
         }
     }

+    public boolean shouldRejectRequest() {
+       return nodePinger.averagePingTime() > MAX_PING_TIME;
+    }
+    
     /**
      * Export my reference so that another node can connect to me.
      * @return

Modified: trunk/freenet/src/freenet/node/NodeDispatcher.java
===================================================================
--- trunk/freenet/src/freenet/node/NodeDispatcher.java  2005-11-29 00:36:32 UTC 
(rev 7630)
+++ trunk/freenet/src/freenet/node/NodeDispatcher.java  2005-11-29 18:18:12 UTC 
(rev 7631)
@@ -74,6 +74,19 @@
             return handleDataRequest(m);
         } else if(spec == DMT.FNPInsertRequest) {
             return handleInsertRequest(m);
+        } else if(spec == DMT.FNPLinkPing) {
+               long id = m.getLong(DMT.PING_SEQNO);
+               Message msg = DMT.createFNPLinkPong(id);
+               try {
+                               source.sendAsync(msg, null);
+                       } catch (NotConnectedException e) {
+                               // Ignore
+                       }
+               return true;
+        } else if(spec == DMT.FNPLinkPong) {
+               long id = m.getLong(DMT.PING_SEQNO);
+               source.receivedLinkPong(id);
+               return true;
         }
         return false;
     }

Added: trunk/freenet/src/freenet/node/NodePinger.java
===================================================================
--- trunk/freenet/src/freenet/node/NodePinger.java      2005-11-29 00:36:32 UTC 
(rev 7630)
+++ trunk/freenet/src/freenet/node/NodePinger.java      2005-11-29 18:18:12 UTC 
(rev 7631)
@@ -0,0 +1,63 @@
+package freenet.node;
+
+import freenet.support.Logger;
+
+public class NodePinger implements Runnable {
+
+       private double meanPing = 0;
+       
+       NodePinger(Node n) {
+               this.node = n;
+               Thread t = new Thread(this, "Node pinger");
+               t.setDaemon(true);
+               t.start();
+       }
+       
+       final Node node;
+       
+       public void run() {
+               while(true) {
+                       try {
+                               Thread.sleep(200);
+                       } catch (InterruptedException e) {
+                               // Ignore
+                       }
+                       PeerNode[] peers = node.peers.connectedPeers;
+                       if(peers == null) continue;
+                       recalculateMean(peers);
+                       for(int i=0;i<peers.length;i++) {
+                               PeerNode pn = peers[i];
+                               if(!pn.isConnected())
+                                       continue;
+                               pn.sendPing();
+                               recalculateMean(peers);
+                               try {
+                                       Thread.sleep(200);
+                               } catch (InterruptedException e) {
+                                       // Ignore
+                               }
+                       }
+               }
+       }
+
+       /** Recalculate the mean ping time */
+       private void recalculateMean(PeerNode[] peers) {
+               int peerCount = 0;
+               double total = 1.0;
+               for(int i=0;i<peers.length;i++) {
+                       PeerNode peer = peers[i];
+                       if(!peer.isConnected()) continue;
+                       peerCount++;
+                       total *= peer.averagePingTime();
+               }
+               if(peerCount > 0) {
+                       total = Math.pow(total, 1.0 / peerCount);
+                       meanPing = total;
+                       Logger.minor(this, "Mean ping: "+meanPing+"ms");
+               }
+       }
+
+       public double averagePingTime() {
+               return meanPing;
+       }
+}

Modified: trunk/freenet/src/freenet/node/PeerManager.java
===================================================================
--- trunk/freenet/src/freenet/node/PeerManager.java     2005-11-29 00:36:32 UTC 
(rev 7630)
+++ trunk/freenet/src/freenet/node/PeerManager.java     2005-11-29 18:18:12 UTC 
(rev 7631)
@@ -126,6 +126,7 @@
         System.arraycopy(connectedPeers, 0, newConnectedPeers, 0, 
connectedPeers.length);
         newConnectedPeers[connectedPeers.length] = pn;
         connectedPeers = newConnectedPeers;
+        Logger.minor(this, "Connected peers: "+connectedPeers.length);
     }

 //    NodePeer route(double targetLocation, RoutingContext ctx) {
@@ -300,20 +301,11 @@
             if(routedTo.contains(p)) continue;
             if(p == pn) continue;
             if(!p.isConnected()) continue;
+            if(p.isBackedOff()) continue;
             count++;
             any = p;
-            if(!notIgnored.contains(p)) {
-                //double pRO = p.getAdjustedPRejectedOverload();
-               double pRO = p.getOtherBiasProbability();
-                double random = node.random.nextDouble();
-               if(random < pRO) {
-                       Logger.minor(this, "Ignoring "+p+": pRO="+pRO+", 
random="+random);
-                       routedTo.add(p);
-                       continue;
-               } else notIgnored.add(p);
-            }
             double diff = distance(p, loc);
-            Logger.minor(this, "p.loc="+p.getLocation().getValue()+", 
loc="+loc+", d="+distance(p.getLocation().getValue(), loc)+" usedD="+diff+", 
bias="+p.getBias());
+            Logger.minor(this, "p.loc="+p.getLocation().getValue()+", 
loc="+loc+", d="+distance(p.getLocation().getValue(), loc)+" usedD="+diff);
             if((!ignoreSelf) && diff > maxDiff) continue;
             if(diff < bestDiff) {
                 best = p;

Modified: trunk/freenet/src/freenet/node/PeerNode.java
===================================================================
--- trunk/freenet/src/freenet/node/PeerNode.java        2005-11-29 00:36:32 UTC 
(rev 7630)
+++ trunk/freenet/src/freenet/node/PeerNode.java        2005-11-29 18:18:12 UTC 
(rev 7631)
@@ -27,6 +27,7 @@
 import freenet.io.comm.PeerParseException;
 import freenet.support.Fields;
 import freenet.support.HexUtil;
+import freenet.support.LRUHashtable;
 import freenet.support.Logger;
 import freenet.support.SimpleFieldSet;
 import freenet.support.math.BootstrappingDecayingRunningAverage;
@@ -271,6 +272,8 @@
         pDataRequestRejectOverload = new SimpleRunningAverage(100, 0.05);
         pInsertRejectOverload = new SimpleRunningAverage(100, 0.05);
         pRejectOverload = new SimpleRunningAverage(100, 0.05);
+        pingNumber = node.random.nextLong();
+        pingAverage = new SimpleRunningAverage(20, 1);
     }

     private void randomizeMaxTimeBetweenPacketSends() {
@@ -861,9 +864,7 @@

     public String getStatus() {
         return 
-               (isConnected ? "CONNECTED   " : "DISCONNECTED") + " " + 
getPeer().toString()+" "+myName+" "+currentLocation.getValue()+" "+getVersion() 
+
-               " ob="+this.getOtherBiasProbability()+" ("+otherBiasValue+") 
"+/*" adjpRO="+this.getAdjustedPRejectedOverload()+*//*" bias="+getBias()+*/" 
reqs: pRO="+pDataRequestRejectOverload.currentValue()+" 
(h="+pDataRequestRejectOverload.countReports()+") ins: pRO="+ 
pInsertRejectOverload.currentValue()+
-                               " (h="+pInsertRejectOverload.countReports()+")";
+               (isConnected ? "CONNECTED   " : "DISCONNECTED") + " " + 
getPeer().toString()+" "+myName+" "+currentLocation.getValue()+" 
"+getVersion()+" backoff: "+backoffLength+" ("+(Math.max(backedOffUntil - 
System.currentTimeMillis(),0))+")";
     }

     public String getVersion(){
@@ -951,105 +952,106 @@
         return hashCode;
     }

-    int otherBiasValue = 0;
-    
-    /**
-     * Record the fact that the node rejected a request due to
-     * overload (or timed out etc).
-     */
-       public void rejectedOverload() {
-               pRejectOverload.report(1.0);
-               pDataRequestRejectOverload.report(1.0);
-               increaseBias();
+       public void throttledSend(Message message, long maxWaitTime) throws 
NotConnectedException, ThrottledPacketLagException {
+               node.globalThrottle.sendPacket(message, this, maxWaitTime);
        }

-       public void insertRejectedOverload() {
-               pRejectOverload.report(1.0);
-               pInsertRejectOverload.report(1.0);
-               increaseBias();
-       }
+       private final Object backoffSync = new Object();

-       private void increaseBias() {
-               synchronized(biasLock) {
-                       if(biasValue < 1.0) biasValue = 1.0;
-                       biasValue += BIAS_SENSITIVITY / BIAS_TARGET;
-                       otherBiasValue += 20;
+       public boolean isBackedOff() {
+               synchronized(backoffSync) {
+                       if(System.currentTimeMillis() < backedOffUntil) {
+                               Logger.minor(this, "Is backed off");
+                               return true;
+                       } else return false;
                }
        }
-
-       private void decreaseBias() {
-               synchronized(biasLock) {
-                       biasValue -= BIAS_SENSITIVITY;
-                       if(biasValue < 1.0) biasValue = 1.0;
-                       otherBiasValue -= 1;
+       
+       long backedOffUntil = -1;
+       /** Initial nominal backoff length */
+       final int INITIAL_BACKOFF_LENGTH = 5000;
+       /** Double every time */
+       final int BACKOFF_MULTIPLIER = 2;
+       /** Maximum: 24 hours */
+       final int MAX_BACKOFF_LENGTH = 24*60*60*1000;
+       /** Current nominal backoff length */
+       int backoffLength = INITIAL_BACKOFF_LENGTH;
+       
+       /**
+        * Got a local RejectedOverload.
+        * Back off this node for a while.
+        */
+       public void localRejectedOverload() {
+               synchronized(backoffSync) {
+                       backoffLength = backoffLength * BACKOFF_MULTIPLIER;
+                       if(backoffLength > MAX_BACKOFF_LENGTH)
+                               backoffLength = MAX_BACKOFF_LENGTH;
+                       backedOffUntil = System.currentTimeMillis() + 
node.random.nextInt(backoffLength);
                }
        }

        /**
-        * Record the fact that the node did not reject a request
-        * due to overload.
+        * Didn't get RejectedOverload.
+        * Reset backoff.
         */
-       public void didNotRejectOverload() {
-               pRejectOverload.report(0.0);
-               pDataRequestRejectOverload.report(0.0);
-               decreaseBias();
+       public void successNotOverload() {
+               synchronized(backoffSync) {
+                       backoffLength = INITIAL_BACKOFF_LENGTH;
+               }
        }

-       public void insertDidNotRejectOverload() {
-               pRejectOverload.report(0.0);
-               pInsertRejectOverload.report(0.0);
-               decreaseBias();
-       }
+       Object pingSync = new Object();
+       final static int MAX_PINGS = 10;
+       final LRUHashtable pingsSentTimes = new LRUHashtable();
+       long pingNumber;
+       final RunningAverage pingAverage;

-       public double getPRejectedOverload() {
-               return pDataRequestRejectOverload.currentValue();
+       public void sendPing() {
+               long pingNo;
+               long now = System.currentTimeMillis();
+               Long lPingNo;
+               synchronized(pingSync) {
+                       pingNo = pingNumber++;
+                       lPingNo = new Long(pingNo);
+                       Long lnow = new Long(now);
+                       pingsSentTimes.push(lPingNo, lnow);
+                       Logger.minor(this, "Pushed "+lPingNo+" "+lnow);
+                       while(pingsSentTimes.size() > MAX_PINGS) {
+                               Long l = (Long) pingsSentTimes.popValue();
+                               Logger.minor(this, 
"pingsSentTimes.size()="+pingsSentTimes.size()+", l="+l);
+                               long tStarted = l.longValue();
+                               pingAverage.report(now - tStarted);
+                               Logger.minor(this, "Reporting dumped ping time 
to "+this+" : "+(now - tStarted));
+                       }
+               }
+               Message msg = DMT.createFNPLinkPing(pingNo);
+               try {
+                       sendAsync(msg, null);
+               } catch (NotConnectedException e) {
+                       synchronized(pingSync) {
+                               pingsSentTimes.removeKey(lPingNo);
+                       }
+               }
        }
-       
-       public double getAdjustedPRejectedOverload() {
-               double d = pRejectOverload.currentValue();
-               long hits = pRejectOverload.countReports();
-               hits = Math.min(hits, 100);
-               double max = ((double) hits) / ((double) (hits + 1));
-               if(hits < 25) return 0.0;
-               return Math.min(d, max);
-       }
-       
-       public double getPInsertRejectedOverload() {
-               return pInsertRejectOverload.currentValue();
-       }

-       /**
-        * Return the bias value for routing for this node.
-        * The idea is simply that if a node is overloaded,
-        * its specialization shrinks.
-        * Essentially this is 1.0-P(RejectedOverload or timeout).
-        */
-       public double getBias() {
-               synchronized(biasLock) {
-                       return biasValue;
+       public void receivedLinkPong(long id) {
+               Long lid = new Long(id);
+               long startTime;
+               synchronized(pingSync) {
+                       Long s = (Long) pingsSentTimes.get(lid);
+                       if(s == null) {
+                               Logger.normal(this, "Dropping ping "+id+" on 
"+this);
+                               return;
+                       }
+                       startTime = s.longValue();
+                       pingsSentTimes.removeKey(lid);
+                       long now = System.currentTimeMillis();
+                       pingAverage.report(now - startTime);
+                       Logger.minor(this, "Reporting ping time to "+this+" : 
"+(now - startTime));
                }
-//     double pSummaryFailure = pRejectOverload.currentValue();
-//     long hits = pRejectOverload.countReports();
-//     if(hits > 10) {
-//             double max = ((double) hits) / ((double) (hits+1));
-//             double denom = 1.0 - pSummaryFailure;
-//             if(denom == 0.0) denom = 0.000001;
-//             return denom;
-//     } else {
-//             return 1.0;
-//     }
        }

-       public void throttledSend(Message message, long maxWaitTime) throws 
NotConnectedException, ThrottledPacketLagException {
-               node.globalThrottle.sendPacket(message, this, maxWaitTime);
+       public double averagePingTime() {
+               return pingAverage.currentValue();
        }
-
-       public double getOtherBiasProbability() {
-               synchronized(biasLock) {
-                       double d = otherBiasValue / 100.0;
-                       if(d < 0) d = 0.0;
-                       d += 1.0;
-                       return 1.0 - (1.0 / d);
-               }
-       }
 }

Modified: trunk/freenet/src/freenet/node/RequestHandler.java
===================================================================
--- trunk/freenet/src/freenet/node/RequestHandler.java  2005-11-29 00:36:32 UTC 
(rev 7630)
+++ trunk/freenet/src/freenet/node/RequestHandler.java  2005-11-29 18:18:12 UTC 
(rev 7631)
@@ -74,7 +74,11 @@

         while(true) {

-            rs.waitUntilStatusChange();
+            if(rs.waitUntilStatusChange()) {
+               // Forward RejectedOverload
+               Message msg = DMT.createFNPRejectedOverload(uid, false);
+               source.sendAsync(msg, null);
+            }

             if(rs.transferStarted()) {
                 Message df = DMT.createFNPDataFound(uid, rs.getHeaders());
@@ -95,9 +99,11 @@
                     Message dnf = DMT.createFNPDataNotFound(uid);
                        source.sendAsync(dnf, null);
                        return;
-               case RequestSender.REJECTED_OVERLOAD:
+               case RequestSender.GENERATED_REJECTED_OVERLOAD:
+               case RequestSender.TIMED_OUT:
+                       // Locally generated.
                    // Propagate back to source who needs to reduce send rate
-                   Message reject = DMT.createFNPRejectedOverload(uid);
+                   Message reject = DMT.createFNPRejectedOverload(uid, true);
                        source.sendAsync(reject, null);
                        return;
                case RequestSender.ROUTE_NOT_FOUND:

Modified: trunk/freenet/src/freenet/node/RequestSender.java
===================================================================
--- trunk/freenet/src/freenet/node/RequestSender.java   2005-11-29 00:36:32 UTC 
(rev 7630)
+++ trunk/freenet/src/freenet/node/RequestSender.java   2005-11-29 18:18:12 UTC 
(rev 7631)
@@ -40,7 +40,6 @@
     final long uid;
     final Node node;
     private double nearestLoc;
-    private final long startTime;
     /** The source of this request if any - purely so we can avoid routing to 
it */
     final PeerNode source;
     private PartiallyReceivedBlock prb = null;
@@ -54,10 +53,11 @@
     static final int NOT_FINISHED = -1;
     static final int SUCCESS = 0;
     static final int ROUTE_NOT_FOUND = 1;
-    static final int REJECTED_OVERLOAD = 2;
     static final int DATA_NOT_FOUND = 3;
     static final int TRANSFER_FAILED = 4;
     static final int VERIFY_FAILURE = 5;
+    static final int TIMED_OUT = 6;
+    static final int GENERATED_REJECTED_OVERLOAD = 7;



@@ -67,7 +67,6 @@

     public RequestSender(NodeCHK key, short htl, long uid, Node n, double 
nearestLoc, 
             PeerNode source) {
-       startTime = System.currentTimeMillis();
         this.key = key;
         this.htl = htl;
         this.uid = uid;
@@ -123,140 +122,182 @@

             Message req = DMT.createFNPDataRequest(uid, htl, key, nearestLoc);

-            /**
-             * What are we waiting for?
-             * FNPAccepted - continue
-             * FNPRejectedLoop - go to another node
-             * FNPRejectedOverload - fail (propagates back to source,
-             * then reduces source transmit rate)
-             */

-            MessageFilter mfAccepted = 
MessageFilter.create().setSource(next).setField(DMT.UID, 
uid).setTimeout(ACCEPTED_TIMEOUT).setType(DMT.FNPAccepted);
-            MessageFilter mfRejectedLoop = 
MessageFilter.create().setSource(next).setField(DMT.UID, 
uid).setTimeout(ACCEPTED_TIMEOUT).setType(DMT.FNPRejectedLoop);
-            MessageFilter mfRejectedOverload = 
MessageFilter.create().setSource(next).setField(DMT.UID, 
uid).setTimeout(ACCEPTED_TIMEOUT).setType(DMT.FNPRejectedOverload);
-
-            // mfRejectedOverload must be the last thing in the or
-            // So its or pointer remains null
-            // Otherwise we need to recreate it below
-            MessageFilter mf = 
mfAccepted.or(mfRejectedLoop.or(mfRejectedOverload));
-            
             next.send(req);
             sentRequest = true;

-            Message msg;
-            try {
-                msg = node.usm.waitFor(mf);
-            } catch (DisconnectedException e) {
-                Logger.normal(this, "Disconnected from "+next+" while waiting 
for Accepted on "+uid);
-                continue;
-            }
+            Message msg = null;

-            if(msg == null) {
-                // Timeout
-                // Treat as FNPRejectOverloadd
-                       next.rejectedOverload();
-                finish(REJECTED_OVERLOAD, next);
-                return;
+            while(true) {
+               
+                /**
+                 * What are we waiting for?
+                 * FNPAccepted - continue
+                 * FNPRejectedLoop - go to another node
+                 * FNPRejectedOverload - fail (propagates back to source,
+                 * then reduces source transmit rate)
+                 */
+                
+                MessageFilter mfAccepted = 
MessageFilter.create().setSource(next).setField(DMT.UID, 
uid).setTimeout(ACCEPTED_TIMEOUT).setType(DMT.FNPAccepted);
+                MessageFilter mfRejectedLoop = 
MessageFilter.create().setSource(next).setField(DMT.UID, 
uid).setTimeout(ACCEPTED_TIMEOUT).setType(DMT.FNPRejectedLoop);
+                MessageFilter mfRejectedOverload = 
MessageFilter.create().setSource(next).setField(DMT.UID, 
uid).setTimeout(ACCEPTED_TIMEOUT).setType(DMT.FNPRejectedOverload);
+
+                // mfRejectedOverload must be the last thing in the or
+                // So its or pointer remains null
+                // Otherwise we need to recreate it below
+                MessageFilter mf = 
mfAccepted.or(mfRejectedLoop.or(mfRejectedOverload));
+                
+                try {
+                    msg = node.usm.waitFor(mf);
+                } catch (DisconnectedException e) {
+                    Logger.normal(this, "Disconnected from "+next+" while 
waiting for Accepted on "+uid);
+                    break;
+                }
+                
+               if(msg == null) {
+                       Logger.minor(this, "Timeout waiting for Accepted");
+                       // Timeout waiting for Accepted
+                       next.localRejectedOverload();
+                       forwardRejectedOverload();
+                       // Try next node
+                       break;
+               }
+               
+               if(msg.getSpec() == DMT.FNPRejectedLoop) {
+                       Logger.minor(this, "Rejected loop");
+                       next.successNotOverload();
+                       // Find another node to route to
+                       break;
+               }
+               
+               if(msg.getSpec() == DMT.FNPRejectedOverload) {
+                       Logger.minor(this, "Rejected: overload");
+                                       // Non-fatal - probably still have time 
left
+                                       forwardRejectedOverload();
+                                       if (msg.getBoolean(DMT.IS_LOCAL)) {
+                                               Logger.minor(this, "Is local");
+                                               next.localRejectedOverload();
+                                               Logger.minor(this, "Local 
RejectedOverload, moving on to next peer");
+                                               // Give up on this one, try 
another
+                                               break;
+                                       }
+                                       continue;
+               }
+               
+               if(msg.getSpec() != DMT.FNPAccepted) {
+                       Logger.error(this, "Unrecognized message: "+msg);
+                       continue;
+               }
+               
+               break;
             }

-            if(msg.getSpec() == DMT.FNPRejectedLoop) {
-                       next.didNotRejectOverload();
-                // Find another node to route to
-                continue;
+            if(msg == null || msg.getSpec() != DMT.FNPAccepted) {
+               // Try another node
+               continue;
             }
+
+            Logger.minor(this, "Got Accepted");

-            if(msg.getSpec() == DMT.FNPRejectedOverload) {
-                // Failed. Propagate back to source.
-                // Source will reduce send rate.
-                       next.rejectedOverload();
-                finish(REJECTED_OVERLOAD, next);
-                return;
-            }
-            
             // Otherwise, must be Accepted

             // So wait...

-            MessageFilter mfDNF = 
MessageFilter.create().setSource(next).setField(DMT.UID, 
uid).setTimeout(FETCH_TIMEOUT).setType(DMT.FNPDataNotFound);
-            MessageFilter mfDF = 
MessageFilter.create().setSource(next).setField(DMT.UID, 
uid).setTimeout(FETCH_TIMEOUT).setType(DMT.FNPDataFound);
-            MessageFilter mfRouteNotFound = 
MessageFilter.create().setSource(next).setField(DMT.UID, 
uid).setTimeout(FETCH_TIMEOUT).setType(DMT.FNPRouteNotFound);
-            mfRejectedOverload = mfRejectedOverload.setTimeout(FETCH_TIMEOUT);
-            mf = mfDNF.or(mfDF.or(mfRouteNotFound.or(mfRejectedOverload)));
+            while(true) {
+               
+                MessageFilter mfDNF = 
MessageFilter.create().setSource(next).setField(DMT.UID, 
uid).setTimeout(FETCH_TIMEOUT).setType(DMT.FNPDataNotFound);
+                MessageFilter mfDF = 
MessageFilter.create().setSource(next).setField(DMT.UID, 
uid).setTimeout(FETCH_TIMEOUT).setType(DMT.FNPDataFound);
+                MessageFilter mfRouteNotFound = 
MessageFilter.create().setSource(next).setField(DMT.UID, 
uid).setTimeout(FETCH_TIMEOUT).setType(DMT.FNPRouteNotFound);
+                MessageFilter mfRejectedOverload = 
MessageFilter.create().setSource(next).setField(DMT.UID, 
uid).setTimeout(FETCH_TIMEOUT).setType(DMT.FNPRejectedOverload);
+                MessageFilter mf = 
mfDNF.or(mfDF.or(mfRouteNotFound.or(mfRejectedOverload)));

-            try {
-                msg = node.usm.waitFor(mf);
-            } catch (DisconnectedException e) {
-                Logger.normal(this, "Disconnected from "+next+" while waiting 
for data on "+uid);
-                continue;
-            }
-            
-            if(msg == null) {
-                // Timeout. Treat as FNPRejectOverload.
-                finish(REJECTED_OVERLOAD, next);
-                return;
-            }
-            
-            if(msg.getSpec() == DMT.FNPDataNotFound) {
-                       next.didNotRejectOverload();
-                finish(DATA_NOT_FOUND, next);
-                return;
-            }
-            
-            if(msg.getSpec() == DMT.FNPRouteNotFound) {
-                // Backtrack within available hops
-                short newHtl = msg.getShort(DMT.HTL);
-                if(newHtl < htl) htl = newHtl;
-                       next.didNotRejectOverload();
-                continue;
-            }
-            
-            if(msg.getSpec() == DMT.FNPRejectedOverload) {
-                       next.rejectedOverload();
-                finish(REJECTED_OVERLOAD, next);
-                return;
-            }
+               try {
+                       msg = node.usm.waitFor(mf);
+               } catch (DisconnectedException e) {
+                       Logger.normal(this, "Disconnected from "+next+" while 
waiting for data on "+uid);
+                       continue;
+               }
+               
+               if(msg == null) {
+                       // Fatal timeout
+                       next.localRejectedOverload();
+                       forwardRejectedOverload();
+                       finish(TIMED_OUT, next);
+                       return;
+               }
+               
+               if(msg.getSpec() == DMT.FNPDataNotFound) {
+                       next.successNotOverload();
+                       finish(DATA_NOT_FOUND, next);
+                       return;
+               }
+               
+               if(msg.getSpec() == DMT.FNPRouteNotFound) {
+                       // Backtrack within available hops
+                       short newHtl = msg.getShort(DMT.HTL);
+                       if(newHtl < htl) htl = newHtl;
+                       next.successNotOverload();
+                       continue;
+               }
+               
+               if(msg.getSpec() == DMT.FNPRejectedOverload) {
+                                       // Non-fatal - probably still have time 
left
+                                       forwardRejectedOverload();
+                                       if (msg.getBoolean(DMT.IS_LOCAL)) {
+                                               next.localRejectedOverload();
+                                               Logger.minor(this, "Local 
RejectedOverload, moving on to next peer");
+                                               // Give up on this one, try 
another
+                                               break;
+                                       }
+                                       continue; // Wait for any further 
response
+               }

-            // Found data
-               next.didNotRejectOverload();
-            
-            // First get headers
-            
-            headers = 
((ShortBuffer)msg.getObject(DMT.BLOCK_HEADERS)).getData();
-            
-            // FIXME: Validate headers
-    
-            node.addTransferringSender(key, this);
-            try {
-            
-                prb = new PartiallyReceivedBlock(Node.PACKETS_IN_BLOCK, 
Node.PACKET_SIZE);
-                
-                synchronized(this) {
-                    notifyAll();
-                }
-                
-                BlockReceiver br = new BlockReceiver(node.usm, next, uid, prb);
-                
-                try {
-                    byte[] data = br.receive();
-                    // Received data
-                    CHKBlock block;
-                    try {
-                        block = new CHKBlock(data, headers, key);
-                    } catch (CHKVerifyException e1) {
-                        Logger.normal(this, "Got data but verify failed: "+e1, 
e1);
-                        finish(VERIFY_FAILURE, next);
-                        return;
-                    }
-                    node.store(block);
-                    finish(SUCCESS, next);
-                    return;
-                } catch (RetrievalException e) {
-                    Logger.normal(this, "Transfer failed: "+e, e);
-                    finish(TRANSFER_FAILED, next);
-                    return;
-                }
-            } finally {
-                node.removeTransferringSender(key, this);
+               if(msg.getSpec() != DMT.FNPDataFound) {
+                       Logger.error(this, "Unexpected message: "+msg);
+               }
+               
+               // Found data
+               next.successNotOverload();
+               
+               // First get headers
+               
+               headers = 
((ShortBuffer)msg.getObject(DMT.BLOCK_HEADERS)).getData();
+               
+               // FIXME: Validate headers
+               
+               node.addTransferringSender(key, this);
+               try {
+                       
+                       prb = new PartiallyReceivedBlock(Node.PACKETS_IN_BLOCK, 
Node.PACKET_SIZE);
+                       
+                       synchronized(this) {
+                               notifyAll();
+                       }
+                       
+                       BlockReceiver br = new BlockReceiver(node.usm, next, 
uid, prb);
+                       
+                       try {
+                               byte[] data = br.receive();
+                               // Received data
+                               CHKBlock block;
+                               try {
+                                       block = new CHKBlock(data, headers, 
key);
+                               } catch (CHKVerifyException e1) {
+                                       Logger.normal(this, "Got data but 
verify failed: "+e1, e1);
+                                       finish(VERIFY_FAILURE, next);
+                                       return;
+                               }
+                               node.store(block);
+                               finish(SUCCESS, next);
+                               return;
+                       } catch (RetrievalException e) {
+                               Logger.normal(this, "Transfer failed: "+e, e);
+                               finish(TRANSFER_FAILED, next);
+                               return;
+                       }
+               } finally {
+                       node.removeTransferringSender(key, this);
+               }
             }
         }
         } catch (Throwable t) {
@@ -267,6 +308,15 @@
         }
     }

+    private volatile boolean hasForwardedRejectedOverload;
+    
+    /** Forward RejectedOverload to the request originator */
+    private synchronized void forwardRejectedOverload() {
+       if(hasForwardedRejectedOverload) return;
+       hasForwardedRejectedOverload = true;
+               notifyAll();
+       }
+    
     public PartiallyReceivedBlock getPRB() {
         return prb;
     }
@@ -275,14 +325,21 @@
         return prb != null;
     }

+    boolean hadROLastTimeWaited = false;
+    
     /**
      * Wait until either the transfer has started or we have a 
      * terminal status code.
+     * @return True if we got a RejectedOverload.
      */
-    public synchronized void waitUntilStatusChange() {
+    public synchronized boolean waitUntilStatusChange() {
         while(true) {
-            if(prb != null) return;
-            if(status != NOT_FINISHED) return;
+               if((!hadROLastTimeWaited) && hasForwardedRejectedOverload) {
+                       hadROLastTimeWaited = true;
+                       return true;
+               }
+            if(prb != null) return false;
+            if(status != NOT_FINISHED) return false;
             try {
                 wait(10000);
             } catch (InterruptedException e) {
@@ -311,14 +368,6 @@
                throw new IllegalStateException("finish() called with "+code+" 
when was already "+status);
         status = code;

-        if(sentRequest) {
-               if(status == REJECTED_OVERLOAD) {
-                       node.getRequestThrottle().requestRejectedOverload();
-               } else if(status == SUCCESS || status == ROUTE_NOT_FOUND || 
status == DATA_NOT_FOUND || status == VERIFY_FAILURE) {
-                       
node.getRequestThrottle().requestCompleted(System.currentTimeMillis() - 
startTime);
-               }
-        }
-        
         synchronized(this) {
             notifyAll();
         }

Modified: trunk/freenet/src/freenet/node/Version.java
===================================================================
--- trunk/freenet/src/freenet/node/Version.java 2005-11-29 00:36:32 UTC (rev 
7630)
+++ trunk/freenet/src/freenet/node/Version.java 2005-11-29 18:18:12 UTC (rev 
7631)
@@ -20,10 +20,10 @@
        public static final String protocolVersion = "1.0";

        /** The build number of the current revision */
-       public static final int buildNumber = 244;
+       public static final int buildNumber = 245;

        /** Oldest build of Fred we will talk to */
-       public static final int lastGoodBuild = 244;
+       public static final int lastGoodBuild = 245;

        /** The highest reported build of fred */
        public static int highestSeenBuild = buildNumber;

Modified: trunk/freenet/src/freenet/support/LRUHashtable.java
===================================================================
--- trunk/freenet/src/freenet/support/LRUHashtable.java 2005-11-29 00:36:32 UTC 
(rev 7630)
+++ trunk/freenet/src/freenet/support/LRUHashtable.java 2005-11-29 18:18:12 UTC 
(rev 7631)
@@ -30,6 +30,7 @@
                insert.value = value;
             list.remove(insert);
         }
+        Logger.minor(this, "Pushed "+insert);

         list.unshift(insert);
     } 
@@ -109,8 +110,13 @@
         public Object obj;
         public Object value;

-        public QItem(Object obj, Object key) {
-            this.obj = obj;
+        public QItem(Object key, Object val) {
+            this.obj = key;
+            this.value = val;
         }
+        
+        public String toString() {
+               return super.toString()+": "+obj+" "+value;
+        }
     }
 }


Reply via email to