Author: toad
Date: 2005-11-17 20:59:49 +0000 (Thu, 17 Nov 2005)
New Revision: 7547

Added:
   trunk/freenet/src/freenet/node/ChainedRequestThrottle.java
   trunk/freenet/src/freenet/node/QueuedDataRequest.java
   trunk/freenet/src/freenet/node/QueuedInsertRequest.java
   trunk/freenet/src/freenet/node/QueuedRequest.java
   trunk/freenet/src/freenet/node/QueueingSimpleLowLevelClient.java
   trunk/freenet/src/freenet/node/RequestStarter.java
   trunk/freenet/src/freenet/node/RequestStarterClient.java
   trunk/freenet/src/freenet/node/RequestThrottle.java
Modified:
   trunk/freenet/src/freenet/client/Fetcher.java
   trunk/freenet/src/freenet/client/FetcherContext.java
   trunk/freenet/src/freenet/client/FileInserter.java
   trunk/freenet/src/freenet/client/HighLevelSimpleClient.java
   trunk/freenet/src/freenet/client/HighLevelSimpleClientImpl.java
   trunk/freenet/src/freenet/client/InserterContext.java
   trunk/freenet/src/freenet/io/comm/DMT.java
   trunk/freenet/src/freenet/io/comm/RetrievalException.java
   trunk/freenet/src/freenet/io/xfer/BlockReceiver.java
   trunk/freenet/src/freenet/io/xfer/BlockTransmitter.java
   trunk/freenet/src/freenet/node/InsertHandler.java
   trunk/freenet/src/freenet/node/InsertSender.java
   trunk/freenet/src/freenet/node/Node.java
   trunk/freenet/src/freenet/node/RealNodeRequestInsertTest.java
   trunk/freenet/src/freenet/node/RequestSender.java
   trunk/freenet/src/freenet/node/SimpleLowLevelClient.java
   trunk/freenet/src/freenet/node/TextModeClientInterface.java
   trunk/freenet/src/freenet/node/Version.java
   trunk/freenet/src/freenet/support/UpdatableSortedLinkedList.java
Log:
184 (mandatory):
Implement client-node-level load limiting.
Re-enable FEC.


Modified: trunk/freenet/src/freenet/client/Fetcher.java
===================================================================
--- trunk/freenet/src/freenet/client/Fetcher.java       2005-11-16 19:29:35 UTC 
(rev 7546)
+++ trunk/freenet/src/freenet/client/Fetcher.java       2005-11-17 20:59:49 UTC 
(rev 7547)
@@ -109,7 +109,7 @@
                // Do the fetch
                KeyBlock block;
                try {
-                       block = ctx.client.getKey(key, ctx.localRequestOnly);
+                       block = ctx.client.getKey(key, ctx.localRequestOnly, 
ctx.starterClient);
                } catch (LowLevelGetException e) {
                        switch(e.code) {
                        case LowLevelGetException.DATA_NOT_FOUND:

Modified: trunk/freenet/src/freenet/client/FetcherContext.java
===================================================================
--- trunk/freenet/src/freenet/client/FetcherContext.java        2005-11-16 
19:29:35 UTC (rev 7546)
+++ trunk/freenet/src/freenet/client/FetcherContext.java        2005-11-17 
20:59:49 UTC (rev 7547)
@@ -2,6 +2,7 @@

 import freenet.client.events.ClientEventProducer;
 import freenet.crypt.RandomSource;
+import freenet.node.RequestStarterClient;
 import freenet.node.SimpleLowLevelClient;
 import freenet.support.BucketFactory;

@@ -34,6 +35,7 @@
        final int maxMetadataSize;
        final int maxDataBlocksPerSegment;
        final int maxCheckBlocksPerSegment;
+       final RequestStarterClient starterClient;


        public FetcherContext(SimpleLowLevelClient client, long curMaxLength, 
@@ -43,7 +45,7 @@
                        boolean allowSplitfiles, boolean followRedirects, 
boolean localRequestOnly,
                        int maxDataBlocksPerSegment, int 
maxCheckBlocksPerSegment,
                        RandomSource random, ArchiveManager archiveManager, 
BucketFactory bucketFactory,
-                       ClientEventProducer producer) {
+                       ClientEventProducer producer, RequestStarterClient 
starter) {
                this.client = client;
                this.maxOutputLength = curMaxLength;
                this.maxTempLength = curMaxTempLength;
@@ -64,6 +66,7 @@
                this.eventProducer = producer;
                this.maxDataBlocksPerSegment = maxDataBlocksPerSegment;
                this.maxCheckBlocksPerSegment = maxCheckBlocksPerSegment;
+               this.starterClient = starter;
        }

        public FetcherContext(FetcherContext ctx, int maskID) {
@@ -88,6 +91,7 @@
                        this.eventProducer = ctx.eventProducer;
                        this.maxDataBlocksPerSegment = 0;
                        this.maxCheckBlocksPerSegment = 0;
+                       this.starterClient = ctx.starterClient;
                } else if(maskID == SPLITFILE_DEFAULT_MASK) {
                        this.client = ctx.client;
                        this.maxOutputLength = ctx.maxOutputLength;
@@ -109,6 +113,7 @@
                        this.eventProducer = ctx.eventProducer;
                        this.maxDataBlocksPerSegment = 
ctx.maxDataBlocksPerSegment;
                        this.maxCheckBlocksPerSegment = 
ctx.maxCheckBlocksPerSegment;
+                       this.starterClient = ctx.starterClient;
                } else if(maskID == SPLITFILE_USE_LENGTHS_MASK) {
                        this.client = ctx.client;
                        this.maxOutputLength = ctx.maxOutputLength;
@@ -130,6 +135,7 @@
                        this.eventProducer = ctx.eventProducer;
                        this.maxDataBlocksPerSegment = 
ctx.maxDataBlocksPerSegment;
                        this.maxCheckBlocksPerSegment = 
ctx.maxCheckBlocksPerSegment;
+                       this.starterClient = ctx.starterClient;
                } else throw new IllegalArgumentException();
        }


Modified: trunk/freenet/src/freenet/client/FileInserter.java
===================================================================
--- trunk/freenet/src/freenet/client/FileInserter.java  2005-11-16 19:29:35 UTC 
(rev 7546)
+++ trunk/freenet/src/freenet/client/FileInserter.java  2005-11-17 20:59:49 UTC 
(rev 7547)
@@ -122,7 +122,7 @@
                try {
                        ctx.eventProducer.produceEvent(new 
SimpleBlockPutEvent(chk.getClientKey()));
                        if(!getCHKOnly)
-                               ctx.client.putCHK(chk);
+                               ctx.client.putCHK(chk, ctx.starterClient);
                } catch (LowLevelPutException e) {
                        le = e;
                }

Modified: trunk/freenet/src/freenet/client/HighLevelSimpleClient.java
===================================================================
--- trunk/freenet/src/freenet/client/HighLevelSimpleClient.java 2005-11-16 
19:29:35 UTC (rev 7546)
+++ trunk/freenet/src/freenet/client/HighLevelSimpleClient.java 2005-11-17 
20:59:49 UTC (rev 7547)
@@ -2,6 +2,7 @@

 import freenet.client.events.ClientEventListener;
 import freenet.keys.FreenetURI;
+import freenet.node.RequestStarterClient;

 public interface HighLevelSimpleClient {


Modified: trunk/freenet/src/freenet/client/HighLevelSimpleClientImpl.java
===================================================================
--- trunk/freenet/src/freenet/client/HighLevelSimpleClientImpl.java     
2005-11-16 19:29:35 UTC (rev 7546)
+++ trunk/freenet/src/freenet/client/HighLevelSimpleClientImpl.java     
2005-11-17 20:59:49 UTC (rev 7547)
@@ -6,6 +6,7 @@
 import freenet.client.events.SimpleEventProducer;
 import freenet.crypt.RandomSource;
 import freenet.keys.FreenetURI;
+import freenet.node.RequestStarterClient;
 import freenet.node.SimpleLowLevelClient;
 import freenet.support.BucketFactory;
 import freenet.support.Logger;
@@ -21,6 +22,8 @@
        private long curMaxTempLength;
        private int curMaxMetadataLength;
        private final RandomSource random;
+       private final RequestStarterClient requestStarter;
+       private final RequestStarterClient insertStarter;
        static final int MAX_RECURSION = 10;
        static final int MAX_ARCHIVE_RESTARTS = 2;
        static final boolean DONT_ENTER_IMPLICIT_ARCHIVES = true;
@@ -50,7 +53,7 @@
        static final int SPLITFILE_CHECK_BLOCKS_PER_SEGMENT = 192;


-       public HighLevelSimpleClientImpl(SimpleLowLevelClient client, 
ArchiveManager mgr, BucketFactory bf, RandomSource r) {
+       public HighLevelSimpleClientImpl(SimpleLowLevelClient client, 
ArchiveManager mgr, BucketFactory bf, RandomSource r, RequestStarterClient 
requestStarterClient, RequestStarterClient insertStarterClient) {
                this.client = client;
                archiveManager = mgr;
                bucketFactory = bf;
@@ -60,6 +63,8 @@
                curMaxLength = Long.MAX_VALUE;
                curMaxTempLength = Long.MAX_VALUE;
                curMaxMetadataLength = 1024 * 1024;
+               this.requestStarter = requestStarterClient;
+               this.insertStarter = insertStarterClient;
        }

        public void setMaxLength(long maxLength) {
@@ -80,14 +85,14 @@
                                SPLITFILE_THREADS, SPLITFILE_BLOCK_RETRIES, 
NON_SPLITFILE_RETRIES,
                                FETCH_SPLITFILES, FOLLOW_REDIRECTS, 
LOCAL_REQUESTS_ONLY,
                                MAX_SPLITFILE_BLOCKS_PER_SEGMENT, 
MAX_SPLITFILE_CHECK_BLOCKS_PER_SEGMENT,
-                               random, archiveManager, bucketFactory, 
globalEventProducer);
+                               random, archiveManager, bucketFactory, 
globalEventProducer, requestStarter);
                Fetcher f = new Fetcher(uri, context);
                return f.run();
        }

        public FreenetURI insert(InsertBlock insert, boolean getCHKOnly) throws 
InserterException {
                InserterContext context = new InserterContext(client, 
bucketFactory, random, SPLITFILE_INSERT_RETRIES, 
-                               SPLITFILE_INSERT_THREADS, 
SPLITFILE_BLOCKS_PER_SEGMENT, SPLITFILE_CHECK_BLOCKS_PER_SEGMENT, 
globalEventProducer);
+                               SPLITFILE_INSERT_THREADS, 
SPLITFILE_BLOCKS_PER_SEGMENT, SPLITFILE_CHECK_BLOCKS_PER_SEGMENT, 
globalEventProducer, insertStarter);
                FileInserter i = new FileInserter(context);
                return i.run(insert, false, getCHKOnly);
        }

Modified: trunk/freenet/src/freenet/client/InserterContext.java
===================================================================
--- trunk/freenet/src/freenet/client/InserterContext.java       2005-11-16 
19:29:35 UTC (rev 7546)
+++ trunk/freenet/src/freenet/client/InserterContext.java       2005-11-17 
20:59:49 UTC (rev 7547)
@@ -2,6 +2,7 @@

 import freenet.client.events.ClientEventProducer;
 import freenet.crypt.RandomSource;
+import freenet.node.RequestStarterClient;
 import freenet.node.SimpleLowLevelClient;
 import freenet.support.BucketFactory;

@@ -19,21 +20,22 @@
        final int splitfileSegmentDataBlocks;
        final int splitfileSegmentCheckBlocks;
        final ClientEventProducer eventProducer;
+       final RequestStarterClient starterClient;

        public InserterContext(SimpleLowLevelClient client, BucketFactory bf, 
RandomSource random,
                        int maxRetries, int maxThreads, int 
splitfileSegmentDataBlocks, int splitfileSegmentCheckBlocks,
-                       ClientEventProducer eventProducer) {
+                       ClientEventProducer eventProducer, RequestStarterClient 
sctx) {
                this.client = client;
                this.bf = bf;
                this.random = random;
                dontCompress = false;
-               //splitfileAlgorithm = Metadata.SPLITFILE_ONION_STANDARD;
-               splitfileAlgorithm = Metadata.SPLITFILE_NONREDUNDANT;
+               splitfileAlgorithm = Metadata.SPLITFILE_ONION_STANDARD;
                this.maxInsertBlockRetries = maxRetries;
                this.maxSplitInsertThreads = maxThreads;
                this.eventProducer = eventProducer;
                this.splitfileSegmentDataBlocks = splitfileSegmentDataBlocks;
                this.splitfileSegmentCheckBlocks = splitfileSegmentCheckBlocks;
+               this.starterClient = sctx;
        }

 }

Modified: trunk/freenet/src/freenet/io/comm/DMT.java
===================================================================
--- trunk/freenet/src/freenet/io/comm/DMT.java  2005-11-16 19:29:35 UTC (rev 
7546)
+++ trunk/freenet/src/freenet/io/comm/DMT.java  2005-11-17 20:59:49 UTC (rev 
7547)
@@ -354,13 +354,15 @@

        public static final MessageType sendAborted = new 
MessageType("sendAborted") {{
                addField(UID, Long.class);
-               addField(REASON, String.class);
+               addField(DESCRIPTION, String.class);
+               addField(REASON, Integer.class);
        }};

-       public static final Message createSendAborted(long uid, String reason) {
+       public static final Message createSendAborted(long uid, int reason, 
String description) {
                Message msg = new Message(sendAborted);
                msg.set(UID, uid);
                msg.set(REASON, reason);
+               msg.set(DESCRIPTION, description);
                return msg;
        }


Modified: trunk/freenet/src/freenet/io/comm/RetrievalException.java
===================================================================
--- trunk/freenet/src/freenet/io/comm/RetrievalException.java   2005-11-16 
19:29:35 UTC (rev 7546)
+++ trunk/freenet/src/freenet/io/comm/RetrievalException.java   2005-11-17 
20:59:49 UTC (rev 7547)
@@ -38,6 +38,7 @@
        public static final int TIMED_OUT = 4;
     public static final int ALREADY_CACHED = 6;
     public static final int SENDER_DISCONNECTED = 7;
+    public static final int NO_DATAINSERT = 8;

        int _reason;
        String _cause = null;

Modified: trunk/freenet/src/freenet/io/xfer/BlockReceiver.java
===================================================================
--- trunk/freenet/src/freenet/io/xfer/BlockReceiver.java        2005-11-16 
19:29:35 UTC (rev 7546)
+++ trunk/freenet/src/freenet/io/xfer/BlockReceiver.java        2005-11-17 
20:59:49 UTC (rev 7547)
@@ -57,6 +57,10 @@
                _usm = usm;
        }

+       public void sendAborted(int reason, String desc) throws 
NotConnectedException {
+               _usm.send(_sender, DMT.createSendAborted(_uid, reason, desc));
+       }
+       
        public byte[] receive() throws RetrievalException {
                int consecutiveMissingPacketReports = 0;
                try {

Modified: trunk/freenet/src/freenet/io/xfer/BlockTransmitter.java
===================================================================
--- trunk/freenet/src/freenet/io/xfer/BlockTransmitter.java     2005-11-16 
19:29:35 UTC (rev 7546)
+++ trunk/freenet/src/freenet/io/xfer/BlockTransmitter.java     2005-11-17 
20:59:49 UTC (rev 7547)
@@ -65,6 +65,10 @@
                }
        }

+       public void sendAborted(int reason, String desc) throws 
NotConnectedException {
+               _usm.send(_destination, DMT.createSendAborted(_uid, reason, 
desc));
+       }
+       
        public boolean send() {
                final PacketThrottle throttle = 
PacketThrottle.getThrottle(_destination.getPeer(), _prb._packetSize);
                _receiverThread = Thread.currentThread();
@@ -133,7 +137,7 @@

                        public void receiveAborted(int reason, String 
description) {
                                try {
-                                       
((PeerNode)_destination).sendAsync(DMT.createSendAborted(reason, description), 
null);
+                                       
((PeerNode)_destination).sendAsync(DMT.createSendAborted(_uid, reason, 
description), null);
                 } catch (NotConnectedException e) {
                     Logger.minor(this, "Receive aborted and receiver is not 
connected");
                 }

Added: trunk/freenet/src/freenet/node/ChainedRequestThrottle.java
===================================================================
--- trunk/freenet/src/freenet/node/ChainedRequestThrottle.java  2005-11-16 
19:29:35 UTC (rev 7546)
+++ trunk/freenet/src/freenet/node/ChainedRequestThrottle.java  2005-11-17 
20:59:49 UTC (rev 7547)
@@ -0,0 +1,22 @@
+package freenet.node;
+
+/**
+ * RequestThrottle which takes a second throttle, and never
+ * returns a delay less than that throttle's current delay. 
+ */
+public class ChainedRequestThrottle extends RequestThrottle {
+
+       private final RequestThrottle otherThrottle;
+       
+       public ChainedRequestThrottle(int rtt, float winsz, RequestThrottle 
other) {
+               super(rtt, winsz);
+               otherThrottle = other;
+       }
+       
+       public long getDelay() {
+               long delay = super.getDelay();
+               long otherDelay = otherThrottle.getDelay();
+               return Math.max(delay, otherDelay);
+       }
+       
+}

Modified: trunk/freenet/src/freenet/node/InsertHandler.java
===================================================================
--- trunk/freenet/src/freenet/node/InsertHandler.java   2005-11-16 19:29:35 UTC 
(rev 7546)
+++ trunk/freenet/src/freenet/node/InsertHandler.java   2005-11-17 20:59:49 UTC 
(rev 7547)
@@ -90,6 +90,10 @@
                 Logger.error(this, "Did not receive DataInsert on "+uid+" from 
"+source+" !");
             Message tooSlow = DMT.createFNPRejectedTimeout(uid);
             source.sendAsync(tooSlow, null);
+            prb = new PartiallyReceivedBlock(Node.PACKETS_IN_BLOCK, 
Node.PACKET_SIZE);
+            br = new BlockReceiver(node.usm, source, uid, prb);
+            prb.abort(RetrievalException.NO_DATAINSERT, "No DataInsert");
+            br.sendAborted(RetrievalException.NO_DATAINSERT, "No DataInsert");
             return;
         }


Modified: trunk/freenet/src/freenet/node/InsertSender.java
===================================================================
--- trunk/freenet/src/freenet/node/InsertSender.java    2005-11-16 19:29:35 UTC 
(rev 7546)
+++ trunk/freenet/src/freenet/node/InsertSender.java    2005-11-17 20:59:49 UTC 
(rev 7547)
@@ -15,7 +15,14 @@

 public final class InsertSender implements Runnable {

-    InsertSender(NodeCHK myKey, long uid, byte[] headers, short htl, 
+    public class Sender implements Runnable {
+
+               public void run() {
+                       bt.send();
+               }
+       }
+
+       InsertSender(NodeCHK myKey, long uid, byte[] headers, short htl, 
             PeerNode source, Node node, PartiallyReceivedBlock prb, boolean 
fromStore, double closestLocation) {
         this.myKey = myKey;
         this.target = myKey.toNormalizedDouble();
@@ -27,6 +34,7 @@
         this.prb = prb;
         this.fromStore = fromStore;
         this.closestLocation = closestLocation;
+        this.startTime = System.currentTimeMillis();
         Thread t = new Thread(this, "InsertSender for UID "+uid+" on 
"+node.portNumber+" at "+System.currentTimeMillis());
         t.setDaemon(true);
         t.start();
@@ -48,6 +56,10 @@
     final boolean fromStore;
     private boolean receiveFailed = false;
     final double closestLocation;
+    final long startTime;
+    private BlockTransmitter bt;
+    private Sender s;
+    private Thread senderThread;

     private int status = -1;
     static final int NOT_FINISHED = -1;
@@ -110,8 +122,8 @@
             // mfRejectedOverload must be the last thing in the or
             // So its or pointer remains null
             // Otherwise we need to recreate it below
+            mfRejectedOverload.clearOr();
             MessageFilter mf = 
mfAccepted.or(mfRejectedLoop.or(mfRejectedOverload));
-            mfRejectedOverload.clearOr();

             // Send to next node

@@ -127,9 +139,11 @@
             }
             if(receiveFailed) return; // don't need to set status as killed by 
InsertHandler

-            if(msg == null) {
-                // No response, move on
-                continue;
+            if(msg == null || msg.getSpec() == DMT.FNPRejectedOverload) {
+                // Overload... hmmmm - propagate error back to source
+                Logger.error(this, "Propagating "+msg+" back to source on 
"+this);
+                finish(REJECTED_OVERLOAD);
+                return;
             }

             if(msg.getSpec() == DMT.FNPRejectedLoop) {
@@ -137,19 +151,11 @@
                 continue;
             }

-            if(msg.getSpec() == DMT.FNPRejectedOverload) {
-                // Overload... hmmmm - propagate error back to source
-                Logger.error(this, "Propagating "+msg+" back to source on 
"+this);
-                finish(REJECTED_OVERLOAD);
-                return;
-            }
-            
             // Otherwise must be an Accepted

             // Send them the data.
             // Which might be the new data resulting from a collision...

-            BlockTransmitter bt;
             Message dataInsert;
             PartiallyReceivedBlock prbNow;
             prbNow = prb;
@@ -168,10 +174,12 @@
             MessageFilter mfRNF = 
MessageFilter.create().setSource(next).setField(DMT.UID, 
uid).setTimeout(PUT_TIMEOUT).setType(DMT.FNPRouteNotFound);
             MessageFilter mfInsertReply = 
MessageFilter.create().setSource(next).setField(DMT.UID, 
uid).setTimeout(PUT_TIMEOUT).setType(DMT.FNPInsertReply);
             mfRejectedOverload.setTimeout(PUT_TIMEOUT);
+            mfRejectedOverload.clearOr();
             MessageFilter mfRouteNotFound = 
MessageFilter.create().setSource(next).setField(DMT.UID, 
uid).setTimeout(PUT_TIMEOUT).setType(DMT.FNPRouteNotFound);
             MessageFilter mfDataInsertRejected = 
MessageFilter.create().setSource(next).setField(DMT.UID, 
uid).setTimeout(PUT_TIMEOUT).setType(DMT.FNPDataInsertRejected);
+            MessageFilter mfTimeout = 
MessageFilter.create().setSource(next).setField(DMT.UID, 
uid).setTimeout(PUT_TIMEOUT).setType(DMT.FNPRejectedTimeout);

-            mf = 
mfRNF.or(mfInsertReply.or(mfRouteNotFound.or(mfDataInsertRejected.or(mfRejectedOverload.setTimeout(PUT_TIMEOUT)))));
+            mf = 
mfRNF.or(mfInsertReply.or(mfRouteNotFound.or(mfDataInsertRejected.or(mfTimeout.or(mfRejectedOverload)))));

             Logger.minor(this, "Sending DataInsert");
             if(receiveFailed) return;
@@ -179,8 +187,10 @@

             Logger.minor(this, "Sending data");
             if(receiveFailed) return;
-            bt.send();
-            Logger.minor(this, "Sent data");
+            s = new Sender();
+            senderThread = new Thread(s);
+            senderThread.setDaemon(true);
+            senderThread.start();

             if(receiveFailed) return;
             try {
@@ -200,7 +210,7 @@
                 return;
             }

-            if(msg.getSpec() == DMT.FNPRejectedOverload) {
+            if(msg.getSpec() == DMT.FNPRejectedOverload || msg.getSpec() == 
DMT.FNPRejectedTimeout) {
                 Logger.minor(this, "Rejected due to overload");
                 finish(REJECTED_OVERLOAD);
                 return;
@@ -255,14 +265,19 @@

             }

-            // Otherwise should be an InsertReply
+            if(msg.getSpec() != DMT.FNPInsertReply) {
+               Logger.error(this, "Unknown reply: "+msg);
+               finish(INTERNAL_ERROR);
+            }
+            
             // Our task is complete
             finish(SUCCESS);
             return;
         }
         } catch (Throwable t) {
             Logger.error(this, "Caught "+t, t);
-            finish(INTERNAL_ERROR);
+            if(status == NOT_FINISHED)
+               finish(INTERNAL_ERROR);
         } finally {
             node.completed(uid);
                node.removeInsertSender(myKey, origHTL, this);
@@ -280,12 +295,30 @@
             } catch (InterruptedException e) {
                 // Ignore
             }
-        }            
+        }
     }

     private void finish(int code) {
         Logger.minor(this, "Finished: "+code+" on "+this, new 
Exception("debug"));
+        if(status != NOT_FINISHED)
+               throw new IllegalStateException("finish() called with "+code+" 
when was already "+status);
         status = code;
+        
+        if(senderThread != null) {
+               while(senderThread.isAlive()) {
+                       try {
+                               senderThread.join();
+                       } catch (InterruptedException e) {
+                               // Ignore
+                       }
+               }
+        }
+        
+        if(status == REJECTED_OVERLOAD)
+               node.getInsertThrottle().requestRejectedOverload();
+        else if(status == SUCCESS || status == ROUTE_NOT_FOUND)
+               
node.getInsertThrottle().requestCompleted(System.currentTimeMillis() - 
startTime);
+        
         synchronized(this) {
             notifyAll();
         }

Modified: trunk/freenet/src/freenet/node/Node.java
===================================================================
--- trunk/freenet/src/freenet/node/Node.java    2005-11-16 19:29:35 UTC (rev 
7546)
+++ trunk/freenet/src/freenet/node/Node.java    2005-11-17 20:59:49 UTC (rev 
7547)
@@ -27,6 +27,7 @@
 import freenet.client.ArchiveManager;
 import freenet.client.HighLevelSimpleClient;
 import freenet.client.HighLevelSimpleClientImpl;
+import freenet.client.InsertBlock;
 import freenet.crypt.DiffieHellman;
 import freenet.crypt.RandomSource;
 import freenet.crypt.Yarrow;
@@ -61,7 +62,7 @@
 /**
  * @author amphibian
  */
-public class Node implements SimpleLowLevelClient {
+public class Node implements QueueingSimpleLowLevelClient {

        static final long serialVersionUID = -1;

@@ -134,6 +135,10 @@
     // Client stuff
     final ArchiveManager archiveManager;
     final BucketFactory tempBucketFactory;
+    final RequestThrottle requestThrottle;
+    final RequestStarter requestStarter;
+    final RequestThrottle insertThrottle;
+    final RequestStarter insertStarter;

     // Client stuff that needs to be configged - FIXME
     static final int MAX_ARCHIVE_HANDLERS = 200; // don't take up much RAM... 
FIXME
@@ -340,6 +345,12 @@
                }
                tempBucketFactory = new 
PaddedEphemerallyEncryptedBucketFactory(new 
TempBucketFactory(tempFilenameGenerator), random, 1024);
                archiveManager = new ArchiveManager(MAX_ARCHIVE_HANDLERS, 
MAX_CACHED_ARCHIVE_DATA, MAX_ARCHIVE_SIZE, MAX_ARCHIVED_FILE_SIZE, 
MAX_CACHED_ELEMENTS, random, tempFilenameGenerator);
+               requestThrottle = new RequestThrottle(5000, 2.0F);
+               requestStarter = new RequestStarter(requestThrottle, "Request 
starter ("+portNumber+")");
+               //insertThrottle = new ChainedRequestThrottle(10000, 2.0F, 
requestThrottle);
+               // FIXME reenable the above
+               insertThrottle = new RequestThrottle(10000, 2.0F);
+               insertStarter = new RequestStarter(insertThrottle, "Insert 
starter ("+portNumber+")");
     }

     void start(SwapRequestInterval interval) {
@@ -349,9 +360,16 @@
         usm.start();
     }

-    public KeyBlock getKey(ClientKey key, boolean localOnly) throws 
LowLevelGetException {
+    public KeyBlock getKey(ClientKey key, boolean localOnly, 
RequestStarterClient client) throws LowLevelGetException {
+       if(localOnly)
+               return realGetKey(key, localOnly);
+       else
+               return client.getKey(key, localOnly);
+    }
+    
+    public KeyBlock realGetKey(ClientKey key, boolean localOnly) throws 
LowLevelGetException {
        if(key instanceof ClientCHK)
-               return getCHK((ClientCHK)key, localOnly);
+               return realGetCHK((ClientCHK)key, localOnly);
        else
                throw new IllegalArgumentException("Not a CHK: "+key);
     }
@@ -360,7 +378,7 @@
      * Really trivially simple client interface.
      * Either it succeeds or it doesn't.
      */
-    public ClientCHKBlock getCHK(ClientCHK key, boolean localOnly) throws 
LowLevelGetException {
+    ClientCHKBlock realGetCHK(ClientCHK key, boolean localOnly) throws 
LowLevelGetException {
         Object o = makeRequestSender(key.getNodeCHK(), MAX_HTL, 
random.nextLong(), null, lm.loc.getValue(), localOnly);
         if(o instanceof CHKBlock) {
             try {
@@ -407,7 +425,11 @@
         }
     }

-    public void putCHK(ClientCHKBlock block) throws LowLevelPutException {
+    public void putCHK(ClientCHKBlock block, RequestStarterClient starter) 
throws LowLevelPutException {
+               starter.putCHK(block);
+    }
+    
+    public void realPutCHK(ClientCHKBlock block) throws LowLevelPutException {
         byte[] data = block.getData();
         byte[] headers = block.getHeader();
         PartiallyReceivedBlock prb = new 
PartiallyReceivedBlock(PACKETS_IN_BLOCK, PACKET_SIZE, data);
@@ -786,8 +808,8 @@
         writeNodeFile();
     }

-       public HighLevelSimpleClient makeClient() {
-               return new HighLevelSimpleClientImpl(this, archiveManager, 
tempBucketFactory, random);
+       public HighLevelSimpleClient makeClient(short prioClass, short prio) {
+               return new HighLevelSimpleClientImpl(this, archiveManager, 
tempBucketFactory, random, makeStarterClient(prioClass, prio, false), 
makeStarterClient(prioClass, prio, true));
        }

        private static class MemoryChecker implements Runnable {
@@ -804,4 +826,16 @@
                        }
                }
        }
+
+       public RequestThrottle getRequestThrottle() {
+               return requestThrottle;
+       }
+
+       public RequestThrottle getInsertThrottle() {
+               return insertThrottle;
+       }
+
+       public RequestStarterClient makeStarterClient(short prioClass, short 
prio, boolean inserts) {
+               return new RequestStarterClient(prioClass, prio, random, this, 
inserts ? insertStarter : requestStarter);
+       }
 }

Added: trunk/freenet/src/freenet/node/QueuedDataRequest.java
===================================================================
--- trunk/freenet/src/freenet/node/QueuedDataRequest.java       2005-11-16 
19:29:35 UTC (rev 7546)
+++ trunk/freenet/src/freenet/node/QueuedDataRequest.java       2005-11-17 
20:59:49 UTC (rev 7547)
@@ -0,0 +1,23 @@
+package freenet.node;
+
+import freenet.keys.ClientKey;
+import freenet.keys.KeyBlock;
+
+public class QueuedDataRequest extends QueuedRequest {
+
+       private final ClientKey key;
+       private final boolean localOnly;
+       private QueueingSimpleLowLevelClient client;
+       
+       public QueuedDataRequest(ClientKey key, boolean localOnly, 
QueueingSimpleLowLevelClient client) {
+               this.key = key;
+               this.localOnly = localOnly;
+               this.client = client;
+       }
+
+       public KeyBlock waitAndFetch() throws LowLevelGetException {
+               waitForSendClearance();
+               return client.realGetKey(key, localOnly);
+       }
+
+}

Added: trunk/freenet/src/freenet/node/QueuedInsertRequest.java
===================================================================
--- trunk/freenet/src/freenet/node/QueuedInsertRequest.java     2005-11-16 
19:29:35 UTC (rev 7546)
+++ trunk/freenet/src/freenet/node/QueuedInsertRequest.java     2005-11-17 
20:59:49 UTC (rev 7547)
@@ -0,0 +1,19 @@
+package freenet.node;
+
+import freenet.keys.ClientCHKBlock;
+
+public class QueuedInsertRequest extends QueuedRequest {
+
+       private final ClientCHKBlock block;
+       private QueueingSimpleLowLevelClient client;
+       
+       public QueuedInsertRequest(ClientCHKBlock block, 
QueueingSimpleLowLevelClient client) {
+               this.block = block;
+               this.client = client;
+       }
+
+       public void waitAndPut() throws LowLevelPutException {
+               waitForSendClearance();
+               client.realPutCHK(block);
+       }
+}

Added: trunk/freenet/src/freenet/node/QueuedRequest.java
===================================================================
--- trunk/freenet/src/freenet/node/QueuedRequest.java   2005-11-16 19:29:35 UTC 
(rev 7546)
+++ trunk/freenet/src/freenet/node/QueuedRequest.java   2005-11-17 20:59:49 UTC 
(rev 7547)
@@ -0,0 +1,32 @@
+package freenet.node;
+
+/**
+ * A request (including both DataRequest's and InsertRequest's) which can be 
queued
+ * by a RequestStarter.
+ */
+public abstract class QueuedRequest {
+
+       private boolean clearToSend = false;
+       
+       /**
+        * Shell for sending the request.
+        */
+       public final void clearToSend() {
+               synchronized(this) {
+                       clearToSend = true;
+                       notifyAll();
+               }
+       }
+
+       protected void waitForSendClearance() {
+               synchronized(this) {
+                       while(!clearToSend) {
+                               try {
+                                       wait(10*1000);
+                               } catch (InterruptedException e) {
+                                       // Ignore
+                               }
+                       }
+               }
+       }
+}

Added: trunk/freenet/src/freenet/node/QueueingSimpleLowLevelClient.java
===================================================================
--- trunk/freenet/src/freenet/node/QueueingSimpleLowLevelClient.java    
2005-11-16 19:29:35 UTC (rev 7546)
+++ trunk/freenet/src/freenet/node/QueueingSimpleLowLevelClient.java    
2005-11-17 20:59:49 UTC (rev 7547)
@@ -0,0 +1,16 @@
+package freenet.node;
+
+import freenet.client.InsertBlock;
+import freenet.keys.ClientCHKBlock;
+import freenet.keys.ClientKey;
+import freenet.keys.KeyBlock;
+
+interface QueueingSimpleLowLevelClient extends SimpleLowLevelClient {
+
+       /** Unqueued version. Only call from QueuedDataRequest ! */
+       KeyBlock realGetKey(ClientKey key, boolean localOnly) throws 
LowLevelGetException;
+
+       /** Ditto */
+       void realPutCHK(ClientCHKBlock block) throws LowLevelPutException;
+
+}

Modified: trunk/freenet/src/freenet/node/RealNodeRequestInsertTest.java
===================================================================
--- trunk/freenet/src/freenet/node/RealNodeRequestInsertTest.java       
2005-11-16 19:29:35 UTC (rev 7546)
+++ trunk/freenet/src/freenet/node/RealNodeRequestInsertTest.java       
2005-11-17 20:59:49 UTC (rev 7547)
@@ -9,6 +9,7 @@
 import freenet.keys.CHKEncodeException;
 import freenet.keys.ClientCHK;
 import freenet.keys.ClientCHKBlock;
+import freenet.keys.ClientKey;
 import freenet.node.PeerNode;
 import freenet.support.Fields;
 import freenet.support.HexUtil;
@@ -70,6 +71,10 @@
             a.peers.connect(b.exportFieldSet());
             b.peers.connect(a.exportFieldSet());
         }
+        
+        RequestStarterClient[] starters = new 
RequestStarterClient[NUMBER_OF_NODES];
+        for(int i=0;i<starters.length;i++)
+               starters[i] = 
nodes[i].makeStarterClient(RequestStarter.INTERACTIVE_PRIORITY_CLASS, (short)0, 
false); // pretend are all requests

         Logger.normal(RealNodeRoutingTest.class, "Added random links");

@@ -164,7 +169,7 @@
                 String dataString = baseString + requestNumber;
                 // Pick random node to insert to
                 int node1 = random.nextInt(NUMBER_OF_NODES);
-                Node randomNode = nodes[random.nextInt(NUMBER_OF_NODES)];
+                Node randomNode = nodes[node1];
                 Logger.error(RealNodeRequestInsertTest.class,"Inserting: 
\""+dataString+"\" to "+node1);
                 byte[] data = dataString.getBytes();
                 ClientCHKBlock block;
@@ -176,7 +181,7 @@
                 Logger.error(RealNodeRequestInsertTest.class, "Decoded: "+new 
String(newBlock.memoryDecode(chk)));
                 Logger.error(RealNodeRequestInsertTest.class,"CHK: 
"+chk.getURI());
                 Logger.error(RealNodeRequestInsertTest.class,"Headers: 
"+HexUtil.bytesToHex(block.getHeader()));
-                randomNode.putCHK(block);
+                randomNode.putCHK(block, starters[node1]);
                 Logger.error(RealNodeRequestInsertTest.class, "Inserted to 
"+node1);
                 Logger.error(RealNodeRequestInsertTest.class, "Data: 
"+Fields.hashCode(encData)+", Headers: "+Fields.hashCode(encHeaders));
                 // Pick random node to request from
@@ -185,7 +190,7 @@
                     node2 = random.nextInt(NUMBER_OF_NODES);
                 } while(node2 == node1);
                 Node fetchNode = nodes[node2];
-                block = fetchNode.getCHK(chk, false);
+                block = (ClientCHKBlock) fetchNode.getKey((ClientKey) chk, 
false, starters[node2]);
                 if(block == null) {
                     Logger.error(RealNodeRequestInsertTest.class, "Fetch 
FAILED from "+node2);
                     requestsAvg.report(0.0);

Modified: trunk/freenet/src/freenet/node/RequestSender.java
===================================================================
--- trunk/freenet/src/freenet/node/RequestSender.java   2005-11-16 19:29:35 UTC 
(rev 7546)
+++ trunk/freenet/src/freenet/node/RequestSender.java   2005-11-17 20:59:49 UTC 
(rev 7547)
@@ -40,6 +40,7 @@
     final long uid;
     final Node node;
     private double nearestLoc;
+    private final long startTime;
     /** The source of this request if any - purely so we can avoid routing to 
it */
     final PeerNode source;
     private PartiallyReceivedBlock prb = null;
@@ -65,6 +66,7 @@

     public RequestSender(NodeCHK key, short htl, long uid, Node n, double 
nearestLoc, 
             PeerNode source) {
+       startTime = System.currentTimeMillis();
         this.key = key;
         this.htl = htl;
         this.uid = uid;
@@ -295,8 +297,15 @@

     private void finish(int code) {
         Logger.minor(this, "finish("+code+")");
+        if(status != NOT_FINISHED)
+               throw new IllegalStateException("finish() called with "+code+" 
when was already "+status);
         status = code;
-
+        
+        if(status == REJECTED_OVERLOAD)
+               node.getRequestThrottle().requestRejectedOverload();
+        else if(status == SUCCESS || status == ROUTE_NOT_FOUND || status == 
DATA_NOT_FOUND || status == VERIFY_FAILURE)
+               
node.getRequestThrottle().requestCompleted(System.currentTimeMillis() - 
startTime);
+        
         synchronized(this) {
             notifyAll();
         }

Added: trunk/freenet/src/freenet/node/RequestStarter.java
===================================================================
--- trunk/freenet/src/freenet/node/RequestStarter.java  2005-11-16 19:29:35 UTC 
(rev 7546)
+++ trunk/freenet/src/freenet/node/RequestStarter.java  2005-11-17 20:59:49 UTC 
(rev 7547)
@@ -0,0 +1,187 @@
+package freenet.node;
+
+import java.util.LinkedList;
+import java.util.Vector;
+
+import freenet.support.Logger;
+import freenet.support.UpdatableSortedLinkedList;
+import freenet.support.UpdatableSortedLinkedListKilledException;
+
+/**
+ * Starts requests.
+ * Nobody starts a request directly, you have to go through RequestStarter.
+ * And you have to provide a RequestStarterClient. We do round robin between 
+ * clients on the same priority level.
+ */
+public class RequestStarter implements Runnable {
+
+       /*
+        * Priority classes
+        */
+       /** Anything more important than fproxy */
+       public static final short MAXIMUM_PRIORITY_CLASS = 0;
+       /** Fproxy etc */
+       public static final short INTERACTIVE_PRIORITY_CLASS = 1;
+       /** Fproxy splitfile fetches */
+       public static final short IMMEDIATE_SPLITFILE_PRIORITY_CLASS = 2;
+       /** USK updates etc */
+       public static final short UPDATE_PRIORITY_CLASS = 3;
+       /** Bulk splitfile fetches */
+       public static final short BULK_SPLITFILE_PRIORITY_CLASS = 4;
+       /** Prefetch */
+       public static final short PREFETCH_PRIORITY_CLASS = 5;
+       /** Anything less important than prefetch (redundant??) */
+       public static final short MINIMUM_PRIORITY_CLASS = 6;
+       
+       // Clients registered
+       final Vector clientsByPriority;
+       final RequestThrottle throttle;
+       /*
+        * Clients which are ready.
+        * How do we do round-robin?
+        * Have a list of clients which are ready to go, in priority order, and
+        * haven't gone this cycle.
+        * Have a list of clients which are ready to go next cycle, in priority
+        * order.
+        * Have each client track the cycle number in which it was last sent.
+        */
+       final UpdatableSortedLinkedList clientsReadyThisCycle;
+       final UpdatableSortedLinkedList clientsReadyNextCycle;
+       /** Increment every time we go through the whole list */
+       long cycleNumber;
+       
+       public RequestStarter(RequestThrottle throttle, String name) {
+               clientsByPriority = new Vector();
+               clientsReadyThisCycle = new UpdatableSortedLinkedList();
+               clientsReadyNextCycle = new UpdatableSortedLinkedList();
+               cycleNumber = 0;
+               this.throttle = throttle;
+               this.name = name;
+               Thread t = new Thread(this, "Request starter");
+               t.setDaemon(true);
+               t.start();
+       }
+
+       final String name;
+       
+       public String toString() {
+               return name;
+       }
+       
+       public synchronized void registerClient(RequestStarterClient client) {
+               int p = client.priority;
+               LinkedList prio = makePriority(p);
+               prio.add(client);
+       }
+
+       public synchronized void notifyReady(RequestStarterClient client) {
+               Logger.minor(this, "notifyReady("+client+")");
+               try {
+                       if(client.getCycleLastSent() == cycleNumber) {
+                               clientsReadyNextCycle.addOrUpdate(client);
+                       } else {
+                               // Can send immediately
+                               clientsReadyThisCycle.addOrUpdate(client);
+                       }
+               } catch (UpdatableSortedLinkedListKilledException e) {
+                       throw new Error(e);
+               }
+               notifyAll();
+       }
+       
+       private synchronized LinkedList makePriority(int p) {
+               while(p >= clientsByPriority.size()) {
+                       clientsByPriority.add(new LinkedList());
+               }
+               return (LinkedList) clientsByPriority.get(p);
+       }
+
+       public void run() {
+               long sentRequestTime = System.currentTimeMillis();
+               while(true) {
+                       RequestStarterClient client;
+                       client = getNextClient();
+                       Logger.minor(this, "getNextClient() = "+client);
+                       if(client != null) {
+                               boolean success;
+                               try {
+                                       success = client.send(cycleNumber);
+                               } catch (Throwable t) {
+                                       Logger.error(this, "Caught "+t);
+                                       continue;
+                               }
+                               if(success) {
+                                       sentRequestTime = 
System.currentTimeMillis();
+                                       Logger.minor(this, "Sent");
+                                       if(client.isReady()) {
+                                               synchronized(this) {
+                                                       try {
+                                                               
clientsReadyNextCycle.addOrUpdate(client);
+                                                       } catch 
(UpdatableSortedLinkedListKilledException e) {
+                                                               // Impossible
+                                                               throw new 
Error(e);
+                                                       }
+                                               }
+                                       }
+                               }
+                       }
+                       while(true) {
+                               long delay = throttle.getDelay();
+                               long sleepUntil = sentRequestTime + delay;
+                               long now = System.currentTimeMillis();
+                               if(sleepUntil < now) {
+                                       if(waitingClients()) break;
+                                       // Otherwise wait for notification
+                                       try {
+                                               synchronized(this) {
+                                                       wait(1000);
+                                               }
+                                       } catch (InterruptedException e) {
+                                               // Ignore
+                                       }
+                               } else {
+                                       Logger.minor(this, 
"delay="+delay+"("+throttle+") sleep for "+(sleepUntil-now)+" for "+this);
+                                       if(sleepUntil - now > 0)
+                                               try {
+                                                       synchronized(this) {
+                                                               // At most 
sleep 500ms, then recompute.
+                                                               
wait(Math.min(sleepUntil - now, 500));
+                                                       }
+                                               } catch (InterruptedException 
e) {
+                                                       // Ignore
+                                               }
+                               }
+                       }
+               }
+       }
+
+       private synchronized boolean waitingClients() {
+               return !(clientsReadyThisCycle.isEmpty() && 
clientsReadyNextCycle.isEmpty());
+       }
+
+       /**
+        * Get the next ready client.
+        */
+       private synchronized RequestStarterClient getNextClient() {
+               try {
+                       while(true) {
+                       if(clientsReadyThisCycle.isEmpty() && 
clientsReadyNextCycle.isEmpty())
+                               return null;
+                       if(clientsReadyThisCycle.isEmpty()) {
+                               cycleNumber++;
+                               
clientsReadyNextCycle.moveTo(clientsReadyThisCycle);
+                       }
+                       RequestStarterClient c = (RequestStarterClient) 
clientsReadyThisCycle.removeLowest();
+                       if(c.getCycleLastSent() == cycleNumber) {
+                               clientsReadyNextCycle.add(c);
+                               continue;
+                       } else {
+                               c.setCycleLastSet(cycleNumber);
+                               return c;
+                       }
+                       }
+               } catch (UpdatableSortedLinkedListKilledException e) {
+                       throw new Error(e);
+               }
+       }
+}

Added: trunk/freenet/src/freenet/node/RequestStarterClient.java
===================================================================
--- trunk/freenet/src/freenet/node/RequestStarterClient.java    2005-11-16 
19:29:35 UTC (rev 7546)
+++ trunk/freenet/src/freenet/node/RequestStarterClient.java    2005-11-17 
20:59:49 UTC (rev 7547)
@@ -0,0 +1,116 @@
+package freenet.node;
+
+import java.util.Vector;
+
+import freenet.crypt.RandomSource;
+import freenet.keys.ClientCHKBlock;
+import freenet.keys.ClientKey;
+import freenet.keys.KeyBlock;
+import freenet.support.DoublyLinkedList;
+import freenet.support.UpdatableSortedLinkedListItemImpl;
+
+/**
+ * Interface to clients for starting a request.
+ * Also represents a single client for fairness purposes.
+ */
+public class RequestStarterClient extends UpdatableSortedLinkedListItemImpl {
+
+       final int priority;
+       private int random;
+       private long cycleLastSent;
+       private final Vector requests;
+       private final RandomSource rs;
+       private final QueueingSimpleLowLevelClient client;
+       private final RequestStarter starter;
+
+       public RequestStarterClient(short prioClass, short prio, RandomSource 
r, QueueingSimpleLowLevelClient c, RequestStarter starter) {
+               this((prioClass << 16) + prio, r, c, starter);
+       }
+       
+       private RequestStarterClient(int prio, RandomSource r, 
QueueingSimpleLowLevelClient c, RequestStarter starter) {
+               priority = prio;
+               this.random = r.nextInt();
+               this.starter = starter;
+               this.cycleLastSent = -1;
+               this.requests = new Vector();
+               this.rs = r;
+               this.client = c;
+               starter.registerClient(this);
+       }
+       
+       /**
+        * Blocking fetch of a key.
+        * @throws LowLevelGetException If the fetch failed for some reason.
+        */
+       public KeyBlock getKey(ClientKey key, boolean localOnly) throws 
LowLevelGetException {
+               QueuedDataRequest qdr = new QueuedDataRequest(key, localOnly, 
client);
+               addRequest(qdr);
+               return qdr.waitAndFetch();
+       }
+       
+       /**
+        * Blocking insert of a key.
+        * @throws LowLevelPutException If the fetch failed for some reason.
+        */
+       public void putCHK(ClientCHKBlock block) throws LowLevelPutException {
+               QueuedInsertRequest qir = new QueuedInsertRequest(block, 
client);
+               addRequest(qir);
+               qir.waitAndPut();
+       }
+       
+       void addRequest(QueuedRequest qr) {
+               synchronized(this) {
+                       requests.add(qr);
+               }
+               if(starter != null)
+                       starter.notifyReady(this);
+       }
+       
+       public long getCycleLastSent() {
+               return cycleLastSent;
+       }
+
+       private DoublyLinkedList parentList;
+       
+       public DoublyLinkedList getParent() {
+               return parentList;
+       }
+
+       public DoublyLinkedList setParent(DoublyLinkedList l) {
+               DoublyLinkedList oldList = parentList;
+               parentList = l;
+               return oldList;
+       }
+
+       public int compareTo(Object o) {
+               if(this == o) return 0;
+               RequestStarterClient c = (RequestStarterClient) o;
+               if(priority > c.priority) return 1;
+               if(priority < c.priority) return -1;
+               if(random > c.random) return 1;
+               if(random < c.random) return -1;
+               return 0;
+       }
+
+       public synchronized boolean isReady() {
+               return !requests.isEmpty();
+       }
+
+       public boolean send(long cycleNumber) {
+               QueuedRequest qr;
+               synchronized(this) {
+                       if(!requests.isEmpty()) {
+                               int x = rs.nextInt(requests.size());
+                               qr = (QueuedRequest) requests.remove(x);
+                       } else qr = null;
+               }
+               if(qr == null) return false;
+               qr.clearToSend();
+               return true;
+       }
+
+       public void setCycleLastSet(long cycleNumber) {
+               this.cycleLastSent = cycleNumber;
+       }
+
+}

Added: trunk/freenet/src/freenet/node/RequestThrottle.java
===================================================================
--- trunk/freenet/src/freenet/node/RequestThrottle.java 2005-11-16 19:29:35 UTC 
(rev 7546)
+++ trunk/freenet/src/freenet/node/RequestThrottle.java 2005-11-17 20:59:49 UTC 
(rev 7547)
@@ -0,0 +1,69 @@
+package freenet.node;
+
+import freenet.support.math.BootstrappingDecayingRunningAverage;
+
+/**
+ * Keeps track of the current request send rate.
+ */
+public class RequestThrottle {
+
+       protected static final float PACKET_DROP_DECREASE_MULTIPLE = 0.97f;
+       protected static final float PACKET_TRANSMIT_INCREMENT = (4 * (1 - 
(PACKET_DROP_DECREASE_MULTIPLE * PACKET_DROP_DECREASE_MULTIPLE))) / 3;
+       protected static final long MAX_DELAY = 5*60*1000;
+       protected static final long MIN_DELAY = 20;
+       public static final long DEFAULT_DELAY = 200;
+       private long _totalPackets = 0, _droppedPackets = 0;
+       private double _simulatedWindowSize = 2;
+       private final BootstrappingDecayingRunningAverage roundTripTime; 
+
+       RequestThrottle(long rtt, float winSize) {
+               _simulatedWindowSize = 2;
+               roundTripTime = new BootstrappingDecayingRunningAverage(rtt, 
10, 5*60*1000, 10);
+       }
+       
+       /**
+        * Get the current inter-request delay.
+        */
+       public synchronized long getDelay() {
+               double rtt = roundTripTime.currentValue();
+               double winSizeForMinPacketDelay = rtt / MIN_DELAY;
+               if (_simulatedWindowSize > winSizeForMinPacketDelay) {
+                       _simulatedWindowSize = winSizeForMinPacketDelay;
+               }
+               if (_simulatedWindowSize < 1.0) {
+                       _simulatedWindowSize = 1.0F;
+               }
+               // return (long) (_roundTripTime / _simulatedWindowSize);
+               return Math.max(MIN_DELAY, Math.min((long) (rtt / 
_simulatedWindowSize), MAX_DELAY));
+       }
+
+       /**
+        * Report that a request completed successfully, and the
+        * time it took.
+        */
+       public synchronized void requestCompleted(long time) {
+               setRoundTripTime(time);
+        _totalPackets++;
+        _simulatedWindowSize += PACKET_TRANSMIT_INCREMENT;
+       }
+
+       /**
+        * Report that a request got RejectedOverload.
+        * Do not report the time it took, because it is irrelevant.
+        */
+       public synchronized void requestRejectedOverload() {
+               _droppedPackets++;
+               _totalPackets++;
+               _simulatedWindowSize *= PACKET_DROP_DECREASE_MULTIPLE;
+       }
+       
+       private synchronized void setRoundTripTime(long rtt) {
+               roundTripTime.report(Math.max(rtt, 10));
+       }
+
+       public synchronized String toString() {
+               return  getDelay()+" ms, (w: "
+                               + _simulatedWindowSize + ", r:" + 
roundTripTime.currentValue() + ", d:"
+                               + (((float) _droppedPackets / (float) 
_totalPackets)) + "="+_droppedPackets+"/"+_totalPackets + ")";
+       }
+}

Modified: trunk/freenet/src/freenet/node/SimpleLowLevelClient.java
===================================================================
--- trunk/freenet/src/freenet/node/SimpleLowLevelClient.java    2005-11-16 
19:29:35 UTC (rev 7546)
+++ trunk/freenet/src/freenet/node/SimpleLowLevelClient.java    2005-11-17 
20:59:49 UTC (rev 7547)
@@ -17,10 +17,10 @@
     /**
      * Fetch a key. Throws if it cannot fetch it.
      */
-    public KeyBlock getKey(ClientKey key, boolean localOnly) throws 
LowLevelGetException;
+    public KeyBlock getKey(ClientKey key, boolean localOnly, 
RequestStarterClient client) throws LowLevelGetException;

     /**
      * Insert a key.
      */
-    public void putCHK(ClientCHKBlock key) throws LowLevelPutException;
+    public void putCHK(ClientCHKBlock key, RequestStarterClient sctx) throws 
LowLevelPutException;
 }

Modified: trunk/freenet/src/freenet/node/TextModeClientInterface.java
===================================================================
--- trunk/freenet/src/freenet/node/TextModeClientInterface.java 2005-11-16 
19:29:35 UTC (rev 7546)
+++ trunk/freenet/src/freenet/node/TextModeClientInterface.java 2005-11-17 
20:59:49 UTC (rev 7547)
@@ -46,14 +46,18 @@
     final Node n;
     final HighLevelSimpleClient client;
     final Hashtable streams;
+    final RequestStarterClient requestStarterClient;
+    final RequestStarterClient insertStarterClient;

     TextModeClientInterface(Node n) {
         this.n = n;
-        client = n.makeClient();
+        client = n.makeClient(RequestStarter.INTERACTIVE_PRIORITY_CLASS, 
(short)0);
         client.addGlobalHook(new EventDumper());
         this.r = n.random;
         streams = new Hashtable();
-        new Thread(this).start();
+        new Thread(this, "Text mode client interface").start();
+        this.requestStarterClient = 
n.makeStarterClient(RequestStarter.INTERACTIVE_PRIORITY_CLASS, (short)0, false);
+        this.insertStarterClient = 
n.makeStarterClient(RequestStarter.INTERACTIVE_PRIORITY_CLASS, (short)0, true);
     }

     /**

Modified: trunk/freenet/src/freenet/node/Version.java
===================================================================
--- trunk/freenet/src/freenet/node/Version.java 2005-11-16 19:29:35 UTC (rev 
7546)
+++ trunk/freenet/src/freenet/node/Version.java 2005-11-17 20:59:49 UTC (rev 
7547)
@@ -20,10 +20,10 @@
        public static final String protocolVersion = "1.0";

        /** The build number of the current revision */
-       public static final int buildNumber = 183;
+       public static final int buildNumber = 184;

        /** Oldest build of Fred we will talk to */
-       public static final int lastGoodBuild = 183;
+       public static final int lastGoodBuild = 184;

        /** The highest reported build of fred */
        public static int highestSeenBuild = buildNumber;

Modified: trunk/freenet/src/freenet/support/UpdatableSortedLinkedList.java
===================================================================
--- trunk/freenet/src/freenet/support/UpdatableSortedLinkedList.java    
2005-11-16 19:29:35 UTC (rev 7546)
+++ trunk/freenet/src/freenet/support/UpdatableSortedLinkedList.java    
2005-11-17 20:59:49 UTC (rev 7547)
@@ -2,6 +2,8 @@

 import java.util.Enumeration;

+import freenet.node.RequestStarterClient;
+
 /**
  * @author amphibian
  * 
@@ -88,6 +90,14 @@
         return item;
     }

+       public synchronized void addOrUpdate(UpdatableSortedLinkedListItem 
item) throws UpdatableSortedLinkedListKilledException {
+               if(item.getParent() == list)
+                       update(item);
+               else if(item.getParent() == null)
+                       add(item);
+               else throw new IllegalStateException("Item "+item+" should be 
on our list: "+list+" or null, but is "+item.getParent());
+       }
+       
     public synchronized void update(UpdatableSortedLinkedListItem i) throws 
UpdatableSortedLinkedListKilledException {
        if(killed) throw new UpdatableSortedLinkedListKilledException();
         Logger.minor(this, "Update("+i+") on "+this);
@@ -223,4 +233,20 @@
        clear();
        killed = true;
     }
+
+       public synchronized UpdatableSortedLinkedListItem removeLowest() throws 
UpdatableSortedLinkedListKilledException {
+               if(isEmpty()) return null;
+               UpdatableSortedLinkedListItem i = getLowest();
+               remove(i);
+               return i;
+       }
+
+       public synchronized void moveTo(UpdatableSortedLinkedList dest) throws 
UpdatableSortedLinkedListKilledException {
+               Enumeration e = list.elements();
+               while(e.hasMoreElements()) {
+                       UpdatableSortedLinkedListItem item = 
(UpdatableSortedLinkedListItem) e.nextElement();
+                       remove(item);
+                       dest.add(item);
+               }
+       }
 }


Reply via email to