Author: toad
Date: 2006-07-07 20:20:09 +0000 (Fri, 07 Jul 2006)
New Revision: 9497

Modified:
   trunk/freenet/src/freenet/io/comm/DMT.java
   trunk/freenet/src/freenet/io/comm/LowLevelFilter.java
   trunk/freenet/src/freenet/io/comm/UdpSocketManager.java
   trunk/freenet/src/freenet/io/xfer/BlockTransmitter.java
   trunk/freenet/src/freenet/node/CHKInsertSender.java
   trunk/freenet/src/freenet/node/FNPPacketMangler.java
   trunk/freenet/src/freenet/node/InsertHandler.java
   trunk/freenet/src/freenet/node/KeyTracker.java
   trunk/freenet/src/freenet/node/LocationManager.java
   trunk/freenet/src/freenet/node/MessageItem.java
   trunk/freenet/src/freenet/node/Node.java
   trunk/freenet/src/freenet/node/NodeDispatcher.java
   trunk/freenet/src/freenet/node/PacketSender.java
   trunk/freenet/src/freenet/node/PeerManager.java
   trunk/freenet/src/freenet/node/PeerNode.java
   trunk/freenet/src/freenet/node/RequestHandler.java
   trunk/freenet/src/freenet/node/SSKInsertHandler.java
   trunk/freenet/src/freenet/node/SSKInsertSender.java
   trunk/freenet/src/freenet/node/SendMessageOnErrorCallback.java
   trunk/freenet/src/freenet/node/Version.java
   trunk/freenet/src/freenet/support/BitArray.java
   trunk/freenet/src/freenet/support/DoubleTokenBucket.java
   trunk/freenet/src/freenet/support/TokenBucket.java
   trunk/freenet/src/freenet/support/math/TimeDecayingRunningAverage.java
Log:
862: New bandwidth limiting algorithm, and related changes. Not fully tested 
yet.

Modified: trunk/freenet/src/freenet/io/comm/DMT.java
===================================================================
--- trunk/freenet/src/freenet/io/comm/DMT.java  2006-07-07 19:29:24 UTC (rev 
9496)
+++ trunk/freenet/src/freenet/io/comm/DMT.java  2006-07-07 20:20:09 UTC (rev 
9497)
@@ -172,6 +172,11 @@
                return msg;
        }

+       public static int packetTransmitSize(int size, int _packets) {
+               return size + 8 /* uid */ + 4 /* packet# */ + 
+                       BitArray.serializedLength(_packets) + 4 /* Message 
header */;
+       }
+       
        public static final MessageType allSent = new MessageType("allSent") {{
                addField(UID, Long.class);
        }};

Modified: trunk/freenet/src/freenet/io/comm/LowLevelFilter.java
===================================================================
--- trunk/freenet/src/freenet/io/comm/LowLevelFilter.java       2006-07-07 
19:29:24 UTC (rev 9496)
+++ trunk/freenet/src/freenet/io/comm/LowLevelFilter.java       2006-07-07 
20:20:09 UTC (rev 9497)
@@ -21,20 +21,9 @@
      */
     void process(byte[] buf, int offset, int length, Peer peer);

+    // Outgoing packets are handled elsewhere...
+    
     /**
-     * Process an outgoing packet. Takes a byte[], which is an
-     * encoded message. Then does whatever encryption or other
-     * things need doing, and calls UdpSocketManager.sendPacket(...)
-     * to send the processed data.
-     * @param buf The buffer to read from.
-     * @param offset The offset to start reading from.
-     * @param length The length in bytes to read.
-     * @param peer The PeerContext the messages will be sent to.
-     * @throws PacketSequenceException 
-     */
-    void processOutgoing(byte[] buf, int offset, int length, PeerContext peer) 
throws NotConnectedException, LowLevelFilterException;
-
-    /**
      * Is the given connection closed?
      */
     boolean isDisconnected(PeerContext context);

Modified: trunk/freenet/src/freenet/io/comm/UdpSocketManager.java
===================================================================
--- trunk/freenet/src/freenet/io/comm/UdpSocketManager.java     2006-07-07 
19:29:24 UTC (rev 9496)
+++ trunk/freenet/src/freenet/io/comm/UdpSocketManager.java     2006-07-07 
20:20:09 UTC (rev 9497)
@@ -503,7 +503,7 @@
 //             } else {
 //                 sendPacket(blockToSend, destination.getPeer());
 //             }
-               ((PeerNode)destination).sendAsync(m, null);
+               ((PeerNode)destination).sendAsync(m, null, 0);
        }

        /**

Modified: trunk/freenet/src/freenet/io/xfer/BlockTransmitter.java
===================================================================
--- trunk/freenet/src/freenet/io/xfer/BlockTransmitter.java     2006-07-07 
19:29:24 UTC (rev 9496)
+++ trunk/freenet/src/freenet/io/xfer/BlockTransmitter.java     2006-07-07 
20:20:09 UTC (rev 9497)
@@ -29,8 +29,10 @@
 import freenet.io.comm.NotConnectedException;
 import freenet.io.comm.PeerContext;
 import freenet.io.comm.UdpSocketManager;
+import freenet.node.FNPPacketMangler;
 import freenet.node.PeerNode;
 import freenet.support.BitArray;
+import freenet.support.DoubleTokenBucket;
 import freenet.support.Logger;

 /**
@@ -52,116 +54,17 @@
        boolean failedByOverload = false;
        final PacketThrottle throttle;
        long timeAllSent = -1;
+       final DoubleTokenBucket _masterThrottle;
+       final int PACKET_SIZE;

-       // FIXME make this stuff non-static. Have a context object for limiting.
-       
-       // Static stuff for global bandwidth limiter
-       /** Synchronization object for bandwidth limiting */
-       static final Object lastPacketSendTimeSync = new Object();
-       // Use nanosecond long values for accuracy reasons
-       /** Time at which the last known packet is scheduled to be sent.
-        * We will not send another packet until at least minPacketDelay ms 
after this time. */
-       static long hardLastPacketSendTimeNSec = System.currentTimeMillis() * 
1000*1000;
-       /** Minimum interval between packet sends, for overall hard bandwidth 
limiter */
-       static int minPacketDelayNSec = 0;
-       /** Minimum average interval between packet sends, for averaged (soft) 
overall
-        * bandwidth usage limiter. */
-       static int minSoftDelayNSec = 0;
-       /** "Soft" equivalent to hardLastPacketSendTime. Can lag up to half the 
softLimitPeriod
-        * behind the current time. Otherwise is similar. This gives it 
flexibility; we can have
-        * spurts above the average limit, but over the softLimitPeriod, it 
will average out to 
-        * the target minSoftDelay. */
-       static long softLastPacketSendTimeNSec = System.currentTimeMillis() * 
1000*1000;
-       /** Period over which the soft limiter should work */
-       static long softLimitPeriodNSec;
-
-       /**
-        * Set the hard bandwidth limiter.
-        * @param bytesPerSecond The maximum number of bytes (of data blocks) 
to be sent in any
-        * one second.
-        */
-       public static void setHardBandwidthLimit(int bytesPerSecond) {
-               int newMinPacketDelayNS = 
convertBytesPerSecondToNanosPerPacket(bytesPerSecond);
-               synchronized(lastPacketSendTimeSync) {
-                       if(minPacketDelayNSec != newMinPacketDelayNS) {
-                               minPacketDelayNSec = newMinPacketDelayNS;
-                               hardLastPacketSendTimeNSec = 
System.currentTimeMillis() * 1000*1000;
-                       }
-               }
-       }
-
-       public static int convertBytesPerSecondToNanosPerPacket(int 
bytesPerSecond) {
-               if(bytesPerSecond <= 0)
-                       return 0; // no limits
-               
-               int packetSize = getPacketSize();
-               double minNanoSecondsBetweenPackets =
-                       ((1000.0*1000.0*1000.0) * packetSize) / ((double) 
bytesPerSecond);
-               int newMinPacketDelayNS = (int) minNanoSecondsBetweenPackets;
-               double inaccuracy = minNanoSecondsBetweenPackets - 
newMinPacketDelayNS;
-               double inaccuracyPercent = (inaccuracy / 
minNanoSecondsBetweenPackets) * 100.0;
-               Logger.minor(BlockTransmitter.class, "Quantization inaccuracy: 
"+inaccuracyPercent+"%");
-               return newMinPacketDelayNS;
-       }
-       
-       public static int convertBytesPerPeriodToNanosPerPacket(int 
bytesPerSecond, long periodLengthNanos) {
-               if(bytesPerSecond <= 0)
-                       return 0; // no limits
-               
-               int packetSize = getPacketSize();
-               double minNanoSecondsBetweenPackets =
-                       (periodLengthNanos * packetSize) / ((double) 
bytesPerSecond);
-               int newMinPacketDelayNS = (int) minNanoSecondsBetweenPackets;
-               double inaccuracy = minNanoSecondsBetweenPackets - 
newMinPacketDelayNS;
-               double inaccuracyPercent = (inaccuracy / 
minNanoSecondsBetweenPackets) * 100.0;
-               Logger.minor(BlockTransmitter.class, "Quantization inaccuracy: 
"+inaccuracyPercent+"%");
-               return newMinPacketDelayNS;
-       }
-       
-       public static int convertNanosPerPacketToBytesPerSecond(int delay) {
-               if(delay == 0) return 0;
-               return (int) (((1000.0*1000.0*1000.0) * getPacketSize()) / 
((double)delay));
-       }
-
-       /** @return The average packet size for a block sent by a 
BlockTransmitter, including all 
-        * headers and protocol overhead */
-       private static int getPacketSize() {
-               // FIXME make this more accurate!
-               return 1024 + 200;
-       }
-
-       public static int getHardBandwidthLimit() {
-               int delay;
-               synchronized(lastPacketSendTimeSync) {
-                       delay = minPacketDelayNSec;
-               }
-               return convertNanosPerPacketToBytesPerSecond(delay);
-       }
-
-       /**
-        * Set the long-term bandwidth limiter.
-        * @param bytes The number of bytes to allow at most over the period. 
(in data packets)
-        * @param period The length of time over which the limit should apply. 
(ms)
-        */
-       public static void setSoftBandwidthLimit(int bytes, long period) {
-               if(period > Long.MAX_VALUE / (1000*1000)) throw new 
IllegalArgumentException("Period too long");
-               int newSoftLimit = convertBytesPerPeriodToNanosPerPacket(bytes, 
period);
-               period = period * 1000 * 1000;
-               synchronized(lastPacketSendTimeSync) {
-                       minSoftDelayNSec = newSoftLimit;
-                       softLimitPeriodNSec = period;
-                       long nowNS = System.currentTimeMillis() * 1000 * 1000;
-                       if(nowNS - softLastPacketSendTimeNSec > period / 2) {
-                               softLastPacketSendTimeNSec = nowNS - (period / 
2);
-                       }
-               }
-       }
-       
-       public BlockTransmitter(UdpSocketManager usm, PeerContext destination, 
long uid, PartiallyReceivedBlock source) {
+       public BlockTransmitter(UdpSocketManager usm, PeerContext destination, 
long uid, PartiallyReceivedBlock source, DoubleTokenBucket masterThrottle) {
                _usm = usm;
                _destination = destination;
                _uid = uid;
                _prb = source;
+               this._masterThrottle = masterThrottle;
+               PACKET_SIZE = DMT.packetTransmitSize(_prb._packetSize, 
_prb._packets)
+                       + FNPPacketMangler.HEADERS_LENGTH_ONE_MESSAGE;
                try {
                        _sentPackets = new BitArray(_prb.getNumPackets());
                } catch (AbortedException e) {
@@ -177,7 +80,7 @@
                                                long startCycleTime = 
System.currentTimeMillis();
                                                try {
                                                        while (true) {
-                                                               
synchronized(_unsent) {
+                                                               
synchronized(_senderThread) {
                                                                        
if(_unsent.size() != 0) break;
                                                                        // No 
unsent packets
                                                                        
if(getNumSent() == _prb.getNumPackets()) {
@@ -185,9 +88,7 @@
                                                                                
if(timeAllSent <= 0)
                                                                                
        timeAllSent = System.currentTimeMillis();
                                                                        }
-                                                               }
-                                                               
if(_sendComplete) return;
-                                                               synchronized 
(_senderThread) {
+                                                                       
if(_sendComplete) return;
                                                                        
_senderThread.wait(10*1000);
                                                                }
                                                        }
@@ -200,35 +101,25 @@
                                                        }
                                                        return;
                                                }
-                                               long startDelayTime = 
System.currentTimeMillis();
-                                               delay(startCycleTime);
                                                int packetNo;
                                                try {
-                                                       synchronized(_unsent) {
+                                                       
synchronized(_senderThread) {
                                                                packetNo = 
((Integer) _unsent.removeFirst()).intValue();
                                                        }
                                                } catch (NoSuchElementException 
nsee) {
                                                        // back up to the top 
to check for completion
                                                        continue;
                                                }
+                                               delay(startCycleTime);
                                                _sentPackets.setBit(packetNo, 
true);
                                                try {
-                                                       long endDelayTime = 
System.currentTimeMillis();
-                                                       long delayTime = 
endDelayTime - startDelayTime;
-                                                       
((PeerNode)_destination).reportThrottledPacketSendTime(delayTime);
-                                                       
((PeerNode)_destination).sendAsync(DMT.createPacketTransmit(_uid, packetNo, 
_sentPackets, _prb.getPacket(packetNo)), null);
-                                                       // May have been delays 
in sending, so update to avoid sending more frequently than allowed
-                                                       long nowNS = 
System.currentTimeMillis() * 1000 * 1000;
-                                                       
synchronized(lastPacketSendTimeSync) {
-                                                               
if(hardLastPacketSendTimeNSec < nowNS)
-                                                                       
hardLastPacketSendTimeNSec = nowNS;
-                                                       }
+                                                       
((PeerNode)_destination).sendAsync(DMT.createPacketTransmit(_uid, packetNo, 
_sentPackets, _prb.getPacket(packetNo)), null, PACKET_SIZE);
                                                // We accelerate the ping rate 
during the transfer to keep a closer eye on round-trip-time
                                                sentSinceLastPing++;
                                                if (sentSinceLastPing >= 
PING_EVERY) {
                                                        sentSinceLastPing = 0;
                                                        
//_usm.send(BlockTransmitter.this._destination, DMT.createPing());
-                                                       
((PeerNode)_destination).sendAsync(DMT.createPing(), null);
+                                                       
((PeerNode)_destination).sendAsync(DMT.createPing(), null, 0);
                                                }
                                                } catch (NotConnectedException 
e) {
                                                    Logger.normal(this, 
"Terminating send: "+e);
@@ -247,101 +138,33 @@
                        }

                        /** @return True if _sendComplete */
-                       private boolean delay(long startCycleTime) {
+                       private void delay(long startCycleTime) {

                                // Get the current inter-packet delay
                                long delay = throttle.getDelay();
                                Logger.minor(this, "Throttle delay: "+delay);

-                               while(true) {
-                                       
-                                       long nowNS = System.currentTimeMillis() 
* 1000 * 1000;
-                                       
-                                       Logger.minor(this, 
"Now="+nowNS/(1000*1000));
-                                       
-                                       long endTime = -1;
-                                       
-                                       boolean thenSend = true;
-                                       
-                                       // Synchronize on the static lock, and 
update
-                                       synchronized(lastPacketSendTimeSync) {
-                                               
-                                               // Get the current time
-                                               nowNS = 
System.currentTimeMillis() * 1000 * 1000;
-                                               
-                                               // Update time if necessary to 
avoid spurts
-                                               if(hardLastPacketSendTimeNSec < 
(nowNS - minPacketDelayNSec)) {
-                                                       Logger.minor(this, 
"Updating hard limit counter - was "+hardLastPacketSendTimeNSec/1000);
-                                                       
hardLastPacketSendTimeNSec = nowNS - minPacketDelayNSec;
-                                                       Logger.minor(this, 
"Updated hard limit counter - now "+hardLastPacketSendTimeNSec/1000);
+                               long startThrottle = System.currentTimeMillis();
+                               _masterThrottle.blockingGrab(PACKET_SIZE);
+                               
+                               long now = System.currentTimeMillis();
+                               
+                               long delayTime = now - startThrottle;
+                               
+                               
((PeerNode)_destination).reportThrottledPacketSendTime(delayTime);
+                               
+                               long end = startCycleTime + delay;
+                               
+                               if(now > end) return;
+                               while(now < end) {
+                                       long l = end - now;
+                                       int x = (int) (Math.min(l, 
Integer.MAX_VALUE));
+                                       if(x > 0)
+                                               try {
+                                                       Thread.sleep(x);
+                                               } catch (InterruptedException 
e) {
+                                                       // Ignore
                                                }
-                                               
-                                               // Wait until the next send 
window
-                                               long 
newHardLastPacketSendTimeNS =
-                                                       
hardLastPacketSendTimeNSec + minPacketDelayNSec;
-                                               
-                                               Logger.minor(this, "Waiting for 
"+minPacketDelayNSec+" until "+newHardLastPacketSendTimeNS/1000);
-                                               
-                                               long newHardLastPacketSendTime =
-                                                       
newHardLastPacketSendTimeNS / (1000 * 1000);
-                                               
-                                               long earliestSendTime = 
startCycleTime + delay;
-                                               
-                                               Logger.minor(this, "Earliest 
time by hard limit: "+newHardLastPacketSendTime);
-                                               Logger.minor(this, "Earliest 
time by throttle:   "+earliestSendTime);
-                                               
-                                               if(earliestSendTime > 
newHardLastPacketSendTime) {
-                                                       // Don't clog up other 
senders!
-                                                       thenSend = false;
-                                                       endTime = 
earliestSendTime;
-                                                       Logger.minor(this, 
"Looping");
-                                               } else {
-                                                       
hardLastPacketSendTimeNSec = newHardLastPacketSendTimeNS;
-                                                       endTime = 
hardLastPacketSendTimeNSec / (1000 * 1000);
-                                                       
-                                                       // What about the soft 
limit?
-                                                       
-                                                       // We can only 
accumulate burst traffic rights for a full period at most.
-                                                       // If we have a period 
of 1 hour, and we send no traffic in the first 30 minutes,
-                                                       // then we can use up 
our whole hour's quota in the next 30 minutes if we need to.
-                                                       // We could even use 
our entire quota in the last 5 minutes. After that, we can
-                                                       // only send at the 
limit (which may be very low), since we have no quota left.
-                                                       // However, after 1 
hour we forget our burst rights.
-                                                       if(nowNS - 
softLastPacketSendTimeNSec > softLimitPeriodNSec) {
-                                                               
softLastPacketSendTimeNSec = nowNS - (softLimitPeriodNSec);
-                                                               
Logger.minor(this, "Updating soft limit");
-                                                       }
-                                                       
-                                                       
softLastPacketSendTimeNSec += minSoftDelayNSec;
-                                                       
-                                                       
if(softLastPacketSendTimeNSec > hardLastPacketSendTimeNSec) {
-                                                               endTime = 
((hardLastPacketSendTimeNSec = softLastPacketSendTimeNSec) / (1000 * 1000));
-                                                               
Logger.minor(this, "endTime now "+endTime+" because of soft limit");
-                                                       }
-                                               }
-                                       }
-                                       
-                                       long now = nowNS / (1000 * 1000);
-                                       
-                                       while(now < endTime) {
-                                               synchronized(_senderThread) {
-                                                       if(_sendComplete)
-                                                               return true;
-                                                       try {
-                                                               
_senderThread.wait(endTime - now);
-                                                       } catch 
(InterruptedException e) {
-                                                               // Ignore
-                                                       }
-                                               }
-                                               if(_sendComplete)
-                                                       return true;
-                                               now = 
System.currentTimeMillis();
-                                       }
-                                       Logger.minor(this, "Completed wait at 
"+now);
-                                       
-                                       nowNS = now * 1000 * 1000;
-                                       
-                                       if(thenSend) return false;
                                }
                        }
                };
@@ -362,16 +185,16 @@
                _unsent = _prb.addListener(myListener = new 
PartiallyReceivedBlock.PacketReceivedListener() {;

                        public void packetReceived(int packetNo) {
-                               _unsent.addLast(new Integer(packetNo));
-                               _sentPackets.setBit(packetNo, false);
                                synchronized(_senderThread) {
+                                       _unsent.addLast(new Integer(packetNo));
+                                       _sentPackets.setBit(packetNo, false);
                                        _senderThread.notify();
                                }
                        }

                        public void receiveAborted(int reason, String 
description) {
                                try {
-                                       
((PeerNode)_destination).sendAsync(DMT.createSendAborted(_uid, reason, 
description), null);
+                                       
((PeerNode)_destination).sendAsync(DMT.createSendAborted(_uid, reason, 
description), null, 0);
                 } catch (NotConnectedException e) {
                     Logger.minor(this, "Receive aborted and receiver is not 
connected");
                 }
@@ -427,13 +250,11 @@
                                for (Iterator i = missing.iterator(); 
i.hasNext();) {
                                        Integer packetNo = (Integer) i.next();
                                        if 
(_prb.isReceived(packetNo.intValue())) {
-                                               synchronized(_unsent) {
+                                               synchronized(_senderThread) {
                                                        
_unsent.addFirst(packetNo);
-                                               }
-                                           
_sentPackets.setBit(packetNo.intValue(), false);
-                                           synchronized(_senderThread) {
+                                                   
_sentPackets.setBit(packetNo.intValue(), false);
                                                _senderThread.notify();
-                                           }
+                                               }
                                        }
                                }
                        } else if (msg.getSpec().equals(DMT.allReceived)) {
@@ -508,45 +329,5 @@
        public PeerContext getDestination() {
                return _destination;
        }
-
-       public static boolean isUncontended() {
-               long nowNS = System.currentTimeMillis() * 1000 * 1000;
-               
-               // Synchronize on the static lock, and update
-               synchronized(lastPacketSendTimeSync) {
-                       
-                       // Get the current time
-                       nowNS = System.currentTimeMillis() * 1000 * 1000;
-                       
-                       // Update time if necessary to avoid spurts
-                       if(hardLastPacketSendTimeNSec < (nowNS - 
minPacketDelayNSec)) {
-                               // Can send immediately!
-                       } else {
-                               Logger.minor(BlockTransmitter.class, "Is 
contended (hard limit)");
-                               return false; // is contended.
-                       }
-                       
-                       // What about the soft limit?
-                       
-                       // We can only accumulate burst traffic rights for a 
full period at most.
-                       // If we have a period of 1 hour, and we send no 
traffic in the first 30 minutes,
-                       // then we can use up our whole hour's quota in the 
next 30 minutes if we need to.
-                       // We could even use our entire quota in the last 5 
minutes. After that, we can
-                       // only send at the limit (which may be very low), 
since we have no quota left.
-                       // However, after 1 hour we forget our burst rights.
-                       if(nowNS - softLastPacketSendTimeNSec > 
softLimitPeriodNSec) {
-                               softLastPacketSendTimeNSec = nowNS - 
(softLimitPeriodNSec);
-                       }
-                       
-                       //softLastPacketSendTimeNSec += minSoftDelayNSec;
-                       
-                       if(softLastPacketSendTimeNSec + minSoftDelayNSec > 
nowNS) {
-                               Logger.minor(BlockTransmitter.class, "Is 
contended (soft limit)");
-                               // Can't send immediately due to soft limit.
-                               return false;
-                       }
-               }
-               
-               return true;
-       }
+       
 }

Modified: trunk/freenet/src/freenet/node/CHKInsertSender.java
===================================================================
--- trunk/freenet/src/freenet/node/CHKInsertSender.java 2006-07-07 19:29:24 UTC 
(rev 9496)
+++ trunk/freenet/src/freenet/node/CHKInsertSender.java 2006-07-07 20:20:09 UTC 
(rev 9497)
@@ -66,7 +66,7 @@

                AwaitingCompletion(PeerNode pn, PartiallyReceivedBlock prb) {
                        this.pn = pn;
-                       bt = new BlockTransmitter(node.usm, pn, uid, prb);
+                       bt = new BlockTransmitter(node.usm, pn, uid, prb, 
node.outputThrottle);
                }

                void start() {

Modified: trunk/freenet/src/freenet/node/FNPPacketMangler.java
===================================================================
--- trunk/freenet/src/freenet/node/FNPPacketMangler.java        2006-07-07 
19:29:24 UTC (rev 9496)
+++ trunk/freenet/src/freenet/node/FNPPacketMangler.java        2006-07-07 
20:20:09 UTC (rev 9497)
@@ -36,20 +36,18 @@
     static final EntropySource fnpTimingSource = new EntropySource();
     static final EntropySource myPacketDataSource = new EntropySource();
     static final int RANDOM_BYTES_LENGTH = 12;
-    final int HASH_LENGTH;
+    static final int HASH_LENGTH = 32;
+    /** Minimum headers overhead */
+       public static final int HEADERS_LENGTH_MINIMUM =
+               HASH_LENGTH + RANDOM_BYTES_LENGTH + 4 + 6;
+       /** Headers overhead if there is one message and no acks. */
+       public static final int HEADERS_LENGTH_ONE_MESSAGE = 
+               HEADERS_LENGTH_MINIMUM + 1; // 1 byte = length of message. rest 
is the same.

     public FNPPacketMangler(Node node) {
         this.node = node;
         this.pm = node.peers;
         this.usm = node.usm;
-        MessageDigest md;
-        try {
-            md = MessageDigest.getInstance("SHA-256");
-        } catch (NoSuchAlgorithmException e) {
-            throw new Error(e);
-        }
-        
-        HASH_LENGTH = md.getDigestLength();
     }

     /**
@@ -88,7 +86,7 @@

         if(opn != null) {
             Logger.minor(this, "Trying exact match");
-            if(length > HASH_LENGTH + RANDOM_BYTES_LENGTH + 4 + 6) {
+            if(length > HEADERS_LENGTH_MINIMUM) {
                 if(tryProcess(buf, offset, length, 
opn.getCurrentKeyTracker())) return;
                 // Try with old key
                 if(tryProcess(buf, offset, length, 
opn.getPreviousKeyTracker())) return;
@@ -387,14 +385,19 @@
         pcfb.blockEncipher(output, 0, output.length);
         System.arraycopy(output, 0, data, hash.length+iv.length+2, 
output.length);
         try {
-                       usm.sendPacket(data, replyTo, pn.allowLocalAddresses());
+               sendPacket(data, replyTo, pn, 0);
                } catch (LocalAddressException e) {
                        Logger.error(this, "Tried to send auth packet to local 
address: "+replyTo+" for "+pn);
                }
         Logger.minor(this, "Sending auth packet (long) to "+replyTo+" - size 
"+data.length+" data length: "+output.length);
      }

-    /**
+    private void sendPacket(byte[] data, Peer replyTo, PeerNode pn, int 
alreadyReportedBytes) throws LocalAddressException {
+       usm.sendPacket(data, replyTo, pn.allowLocalAddresses());
+       node.outputThrottle.forceGrab(data.length - alreadyReportedBytes);
+       }
+
+       /**
      * @param i
      * @param payload
      * @param pn
@@ -801,6 +804,7 @@
     public void processOutgoingOrRequeue(MessageItem[] messages, PeerNode pn, 
boolean neverWaitForPacketNumber, boolean dontRequeue) {
         Logger.minor(this, "processOutgoingOrRequeue "+messages.length+" 
messages for "+pn+" ("+neverWaitForPacketNumber+")");
         byte[][] messageData = new byte[messages.length][];
+        int[] alreadyReported = new int[messages.length];
         int length = 1;
         int callbacksCount = 0;
         int x = 0;
@@ -816,7 +820,7 @@
                         return;
                     }
                     int packetNumber = 
kt.allocateOutgoingPacketNumberNeverBlock();
-                    this.processOutgoingPreformatted(buf, 0, buf.length, 
pn.getCurrentKeyTracker(), packetNumber, mi.cb);
+                    this.processOutgoingPreformatted(buf, 0, buf.length, 
pn.getCurrentKeyTracker(), packetNumber, mi.cb, mi.alreadyReportedBytes);
                 } catch (NotConnectedException e) {
                     Logger.minor(this, "Caught "+e+" while sending messages, 
requeueing");
                     // Requeue
@@ -845,7 +849,9 @@
                 }
             } else {
                 byte[] data = mi.getData(this, pn);
-                messageData[x++] = data;
+                messageData[x] = data;
+                alreadyReported[x] = mi.alreadyReportedBytes;
+                x++;
                 if(mi.cb != null) callbacksCount += mi.cb.length;
                 Logger.minor(this, "Sending: "+mi+" length "+data.length+" cb 
"+mi.cb);
                 length += (data.length + 2);
@@ -858,9 +864,11 @@
         }
         AsyncMessageCallback callbacks[] = new 
AsyncMessageCallback[callbacksCount];
         x=0;
+        int alreadyReportedBytes = 0;
         for(int i=0;i<messages.length;i++) {
             if(messages[i].formatted) continue;
             if(messages[i].cb != null) {
+               alreadyReportedBytes += messages[i].alreadyReportedBytes;
                 System.arraycopy(messages[i].cb, 0, callbacks, x, 
messages[i].cb.length);
                 x += messages[i].cb.length;
             }
@@ -870,7 +878,7 @@
         if(length < node.usm.getMaxPacketSize() &&
                 messages.length < 256) {
             try {
-                innerProcessOutgoing(messageData, 0, messageData.length, 
length, pn, neverWaitForPacketNumber, callbacks);
+                innerProcessOutgoing(messageData, 0, messageData.length, 
length, pn, neverWaitForPacketNumber, callbacks, alreadyReportedBytes);
             } catch (NotConnectedException e) {
                 Logger.normal(this, "Caught "+e+" while sending messages, 
requeueing");
                 // Requeue
@@ -895,6 +903,7 @@
             length = 56;
             int count = 0;
             int lastIndex = 0;
+            alreadyReportedBytes = 0;
             for(int i=0;i<=messages.length;i++) {
                 int thisLength;
                 if(i == messages.length) thisLength = 0;
@@ -910,7 +919,7 @@
                     // e.g. lastIndex = 0, i = 1, we just send message 0
                     if(lastIndex != i) {
                         try {
-                            innerProcessOutgoing(messageData, lastIndex, 
i-lastIndex, length, pn, neverWaitForPacketNumber, callbacks);
+                            innerProcessOutgoing(messageData, lastIndex, 
i-lastIndex, length, pn, neverWaitForPacketNumber, callbacks, 
alreadyReportedBytes);
                         } catch (NotConnectedException e) {
                             Logger.normal(this, "Caught "+e+" while sending 
messages, requeueing remaining messages");
                             // Requeue
@@ -935,7 +944,10 @@
                     if(i != messages.length)
                         length = 1 + (messageData[i].length + 2);
                     count = 0;
-                } else length = newLength;
+                } else {
+                       length = newLength;
+                       alreadyReportedBytes += alreadyReported[i];
+                }
             }
         }
     }
@@ -949,7 +961,8 @@
      * @param pn Node to send the messages to.
      * @throws PacketSequenceException 
      */
-    private void innerProcessOutgoing(byte[][] messageData, int start, int 
length, int bufferLength, PeerNode pn, boolean neverWaitForPacketNumber, 
AsyncMessageCallback[] callbacks) throws NotConnectedException, 
WouldBlockException, PacketSequenceException {
+    private void innerProcessOutgoing(byte[][] messageData, int start, int 
length, int bufferLength, 
+               PeerNode pn, boolean neverWaitForPacketNumber, 
AsyncMessageCallback[] callbacks, int alreadyReportedBytes) throws 
NotConnectedException, WouldBlockException, PacketSequenceException {
         Logger.minor(this, 
"innerProcessOutgoing(...,"+start+","+length+","+bufferLength+")");
         byte[] buf = new byte[bufferLength];
         buf[0] = (byte)length;
@@ -962,7 +975,7 @@
             System.arraycopy(data, 0, buf, loc, len);
             loc += len;
         }
-        processOutgoingPreformatted(buf, 0, bufferLength, pn, 
neverWaitForPacketNumber, callbacks);
+        processOutgoingPreformatted(buf, 0, bufferLength, pn, 
neverWaitForPacketNumber, callbacks, alreadyReportedBytes);
     }

     /**
@@ -971,13 +984,13 @@
      * @throws PacketSequenceException 
      * @throws WouldBlockException 
      */
-    public void processOutgoing(byte[] buf, int offset, int length, 
PeerContext peer) throws NotConnectedException, PacketSequenceException, 
WouldBlockException {
+    public void processOutgoing(byte[] buf, int offset, int length, 
PeerContext peer, int alreadyReportedBytes) throws NotConnectedException, 
PacketSequenceException, WouldBlockException {
        Logger.minor(this, "processOutgoing(buf, "+offset+", "+length+", 
"+peer.getPeer());
         if(!(peer instanceof PeerNode))
             throw new IllegalArgumentException();
         PeerNode pn = (PeerNode)peer;
         byte[] newBuf = preformat(buf, offset, length);
-        processOutgoingPreformatted(newBuf, 0, newBuf.length, pn, -1, null);
+        processOutgoingPreformatted(newBuf, 0, newBuf.length, pn, -1, null, 
alreadyReportedBytes);
     }


@@ -987,29 +1000,20 @@
      * @throws PacketSequenceException 
      * @throws WouldBlockException 
      */
-    public void processOutgoing(byte[] buf, int offset, int length, KeyTracker 
tracker) throws KeyChangedException, NotConnectedException, 
PacketSequenceException, WouldBlockException {
+    public void processOutgoing(byte[] buf, int offset, int length, KeyTracker 
tracker, int alreadyReportedBytes) throws KeyChangedException, 
NotConnectedException, PacketSequenceException, WouldBlockException {
         byte[] newBuf = preformat(buf, offset, length);
-        processOutgoingPreformatted(newBuf, 0, newBuf.length, tracker, -1, 
null);
+        processOutgoingPreformatted(newBuf, 0, newBuf.length, tracker, -1, 
null, alreadyReportedBytes);
     }


     /**
-     * Send a packet, with a packet number.
-     * @throws PacketSequenceException 
-     * @throws WouldBlockException If allocating a packet number would have 
blocked.
-     */
-    public void processOutgoing(byte[] buf, int offset, int length, KeyTracker 
tracker, int packetNo, AsyncMessageCallback[] callbacks) throws 
KeyChangedException, NotConnectedException, PacketSequenceException, 
WouldBlockException {
-        byte[] newBuf = preformat(buf, offset, length);
-        processOutgoingPreformatted(newBuf, 0, newBuf.length, tracker, 
packetNo, callbacks);
-    }
-    
-    /**
      * Send a packet using the current key. Retry if it fails solely because
      * the key changes.
      * @throws PacketSequenceException 
      * @throws WouldBlockException 
      */
-    void processOutgoingPreformatted(byte[] buf, int offset, int length, 
PeerNode peer, int k, AsyncMessageCallback[] callbacks) throws 
NotConnectedException, PacketSequenceException, WouldBlockException {
+    void processOutgoingPreformatted(byte[] buf, int offset, int length, 
PeerNode peer, int k, AsyncMessageCallback[] callbacks, 
+               int alreadyReportedBytes) throws NotConnectedException, 
PacketSequenceException, WouldBlockException {
         while(true) {
             try {
                Logger.minor(this, "At beginning of processOutgoingPreformatted 
loop for "+peer.getPeer());
@@ -1018,7 +1022,7 @@
                     Logger.normal(this, "Dropping packet: Not connected yet");
                     throw new NotConnectedException();
                 }
-                processOutgoingPreformatted(buf, offset, length, tracker, k, 
callbacks);
+                processOutgoingPreformatted(buf, offset, length, tracker, k, 
callbacks, alreadyReportedBytes);
                 return;
             } catch (KeyChangedException e) {
                Logger.normal(this, "Key changed");
@@ -1032,7 +1036,7 @@
      * the key changes.
      * @throws PacketSequenceException 
      */
-    void processOutgoingPreformatted(byte[] buf, int offset, int length, 
PeerNode peer, boolean neverWaitForPacketNumber, AsyncMessageCallback[] 
callbacks) throws NotConnectedException, WouldBlockException, 
PacketSequenceException {
+    void processOutgoingPreformatted(byte[] buf, int offset, int length, 
PeerNode peer, boolean neverWaitForPacketNumber, AsyncMessageCallback[] 
callbacks, int alreadyReportedBytes) throws NotConnectedException, 
WouldBlockException, PacketSequenceException {
         while(true) {
             try {
                 KeyTracker tracker = peer.getCurrentKeyTracker();
@@ -1042,7 +1046,7 @@
                 }
                 int seqNo = neverWaitForPacketNumber ? 
tracker.allocateOutgoingPacketNumberNeverBlock() :
                     tracker.allocateOutgoingPacketNumber();
-                processOutgoingPreformatted(buf, offset, length, tracker, 
seqNo, callbacks);
+                processOutgoingPreformatted(buf, offset, length, tracker, 
seqNo, callbacks, alreadyReportedBytes);
                 return;
             } catch (KeyChangedException e) {
                 // Go around again
@@ -1086,7 +1090,7 @@
      * @throws PacketSequenceException 
      * @throws WouldBlockException If we cannot allocate a packet number 
because it would block.
      */
-    public void processOutgoingPreformatted(byte[] buf, int offset, int 
length, KeyTracker tracker, int packetNumber, AsyncMessageCallback[] callbacks) 
throws KeyChangedException, NotConnectedException, PacketSequenceException, 
WouldBlockException {
+    public void processOutgoingPreformatted(byte[] buf, int offset, int 
length, KeyTracker tracker, int packetNumber, AsyncMessageCallback[] callbacks, 
int alreadyReportedBytes) throws KeyChangedException, NotConnectedException, 
PacketSequenceException, WouldBlockException {
         if(Logger.shouldLog(Logger.MINOR, this)) {
             String log = 
"processOutgoingPreformatted("+Fields.hashCode(buf)+", 
"+offset+","+length+","+tracker+","+packetNumber+",";
             if(callbacks == null) log += "null";
@@ -1229,7 +1233,7 @@

         Logger.minor(this, "Sending...");

-        processOutgoingFullyFormatted(plaintext, tracker, callbacks);
+        processOutgoingFullyFormatted(plaintext, tracker, callbacks, 
alreadyReportedBytes);
         Logger.minor(this, "Sent packet");
     }

@@ -1238,7 +1242,7 @@
      * @param plaintext The packet's plaintext, including all formatting,
      * including acks and resend requests. Is clobbered.
      */
-    private void processOutgoingFullyFormatted(byte[] plaintext, KeyTracker 
kt, AsyncMessageCallback[] callbacks) {
+    private void processOutgoingFullyFormatted(byte[] plaintext, KeyTracker 
kt, AsyncMessageCallback[] callbacks, int alreadyReportedBytes) {
         BlockCipher sessionCipher = kt.sessionCipher;
         Logger.minor(this, "Encrypting with 
"+HexUtil.bytesToHex(kt.sessionKey));
         if(sessionCipher == null) {
@@ -1298,7 +1302,7 @@

         // pn.getPeer() cannot be null
         try {
-                       usm.sendPacket(output, kt.pn.getPeer(), 
kt.pn.allowLocalAddresses());
+               sendPacket(output, kt.pn.getPeer(), kt.pn, 
alreadyReportedBytes);
                } catch (LocalAddressException e) {
                        Logger.error(this, "Tried to send data packet to local 
address: "+kt.pn.getPeer()+" for "+kt.pn.allowLocalAddresses());
                }

Modified: trunk/freenet/src/freenet/node/InsertHandler.java
===================================================================
--- trunk/freenet/src/freenet/node/InsertHandler.java   2006-07-07 19:29:24 UTC 
(rev 9496)
+++ trunk/freenet/src/freenet/node/InsertHandler.java   2006-07-07 20:20:09 UTC 
(rev 9497)
@@ -106,9 +106,9 @@
                        if(source.isConnected() && startTime > 
(source.timeLastConnected()+Node.HANDSHAKE_TIMEOUT*4))
                                Logger.error(this, "Did not receive DataInsert 
on "+uid+" from "+source+" !");
                        Message tooSlow = DMT.createFNPRejectedTimeout(uid);
-                       source.sendAsync(tooSlow, null);
+                       source.sendAsync(tooSlow, null, 0);
                        Message m = DMT.createFNPInsertTransfersCompleted(uid, 
true);
-                       source.sendAsync(m, null);
+                       source.sendAsync(m, null, 0);
                        prb = new PartiallyReceivedBlock(Node.PACKETS_IN_BLOCK, 
Node.PACKET_SIZE);
                        br = new BlockReceiver(node.usm, source, uid, prb);
                        prb.abort(RetrievalException.NO_DATAINSERT, "No 
DataInsert");
@@ -314,7 +314,7 @@
                boolean failed = sender.anyTransfersFailed();
                Message m = DMT.createFNPInsertTransfersCompleted(uid, failed);
                try {
-                       source.sendAsync(m, null);
+                       source.sendAsync(m, null, 0);
                        Logger.minor(this, "Sent completion: "+failed+" for 
"+this);
                } catch (NotConnectedException e1) {
                        Logger.minor(this, "Not connected: "+source+" for 
"+this);
@@ -344,7 +344,7 @@
         }
         if(toSend != null) {
             try {
-                source.sendAsync(toSend, null);
+                source.sendAsync(toSend, null, 0);
             } catch (NotConnectedException e) {
                 // :(
                 Logger.minor(this, "Lost connection in "+this+" when sending 
FNPDataInsertRejected");

Modified: trunk/freenet/src/freenet/node/KeyTracker.java
===================================================================
--- trunk/freenet/src/freenet/node/KeyTracker.java      2006-07-07 19:29:24 UTC 
(rev 9496)
+++ trunk/freenet/src/freenet/node/KeyTracker.java      2006-07-07 20:20:09 UTC 
(rev 9497)
@@ -842,7 +842,7 @@
             AsyncMessageCallback[] callbacks = element.callbacks;
             // Ignore packet#
             Logger.minor(this, "Queueing resend of what was once 
"+element.packetNumber);
-            messages[i] = new MessageItem(buf, callbacks, true);
+            messages[i] = new MessageItem(buf, callbacks, true, 0);
         }
         pn.requeueMessageItems(messages, 0, messages.length, true);
         pn.node.ps.queuedResendPacket();

Modified: trunk/freenet/src/freenet/node/LocationManager.java
===================================================================
--- trunk/freenet/src/freenet/node/LocationManager.java 2006-07-07 19:29:24 UTC 
(rev 9496)
+++ trunk/freenet/src/freenet/node/LocationManager.java 2006-07-07 20:20:09 UTC 
(rev 9497)
@@ -675,7 +675,7 @@
             // Reject
             Message reject = DMT.createFNPSwapRejected(uid);
             try {
-                pn.sendAsync(reject, null);
+                pn.sendAsync(reject, null, 0);
             } catch (NotConnectedException e) {
                 Logger.minor(this, "Lost connection to "+pn+" rejecting 
SwapRequest");
             }
@@ -687,7 +687,7 @@
             // Reject
             Message reject = DMT.createFNPSwapRejected(uid);
             try {
-                pn.sendAsync(reject, null);
+                pn.sendAsync(reject, null, 0);
             } catch (NotConnectedException e) {
                 Logger.minor(this, "Lost connection rejecting SwapRequest from 
"+pn);
             }
@@ -705,7 +705,7 @@
                 // Reject
                 Message reject = DMT.createFNPSwapRejected(uid);
                 try {
-                    pn.sendAsync(reject, null);
+                    pn.sendAsync(reject, null, 0);
                 } catch (NotConnectedException e1) {
                     Logger.minor(this, "Lost connection rejecting SwapRequest 
(locked) from "+pn);
                 }
@@ -740,7 +740,7 @@
                     Logger.minor(this, "Late reject "+uid);
                     Message reject = DMT.createFNPSwapRejected(uid);
                     try {
-                        pn.sendAsync(reject, null);
+                        pn.sendAsync(reject, null, 0);
                     } catch (NotConnectedException e1) {
                         Logger.normal(this, "Late reject but disconnected from 
sender: "+pn);
                     }
@@ -754,7 +754,7 @@
                     // Forward the request.
                     // Note that we MUST NOT send this blocking as we are on 
the
                     // receiver thread.
-                    randomPeer.sendAsync(m, new 
MyCallback(DMT.createFNPSwapRejected(uid), pn, item));
+                    randomPeer.sendAsync(m, new 
MyCallback(DMT.createFNPSwapRejected(uid), pn, item), 0);
                 } catch (NotConnectedException e) {
                     // Try a different node
                     continue;
@@ -801,7 +801,7 @@
         m.set(DMT.UID, item.incomingID);
         Logger.minor(this, "Forwarding SwapReply "+uid+" from 
"+m.getSource()+" to "+item.requestSender);
         try {
-            item.requestSender.sendAsync(m, null);
+            item.requestSender.sendAsync(m, null, 0);
         } catch (NotConnectedException e) {
             Logger.minor(this, "Lost connection forwarding SwapReply "+uid+" 
to "+item.requestSender);
         }
@@ -833,7 +833,7 @@
         // Returning to source - use incomingID
         m.set(DMT.UID, item.incomingID);
         try {
-            item.requestSender.sendAsync(m, null);
+            item.requestSender.sendAsync(m, null, 0);
         } catch (NotConnectedException e) {
             Logger.minor(this, "Lost connection forwarding SwapRejected 
"+uid+" to "+item.requestSender);
         }
@@ -860,7 +860,7 @@
         // Sending onwards - use outgoing ID
         m.set(DMT.UID, item.outgoingID);
         try {
-            item.routedTo.sendAsync(m, new 
SendMessageOnErrorCallback(DMT.createFNPSwapRejected(item.incomingID), 
item.requestSender));
+            item.routedTo.sendAsync(m, new 
SendMessageOnErrorCallback(DMT.createFNPSwapRejected(item.incomingID), 
item.requestSender), 0);
         } catch (NotConnectedException e) {
             Logger.minor(this, "Lost connection forwarding SwapCommit "+uid+" 
to "+item.routedTo);
         }
@@ -897,7 +897,7 @@
         // Returning to source - use incomingID
         m.set(DMT.UID, item.incomingID);
         try {
-            item.requestSender.sendAsync(m, null);
+            item.requestSender.sendAsync(m, null, 0);
         } catch (NotConnectedException e) {
             Logger.normal(this, "Lost connection forwarding SwapComplete 
"+uid+" to "+item.requestSender);
         }
@@ -943,7 +943,7 @@
             Message msg = DMT.createFNPSwapRejected(item.incomingID);
             Logger.minor(this, "Rejecting in lostOrRestartedNode: 
"+item.incomingID+ " from "+item.requestSender);
             try {
-                item.requestSender.sendAsync(msg, null);
+                item.requestSender.sendAsync(msg, null, 0);
             } catch (NotConnectedException e1) {
                 Logger.normal(this, "Both sender and receiver disconnected for 
"+item);
             }

Modified: trunk/freenet/src/freenet/node/MessageItem.java
===================================================================
--- trunk/freenet/src/freenet/node/MessageItem.java     2006-07-07 19:29:24 UTC 
(rev 9496)
+++ trunk/freenet/src/freenet/node/MessageItem.java     2006-07-07 20:20:09 UTC 
(rev 9497)
@@ -9,12 +9,14 @@
     byte[] buf;
     final AsyncMessageCallback[] cb;
     final long submitted;
+    final int alreadyReportedBytes;
     /** If true, the buffer may contain several messages, and is formatted
      * for sending as a single packet.
      */
     final boolean formatted;

-    public MessageItem(Message msg2, AsyncMessageCallback[] cb2) {
+    public MessageItem(Message msg2, AsyncMessageCallback[] cb2, int 
alreadyReportedBytes) {
+       this.alreadyReportedBytes = alreadyReportedBytes;
         this.msg = msg2;
         this.cb = cb2;
         buf = null;
@@ -22,7 +24,8 @@
         this.submitted = System.currentTimeMillis();
     }

-    public MessageItem(byte[] data, AsyncMessageCallback[] cb2, boolean 
formatted) {
+    public MessageItem(byte[] data, AsyncMessageCallback[] cb2, boolean 
formatted, int alreadyReportedBytes) {
+       this.alreadyReportedBytes = alreadyReportedBytes;
         this.cb = cb2;
         this.msg = null;
         this.buf = data;

Modified: trunk/freenet/src/freenet/node/Node.java
===================================================================
--- trunk/freenet/src/freenet/node/Node.java    2006-07-07 19:29:24 UTC (rev 
9496)
+++ trunk/freenet/src/freenet/node/Node.java    2006-07-07 20:20:09 UTC (rev 
9497)
@@ -73,7 +73,6 @@
 import freenet.io.comm.PeerParseException;
 import freenet.io.comm.UdpSocketManager;
 import freenet.io.xfer.AbortedException;
-import freenet.io.xfer.BlockTransmitter;
 import freenet.io.xfer.PartiallyReceivedBlock;
 import freenet.keys.CHKBlock;
 import freenet.keys.CHKVerifyException;
@@ -109,6 +108,7 @@
 import freenet.support.Base64;
 import freenet.support.Bucket;
 import freenet.support.BucketFactory;
+import freenet.support.DoubleTokenBucket;
 import freenet.support.Fields;
 import freenet.support.FileLoggerHook;
 import freenet.support.HexUtil;
@@ -561,6 +561,7 @@
        final boolean testnetEnabled;
        final TestnetHandler testnetHandler;
        final StaticSwapRequestInterval swapInterval;
+       public final DoubleTokenBucket outputThrottle;
        static short MAX_HTL = 10;
        static final int EXIT_STORE_FILE_NOT_FOUND = 1;
        static final int EXIT_STORE_IOEXCEPTION = 2;
@@ -610,7 +611,7 @@
        final MyRequestThrottle sskInsertThrottle;
        final RequestStarter sskInsertStarter;
        public final UserAlertManager alerts;
-       final RunningAverage throttledPacketSendAverage;
+       final TimeDecayingRunningAverage throttledPacketSendAverage;
        /** Must be included as a hidden field in order for any dangerous HTTP 
operation to complete successfully. */
        public final String formPassword;

@@ -1013,7 +1014,7 @@
                bootID = random.nextLong();
                throttledPacketSendAverage =
                        new TimeDecayingRunningAverage(1, 10*60*1000 /* should 
be significantly longer than a typical transfer */, 0, Long.MAX_VALUE);
-
+               
                buildOldAgeUserAlert = new BuildOldAgeUserAlert();

                primaryIPUndetectedAlert = new IPUndetectedUserAlert();
@@ -1204,19 +1205,19 @@
                                "Output bandwidth limit (bytes per second)", 
"Hard output bandwidth limit (bytes/sec); the node should almost never exceed 
this", 
                                new IntCallback() {
                                        public int get() {
-                                               return 
BlockTransmitter.getHardBandwidthLimit();
+                                               //return 
BlockTransmitter.getHardBandwidthLimit();
+                                               return (int) ((1000L * 1000L * 
1000L) / outputThrottle.getNanosPerTick());
                                        }
                                        public void set(int val) throws 
InvalidConfigValueException {
-                                               
BlockTransmitter.setHardBandwidthLimit(val);
+                                               if(val <= 0) throw new 
InvalidConfigValueException("Bandwidth limit must be positive");
+                                               
outputThrottle.changeNanosAndBucketSizes((1000L * 1000L * 1000L) / val, val, 
(val * 4) / 5);
                                        }
                });

                int obwLimit = nodeConfig.getInt("outputBandwidthLimit");
-               BlockTransmitter.setHardBandwidthLimit(obwLimit);
+               this.outputThrottle = new DoubleTokenBucket(obwLimit/2, 
(1000L*1000L*1000L) /  obwLimit, obwLimit, (obwLimit * 4) / 5);
+               
                // FIXME add an averaging/long-term/soft bandwidth limit. (bug 
76)
-               // There is already untested support for this in 
BlockTransmitter.
-               // No long-term limit for now.
-               BlockTransmitter.setSoftBandwidthLimit(0, 0);

                // SwapRequestInterval

@@ -2168,19 +2169,23 @@

        long lastCheckedUncontended = -1;

+       static final int ESTIMATED_SIZE_OF_ONE_THROTTLED_PACKET = 
+               1024 + DMT.packetTransmitSize(1024, 32)
+               + FNPPacketMangler.HEADERS_LENGTH_ONE_MESSAGE;
+       
     /* return reject reason as string if should reject, otherwise return null 
*/
        public synchronized String shouldRejectRequest(boolean canAcceptAnyway) 
{
                long now = System.currentTimeMillis();

-               if(now - lastCheckedUncontended > 1000) {
-                       lastCheckedUncontended = now;
-                       if(BlockTransmitter.isUncontended()) {
-                               Logger.minor(this, "Reporting 0 because 
throttle uncontended: now "+throttledPacketSendAverage.currentValue());
-                               throttledPacketSendAverage.report(0);
-                               Logger.minor(this, "New average: 
"+throttledPacketSendAverage.currentValue());
-                               Logger.minor(this, "Average: 
"+throttledPacketSendAverage.toString());
-                       } else
-                               Logger.minor(this, "Not uncontended");
+               double bwlimitDelayTime = 
throttledPacketSendAverage.currentValue();
+               
+               if(throttledPacketSendAverage.lastReportTime() < 
System.currentTimeMillis() - 5000) {
+                       
outputThrottle.blockingGrab(ESTIMATED_SIZE_OF_ONE_THROTTLED_PACKET);
+                       
outputThrottle.recycle(ESTIMATED_SIZE_OF_ONE_THROTTLED_PACKET);
+                       long after = System.currentTimeMillis();
+                       throttledPacketSendAverage.report(after - now);
+                       now = after;
+                       bwlimitDelayTime = 
throttledPacketSendAverage.currentValue();
                }

                // Round trip time
@@ -2202,7 +2207,6 @@

                // Bandwidth limited packets

-               double bwlimitDelayTime = 
this.throttledPacketSendAverage.currentValue();
                Logger.minor(this, "bwlimitDelayTime = "+bwlimitDelayTime);
                if(bwlimitDelayTime > MAX_THROTTLE_DELAY) {
                        if(now - lastAcceptedRequest > MAX_INTERREQUEST_TIME && 
canAcceptAnyway) {

Modified: trunk/freenet/src/freenet/node/NodeDispatcher.java
===================================================================
--- trunk/freenet/src/freenet/node/NodeDispatcher.java  2006-07-07 19:29:24 UTC 
(rev 9496)
+++ trunk/freenet/src/freenet/node/NodeDispatcher.java  2006-07-07 20:20:09 UTC 
(rev 9497)
@@ -45,7 +45,7 @@
             // Send an FNPPong
             Message reply = DMT.createFNPPong(m.getInt(DMT.PING_SEQNO));
             try {
-                ((PeerNode)m.getSource()).sendAsync(reply, null); // nothing 
we can do if can't contact source
+                ((PeerNode)m.getSource()).sendAsync(reply, null, 0); // 
nothing we can do if can't contact source
             } catch (NotConnectedException e) {
                 Logger.minor(this, "Lost connection replying to "+m);
             }
@@ -83,7 +83,7 @@
                long id = m.getLong(DMT.PING_SEQNO);
                Message msg = DMT.createFNPLinkPong(id);
                try {
-                               source.sendAsync(msg, null);
+                               source.sendAsync(msg, null, 0);
                        } catch (NotConnectedException e) {
                                // Ignore
                        }
@@ -113,7 +113,7 @@
         if(node.recentlyCompleted(id)) {
             Message rejected = DMT.createFNPRejectedLoop(id);
             try {
-                ((PeerNode)(m.getSource())).sendAsync(rejected, null);
+                ((PeerNode)(m.getSource())).sendAsync(rejected, null, 0);
             } catch (NotConnectedException e) {
                 Logger.normal(this, "Rejecting data request (loop, finished): 
"+e);
             }
@@ -125,7 +125,7 @@
                Logger.normal(this, "Rejecting request from 
"+m.getSource().getPeer()+" preemptively because "+rejectReason);
                Message rejected = DMT.createFNPRejectedOverload(id, true);
                try {
-                       ((PeerNode)(m.getSource())).sendAsync(rejected, null);
+                       ((PeerNode)(m.getSource())).sendAsync(rejected, null, 
0);
             } catch (NotConnectedException e) {
                 Logger.normal(this, "Rejecting (overload) data request from 
"+m.getSource().getPeer()+": "+e);
                }
@@ -136,7 +136,7 @@
             Logger.minor(this, "Could not lock ID "+id+" -> rejecting (already 
running)");
             Message rejected = DMT.createFNPRejectedLoop(id);
             try {
-                ((PeerNode)(m.getSource())).sendAsync(rejected, null);
+                ((PeerNode)(m.getSource())).sendAsync(rejected, null, 0);
             } catch (NotConnectedException e) {
                 Logger.normal(this, "Rejecting insert request from 
"+m.getSource().getPeer()+": "+e);
             }
@@ -158,7 +158,7 @@
         if(node.recentlyCompleted(id)) {
             Message rejected = DMT.createFNPRejectedLoop(id);
             try {
-                ((PeerNode)(m.getSource())).sendAsync(rejected, null);
+                ((PeerNode)(m.getSource())).sendAsync(rejected, null, 0);
             } catch (NotConnectedException e) {
                 Logger.normal(this, "Rejecting insert request from 
"+m.getSource().getPeer()+": "+e);
             }
@@ -170,7 +170,7 @@
                Logger.normal(this, "Rejecting insert from 
"+m.getSource().getPeer()+" preemptively because "+rejectReason);
                Message rejected = DMT.createFNPRejectedOverload(id, true);
                try {
-                       ((PeerNode)(m.getSource())).sendAsync(rejected, null);
+                       ((PeerNode)(m.getSource())).sendAsync(rejected, null, 
0);
             } catch (NotConnectedException e) {
                 Logger.normal(this, "Rejecting (overload) insert request from 
"+m.getSource().getPeer()+": "+e);
                }
@@ -181,7 +181,7 @@
             Logger.minor(this, "Could not lock ID "+id+" -> rejecting (already 
running)");
             Message rejected = DMT.createFNPRejectedLoop(id);
             try {
-                ((PeerNode)(m.getSource())).sendAsync(rejected, null);
+                ((PeerNode)(m.getSource())).sendAsync(rejected, null, 0);
             } catch (NotConnectedException e) {
                 Logger.normal(this, "Rejecting insert request from 
"+m.getSource().getPeer()+": "+e);
             }
@@ -269,7 +269,7 @@
         ctx = (RoutedContext)routedContexts.get(lid);
         if(ctx != null) {
             try {
-                
((PeerNode)m.getSource()).sendAsync(DMT.createFNPRoutedRejected(id, 
(short)(htl-1)), null);
+                
((PeerNode)m.getSource()).sendAsync(DMT.createFNPRoutedRejected(id, 
(short)(htl-1)), null, 0);
             } catch (NotConnectedException e) {
                 Logger.minor(this, "Lost connection rejecting "+m);
             }
@@ -289,7 +289,7 @@
         } else if(htl == 0) {
             Message reject = DMT.createFNPRoutedRejected(id, (short)0);
             if(pn != null) try {
-                pn.sendAsync(reject, null);
+                pn.sendAsync(reject, null, 0);
             } catch (NotConnectedException e) {
                 Logger.minor(this, "Lost connection rejecting "+m);
             }
@@ -311,7 +311,7 @@
         PeerNode pn = ctx.source;
         if(pn == null) return false;
         try {
-            pn.sendAsync(m, null);
+            pn.sendAsync(m, null, 0);
         } catch (NotConnectedException e) {
             Logger.minor(this, "Lost connection forwarding "+m+" to "+pn);
         }
@@ -330,7 +330,7 @@
                 Logger.minor(this, "Forwarding "+m.getSpec()+" to 
"+next.getPeer().getPort());
                 ctx.addSent(next);
                 try {
-                    next.sendAsync(m, null);
+                    next.sendAsync(m, null, 0);
                 } catch (NotConnectedException e) {
                     continue;
                 }
@@ -339,7 +339,7 @@
                 // Reached a dead end...
                 Message reject = DMT.createFNPRoutedRejected(id, htl);
                 if(pn != null) try {
-                    pn.sendAsync(reject, null);
+                    pn.sendAsync(reject, null, 0);
                 } catch (NotConnectedException e) {
                     Logger.error(this, "Cannot send reject message back to 
source "+pn);
                     return true;
@@ -374,7 +374,7 @@
             int x = m.getInt(DMT.COUNTER);
             Message reply = DMT.createFNPRoutedPong(id, x);
             try {
-                src.sendAsync(reply, null);
+                src.sendAsync(reply, null, 0);
             } catch (NotConnectedException e) {
                 Logger.minor(this, "Lost connection replying to "+m+" in 
dispatchRoutedMessage");
             }

Modified: trunk/freenet/src/freenet/node/PacketSender.java
===================================================================
--- trunk/freenet/src/freenet/node/PacketSender.java    2006-07-07 19:29:24 UTC 
(rev 9496)
+++ trunk/freenet/src/freenet/node/PacketSender.java    2006-07-07 20:20:09 UTC 
(rev 9497)
@@ -196,7 +196,7 @@
                         if(item == null) continue;
                         try {
                             Logger.minor(this, "Resending 
"+item.packetNumber+" to "+item.kt);
-                            
node.packetMangler.processOutgoingPreformatted(item.buf, 0, item.buf.length, 
item.kt, item.packetNumber, item.callbacks);
+                            
node.packetMangler.processOutgoingPreformatted(item.buf, 0, item.buf.length, 
item.kt, item.packetNumber, item.callbacks, 0);
                         } catch (KeyChangedException e) {
                             Logger.error(this, "Caught "+e+" resending packets 
to "+kt);
                             pn.requeueResendItems(resendItems);
@@ -250,7 +250,7 @@
                        // Force packet to have a sequence number.
                        Message m = DMT.createFNPVoid();
                        pn.addToLocalNodeSentMessagesToStatistic(m);
-                       node.packetMangler.processOutgoingOrRequeue(new 
MessageItem[] { new MessageItem(m, null) }, pn, true, true);
+                       node.packetMangler.processOutgoingOrRequeue(new 
MessageItem[] { new MessageItem(m, null, 0) }, pn, true, true);
                 }
             } else {
                 // Not connected

Modified: trunk/freenet/src/freenet/node/PeerManager.java
===================================================================
--- trunk/freenet/src/freenet/node/PeerManager.java     2006-07-07 19:29:24 UTC 
(rev 9496)
+++ trunk/freenet/src/freenet/node/PeerManager.java     2006-07-07 20:20:09 UTC 
(rev 9497)
@@ -328,7 +328,7 @@
         PeerNode[] peers = connectedPeers; // avoid synchronization
         for(int i=0;i<peers.length;i++) {
             if(peers[i].isConnected()) try {
-                peers[i].sendAsync(msg, null);
+                peers[i].sendAsync(msg, null, 0);
             } catch (NotConnectedException e) {
                 // Ignore
             }

Modified: trunk/freenet/src/freenet/node/PeerNode.java
===================================================================
--- trunk/freenet/src/freenet/node/PeerNode.java        2006-07-07 19:29:24 UTC 
(rev 9496)
+++ trunk/freenet/src/freenet/node/PeerNode.java        2006-07-07 20:20:09 UTC 
(rev 9497)
@@ -687,11 +687,15 @@
     /**
      * Send a message, off-thread, to this node.
      * @param msg The message to be sent.
+     * @param cb The callback to be called when the packet has been sent, or 
null.
+     * @param alreadyReportedBytes The number of bytes already reported to the 
throttle
+     * relating to this packet (normally set when we have delayed a packet in 
order to
+     * throttle it).
      */
-    public void sendAsync(Message msg, AsyncMessageCallback cb) throws 
NotConnectedException {
+    public void sendAsync(Message msg, AsyncMessageCallback cb, int 
alreadyReportedBytes) throws NotConnectedException {
         Logger.minor(this, "Sending async: "+msg+" : "+cb+" on "+this);
         if(!isConnected) throw new NotConnectedException();
-        MessageItem item = new MessageItem(msg, cb == null ? null : new 
AsyncMessageCallback[] {cb});
+        MessageItem item = new MessageItem(msg, cb == null ? null : new 
AsyncMessageCallback[] {cb}, alreadyReportedBytes);
        synchronized(routingBackoffSync) {
                reportBackoffStatus(System.currentTimeMillis());
        }
@@ -1203,8 +1207,8 @@
         Message ipMsg = DMT.createFNPDetectedIPAddress(detectedPeer);

         try {
-            sendAsync(locMsg, null);
-            sendAsync(ipMsg, null);
+            sendAsync(locMsg, null, 0);
+            sendAsync(ipMsg, null, 0);
         } catch (NotConnectedException e) {
             Logger.error(this, "Completed handshake with "+getPeer()+" but 
disconnected!!!", new Exception("error"));
         }
@@ -1419,7 +1423,7 @@
             long t = tracker.getNextUrgentTime();
             if(t < now) {
                 try {
-                    node.packetMangler.processOutgoing(null, 0, 0, tracker);
+                    node.packetMangler.processOutgoing(null, 0, 0, tracker, 0);
                 } catch (NotConnectedException e) {
                     // Ignore
                 } catch (KeyChangedException e) {
@@ -1436,7 +1440,7 @@
             long t = tracker.getNextUrgentTime();
             if(t < now) {
                 try {
-                    node.packetMangler.processOutgoing(null, 0, 0, tracker);
+                    node.packetMangler.processOutgoing(null, 0, 0, tracker, 0);
                 } catch (NotConnectedException e) {
                     // Ignore
                 } catch (KeyChangedException e) {
@@ -1597,7 +1601,7 @@
                 Logger.error(this, "No tracker to resend packet 
"+item.packetNumber+" on");
                 continue;
             }
-            MessageItem mi = new MessageItem(item.buf, item.callbacks, true);
+            MessageItem mi = new MessageItem(item.buf, item.callbacks, true, 
0);
             requeueMessageItems(new MessageItem[] {mi}, 0, 1, true);
         }
     }
@@ -1747,7 +1751,7 @@
                }
                Message msg = DMT.createFNPLinkPing(pingNo);
                try {
-                       sendAsync(msg, null);
+                       sendAsync(msg, null, 0);
                } catch (NotConnectedException e) {
                        synchronized(pingSync) {
                                pingsSentTimes.removeKey(lPingNo);
@@ -1782,7 +1786,7 @@
                node.throttledPacketSendAverage.report(timeDiff);
                Logger.minor(this, "Reporting throttled packet send time: 
"+timeDiff+" to "+getPeer());
        }
-
+       
        public void setRemoteDetectedPeer(Peer p) {
                this.remoteDetectedPeer = p;
        }

Modified: trunk/freenet/src/freenet/node/RequestHandler.java
===================================================================
--- trunk/freenet/src/freenet/node/RequestHandler.java  2006-07-07 19:29:24 UTC 
(rev 9496)
+++ trunk/freenet/src/freenet/node/RequestHandler.java  2006-07-07 20:20:09 UTC 
(rev 9497)
@@ -75,7 +75,7 @@
                PartiallyReceivedBlock prb =
                        new PartiallyReceivedBlock(Node.PACKETS_IN_BLOCK, 
Node.PACKET_SIZE, block.getRawData());
                BlockTransmitter bt =
-                       new BlockTransmitter(node.usm, source, uid, prb);
+                       new BlockTransmitter(node.usm, source, uid, prb, 
node.outputThrottle);
                bt.send();
             }
             return;
@@ -95,7 +95,7 @@
             if(rs.waitUntilStatusChange()) {
                // Forward RejectedOverload
                Message msg = DMT.createFNPRejectedOverload(uid, false);
-               source.sendAsync(msg, null);
+               source.sendAsync(msg, null, 0);
             }

             if(rs.transferStarted()) {
@@ -104,7 +104,7 @@
                 source.send(df);
                 PartiallyReceivedBlock prb = rs.getPRB();
                BlockTransmitter bt =
-                   new BlockTransmitter(node.usm, source, uid, prb);
+                   new BlockTransmitter(node.usm, source, uid, prb, 
node.outputThrottle);
                bt.send(); // either fails or succeeds; other side will see, we 
don't care
                    return;
             }
@@ -116,7 +116,7 @@
                    continue;
                case RequestSender.DATA_NOT_FOUND:
                     Message dnf = DMT.createFNPDataNotFound(uid);
-                       source.sendAsync(dnf, null);
+                       source.sendAsync(dnf, null, 0);
                        return;
                case RequestSender.GENERATED_REJECTED_OVERLOAD:
                case RequestSender.TIMED_OUT:
@@ -124,12 +124,12 @@
                        // Locally generated.
                    // Propagate back to source who needs to reduce send rate
                    Message reject = DMT.createFNPRejectedOverload(uid, true);
-                       source.sendAsync(reject, null);
+                       source.sendAsync(reject, null, 0);
                        return;
                case RequestSender.ROUTE_NOT_FOUND:
                    // Tell source
                    Message rnf = DMT.createFNPRouteNotFound(uid, rs.getHTL());
-                       source.sendAsync(rnf, null);
+                       source.sendAsync(rnf, null, 0);
                        return;
                case RequestSender.SUCCESS:
                        if(key instanceof NodeSSK) {
@@ -151,7 +151,7 @@
                                continue; // should have started transfer
                        }
                    reject = DMT.createFNPRejectedOverload(uid, true);
-                       source.sendAsync(reject, null);
+                       source.sendAsync(reject, null, 0);
                        return;
                case RequestSender.TRANSFER_FAILED:
                        if(key instanceof NodeCHK) {

Modified: trunk/freenet/src/freenet/node/SSKInsertHandler.java
===================================================================
--- trunk/freenet/src/freenet/node/SSKInsertHandler.java        2006-07-07 
19:29:24 UTC (rev 9496)
+++ trunk/freenet/src/freenet/node/SSKInsertHandler.java        2006-07-07 
20:20:09 UTC (rev 9497)
@@ -109,7 +109,7 @@
                                        Logger.minor(this, "Got pubkey on 
"+uid+" : "+pubKey);
                                        Message confirm = 
DMT.createFNPSSKPubKeyAccepted(uid);
                                        try {
-                                               source.sendAsync(confirm, null);
+                                               source.sendAsync(confirm, null, 
0);
                                        } catch (NotConnectedException e) {
                                                Logger.minor(this, "Lost 
connection to source on "+uid);
                                                return;

Modified: trunk/freenet/src/freenet/node/SSKInsertSender.java
===================================================================
--- trunk/freenet/src/freenet/node/SSKInsertSender.java 2006-07-07 19:29:24 UTC 
(rev 9496)
+++ trunk/freenet/src/freenet/node/SSKInsertSender.java 2006-07-07 20:20:09 UTC 
(rev 9497)
@@ -172,7 +172,7 @@
             // Send to next node

             try {
-                               next.sendAsync(req, null);
+                               next.sendAsync(req, null, 0);
                        } catch (NotConnectedException e1) {
                                Logger.minor(this, "Not connected to "+next);
                                continue;
@@ -245,7 +245,7 @@
             if(msg.getBoolean(DMT.NEED_PUB_KEY)) {
                Message pkMsg = DMT.createFNPSSKPubKey(uid, pubKey.asBytes());
                try {
-                       next.sendAsync(pkMsg, null);
+                       next.sendAsync(pkMsg, null, 0);
                } catch (NotConnectedException e) {
                        Logger.minor(this, "Node disconnected while sending 
pubkey: "+next);
                        continue;

Modified: trunk/freenet/src/freenet/node/SendMessageOnErrorCallback.java
===================================================================
--- trunk/freenet/src/freenet/node/SendMessageOnErrorCallback.java      
2006-07-07 19:29:24 UTC (rev 9496)
+++ trunk/freenet/src/freenet/node/SendMessageOnErrorCallback.java      
2006-07-07 20:20:09 UTC (rev 9497)
@@ -34,7 +34,7 @@
     public void disconnected() {
         Logger.minor(this, "Disconnect trigger: "+this);
         try {
-            dest.sendAsync(msg, null);
+            dest.sendAsync(msg, null, 0);
         } catch (NotConnectedException e) {
             Logger.minor(this, "Both source and destination disconnected: 
"+msg+" for "+this);
         }

Modified: trunk/freenet/src/freenet/node/Version.java
===================================================================
--- trunk/freenet/src/freenet/node/Version.java 2006-07-07 19:29:24 UTC (rev 
9496)
+++ trunk/freenet/src/freenet/node/Version.java 2006-07-07 20:20:09 UTC (rev 
9497)
@@ -18,7 +18,7 @@
        public static final String protocolVersion = "1.0";

        /** The build number of the current revision */
-       private static final int buildNumber = 861;
+       private static final int buildNumber = 862;

        /** Oldest build of Fred we will talk to */
        private static final int oldLastGoodBuild = 839;

Modified: trunk/freenet/src/freenet/support/BitArray.java
===================================================================
--- trunk/freenet/src/freenet/support/BitArray.java     2006-07-07 19:29:24 UTC 
(rev 9496)
+++ trunk/freenet/src/freenet/support/BitArray.java     2006-07-07 20:20:09 UTC 
(rev 9497)
@@ -78,6 +78,10 @@
                dos.write(_bits);
        }

+       public static int serializedLength(int size) {
+               return ((size / 8) + (size % 8 == 0 ? 0 : 1)) + 4;
+       }
+
        public int getSize() {
                return _size;
        }

Modified: trunk/freenet/src/freenet/support/DoubleTokenBucket.java
===================================================================
--- trunk/freenet/src/freenet/support/DoubleTokenBucket.java    2006-07-07 
19:29:24 UTC (rev 9496)
+++ trunk/freenet/src/freenet/support/DoubleTokenBucket.java    2006-07-07 
20:20:09 UTC (rev 9497)
@@ -28,6 +28,7 @@
         */
        public DoubleTokenBucket(long max, long nanosPerTick, long 
initialValue, long maxForced) {
                super(max, nanosPerTick, initialValue);
+               Logger.minor(this, "Max: "+max+" nanosPerTick: "+nanosPerTick+" 
initialValue: "+initialValue+" maxForced: "+maxForced);
                this.maxForced = maxForced;
                this.curForced = 0;
        }
@@ -39,11 +40,16 @@
        public synchronized void forceGrab(long tokens) {
                addTokens();
                long thisMax = maxForced - curForced;
-               if(tokens > thisMax) tokens = thisMax;
+               if(tokens > thisMax) {
+                       Logger.minor(this, "Limiting force-grab to "+thisMax+" 
tokens was "+tokens);
+                       tokens = thisMax;
+               }
                curForced += tokens;
                current -= tokens;
-               if(current > max) current = max;
-               if(curForced > maxForced) curForced = maxForced;
+               if(curForced > maxForced) {
+                       curForced = maxForced;
+               }
+               Logger.minor(this, "Force-Grabbed "+tokens+" 
current="+current+" forced="+curForced);
        }

        // blockingGrab is unchanged
@@ -54,12 +60,37 @@
                if(curForced > maxForced) curForced = maxForced;
        }

-       public synchronized void addTokens() {
+       public synchronized void changeNanosAndBucketSizes(long nanos, long 
newMax, long newMaxForced) {
+               // FIXME maybe should be combined
+               changeSizeOfBuckets(newMax, newMaxForced);
+               changeNanosPerTick(nanos);
+       }
+       
+       public synchronized void addTokensNoClip() {
                long add = tokensToAdd();
                current += add;
                curForced -= add;
                if(curForced < 0) curForced = 0;
                timeLastTick += add * nanosPerTick;
+               Logger.minor(this, "Added "+add+" tokens current="+current+" 
forced="+curForced);
        }
+
+       public synchronized void addTokens() {
+               addTokensNoClip();
+               if(curForced > maxForced) curForced = maxForced;
+               if(current > max) current = max;
+       }

+       public synchronized long getNanosPerTick() {
+               return nanosPerTick;
+       }
+
+       public synchronized long getSize() {
+               return max;
+       }
+
+       protected long offset() {
+               return curForced;
+       }
+       
 }

Modified: trunk/freenet/src/freenet/support/TokenBucket.java
===================================================================
--- trunk/freenet/src/freenet/support/TokenBucket.java  2006-07-07 19:29:24 UTC 
(rev 9496)
+++ trunk/freenet/src/freenet/support/TokenBucket.java  2006-07-07 20:20:09 UTC 
(rev 9497)
@@ -10,6 +10,7 @@
        protected long max;
        protected long timeLastTick;
        protected long nanosPerTick;
+       protected long nextWake;

        /**
         * Create a token bucket.
@@ -20,7 +21,9 @@
                this.max = max;
                this.current = initialValue;
                this.nanosPerTick = nanosPerTick;
-               this.timeLastTick = System.currentTimeMillis();
+               long now = System.currentTimeMillis();
+               this.timeLastTick = now * (1000 * 1000);
+               nextWake = now;
        }

        /**
@@ -56,34 +59,72 @@
        public synchronized long count() {
                return current;
        }
+
+       protected long offset() {
+               return 0;
+       }

+       public synchronized void blockingGrab(long tokens) {
+               if(tokens < max)
+                       innerBlockingGrab(tokens);
+               else {
+                       for(int i=0;i<tokens;i+=max) {
+                               innerBlockingGrab(Math.min(tokens, max));
+                       }
+               }
+       }
+       
        /**
         * Grab a bunch of tokens. Block if necessary.
         * @param tokens The number of tokens to grab.
         */
-       public synchronized void blockingGrab(long tokens) {
+       public synchronized void innerBlockingGrab(long tokens) {
+               Logger.minor(this, "Blocking grab: "+tokens);
+               addTokens();
+               if(current > max) current = max;
+               Logger.minor(this, "current="+current);
+               if(current > tokens) {
+                       current -= tokens;
+                       return;
+               }
+               long extra = 0;
+               if(current > 0) {
+                       tokens -= current;
+                       current = 0;
+               } else if(current < 0) {
+                       extra = -current;
+                       current = 0;
+               }
+               long minDelayNS = nanosPerTick * (tokens + extra);
+               long minDelayMS = minDelayNS / (1000*1000) + (minDelayNS % 
(1000*1000) == 0 ? 0 : 1);
+               long now = System.currentTimeMillis();
+               
+               // Schedule between the blockingGrab's.
+               
+               if(nextWake < now) nextWake = now;
+               long wakeAt = (nextWake += minDelayMS);
                while(true) {
-                       addTokens();
-                       if(current > tokens) {
-                               current -= tokens;
-                               if(current > max) current = max;
-                               return;
-                       } else {
-                               if(current > 0) {
-                                       tokens -= current;
-                                       current = 0;
-                               }
-                       }
-                       long minDelayNS = nanosPerTick * Math.min(tokens, 
max/2);
-                       long minDelayMS = minDelayNS / 1000 + (minDelayNS % 
1000 == 0 ? 0 : 1);
+                       now = System.currentTimeMillis();
+                       int delay = (int) Math.min(Integer.MAX_VALUE, wakeAt - 
now);
+                       if(delay <= 0) break;
+                       Logger.minor(this, "Waiting "+delay+"ms");
                        try {
-                               wait(minDelayMS);
+                               wait(delay);
                        } catch (InterruptedException e) {
                                // Go around the loop again.
                        }
                }
+               // Remove the tokens, even if we have built up a debt due to 
forceGrab()s and
+               // will therefore go negative. We have paid off the initial 
debt, and we have
+               // paid off the tokens, any more debt is a problem for future 
blockingGrab's!
+               current -= tokens;
        }

+       public synchronized void recycle(long tokens) {
+               current += tokens;
+               if(current > max) current = max;
+       }
+       
        /**
         * Change the number of nanos per tick.
         * @param nanosPerTick The new number of nanos per tick.
@@ -99,7 +140,7 @@

        public synchronized void changeBucketSize(long newMax) {
                if(newMax <= 0) throw new IllegalArgumentException();
-               addTokens();
+               addTokensNoClip();
                max = newMax;
                if(current > max) current = max;
        }
@@ -108,7 +149,7 @@
                if(nanosPerTick <= 0) throw new IllegalArgumentException();
                if(newMax <= 0) throw new IllegalArgumentException();
                // Synchronize up first, using the old nanosPerTick.
-               addTokens();
+               addTokensNoClip();
                if(nanosPerTick < this.nanosPerTick)
                        notifyAll();
                this.nanosPerTick = nanosPerTick;
@@ -116,10 +157,15 @@
                if(current > max) current = max;
        }

+       public synchronized void addTokens() {
+               addTokensNoClip();
+               if(current > max) current = max;
+       }
+       
        /**
         * Update the number of tokens according to elapsed time.
         */
-       public synchronized void addTokens() {
+       public synchronized void addTokensNoClip() {
                long add = tokensToAdd();
                current += add;
                timeLastTick += add * nanosPerTick;
@@ -127,9 +173,11 @@
        }

        synchronized long tokensToAdd() {
-               long nowNS = System.currentTimeMillis() * 1000;
+               long nowNS = System.currentTimeMillis() * (1000 * 1000);
                long nextTick = timeLastTick + nanosPerTick;
-               if(nextTick < nowNS) return 0;
+               if(nextTick > nowNS) {
+                       return 0;
+               }
                if(nextTick + nanosPerTick > nowNS) {
                        timeLastTick = nextTick;
                        return 1;

Modified: trunk/freenet/src/freenet/support/math/TimeDecayingRunningAverage.java
===================================================================
--- trunk/freenet/src/freenet/support/math/TimeDecayingRunningAverage.java      
2006-07-07 19:29:24 UTC (rev 9496)
+++ trunk/freenet/src/freenet/support/math/TimeDecayingRunningAverage.java      
2006-07-07 20:20:09 UTC (rev 9497)
@@ -156,4 +156,8 @@
     public long countReports() {
         return totalReports;
     }
+
+       public long lastReportTime() {
+               return lastReportTime;
+       }
 }


Reply via email to