Author: toad
Date: 2006-07-08 01:42:43 +0000 (Sat, 08 Jul 2006)
New Revision: 9500

Added:
   trunk/freenet/src/freenet/node/ByteCounter.java
Modified:
   trunk/freenet/src/freenet/clients/http/N2NTMToadlet.java
   trunk/freenet/src/freenet/io/comm/Message.java
   trunk/freenet/src/freenet/io/comm/UdpSocketManager.java
   trunk/freenet/src/freenet/io/xfer/BlockReceiver.java
   trunk/freenet/src/freenet/io/xfer/BlockTransmitter.java
   trunk/freenet/src/freenet/node/CHKInsertSender.java
   trunk/freenet/src/freenet/node/FNPPacketMangler.java
   trunk/freenet/src/freenet/node/InsertHandler.java
   trunk/freenet/src/freenet/node/KeyTracker.java
   trunk/freenet/src/freenet/node/LocationManager.java
   trunk/freenet/src/freenet/node/MessageItem.java
   trunk/freenet/src/freenet/node/Node.java
   trunk/freenet/src/freenet/node/NodeDispatcher.java
   trunk/freenet/src/freenet/node/PacketSender.java
   trunk/freenet/src/freenet/node/PeerManager.java
   trunk/freenet/src/freenet/node/PeerNode.java
   trunk/freenet/src/freenet/node/RequestHandler.java
   trunk/freenet/src/freenet/node/RequestSender.java
   trunk/freenet/src/freenet/node/RequestStarter.java
   trunk/freenet/src/freenet/node/SSKInsertHandler.java
   trunk/freenet/src/freenet/node/SSKInsertSender.java
   trunk/freenet/src/freenet/node/SendMessageOnErrorCallback.java
   trunk/freenet/src/freenet/node/Version.java
   trunk/freenet/src/freenet/support/DoubleTokenBucket.java
   trunk/freenet/src/freenet/support/TokenBucket.java
Log:
863: Long-term bandwidth limiting by calculating bytes used per request of each 
type, and allocating accordingly from a token bucket (both input and output) 
when accepting requests.
HIGHLY EXPERIMENTAL.

Modified: trunk/freenet/src/freenet/clients/http/N2NTMToadlet.java
===================================================================
--- trunk/freenet/src/freenet/clients/http/N2NTMToadlet.java    2006-07-08 
01:05:24 UTC (rev 9499)
+++ trunk/freenet/src/freenet/clients/http/N2NTMToadlet.java    2006-07-08 
01:42:43 UTC (rev 9500)
@@ -185,7 +185,7 @@
                                  buf.append("<div class=\"infobox-content\">");
                                  buf.append("Peer 
'"+HTMLEncoder.encode(pn.getName())+"' is \"backed off\".  N2NTM receipt may be 
significantly delayed.<br /><br />\n");

-                                 usm.send(pn, n2ntm);
+                                 usm.send(pn, n2ntm, null);
                                  Logger.normal(this, "Sent N2NTM to 
'"+pn.getName()+"': "+message);

                                  buf.append("Message should be on it's way:<hr 
/><br /><br />"+messageTextBuf2+"<br /><br />\n");
@@ -201,7 +201,7 @@
                                  buf.append("<div class=\"infobox-content\">");
                                  buf.append("Sending N2NTM to peer 
'"+HTMLEncoder.encode(pn.getName())+"'.<br /><br />\n");  

-                                 usm.send(pn, n2ntm);
+                                 usm.send(pn, n2ntm, null);
                                  Logger.normal(this, "Sent N2NTM to 
'"+pn.getName()+"': "+message);

                                  buf.append("Message should be on it's way:<hr 
/><br /><br />"+messageTextBuf2+"<br /><br />\n");

Modified: trunk/freenet/src/freenet/io/comm/Message.java
===================================================================
--- trunk/freenet/src/freenet/io/comm/Message.java      2006-07-08 01:05:24 UTC 
(rev 9499)
+++ trunk/freenet/src/freenet/io/comm/Message.java      2006-07-08 01:42:43 UTC 
(rev 9500)
@@ -39,8 +39,9 @@
        private final PeerContext _source;
        private final HashMap _payload = new HashMap();
        public final long localInstantiationTime;
+       final int _receivedByteCount;

-       public static Message decodeFromPacket(byte[] buf, int offset, int 
length, PeerContext peer) {
+       public static Message decodeFromPacket(byte[] buf, int offset, int 
length, PeerContext peer, int overhead) {
                DataInputStream dis
            = new DataInputStream(new ByteArrayInputStream(buf,
                offset, length));
@@ -56,7 +57,7 @@
                }
                if(mspec.isInternalOnly())
                    return null; // silently discard internal-only messages
-               Message m = new Message(mspec, peer);
+               Message m = new Message(mspec, peer, length + overhead);
                try {
                    for (Iterator i = mspec.getOrderedFields().iterator(); 
i.hasNext();) {
                        String name = (String) i.next();
@@ -78,13 +79,14 @@
        }

        public Message(MessageType spec) {
-               this(spec, null);
+               this(spec, null, 0);
        }

-       private Message(MessageType spec, PeerContext source) {
+       private Message(MessageType spec, PeerContext source, int 
recvByteCount) {
                localInstantiationTime = System.currentTimeMillis();
                _spec = spec;
                _source = source;
+               _receivedByteCount = recvByteCount;
        }

        public boolean getBoolean(String key) {

Modified: trunk/freenet/src/freenet/io/comm/UdpSocketManager.java
===================================================================
--- trunk/freenet/src/freenet/io/comm/UdpSocketManager.java     2006-07-08 
01:05:24 UTC (rev 9499)
+++ trunk/freenet/src/freenet/io/comm/UdpSocketManager.java     2006-07-08 
01:42:43 UTC (rev 9500)
@@ -25,6 +25,7 @@
 import org.tanukisoftware.wrapper.WrapperManager;

 import freenet.io.comm.Peer.LocalAddressException;
+import freenet.node.ByteCounter;
 import freenet.node.Node;
 import freenet.node.PeerNode;
 import freenet.support.Logger;
@@ -207,7 +208,7 @@
                        } else {
                                // Create a bogus context since no filter
                                Message m = decodePacket(data, offset, length,
-                                               new DummyPeerContext(peer));
+                                               new DummyPeerContext(peer), 0);
                                if (m != null)
                                        checkFilters(m);
                        }
@@ -223,9 +224,9 @@
      * @param length
      * @param peer
      */
-    public Message decodePacket(byte[] data, int offset, int length, 
PeerContext peer) {
+    public Message decodePacket(byte[] data, int offset, int length, 
PeerContext peer, int overhead) {
         try {
-            return Message.decodeFromPacket(data, offset, length, peer);
+            return Message.decodeFromPacket(data, offset, length, peer, 
overhead);
         } catch (Throwable t) {
             Logger.error(this, "Could not decode packet: "+t, t);
             return null;
@@ -397,7 +398,7 @@
            }
        }

-       public Message waitFor(MessageFilter filter) throws 
DisconnectedException {
+       public Message waitFor(MessageFilter filter, ByteCounter ctr) throws 
DisconnectedException {
                Logger.debug(this, "Waiting for "+filter);
                long startTime = System.currentTimeMillis();
                Message ret = null;
@@ -471,6 +472,8 @@
 //             }
                long endTime = System.currentTimeMillis();
                Logger.debug(this, "Returning in "+(endTime-startTime)+"ms");
+               if(ctr != null && ret != null)
+                       ctr.receivedBytes(ret._receivedByteCount);
                return ret;
        }

@@ -478,7 +481,7 @@
         * Send a Message to a PeerContext.
         * @throws NotConnectedException If we are not currently connected to 
the node.
         */
-       public void send(PeerContext destination, Message m) throws 
NotConnectedException {
+       public void send(PeerContext destination, Message m, ByteCounter ctr) 
throws NotConnectedException {
            if(m.getSpec().isInternalOnly()) {
                Logger.error(this, "Trying to send internal-only message "+m+" 
of spec "+m.getSpec(), new Exception("debug"));
                return;
@@ -503,7 +506,7 @@
 //             } else {
 //                 sendPacket(blockToSend, destination.getPeer());
 //             }
-               ((PeerNode)destination).sendAsync(m, null, 0);
+               ((PeerNode)destination).sendAsync(m, null, 0, ctr);
        }

        /**

Modified: trunk/freenet/src/freenet/io/xfer/BlockReceiver.java
===================================================================
--- trunk/freenet/src/freenet/io/xfer/BlockReceiver.java        2006-07-08 
01:05:24 UTC (rev 9499)
+++ trunk/freenet/src/freenet/io/xfer/BlockReceiver.java        2006-07-08 
01:42:43 UTC (rev 9500)
@@ -29,6 +29,7 @@
 import freenet.io.comm.PeerContext;
 import freenet.io.comm.RetrievalException;
 import freenet.io.comm.UdpSocketManager;
+import freenet.node.ByteCounter;
 import freenet.support.BitArray;
 import freenet.support.Buffer;
 import freenet.support.Logger;
@@ -49,16 +50,18 @@
        UdpSocketManager _usm;
        /** packet : Integer -> reportTime : Long * */
        HashMap _recentlyReportedMissingPackets = new HashMap();
+       ByteCounter _ctr;

-       public BlockReceiver(UdpSocketManager usm, PeerContext sender, long 
uid, PartiallyReceivedBlock prb) {
+       public BlockReceiver(UdpSocketManager usm, PeerContext sender, long 
uid, PartiallyReceivedBlock prb, ByteCounter ctr) {
                _sender = sender;
                _prb = prb;
                _uid = uid;
                _usm = usm;
+               _ctr = ctr;
        }

        public void sendAborted(int reason, String desc) throws 
NotConnectedException {
-               _usm.send(_sender, DMT.createSendAborted(_uid, reason, desc));
+               _usm.send(_sender, DMT.createSendAborted(_uid, reason, desc), 
_ctr);
        }

        public byte[] receive() throws RetrievalException {
@@ -70,7 +73,7 @@
                MessageFilter mfPacketTransmit = 
MessageFilter.create().setTimeout(RECEIPT_TIMEOUT).setType(DMT.packetTransmit).setField(DMT.UID,
 _uid).setSource(_sender);
                MessageFilter mfAllSent = 
MessageFilter.create().setType(DMT.allSent).setField(DMT.UID, 
_uid).setSource(_sender);
                MessageFilter mfSendAborted = 
MessageFilter.create().setType(DMT.sendAborted).setField(DMT.UID, 
_uid).setSource(_sender);
-                m1 = 
_usm.waitFor(mfPacketTransmit.or(mfAllSent.or(mfSendAborted)));
+                m1 = 
_usm.waitFor(mfPacketTransmit.or(mfAllSent.or(mfSendAborted)), _ctr);
                 if(!_sender.isConnected()) throw new DisconnectedException();
             } catch (DisconnectedException e1) {
                 Logger.normal(this, "Disconnected during receive: "+_uid+" 
from "+_sender);
@@ -110,7 +113,7 @@
                                Logger.minor(this, "Missing: "+missing.size());
                                if (missing.size() > 0) {
                                        Message mn = 
DMT.createMissingPacketNotification(_uid, missing);
-                                       _usm.send(_sender, mn);
+                                       _usm.send(_sender, mn, _ctr);
                                        consecutiveMissingPacketReports++;
                                        if (missing.size() > 50) {
                                                Logger.normal(this, "Excessive 
packet loss : "+mn);
@@ -131,14 +134,14 @@
                                        }
                                }
                                Message mn = 
DMT.createMissingPacketNotification(_uid, missing);
-                               _usm.send(_sender, mn);
+                               _usm.send(_sender, mn, _ctr);
                                consecutiveMissingPacketReports++;
                                if (missing.size() > 50) {
                                        Logger.normal(this, "Sending large 
missingPacketNotification due to packet receiver timeout after 
"+RECEIPT_TIMEOUT+"ms");
                                }
                        }
                }
-               _usm.send(_sender, DMT.createAllReceived(_uid));
+               _usm.send(_sender, DMT.createAllReceived(_uid), _ctr);
                return _prb.getBlock();
                } catch(NotConnectedException e) {
                    throw new 
RetrievalException(RetrievalException.SENDER_DISCONNECTED);

Modified: trunk/freenet/src/freenet/io/xfer/BlockTransmitter.java
===================================================================
--- trunk/freenet/src/freenet/io/xfer/BlockTransmitter.java     2006-07-08 
01:05:24 UTC (rev 9499)
+++ trunk/freenet/src/freenet/io/xfer/BlockTransmitter.java     2006-07-08 
01:42:43 UTC (rev 9500)
@@ -29,6 +29,7 @@
 import freenet.io.comm.NotConnectedException;
 import freenet.io.comm.PeerContext;
 import freenet.io.comm.UdpSocketManager;
+import freenet.node.ByteCounter;
 import freenet.node.FNPPacketMangler;
 import freenet.node.PeerNode;
 import freenet.support.BitArray;
@@ -55,14 +56,16 @@
        final PacketThrottle throttle;
        long timeAllSent = -1;
        final DoubleTokenBucket _masterThrottle;
+       final ByteCounter _ctr;
        final int PACKET_SIZE;

-       public BlockTransmitter(UdpSocketManager usm, PeerContext destination, 
long uid, PartiallyReceivedBlock source, DoubleTokenBucket masterThrottle) {
+       public BlockTransmitter(UdpSocketManager usm, PeerContext destination, 
long uid, PartiallyReceivedBlock source, DoubleTokenBucket masterThrottle, 
ByteCounter ctr) {
                _usm = usm;
                _destination = destination;
                _uid = uid;
                _prb = source;
-               this._masterThrottle = masterThrottle;
+               _ctr = ctr;
+               _masterThrottle = masterThrottle;
                PACKET_SIZE = DMT.packetTransmitSize(_prb._packetSize, 
_prb._packets)
                        + FNPPacketMangler.HEADERS_LENGTH_ONE_MESSAGE;
                try {
@@ -113,13 +116,13 @@
                                                delay(startCycleTime);
                                                _sentPackets.setBit(packetNo, 
true);
                                                try {
-                                                       
((PeerNode)_destination).sendAsync(DMT.createPacketTransmit(_uid, packetNo, 
_sentPackets, _prb.getPacket(packetNo)), null, PACKET_SIZE);
+                                                       
((PeerNode)_destination).sendAsync(DMT.createPacketTransmit(_uid, packetNo, 
_sentPackets, _prb.getPacket(packetNo)), null, PACKET_SIZE, _ctr);
                                                // We accelerate the ping rate 
during the transfer to keep a closer eye on round-trip-time
                                                sentSinceLastPing++;
                                                if (sentSinceLastPing >= 
PING_EVERY) {
                                                        sentSinceLastPing = 0;
                                                        
//_usm.send(BlockTransmitter.this._destination, DMT.createPing());
-                                                       
((PeerNode)_destination).sendAsync(DMT.createPing(), null, 0);
+                                                       
((PeerNode)_destination).sendAsync(DMT.createPing(), null, 0, _ctr);
                                                }
                                                } catch (NotConnectedException 
e) {
                                                    Logger.normal(this, 
"Terminating send: "+e);
@@ -172,7 +175,7 @@
        }

        public void sendAborted(int reason, String desc) throws 
NotConnectedException {
-               _usm.send(_destination, DMT.createSendAborted(_uid, reason, 
desc));
+               _usm.send(_destination, DMT.createSendAborted(_uid, reason, 
desc), _ctr);
        }

        public boolean send() {
@@ -194,7 +197,7 @@

                        public void receiveAborted(int reason, String 
description) {
                                try {
-                                       
((PeerNode)_destination).sendAsync(DMT.createSendAborted(_uid, reason, 
description), null, 0);
+                                       
((PeerNode)_destination).sendAsync(DMT.createSendAborted(_uid, reason, 
description), null, 0, _ctr);
                 } catch (NotConnectedException e) {
                     Logger.minor(this, "Receive aborted and receiver is not 
connected");
                 }
@@ -215,7 +218,7 @@
                                MessageFilter mfMissingPacketNotification = 
MessageFilter.create().setType(DMT.missingPacketNotification).setField(DMT.UID, 
_uid).setTimeout(SEND_TIMEOUT).setSource(_destination);
                                MessageFilter mfAllReceived = 
MessageFilter.create().setType(DMT.allReceived).setField(DMT.UID, 
_uid).setTimeout(SEND_TIMEOUT).setSource(_destination);
                                MessageFilter mfSendAborted = 
MessageFilter.create().setType(DMT.sendAborted).setField(DMT.UID, 
_uid).setTimeout(SEND_TIMEOUT).setSource(_destination);
-                msg = 
_usm.waitFor(mfMissingPacketNotification.or(mfAllReceived.or(mfSendAborted)));
+                msg = 
_usm.waitFor(mfMissingPacketNotification.or(mfAllReceived.or(mfSendAborted)), 
_ctr);
                 Logger.minor(this, "Got "+msg);
             } catch (DisconnectedException e) {
                // Ignore, see below

Added: trunk/freenet/src/freenet/node/ByteCounter.java
===================================================================
--- trunk/freenet/src/freenet/node/ByteCounter.java     2006-07-08 01:05:24 UTC 
(rev 9499)
+++ trunk/freenet/src/freenet/node/ByteCounter.java     2006-07-08 01:42:43 UTC 
(rev 9500)
@@ -0,0 +1,12 @@
+package freenet.node;
+
+/**
+ * Interface for something which counts bytes.
+ */
+public interface ByteCounter {
+       
+       public void sentBytes(int x);
+       
+       public void receivedBytes(int x);
+
+}

Modified: trunk/freenet/src/freenet/node/CHKInsertSender.java
===================================================================
--- trunk/freenet/src/freenet/node/CHKInsertSender.java 2006-07-08 01:05:24 UTC 
(rev 9499)
+++ trunk/freenet/src/freenet/node/CHKInsertSender.java 2006-07-08 01:42:43 UTC 
(rev 9500)
@@ -16,7 +16,7 @@
 import freenet.keys.NodeCHK;
 import freenet.support.Logger;

-public final class CHKInsertSender implements Runnable, AnyInsertSender {
+public final class CHKInsertSender implements Runnable, AnyInsertSender, 
ByteCounter {

        private static class Sender implements Runnable {

@@ -66,7 +66,7 @@

                AwaitingCompletion(PeerNode pn, PartiallyReceivedBlock prb) {
                        this.pn = pn;
-                       bt = new BlockTransmitter(node.usm, pn, uid, prb, 
node.outputThrottle);
+                       bt = new BlockTransmitter(node.usm, pn, uid, prb, 
node.outputThrottle, CHKInsertSender.this);
                }

                void start() {
@@ -114,8 +114,6 @@
                }
        }

-   
-    
        CHKInsertSender(NodeCHK myKey, long uid, byte[] headers, short htl, 
             PeerNode source, Node node, PartiallyReceivedBlock prb, boolean 
fromStore, double closestLocation) {
         this.myKey = myKey;
@@ -263,7 +261,7 @@
             // Send to next node

             try {
-                               next.send(req);
+                               next.send(req, this);
                        } catch (NotConnectedException e1) {
                                Logger.minor(this, "Not connected to "+next);
                                continue;
@@ -282,7 +280,7 @@
             while (true) {

                                try {
-                                       msg = node.usm.waitFor(mf);
+                                       msg = node.usm.waitFor(mf, this);
                                } catch (DisconnectedException e) {
                                        Logger.normal(this, "Disconnected from 
" + next
                                                        + " while waiting for 
Accepted");
@@ -363,7 +361,7 @@
             Logger.minor(this, "Sending DataInsert");
             if(receiveFailed) return;
             try {
-                               next.send(dataInsert);
+                               next.send(dataInsert, this);
                        } catch (NotConnectedException e1) {
                                Logger.minor(this, "Not connected sending 
DataInsert: "+next+" for "+uid);
                                continue;
@@ -385,7 +383,7 @@
                                        return;

                                try {
-                                       msg = node.usm.waitFor(mf);
+                                       msg = node.usm.waitFor(mf, this);
                                } catch (DisconnectedException e) {
                                        Logger.normal(this, "Disconnected from 
" + next
                                                        + " while waiting for 
InsertReply on " + this);
@@ -549,6 +547,15 @@
                notifyAll();
         }

+        if(code != TIMED_OUT && code != GENERATED_REJECTED_OVERLOAD && code != 
INTERNAL_ERROR
+                       && code != ROUTE_REALLY_NOT_FOUND) {
+               Logger.minor(this, "CHK insert cost 
"+getTotalSentBytes()+"/"+getTotalReceivedBytes()+" bytes ("+code+")");
+               (source == null ? node.localChkInsertBytesSentAverage : 
node.remoteChkInsertBytesSentAverage)
+                               .report(getTotalSentBytes());
+               (source == null ? node.localChkInsertBytesReceivedAverage : 
node.remoteChkInsertBytesReceivedAverage)
+                               .report(getTotalReceivedBytes());
+        }
+        
         Logger.minor(this, "Returning from finish()");
     }

@@ -695,7 +702,7 @@
                        } else {
                                Message m;
                                try {
-                                       m = node.usm.waitFor(mf);
+                                       m = node.usm.waitFor(mf, 
CHKInsertSender.this);
                                } catch (DisconnectedException e) {
                                        // Which one? I have no idea.
                                        // Go around the loop again.
@@ -819,4 +826,33 @@
        public long getUID() {
                return uid;
        }
+
+       private final Object totalBytesSync = new Object();
+       private int totalBytesSent;
+       
+       public void sentBytes(int x) {
+               synchronized(totalBytesSync) {
+                       totalBytesSent += x;
+               }
+       }
+       
+       public int getTotalSentBytes() {
+               synchronized(totalBytesSync) {
+                       return totalBytesSent;
+               }
+       }
+       
+       private int totalBytesReceived;
+       
+       public void receivedBytes(int x) {
+               synchronized(totalBytesSync) {
+                       totalBytesReceived += x;
+               }
+       }
+       
+       public int getTotalReceivedBytes() {
+               synchronized(totalBytesSync) {
+                       return totalBytesReceived;
+               }
+       }
 }

Modified: trunk/freenet/src/freenet/node/FNPPacketMangler.java
===================================================================
--- trunk/freenet/src/freenet/node/FNPPacketMangler.java        2006-07-08 
01:05:24 UTC (rev 9499)
+++ trunk/freenet/src/freenet/node/FNPPacketMangler.java        2006-07-08 
01:42:43 UTC (rev 9500)
@@ -630,7 +630,7 @@
         node.random.acceptEntropyBytes(myPacketDataSource, packetHash, 0, 
md.getDigestLength(), 0.5);

         // Lots more to do yet!
-        processDecryptedData(plaintext, seqNumber, tracker);
+        processDecryptedData(plaintext, seqNumber, tracker, length - 
plaintext.length);
         return true;
     }

@@ -640,7 +640,7 @@
      * @param seqNumber The detected sequence number of the packet.
      * @param tracker The KeyTracker responsible for the key used to encrypt 
the packet.
      */
-    private void processDecryptedData(byte[] decrypted, int seqNumber, 
KeyTracker tracker) {
+    private void processDecryptedData(byte[] decrypted, int seqNumber, 
KeyTracker tracker, int overhead) {
         /**
          * Decoded format:
          * 1 byte - version number (0)
@@ -777,6 +777,8 @@

         int messages = decrypted[ptr++] & 0xff;

+        overhead += ptr;
+        
         for(int i=0;i<messages;i++) {
             if(ptr+1 >= decrypted.length) {
                 Logger.error(this, "Packet not long enough at byte "+ptr+" on 
"+tracker);
@@ -788,7 +790,7 @@
                 return;
             }
             Logger.minor(this, "Message "+i+" length "+length+", hash code: 
"+Fields.hashCode(decrypted, ptr, length));
-            Message m = usm.decodePacket(decrypted, ptr, length, tracker.pn);
+            Message m = usm.decodePacket(decrypted, ptr, length, tracker.pn, 1 
+ (overhead / messages));
             ptr+=length;
             if(m != null) {
                 //Logger.minor(this, "Dispatching packet: "+m);
@@ -805,6 +807,7 @@
         Logger.minor(this, "processOutgoingOrRequeue "+messages.length+" 
messages for "+pn+" ("+neverWaitForPacketNumber+")");
         byte[][] messageData = new byte[messages.length][];
         int[] alreadyReported = new int[messages.length];
+        MessageItem[] newMsgs = new MessageItem[messages.length];
         int length = 1;
         int callbacksCount = 0;
         int x = 0;
@@ -821,35 +824,45 @@
                     }
                     int packetNumber = 
kt.allocateOutgoingPacketNumberNeverBlock();
                     this.processOutgoingPreformatted(buf, 0, buf.length, 
pn.getCurrentKeyTracker(), packetNumber, mi.cb, mi.alreadyReportedBytes);
+                    if(mi.ctrCallback != null)
+                       mi.ctrCallback.sentBytes(buf.length + 
HEADERS_LENGTH_ONE_MESSAGE);
                 } catch (NotConnectedException e) {
                     Logger.minor(this, "Caught "+e+" while sending messages, 
requeueing");
                     // Requeue
-                    if(!dontRequeue)
+                    if(!dontRequeue) {
+                       pn.requeueMessageItems(messages, 0, x, false, 
"NotConnectedException");
                        pn.requeueMessageItems(messages, i, messages.length-i, 
false, "NotConnectedException");
+                    }
                     return;
                 } catch (WouldBlockException e) {
                     Logger.minor(this, "Caught "+e+" while sending messages, 
requeueing", e);
                     // Requeue
-                    if(!dontRequeue)
-                       pn.requeueMessageItems(messages, i, messages.length-i, 
false, "WouldBlockException");
+                    if(!dontRequeue) {
+                       pn.requeueMessageItems(messages, 0, x, false, 
"NotConnectedException");
+                       pn.requeueMessageItems(messages, i, messages.length-i, 
false, "NotConnectedException");
+                    }
                     return;
                 } catch (KeyChangedException e) {
                     Logger.minor(this, "Caught "+e+" while sending messages, 
requeueing");
                     // Requeue
-                    if(!dontRequeue)
-                       pn.requeueMessageItems(messages, i, messages.length-i, 
false, "KeyChangedException");
+                    if(!dontRequeue) {
+                       pn.requeueMessageItems(messages, 0, x, false, 
"NotConnectedException");
+                       pn.requeueMessageItems(messages, i, messages.length-i, 
false, "NotConnectedException");
+                    }
                     return;
                 } catch (Throwable e) {
                     Logger.error(this, "Caught "+e+" while sending messages, 
requeueing", e);
                     // Requeue
-                    if(!dontRequeue)
-                       pn.requeueMessageItems(messages, i, messages.length-i, 
false, "Throwable");
+                    if(!dontRequeue) {
+                       pn.requeueMessageItems(messages, 0, x, false, 
"NotConnectedException");
+                       pn.requeueMessageItems(messages, i, messages.length-i, 
false, "NotConnectedException");
+                    }
                     return;
-                    
                 }
             } else {
                 byte[] data = mi.getData(this, pn);
                 messageData[x] = data;
+                newMsgs[x] = mi;
                 alreadyReported[x] = mi.alreadyReportedBytes;
                 x++;
                 if(mi.cb != null) callbacksCount += mi.cb.length;
@@ -876,9 +889,17 @@
         if(x != callbacksCount) throw new IllegalStateException();

         if(length < node.usm.getMaxPacketSize() &&
-                messages.length < 256) {
+                messageData.length < 256) {
             try {
                 innerProcessOutgoing(messageData, 0, messageData.length, 
length, pn, neverWaitForPacketNumber, callbacks, alreadyReportedBytes);
+                for(int i=0;i<messageData.length;i++) {
+                       MessageItem mi = newMsgs[i];
+                       if(mi.ctrCallback != null) {
+                               mi.ctrCallback.sentBytes(messageData[i].length 
+ 
+                                               1 + (HEADERS_LENGTH_MINIMUM / 
messageData.length));
+                               // FIXME rounding issues
+                       }
+                }
             } catch (NotConnectedException e) {
                 Logger.normal(this, "Caught "+e+" while sending messages, 
requeueing");
                 // Requeue
@@ -920,6 +941,14 @@
                     if(lastIndex != i) {
                         try {
                             innerProcessOutgoing(messageData, lastIndex, 
i-lastIndex, length, pn, neverWaitForPacketNumber, callbacks, 
alreadyReportedBytes);
+                            for(int j=lastIndex;j<i;j++) {
+                               MessageItem mi = newMsgs[j];
+                               if(mi.ctrCallback != null) {
+                                       
mi.ctrCallback.sentBytes(messageData[j].length + 
+                                                       1 + 
(HEADERS_LENGTH_MINIMUM / (i-lastIndex)));
+                                       // FIXME rounding issues
+                               }
+                            }
                         } catch (NotConnectedException e) {
                             Logger.normal(this, "Caught "+e+" while sending 
messages, requeueing remaining messages");
                             // Requeue
@@ -941,7 +970,7 @@
                         }
                     }
                     lastIndex = i;
-                    if(i != messages.length)
+                    if(i != messageData.length)
                         length = 1 + (messageData[i].length + 2);
                     count = 0;
                 } else {

Modified: trunk/freenet/src/freenet/node/InsertHandler.java
===================================================================
--- trunk/freenet/src/freenet/node/InsertHandler.java   2006-07-08 01:05:24 UTC 
(rev 9499)
+++ trunk/freenet/src/freenet/node/InsertHandler.java   2006-07-08 01:42:43 UTC 
(rev 9500)
@@ -80,7 +80,7 @@
         // Send Accepted
         Message accepted = DMT.createFNPAccepted(uid);
         try {
-                       source.send(accepted);
+                       source.send(accepted, null);
                } catch (NotConnectedException e1) {
                        Logger.minor(this, "Lost connection to source");
                        return;
@@ -93,7 +93,7 @@

         Message msg;
         try {
-            msg = node.usm.waitFor(mf);
+            msg = node.usm.waitFor(mf, null);
         } catch (DisconnectedException e) {
             Logger.normal(this, "Disconnected while waiting for DataInsert on 
"+uid);
             return;
@@ -106,11 +106,11 @@
                        if(source.isConnected() && startTime > 
(source.timeLastConnected()+Node.HANDSHAKE_TIMEOUT*4))
                                Logger.error(this, "Did not receive DataInsert 
on "+uid+" from "+source+" !");
                        Message tooSlow = DMT.createFNPRejectedTimeout(uid);
-                       source.sendAsync(tooSlow, null, 0);
+                       source.sendAsync(tooSlow, null, 0, null);
                        Message m = DMT.createFNPInsertTransfersCompleted(uid, 
true);
-                       source.sendAsync(m, null, 0);
+                       source.sendAsync(m, null, 0, null);
                        prb = new PartiallyReceivedBlock(Node.PACKETS_IN_BLOCK, 
Node.PACKET_SIZE);
-                       br = new BlockReceiver(node.usm, source, uid, prb);
+                       br = new BlockReceiver(node.usm, source, uid, prb, 
null);
                        prb.abort(RetrievalException.NO_DATAINSERT, "No 
DataInsert");
                        br.sendAborted(RetrievalException.NO_DATAINSERT, "No 
DataInsert");
                        return;
@@ -132,7 +132,7 @@
         prb = new PartiallyReceivedBlock(Node.PACKETS_IN_BLOCK, 
Node.PACKET_SIZE);
         if(htl > 0)
             sender = node.makeInsertSender(key, htl, uid, source, headers, 
prb, false, closestLoc, true);
-        br = new BlockReceiver(node.usm, source, uid, prb);
+        br = new BlockReceiver(node.usm, source, uid, prb, null);

         // Receive the data, off thread

@@ -146,7 +146,7 @@
                msg = DMT.createFNPInsertReply(uid);
                sentSuccess = true;
                try {
-                               source.send(msg);
+                               source.send(msg, null);
                        } catch (NotConnectedException e) {
                                // Ignore
                        }
@@ -195,7 +195,7 @@
                // Forward it
                Message m = DMT.createFNPRejectedOverload(uid, false);
                try {
-                                       source.send(m);
+                                       source.send(m, null);
                                } catch (NotConnectedException e) {
                                        Logger.minor(this, "Lost connection to 
source");
                                        return;
@@ -225,7 +225,7 @@
                        status == CHKInsertSender.INTERNAL_ERROR) {
                 msg = DMT.createFNPRejectedOverload(uid, true);
                 try {
-                                       source.send(msg);
+                                       source.send(msg, null);
                                } catch (NotConnectedException e) {
                                        Logger.minor(this, "Lost connection to 
source");
                                        return;
@@ -241,7 +241,7 @@
             if(status == CHKInsertSender.ROUTE_NOT_FOUND || status == 
CHKInsertSender.ROUTE_REALLY_NOT_FOUND) {
                 msg = DMT.createFNPRouteNotFound(uid, sender.getHTL());
                 try {
-                                       source.send(msg);
+                                       source.send(msg, null);
                                } catch (NotConnectedException e) {
                                        Logger.minor(this, "Lost connection to 
source");
                                        return;
@@ -255,7 +255,7 @@
                msg = DMT.createFNPInsertReply(uid);
                sentSuccess = true;
                try {
-                                       source.send(msg);
+                                       source.send(msg, null);
                                } catch (NotConnectedException e) {
                                        Logger.minor(this, "Lost connection to 
source");
                                        return;
@@ -269,7 +269,7 @@
             Logger.error(this, "Unknown status code: 
"+sender.getStatusString());
             msg = DMT.createFNPRejectedOverload(uid, true);
             try {
-                               source.send(msg);
+                               source.send(msg, null);
                        } catch (NotConnectedException e) {
                                // Ignore
                        }
@@ -314,7 +314,7 @@
                boolean failed = sender.anyTransfersFailed();
                Message m = DMT.createFNPInsertTransfersCompleted(uid, failed);
                try {
-                       source.sendAsync(m, null, 0);
+                       source.sendAsync(m, null, 0, null);
                        Logger.minor(this, "Sent completion: "+failed+" for 
"+this);
                } catch (NotConnectedException e1) {
                        Logger.minor(this, "Not connected: "+source+" for 
"+this);
@@ -344,7 +344,7 @@
         }
         if(toSend != null) {
             try {
-                source.sendAsync(toSend, null, 0);
+                source.sendAsync(toSend, null, 0, null);
             } catch (NotConnectedException e) {
                 // :(
                 Logger.minor(this, "Lost connection in "+this+" when sending 
FNPDataInsertRejected");
@@ -368,7 +368,7 @@
                 runThread.interrupt();
                 Message msg = DMT.createFNPDataInsertRejected(uid, 
DMT.DATA_INSERT_REJECTED_RECEIVE_FAILED);
                 try {
-                    source.send(msg);
+                    source.send(msg, null);
                 } catch (NotConnectedException ex) {
                     Logger.error(this, "Can't send "+msg+" to "+source+": 
"+ex);
                 }

Modified: trunk/freenet/src/freenet/node/KeyTracker.java
===================================================================
--- trunk/freenet/src/freenet/node/KeyTracker.java      2006-07-08 01:05:24 UTC 
(rev 9499)
+++ trunk/freenet/src/freenet/node/KeyTracker.java      2006-07-08 01:42:43 UTC 
(rev 9500)
@@ -842,7 +842,7 @@
             AsyncMessageCallback[] callbacks = element.callbacks;
             // Ignore packet#
             Logger.minor(this, "Queueing resend of what was once 
"+element.packetNumber);
-            messages[i] = new MessageItem(buf, callbacks, true, 0);
+            messages[i] = new MessageItem(buf, callbacks, true, 0, null);
         }
         pn.requeueMessageItems(messages, 0, messages.length, true);
         pn.node.ps.queuedResendPacket();

Modified: trunk/freenet/src/freenet/node/LocationManager.java
===================================================================
--- trunk/freenet/src/freenet/node/LocationManager.java 2006-07-08 01:05:24 UTC 
(rev 9499)
+++ trunk/freenet/src/freenet/node/LocationManager.java 2006-07-08 01:42:43 UTC 
(rev 9500)
@@ -233,11 +233,11 @@
             MessageFilter filter =
                 
MessageFilter.create().setType(DMT.FNPSwapCommit).setField(DMT.UID, 
uid).setTimeout(TIMEOUT).setSource(pn);

-            node.usm.send(pn, m);
+            node.usm.send(pn, m, null);

             Message commit;
             try {
-                commit = node.usm.waitFor(filter);
+                commit = node.usm.waitFor(filter, null);
             } catch (DisconnectedException e) {
                 Logger.minor(this, "Disconnected from "+pn+" while waiting for 
SwapCommit");
                 return;
@@ -295,7 +295,7 @@

             Message confirm = DMT.createFNPSwapComplete(uid, myValue);

-            node.usm.send(pn, confirm);
+            node.usm.send(pn, confirm, null);

             if(shouldSwap(myLoc, friendLocs, hisLoc, hisFriendLocs, random ^ 
hisRandom)) {
                 timeLastSuccessfullySwapped = System.currentTimeMillis();
@@ -374,12 +374,12 @@
                 // 60 seconds
                 filter.setTimeout(TIMEOUT);

-                node.usm.send(pn, m);
+                node.usm.send(pn, m, null);

                 Logger.minor(this, "Waiting for SwapReply/SwapRejected on 
"+uid);
                 Message reply;
                 try {
-                    reply = node.usm.waitFor(filter);
+                    reply = node.usm.waitFor(filter, null);
                 } catch (DisconnectedException e) {
                     Logger.minor(this, "Disconnected while waiting for 
SwapReply/SwapRejected for "+uid);
                     return;
@@ -409,12 +409,12 @@
                 MessageFilter filter3 = 
MessageFilter.create().setField(DMT.UID, 
uid).setType(DMT.FNPSwapComplete).setTimeout(TIMEOUT).setSource(pn);
                 filter = filter1.or(filter3);

-                node.usm.send(pn, confirm);
+                node.usm.send(pn, confirm, null);

                 Logger.minor(this, "Waiting for SwapComplete: uid = "+uid);

                 try {
-                    reply = node.usm.waitFor(filter);
+                    reply = node.usm.waitFor(filter, null);
                 } catch (DisconnectedException e) {
                     Logger.minor(this, "Disconnected waiting for SwapComplete 
on "+uid);
                     return;
@@ -675,7 +675,7 @@
             // Reject
             Message reject = DMT.createFNPSwapRejected(uid);
             try {
-                pn.sendAsync(reject, null, 0);
+                pn.sendAsync(reject, null, 0, null);
             } catch (NotConnectedException e) {
                 Logger.minor(this, "Lost connection to "+pn+" rejecting 
SwapRequest");
             }
@@ -687,7 +687,7 @@
             // Reject
             Message reject = DMT.createFNPSwapRejected(uid);
             try {
-                pn.sendAsync(reject, null, 0);
+                pn.sendAsync(reject, null, 0, null);
             } catch (NotConnectedException e) {
                 Logger.minor(this, "Lost connection rejecting SwapRequest from 
"+pn);
             }
@@ -705,7 +705,7 @@
                 // Reject
                 Message reject = DMT.createFNPSwapRejected(uid);
                 try {
-                    pn.sendAsync(reject, null, 0);
+                    pn.sendAsync(reject, null, 0, null);
                 } catch (NotConnectedException e1) {
                     Logger.minor(this, "Lost connection rejecting SwapRequest 
(locked) from "+pn);
                 }
@@ -740,7 +740,7 @@
                     Logger.minor(this, "Late reject "+uid);
                     Message reject = DMT.createFNPSwapRejected(uid);
                     try {
-                        pn.sendAsync(reject, null, 0);
+                        pn.sendAsync(reject, null, 0, null);
                     } catch (NotConnectedException e1) {
                         Logger.normal(this, "Late reject but disconnected from 
sender: "+pn);
                     }
@@ -754,7 +754,7 @@
                     // Forward the request.
                     // Note that we MUST NOT send this blocking as we are on 
the
                     // receiver thread.
-                    randomPeer.sendAsync(m, new 
MyCallback(DMT.createFNPSwapRejected(uid), pn, item), 0);
+                    randomPeer.sendAsync(m, new 
MyCallback(DMT.createFNPSwapRejected(uid), pn, item), 0, null);
                 } catch (NotConnectedException e) {
                     // Try a different node
                     continue;
@@ -801,7 +801,7 @@
         m.set(DMT.UID, item.incomingID);
         Logger.minor(this, "Forwarding SwapReply "+uid+" from 
"+m.getSource()+" to "+item.requestSender);
         try {
-            item.requestSender.sendAsync(m, null, 0);
+            item.requestSender.sendAsync(m, null, 0, null);
         } catch (NotConnectedException e) {
             Logger.minor(this, "Lost connection forwarding SwapReply "+uid+" 
to "+item.requestSender);
         }
@@ -833,7 +833,7 @@
         // Returning to source - use incomingID
         m.set(DMT.UID, item.incomingID);
         try {
-            item.requestSender.sendAsync(m, null, 0);
+            item.requestSender.sendAsync(m, null, 0, null);
         } catch (NotConnectedException e) {
             Logger.minor(this, "Lost connection forwarding SwapRejected 
"+uid+" to "+item.requestSender);
         }
@@ -860,7 +860,7 @@
         // Sending onwards - use outgoing ID
         m.set(DMT.UID, item.outgoingID);
         try {
-            item.routedTo.sendAsync(m, new 
SendMessageOnErrorCallback(DMT.createFNPSwapRejected(item.incomingID), 
item.requestSender), 0);
+            item.routedTo.sendAsync(m, new 
SendMessageOnErrorCallback(DMT.createFNPSwapRejected(item.incomingID), 
item.requestSender), 0, null);
         } catch (NotConnectedException e) {
             Logger.minor(this, "Lost connection forwarding SwapCommit "+uid+" 
to "+item.routedTo);
         }
@@ -897,7 +897,7 @@
         // Returning to source - use incomingID
         m.set(DMT.UID, item.incomingID);
         try {
-            item.requestSender.sendAsync(m, null, 0);
+            item.requestSender.sendAsync(m, null, 0, null);
         } catch (NotConnectedException e) {
             Logger.normal(this, "Lost connection forwarding SwapComplete 
"+uid+" to "+item.requestSender);
         }
@@ -943,7 +943,7 @@
             Message msg = DMT.createFNPSwapRejected(item.incomingID);
             Logger.minor(this, "Rejecting in lostOrRestartedNode: 
"+item.incomingID+ " from "+item.requestSender);
             try {
-                item.requestSender.sendAsync(msg, null, 0);
+                item.requestSender.sendAsync(msg, null, 0, null);
             } catch (NotConnectedException e1) {
                 Logger.normal(this, "Both sender and receiver disconnected for 
"+item);
             }

Modified: trunk/freenet/src/freenet/node/MessageItem.java
===================================================================
--- trunk/freenet/src/freenet/node/MessageItem.java     2006-07-08 01:05:24 UTC 
(rev 9499)
+++ trunk/freenet/src/freenet/node/MessageItem.java     2006-07-08 01:42:43 UTC 
(rev 9500)
@@ -14,22 +14,25 @@
      * for sending as a single packet.
      */
     final boolean formatted;
+    final ByteCounter ctrCallback;

-    public MessageItem(Message msg2, AsyncMessageCallback[] cb2, int 
alreadyReportedBytes) {
+    public MessageItem(Message msg2, AsyncMessageCallback[] cb2, int 
alreadyReportedBytes, ByteCounter ctr) {
        this.alreadyReportedBytes = alreadyReportedBytes;
         this.msg = msg2;
         this.cb = cb2;
         buf = null;
         formatted = false;
+        this.ctrCallback = ctr;
         this.submitted = System.currentTimeMillis();
     }

-    public MessageItem(byte[] data, AsyncMessageCallback[] cb2, boolean 
formatted, int alreadyReportedBytes) {
+    public MessageItem(byte[] data, AsyncMessageCallback[] cb2, boolean 
formatted, int alreadyReportedBytes, ByteCounter ctr) {
        this.alreadyReportedBytes = alreadyReportedBytes;
         this.cb = cb2;
         this.msg = null;
         this.buf = data;
         this.formatted = formatted;
+        this.ctrCallback = ctr;
         this.submitted = System.currentTimeMillis();
     }


Modified: trunk/freenet/src/freenet/node/Node.java
===================================================================
--- trunk/freenet/src/freenet/node/Node.java    2006-07-08 01:05:24 UTC (rev 
9499)
+++ trunk/freenet/src/freenet/node/Node.java    2006-07-08 01:42:43 UTC (rev 
9500)
@@ -120,6 +120,7 @@
 import freenet.support.PaddedEphemerallyEncryptedBucketFactory;
 import freenet.support.SimpleFieldSet;
 import freenet.support.SimpleReadOnlyArrayBucket;
+import freenet.support.TokenBucket;
 import freenet.support.io.FilenameGenerator;
 import freenet.support.io.PersistentTempBucketFactory;
 import freenet.support.io.TempBucketFactory;
@@ -562,6 +563,9 @@
        final TestnetHandler testnetHandler;
        final StaticSwapRequestInterval swapInterval;
        public final DoubleTokenBucket outputThrottle;
+       final TokenBucket requestOutputThrottle;
+       final TokenBucket requestInputThrottle;
+       private boolean inputLimitDefault = false;
        static short MAX_HTL = 10;
        static final int EXIT_STORE_FILE_NOT_FOUND = 1;
        static final int EXIT_STORE_IOEXCEPTION = 2;
@@ -614,6 +618,22 @@
        final TimeDecayingRunningAverage throttledPacketSendAverage;
        /** Must be included as a hidden field in order for any dangerous HTTP 
operation to complete successfully. */
        public final String formPassword;
+       final TimeDecayingRunningAverage remoteChkFetchBytesSentAverage;
+       final TimeDecayingRunningAverage remoteSskFetchBytesSentAverage;
+       final TimeDecayingRunningAverage remoteChkInsertBytesSentAverage;
+       final TimeDecayingRunningAverage remoteSskInsertBytesSentAverage;
+       final TimeDecayingRunningAverage remoteChkFetchBytesReceivedAverage;
+       final TimeDecayingRunningAverage remoteSskFetchBytesReceivedAverage;
+       final TimeDecayingRunningAverage remoteChkInsertBytesReceivedAverage;
+       final TimeDecayingRunningAverage remoteSskInsertBytesReceivedAverage;
+       final TimeDecayingRunningAverage localChkFetchBytesSentAverage;
+       final TimeDecayingRunningAverage localSskFetchBytesSentAverage;
+       final TimeDecayingRunningAverage localChkInsertBytesSentAverage;
+       final TimeDecayingRunningAverage localSskInsertBytesSentAverage;
+       final TimeDecayingRunningAverage localChkFetchBytesReceivedAverage;
+       final TimeDecayingRunningAverage localSskFetchBytesReceivedAverage;
+       final TimeDecayingRunningAverage localChkInsertBytesReceivedAverage;
+       final TimeDecayingRunningAverage localSskInsertBytesReceivedAverage;

        File downloadDir;
        public final ClientRequestScheduler chkFetchScheduler;
@@ -1208,15 +1228,44 @@
                                                //return 
BlockTransmitter.getHardBandwidthLimit();
                                                return (int) ((1000L * 1000L * 
1000L) / outputThrottle.getNanosPerTick());
                                        }
-                                       public void set(int val) throws 
InvalidConfigValueException {
-                                               if(val <= 0) throw new 
InvalidConfigValueException("Bandwidth limit must be positive");
-                                               
outputThrottle.changeNanosAndBucketSizes((1000L * 1000L * 1000L) / val, val, 
(val * 4) / 5);
+                                       public void set(int obwLimit) throws 
InvalidConfigValueException {
+                                               if(obwLimit <= 0) throw new 
InvalidConfigValueException("Bandwidth limit must be positive");
+                                               
outputThrottle.changeNanosAndBucketSizes((1000L * 1000L * 1000L) / obwLimit, 
obwLimit/2, (obwLimit * 2) / 5);
+                                               
requestOutputThrottle.changeNanosAndBucketSize((1000L*1000L*1000L) /  obwLimit, 
Math.max(obwLimit*60, 32768*20));
+                                               if(inputLimitDefault) {
+                                                       int ibwLimit = obwLimit 
* 4;
+                                                       
requestInputThrottle.changeNanosAndBucketSize((1000L*1000L*1000L) /  ibwLimit, 
Math.max(ibwLimit*60, 32768*20));
+                                               }
                                        }
                });

                int obwLimit = nodeConfig.getInt("outputBandwidthLimit");
-               this.outputThrottle = new DoubleTokenBucket(obwLimit/2, 
(1000L*1000L*1000L) /  obwLimit, obwLimit, (obwLimit * 4) / 5);
+               outputThrottle = new DoubleTokenBucket(obwLimit/2, 
(1000L*1000L*1000L) /  obwLimit, obwLimit, (obwLimit * 2) / 5);
+               requestOutputThrottle = 
+                       new TokenBucket(Math.max(obwLimit*60, 32768*20), 
(1000L*1000L*1000L) /  obwLimit, 0);

+               nodeConfig.register("inputBandwidthLimit", "-1", sortOrder++, 
false,
+                               "Input bandwidth limit (bytes per second)", 
"Input bandwidth limit (bytes/sec); the node will try not to exceed this; -1 = 
4x set outputBandwidthLimit",
+                               new IntCallback() {
+                                       public int get() {
+                                               if(inputLimitDefault) return -1;
+                                               return (int) ((1000L * 1000L * 
1000L) / requestInputThrottle.getNanosPerTick());
+                                       }
+                                       public void set(int ibwLimit) throws 
InvalidConfigValueException {
+                                               if(ibwLimit == -1) {
+                                                       inputLimitDefault = 
true;
+                                                       ibwLimit = (int) 
((1000L * 1000L * 1000L) / outputThrottle.getNanosPerTick()) * 4;
+                                               }
+                                               if(ibwLimit <= 0) throw new 
InvalidConfigValueException("Bandwidth limit must be positive or -1");
+                                               
requestInputThrottle.changeNanosAndBucketSize((1000L*1000L*1000L) /  ibwLimit, 
Math.max(ibwLimit*60, 32768*20));
+                                       }
+               });
+               
+               int ibwLimit = nodeConfig.getInt("inputBandwidthLimit");
+               requestInputThrottle = 
+                       new TokenBucket(Math.max(ibwLimit*60, 32768*20), 
(1000L*1000L*1000L) / ibwLimit, 0);
+               
+               
                // FIXME add an averaging/long-term/soft bandwidth limit. (bug 
76)

                // SwapRequestInterval
@@ -1522,35 +1571,54 @@

                // Select the request scheduler

+               localChkFetchBytesSentAverage = new 
TimeDecayingRunningAverage(0.0, 180000, 0.0, Long.MAX_VALUE);
+               localSskFetchBytesSentAverage = new 
TimeDecayingRunningAverage(0.0, 180000, 0.0, Long.MAX_VALUE);
+               localChkInsertBytesSentAverage = new 
TimeDecayingRunningAverage(32768, 180000, 0.0, Long.MAX_VALUE);
+               localSskInsertBytesSentAverage = new 
TimeDecayingRunningAverage(2048, 180000, 0.0, Long.MAX_VALUE);
+               localChkFetchBytesReceivedAverage = new 
TimeDecayingRunningAverage(32768, 180000, 0.0, Long.MAX_VALUE);
+               localSskFetchBytesReceivedAverage = new 
TimeDecayingRunningAverage(2048, 180000, 0.0, Long.MAX_VALUE);
+               localChkInsertBytesReceivedAverage = new 
TimeDecayingRunningAverage(0.0, 180000, 0.0, Long.MAX_VALUE);
+               localSskInsertBytesReceivedAverage = new 
TimeDecayingRunningAverage(0.0, 180000, 0.0, Long.MAX_VALUE);
+
+               remoteChkFetchBytesSentAverage = new 
TimeDecayingRunningAverage(0.0, 180000, 0.0, Long.MAX_VALUE);
+               remoteSskFetchBytesSentAverage = new 
TimeDecayingRunningAverage(0.0, 180000, 0.0, Long.MAX_VALUE);
+               remoteChkInsertBytesSentAverage = new 
TimeDecayingRunningAverage(32768, 180000, 0.0, Long.MAX_VALUE);
+               remoteSskInsertBytesSentAverage = new 
TimeDecayingRunningAverage(2048, 180000, 0.0, Long.MAX_VALUE);
+               remoteChkFetchBytesReceivedAverage = new 
TimeDecayingRunningAverage(32768, 180000, 0.0, Long.MAX_VALUE);
+               remoteSskFetchBytesReceivedAverage = new 
TimeDecayingRunningAverage(2048, 180000, 0.0, Long.MAX_VALUE);
+               remoteChkInsertBytesReceivedAverage = new 
TimeDecayingRunningAverage(0.0, 180000, 0.0, Long.MAX_VALUE);
+               remoteSskInsertBytesReceivedAverage = new 
TimeDecayingRunningAverage(0.0, 180000, 0.0, Long.MAX_VALUE);
+
                // FIXME make all the below arbitrary constants configurable!

                archiveManager = new ArchiveManager(MAX_ARCHIVE_HANDLERS, 
MAX_CACHED_ARCHIVE_DATA, MAX_ARCHIVE_SIZE, MAX_ARCHIVED_FILE_SIZE, 
MAX_CACHED_ELEMENTS, random, tempFilenameGenerator);
                chkRequestThrottle = new MyRequestThrottle(throttleWindow, 
5000, "CHK Request");
-               chkRequestStarter = new RequestStarter(this, 
chkRequestThrottle, "CHK Request starter ("+portNumber+")");
+               chkRequestStarter = new RequestStarter(this, 
chkRequestThrottle, "CHK Request starter ("+portNumber+")", 
requestOutputThrottle, requestInputThrottle, localChkFetchBytesSentAverage, 
localChkFetchBytesReceivedAverage);
                chkFetchScheduler = new ClientRequestScheduler(false, false, 
random, chkRequestStarter, this);
                chkRequestStarter.setScheduler(chkFetchScheduler);
                chkRequestStarter.start();
                //insertThrottle = new ChainedRequestThrottle(10000, 2.0F, 
requestThrottle);
                // FIXME reenable the above
                chkInsertThrottle = new MyRequestThrottle(throttleWindow, 
20000, "CHK Insert");
-               chkInsertStarter = new RequestStarter(this, chkInsertThrottle, 
"CHK Insert starter ("+portNumber+")");
+               chkInsertStarter = new RequestStarter(this, chkInsertThrottle, 
"CHK Insert starter ("+portNumber+")", requestOutputThrottle, 
requestInputThrottle, localChkInsertBytesSentAverage, 
localChkInsertBytesReceivedAverage);
                chkPutScheduler = new ClientRequestScheduler(true, false, 
random, chkInsertStarter, this);
                chkInsertStarter.setScheduler(chkPutScheduler);
                chkInsertStarter.start();

                sskRequestThrottle = new MyRequestThrottle(throttleWindow, 
5000, "SSK Request");
-               sskRequestStarter = new RequestStarter(this, 
sskRequestThrottle, "SSK Request starter ("+portNumber+")");
+               sskRequestStarter = new RequestStarter(this, 
sskRequestThrottle, "SSK Request starter ("+portNumber+")", 
requestOutputThrottle, requestInputThrottle, localSskFetchBytesSentAverage, 
localSskFetchBytesReceivedAverage);
                sskFetchScheduler = new ClientRequestScheduler(false, true, 
random, sskRequestStarter, this);
                sskRequestStarter.setScheduler(sskFetchScheduler);
                sskRequestStarter.start();
                //insertThrottle = new ChainedRequestThrottle(10000, 2.0F, 
requestThrottle);
                // FIXME reenable the above
                sskInsertThrottle = new MyRequestThrottle(throttleWindow, 
20000, "SSK Insert");
-               sskInsertStarter = new RequestStarter(this, sskInsertThrottle, 
"SSK Insert starter ("+portNumber+")");
+               sskInsertStarter = new RequestStarter(this, sskInsertThrottle, 
"SSK Insert starter ("+portNumber+")", requestOutputThrottle, 
requestInputThrottle, localSskInsertBytesSentAverage, 
localSskFetchBytesReceivedAverage);
                sskPutScheduler = new ClientRequestScheduler(true, true, 
random, sskInsertStarter, this);
                sskInsertStarter.setScheduler(sskPutScheduler);
                sskInsertStarter.start();

+

                nodeConfig.finishedInitialization();
                writeNodeFile();
@@ -2174,11 +2242,29 @@
                + FNPPacketMangler.HEADERS_LENGTH_ONE_MESSAGE;

     /* return reject reason as string if should reject, otherwise return null 
*/
-       public synchronized String shouldRejectRequest(boolean canAcceptAnyway) 
{
+       public synchronized String shouldRejectRequest(boolean canAcceptAnyway, 
boolean isInsert, boolean isSSK) {
                long now = System.currentTimeMillis();
+
+               dumpByteCostAverages();

                double bwlimitDelayTime = 
throttledPacketSendAverage.currentValue();

+               // Do we have the bandwidth?
+               
+               double expected = 
+                       (isInsert ? (isSSK ? 
this.remoteSskInsertBytesSentAverage : this.remoteChkInsertBytesSentAverage)
+                                       : (isSSK ? 
this.remoteSskFetchBytesSentAverage : 
this.remoteChkFetchBytesSentAverage)).currentValue();
+               int e = (int)Math.max(expected, 0);
+               if(!requestOutputThrottle.instantGrab(e)) return "Insufficient 
output bandwidth";
+               expected = 
+                       (isInsert ? (isSSK ? 
this.remoteSskInsertBytesReceivedAverage : 
this.remoteChkInsertBytesReceivedAverage)
+                                       : (isSSK ? 
this.remoteSskFetchBytesReceivedAverage : 
this.remoteChkFetchBytesReceivedAverage)).currentValue();
+               e = (int)Math.max(expected, 0);
+               if(!requestInputThrottle.instantGrab(e)) return "Insufficient 
input bandwidth";
+
+               
+               // If no recent reports, no packets have been sent; correct the 
average downwards.
+               
                if(throttledPacketSendAverage.lastReportTime() < 
System.currentTimeMillis() - 5000) {
                        
outputThrottle.blockingGrab(ESTIMATED_SIZE_OF_ONE_THROTTLED_PACKET);
                        
outputThrottle.recycle(ESTIMATED_SIZE_OF_ONE_THROTTLED_PACKET);
@@ -2229,6 +2315,19 @@
                return null;
        }

+       private void dumpByteCostAverages() {
+               Logger.minor(this, "Byte cost averages: REMOTE:"+
+                               " CHK insert 
"+remoteChkInsertBytesSentAverage.currentValue()+"/"+remoteChkInsertBytesReceivedAverage.currentValue()+
+                               " SSK insert 
"+remoteSskInsertBytesSentAverage.currentValue()+"/"+remoteSskInsertBytesReceivedAverage.currentValue()+
+                               " CHK fetch 
"+remoteChkFetchBytesSentAverage.currentValue()+"/"+remoteChkFetchBytesReceivedAverage.currentValue()+
+                               " SSK fetch 
"+remoteSskFetchBytesSentAverage.currentValue()+"/"+remoteSskFetchBytesReceivedAverage.currentValue());
+               Logger.minor(this, "Byte cost averages: LOCAL"+
+                               " CHK insert 
"+localChkInsertBytesSentAverage.currentValue()+"/"+localChkInsertBytesReceivedAverage.currentValue()+
+                               " SSK insert 
"+localSskInsertBytesSentAverage.currentValue()+"/"+localSskInsertBytesReceivedAverage.currentValue()+
+                               " CHK fetch 
"+localChkFetchBytesSentAverage.currentValue()+"/"+localChkFetchBytesReceivedAverage.currentValue()+
+                               " SSK fetch 
"+localSskFetchBytesSentAverage.currentValue()+"/"+localSskFetchBytesReceivedAverage.currentValue());
+       }
+
        public SimpleFieldSet exportPrivateFieldSet() {
                SimpleFieldSet fs = exportPublicFieldSet();
                fs.put("dsaPrivKey", myPrivKey.asFieldSet());
@@ -2387,7 +2486,7 @@
                try {
                        //MessageFilter mf2 = 
MessageFilter.create().setField(DMT.UID, 
uid).setType(DMT.FNPRoutedRejected).setTimeout(5000);
                        // Ignore Rejected - let it be retried on other peers
-                       m = usm.waitFor(mf1/*.or(mf2)*/);
+                       m = usm.waitFor(mf1/*.or(mf2)*/, null);
                } catch (DisconnectedException e) {
                        Logger.normal(this, "Disconnected in waiting for pong");
                        return -1;

Modified: trunk/freenet/src/freenet/node/NodeDispatcher.java
===================================================================
--- trunk/freenet/src/freenet/node/NodeDispatcher.java  2006-07-08 01:05:24 UTC 
(rev 9499)
+++ trunk/freenet/src/freenet/node/NodeDispatcher.java  2006-07-08 01:42:43 UTC 
(rev 9500)
@@ -45,7 +45,7 @@
             // Send an FNPPong
             Message reply = DMT.createFNPPong(m.getInt(DMT.PING_SEQNO));
             try {
-                ((PeerNode)m.getSource()).sendAsync(reply, null, 0); // 
nothing we can do if can't contact source
+                ((PeerNode)m.getSource()).sendAsync(reply, null, 0, null); // 
nothing we can do if can't contact source
             } catch (NotConnectedException e) {
                 Logger.minor(this, "Lost connection replying to "+m);
             }
@@ -83,7 +83,7 @@
                long id = m.getLong(DMT.PING_SEQNO);
                Message msg = DMT.createFNPLinkPong(id);
                try {
-                               source.sendAsync(msg, null, 0);
+                               source.sendAsync(msg, null, 0, null);
                        } catch (NotConnectedException e) {
                                // Ignore
                        }
@@ -113,19 +113,19 @@
         if(node.recentlyCompleted(id)) {
             Message rejected = DMT.createFNPRejectedLoop(id);
             try {
-                ((PeerNode)(m.getSource())).sendAsync(rejected, null, 0);
+                ((PeerNode)(m.getSource())).sendAsync(rejected, null, 0, null);
             } catch (NotConnectedException e) {
                 Logger.normal(this, "Rejecting data request (loop, finished): 
"+e);
             }
             return true;
         }
-        String rejectReason = node.shouldRejectRequest(!isSSK);
+        String rejectReason = node.shouldRejectRequest(!isSSK, false, isSSK);
         if(rejectReason != null) {
                // can accept 1 CHK request every so often, but not with SSKs 
because they aren't throttled so won't sort out bwlimitDelayTime, which was the 
whole reason for accepting them when overloaded...
                Logger.normal(this, "Rejecting request from 
"+m.getSource().getPeer()+" preemptively because "+rejectReason);
                Message rejected = DMT.createFNPRejectedOverload(id, true);
                try {
-                       ((PeerNode)(m.getSource())).sendAsync(rejected, null, 
0);
+                       ((PeerNode)(m.getSource())).sendAsync(rejected, null, 
0, null);
             } catch (NotConnectedException e) {
                 Logger.normal(this, "Rejecting (overload) data request from 
"+m.getSource().getPeer()+": "+e);
                }
@@ -136,7 +136,7 @@
             Logger.minor(this, "Could not lock ID "+id+" -> rejecting (already 
running)");
             Message rejected = DMT.createFNPRejectedLoop(id);
             try {
-                ((PeerNode)(m.getSource())).sendAsync(rejected, null, 0);
+                ((PeerNode)(m.getSource())).sendAsync(rejected, null, 0, null);
             } catch (NotConnectedException e) {
                 Logger.normal(this, "Rejecting insert request from 
"+m.getSource().getPeer()+": "+e);
             }
@@ -158,19 +158,19 @@
         if(node.recentlyCompleted(id)) {
             Message rejected = DMT.createFNPRejectedLoop(id);
             try {
-                ((PeerNode)(m.getSource())).sendAsync(rejected, null, 0);
+                ((PeerNode)(m.getSource())).sendAsync(rejected, null, 0, null);
             } catch (NotConnectedException e) {
                 Logger.normal(this, "Rejecting insert request from 
"+m.getSource().getPeer()+": "+e);
             }
             return true;
         }
         // SSKs don't fix bwlimitDelayTime so shouldn't be accepted when 
overloaded.
-        String rejectReason = node.shouldRejectRequest(!isSSK);
+        String rejectReason = node.shouldRejectRequest(!isSSK, true, isSSK);
         if(rejectReason != null) {
                Logger.normal(this, "Rejecting insert from 
"+m.getSource().getPeer()+" preemptively because "+rejectReason);
                Message rejected = DMT.createFNPRejectedOverload(id, true);
                try {
-                       ((PeerNode)(m.getSource())).sendAsync(rejected, null, 
0);
+                       ((PeerNode)(m.getSource())).sendAsync(rejected, null, 
0, null);
             } catch (NotConnectedException e) {
                 Logger.normal(this, "Rejecting (overload) insert request from 
"+m.getSource().getPeer()+": "+e);
                }
@@ -181,7 +181,7 @@
             Logger.minor(this, "Could not lock ID "+id+" -> rejecting (already 
running)");
             Message rejected = DMT.createFNPRejectedLoop(id);
             try {
-                ((PeerNode)(m.getSource())).sendAsync(rejected, null, 0);
+                ((PeerNode)(m.getSource())).sendAsync(rejected, null, 0, null);
             } catch (NotConnectedException e) {
                 Logger.normal(this, "Rejecting insert request from 
"+m.getSource().getPeer()+": "+e);
             }
@@ -269,7 +269,7 @@
         ctx = (RoutedContext)routedContexts.get(lid);
         if(ctx != null) {
             try {
-                
((PeerNode)m.getSource()).sendAsync(DMT.createFNPRoutedRejected(id, 
(short)(htl-1)), null, 0);
+                
((PeerNode)m.getSource()).sendAsync(DMT.createFNPRoutedRejected(id, 
(short)(htl-1)), null, 0, null);
             } catch (NotConnectedException e) {
                 Logger.minor(this, "Lost connection rejecting "+m);
             }
@@ -289,7 +289,7 @@
         } else if(htl == 0) {
             Message reject = DMT.createFNPRoutedRejected(id, (short)0);
             if(pn != null) try {
-                pn.sendAsync(reject, null, 0);
+                pn.sendAsync(reject, null, 0, null);
             } catch (NotConnectedException e) {
                 Logger.minor(this, "Lost connection rejecting "+m);
             }
@@ -311,7 +311,7 @@
         PeerNode pn = ctx.source;
         if(pn == null) return false;
         try {
-            pn.sendAsync(m, null, 0);
+            pn.sendAsync(m, null, 0, null);
         } catch (NotConnectedException e) {
             Logger.minor(this, "Lost connection forwarding "+m+" to "+pn);
         }
@@ -330,7 +330,7 @@
                 Logger.minor(this, "Forwarding "+m.getSpec()+" to 
"+next.getPeer().getPort());
                 ctx.addSent(next);
                 try {
-                    next.sendAsync(m, null, 0);
+                    next.sendAsync(m, null, 0, null);
                 } catch (NotConnectedException e) {
                     continue;
                 }
@@ -339,7 +339,7 @@
                 // Reached a dead end...
                 Message reject = DMT.createFNPRoutedRejected(id, htl);
                 if(pn != null) try {
-                    pn.sendAsync(reject, null, 0);
+                    pn.sendAsync(reject, null, 0, null);
                 } catch (NotConnectedException e) {
                     Logger.error(this, "Cannot send reject message back to 
source "+pn);
                     return true;
@@ -374,7 +374,7 @@
             int x = m.getInt(DMT.COUNTER);
             Message reply = DMT.createFNPRoutedPong(id, x);
             try {
-                src.sendAsync(reply, null, 0);
+                src.sendAsync(reply, null, 0, null);
             } catch (NotConnectedException e) {
                 Logger.minor(this, "Lost connection replying to "+m+" in 
dispatchRoutedMessage");
             }

Modified: trunk/freenet/src/freenet/node/PacketSender.java
===================================================================
--- trunk/freenet/src/freenet/node/PacketSender.java    2006-07-08 01:05:24 UTC 
(rev 9499)
+++ trunk/freenet/src/freenet/node/PacketSender.java    2006-07-08 01:42:43 UTC 
(rev 9500)
@@ -250,7 +250,7 @@
                        // Force packet to have a sequence number.
                        Message m = DMT.createFNPVoid();
                        pn.addToLocalNodeSentMessagesToStatistic(m);
-                       node.packetMangler.processOutgoingOrRequeue(new 
MessageItem[] { new MessageItem(m, null, 0) }, pn, true, true);
+                       node.packetMangler.processOutgoingOrRequeue(new 
MessageItem[] { new MessageItem(m, null, 0, null) }, pn, true, true);
                 }
             } else {
                 // Not connected

Modified: trunk/freenet/src/freenet/node/PeerManager.java
===================================================================
--- trunk/freenet/src/freenet/node/PeerManager.java     2006-07-08 01:05:24 UTC 
(rev 9499)
+++ trunk/freenet/src/freenet/node/PeerManager.java     2006-07-08 01:42:43 UTC 
(rev 9500)
@@ -328,7 +328,7 @@
         PeerNode[] peers = connectedPeers; // avoid synchronization
         for(int i=0;i<peers.length;i++) {
             if(peers[i].isConnected()) try {
-                peers[i].sendAsync(msg, null, 0);
+                peers[i].sendAsync(msg, null, 0, null);
             } catch (NotConnectedException e) {
                 // Ignore
             }

Modified: trunk/freenet/src/freenet/node/PeerNode.java
===================================================================
--- trunk/freenet/src/freenet/node/PeerNode.java        2006-07-08 01:05:24 UTC 
(rev 9499)
+++ trunk/freenet/src/freenet/node/PeerNode.java        2006-07-08 01:42:43 UTC 
(rev 9500)
@@ -692,13 +692,13 @@
      * relating to this packet (normally set when we have delayed a packet in 
order to
      * throttle it).
      */
-    public void sendAsync(Message msg, AsyncMessageCallback cb, int 
alreadyReportedBytes) throws NotConnectedException {
+    public void sendAsync(Message msg, AsyncMessageCallback cb, int 
alreadyReportedBytes, ByteCounter ctr) throws NotConnectedException {
         Logger.minor(this, "Sending async: "+msg+" : "+cb+" on "+this);
         if(!isConnected) throw new NotConnectedException();
-        MessageItem item = new MessageItem(msg, cb == null ? null : new 
AsyncMessageCallback[] {cb}, alreadyReportedBytes);
-       synchronized(routingBackoffSync) {
-               reportBackoffStatus(System.currentTimeMillis());
-       }
+        MessageItem item = new MessageItem(msg, cb == null ? null : new 
AsyncMessageCallback[] {cb}, alreadyReportedBytes, ctr);
+        synchronized(routingBackoffSync) {
+               reportBackoffStatus(System.currentTimeMillis());
+        }
         synchronized(messagesToSendNow) {
             messagesToSendNow.addLast(item);
         }
@@ -915,10 +915,10 @@
      */
     public boolean ping(int pingID) throws NotConnectedException {
         Message ping = DMT.createFNPPing(pingID);
-        node.usm.send(this, ping);
+        node.usm.send(this, ping, null);
         Message msg;
         try {
-            msg = 
node.usm.waitFor(MessageFilter.create().setTimeout(2000).setType(DMT.FNPPong).setField(DMT.PING_SEQNO,
 pingID));
+            msg = 
node.usm.waitFor(MessageFilter.create().setTimeout(2000).setType(DMT.FNPPong).setField(DMT.PING_SEQNO,
 pingID), null);
         } catch (DisconnectedException e) {
             throw new NotConnectedException("Disconnected while waiting for 
pong");
         }
@@ -949,14 +949,14 @@
     /**
      * Send a message, right now, on this thread, to this node.
      */
-    public void send(Message req) throws NotConnectedException {
+    public void send(Message req, ByteCounter ctr) throws 
NotConnectedException {
        synchronized (this) {
             if(!isConnected()) {
                 Logger.error(this, "Tried to send "+req+" but not connected to 
"+this, new Exception("debug"));
                 return;
             }
                }
-        node.usm.send(this, req);
+        node.usm.send(this, req, ctr);
     }

     /**
@@ -1207,8 +1207,8 @@
         Message ipMsg = DMT.createFNPDetectedIPAddress(detectedPeer);

         try {
-            sendAsync(locMsg, null, 0);
-            sendAsync(ipMsg, null, 0);
+            sendAsync(locMsg, null, 0, null);
+            sendAsync(ipMsg, null, 0, null);
         } catch (NotConnectedException e) {
             Logger.error(this, "Completed handshake with "+getPeer()+" but 
disconnected!!!", new Exception("error"));
         }
@@ -1601,7 +1601,7 @@
                 Logger.error(this, "No tracker to resend packet 
"+item.packetNumber+" on");
                 continue;
             }
-            MessageItem mi = new MessageItem(item.buf, item.callbacks, true, 
0);
+            MessageItem mi = new MessageItem(item.buf, item.callbacks, true, 
0, null);
             requeueMessageItems(new MessageItem[] {mi}, 0, 1, true);
         }
     }
@@ -1751,7 +1751,7 @@
                }
                Message msg = DMT.createFNPLinkPing(pingNo);
                try {
-                       sendAsync(msg, null, 0);
+                       sendAsync(msg, null, 0, null);
                } catch (NotConnectedException e) {
                        synchronized(pingSync) {
                                pingsSentTimes.removeKey(lPingNo);

Modified: trunk/freenet/src/freenet/node/RequestHandler.java
===================================================================
--- trunk/freenet/src/freenet/node/RequestHandler.java  2006-07-08 01:05:24 UTC 
(rev 9499)
+++ trunk/freenet/src/freenet/node/RequestHandler.java  2006-07-08 01:42:43 UTC 
(rev 9500)
@@ -56,26 +56,26 @@
         htl = source.decrementHTL(htl);

         Message accepted = DMT.createFNPAccepted(uid);
-        source.send(accepted);
+        source.send(accepted, null);

         Object o = node.makeRequestSender(key, htl, uid, source, closestLoc, 
false, true, false);
         if(o instanceof KeyBlock) {
             KeyBlock block = (KeyBlock) o;
             Message df = createDataFound(block);
-            source.send(df);
+            source.send(df, null);
             if(key instanceof NodeSSK) {
                 if(needsPubKey) {
                        DSAPublicKey key = 
((NodeSSK)block.getKey()).getPubKey();
                        Message pk = DMT.createFNPSSKPubKey(uid, key.asBytes());
                        Logger.minor(this, "Sending PK: "+key+" 
"+key.writeAsField());
-                       source.send(pk);
+                       source.send(pk, null);
                 }
             }
             if(block instanceof CHKBlock) {
                PartiallyReceivedBlock prb =
                        new PartiallyReceivedBlock(Node.PACKETS_IN_BLOCK, 
Node.PACKET_SIZE, block.getRawData());
                BlockTransmitter bt =
-                       new BlockTransmitter(node.usm, source, uid, prb, 
node.outputThrottle);
+                       new BlockTransmitter(node.usm, source, uid, prb, 
node.outputThrottle, null);
                bt.send();
             }
             return;
@@ -84,7 +84,7 @@

         if(rs == null) { // ran out of htl?
             Message dnf = DMT.createFNPDataNotFound(uid);
-            source.send(dnf);
+            source.send(dnf, null);
             return;
         }

@@ -95,16 +95,16 @@
             if(rs.waitUntilStatusChange()) {
                // Forward RejectedOverload
                Message msg = DMT.createFNPRejectedOverload(uid, false);
-               source.sendAsync(msg, null, 0);
+               source.sendAsync(msg, null, 0, null);
             }

             if(rs.transferStarted()) {
                // Is a CHK.
                 Message df = DMT.createFNPCHKDataFound(uid, rs.getHeaders());
-                source.send(df);
+                source.send(df, null);
                 PartiallyReceivedBlock prb = rs.getPRB();
                BlockTransmitter bt =
-                   new BlockTransmitter(node.usm, source, uid, prb, 
node.outputThrottle);
+                   new BlockTransmitter(node.usm, source, uid, prb, 
node.outputThrottle, null);
                bt.send(); // either fails or succeeds; other side will see, we 
don't care
                    return;
             }
@@ -116,7 +116,7 @@
                    continue;
                case RequestSender.DATA_NOT_FOUND:
                     Message dnf = DMT.createFNPDataNotFound(uid);
-                       source.sendAsync(dnf, null, 0);
+                       source.sendAsync(dnf, null, 0, null);
                        return;
                case RequestSender.GENERATED_REJECTED_OVERLOAD:
                case RequestSender.TIMED_OUT:
@@ -124,20 +124,20 @@
                        // Locally generated.
                    // Propagate back to source who needs to reduce send rate
                    Message reject = DMT.createFNPRejectedOverload(uid, true);
-                       source.sendAsync(reject, null, 0);
+                       source.sendAsync(reject, null, 0, null);
                        return;
                case RequestSender.ROUTE_NOT_FOUND:
                    // Tell source
                    Message rnf = DMT.createFNPRouteNotFound(uid, rs.getHTL());
-                       source.sendAsync(rnf, null, 0);
+                       source.sendAsync(rnf, null, 0, null);
                        return;
                case RequestSender.SUCCESS:
                        if(key instanceof NodeSSK) {
                         Message df = DMT.createFNPSSKDataFound(uid, 
rs.getHeaders(), rs.getSSKData());
-                        source.send(df);
+                        source.send(df, null);
                         if(needsPubKey) {
                                Message pk = DMT.createFNPSSKPubKey(uid, 
((NodeSSK)rs.getSSKBlock().getKey()).getPubKey().asBytes());
-                               source.send(df);
+                               source.send(df, null);
                         }
                        } else if(!rs.transferStarted()) {
                                Logger.error(this, "Status is SUCCESS but we 
never started a transfer on "+uid);
@@ -151,7 +151,7 @@
                                continue; // should have started transfer
                        }
                    reject = DMT.createFNPRejectedOverload(uid, true);
-                       source.sendAsync(reject, null, 0);
+                       source.sendAsync(reject, null, 0, null);
                        return;
                case RequestSender.TRANSFER_FAILED:
                        if(key instanceof NodeCHK) {

Modified: trunk/freenet/src/freenet/node/RequestSender.java
===================================================================
--- trunk/freenet/src/freenet/node/RequestSender.java   2006-07-08 01:05:24 UTC 
(rev 9499)
+++ trunk/freenet/src/freenet/node/RequestSender.java   2006-07-08 01:42:43 UTC 
(rev 9500)
@@ -34,7 +34,7 @@
  * transferring senders when starts transferring, and remove from it
  * when finishes transferring.
  */
-public final class RequestSender implements Runnable {
+public final class RequestSender implements Runnable, ByteCounter {

     // Constants
     static final int ACCEPTED_TIMEOUT = 5000;
@@ -141,7 +141,7 @@
             Message req = createDataRequest();


-            next.send(req);
+            next.send(req, this);
             sentRequest = true;

             Message msg = null;
@@ -166,7 +166,7 @@
                 MessageFilter mf = 
mfAccepted.or(mfRejectedLoop.or(mfRejectedOverload));

                 try {
-                    msg = node.usm.waitFor(mf);
+                    msg = node.usm.waitFor(mf, this);
                     Logger.minor(this, "first part got "+msg);
                 } catch (DisconnectedException e) {
                     Logger.normal(this, "Disconnected from "+next+" while 
waiting for Accepted on "+uid);
@@ -235,7 +235,7 @@


                try {
-                       msg = node.usm.waitFor(mf);
+                       msg = node.usm.waitFor(mf, this);
                } catch (DisconnectedException e) {
                        Logger.normal(this, "Disconnected from "+next+" while 
waiting for data on "+uid);
                        break;
@@ -302,7 +302,7 @@
                                        notifyAll();
                                }

-                               BlockReceiver br = new BlockReceiver(node.usm, 
next, uid, prb);
+                               BlockReceiver br = new BlockReceiver(node.usm, 
next, uid, prb, this);

                                try {
                                        Logger.minor(this, "Receiving data");
@@ -402,6 +402,7 @@
                        return;
                } catch (KeyCollisionException e) {
                        Logger.normal(this, "Collision on "+this);
+                       finish(SUCCESS, next);
                }
        }

@@ -502,6 +503,22 @@
         if(status != NOT_FINISHED)
                throw new IllegalStateException("finish() called with "+code+" 
when was already "+status);
         status = code;
+        
+        if(status != TIMED_OUT && status != GENERATED_REJECTED_OVERLOAD && 
status != INTERNAL_ERROR) {
+               if(key instanceof NodeSSK) {
+               Logger.minor(this, "SSK fetch cost 
"+getTotalSentBytes()+"/"+getTotalReceivedBytes()+" bytes ("+status+")");
+               (source == null ? node.localSskFetchBytesSentAverage : 
node.remoteSskFetchBytesSentAverage)
+                               .report(getTotalSentBytes());
+               (source == null ? node.localSskFetchBytesReceivedAverage : 
node.remoteSskFetchBytesReceivedAverage)
+                               .report(getTotalReceivedBytes());
+               } else {
+               Logger.minor(this, "CHK fetch cost 
"+getTotalSentBytes()+"/"+getTotalReceivedBytes()+" bytes ("+status+")");
+               (source == null ? node.localChkFetchBytesSentAverage : 
node.remoteChkFetchBytesSentAverage)
+                       .report(getTotalSentBytes());
+               (source == null ? node.localChkFetchBytesReceivedAverage : 
node.remoteChkFetchBytesReceivedAverage)
+                               .report(getTotalReceivedBytes());
+               }
+        }

         synchronized(this) {
             notifyAll();
@@ -527,4 +544,33 @@
     public SSKBlock getSSKBlock() {
        return block;
     }
+
+       private final Object totalBytesSync = new Object();
+       private int totalBytesSent;
+       
+       public void sentBytes(int x) {
+               synchronized(totalBytesSync) {
+                       totalBytesSent += x;
+               }
+       }
+       
+       public int getTotalSentBytes() {
+               synchronized(totalBytesSync) {
+                       return totalBytesSent;
+               }
+       }
+       
+       private int totalBytesReceived;
+       
+       public void receivedBytes(int x) {
+               synchronized(totalBytesSync) {
+                       totalBytesReceived += x;
+               }
+       }
+       
+       public int getTotalReceivedBytes() {
+               synchronized(totalBytesSync) {
+                       return totalBytesReceived;
+               }
+       }
 }

Modified: trunk/freenet/src/freenet/node/RequestStarter.java
===================================================================
--- trunk/freenet/src/freenet/node/RequestStarter.java  2006-07-08 01:05:24 UTC 
(rev 9499)
+++ trunk/freenet/src/freenet/node/RequestStarter.java  2006-07-08 01:42:43 UTC 
(rev 9500)
@@ -3,6 +3,8 @@
 import freenet.client.async.RequestScheduler;
 import freenet.client.async.SendableRequest;
 import freenet.support.Logger;
+import freenet.support.TokenBucket;
+import freenet.support.math.RunningAverage;

 /**
  * Starts requests.
@@ -37,14 +39,23 @@
        }

        final BaseRequestThrottle throttle;
+       final TokenBucket inputBucket;
+       final TokenBucket outputBucket;
+       final RunningAverage averageInputBytesPerRequest;
+       final RunningAverage averageOutputBytesPerRequest;
        RequestScheduler sched;
        final Node node;
        private long sentRequestTime;

-       public RequestStarter(Node node, BaseRequestThrottle throttle, String 
name) {
+       public RequestStarter(Node node, BaseRequestThrottle throttle, String 
name, TokenBucket outputBucket, TokenBucket inputBucket,
+                       RunningAverage averageOutputBytesPerRequest, 
RunningAverage averageInputBytesPerRequest) {
                this.node = node;
                this.throttle = throttle;
                this.name = name;
+               this.outputBucket = outputBucket;
+               this.inputBucket = inputBucket;
+               this.averageOutputBytesPerRequest = 
averageOutputBytesPerRequest;
+               this.averageInputBytesPerRequest = averageInputBytesPerRequest;
        }

        void setScheduler(RequestScheduler sched) {
@@ -92,6 +103,8 @@
                                long delay = throttle.getDelay();
                                Logger.minor(this, "Delay="+delay+" from 
"+throttle);
                                long sleepUntil = sentRequestTime + delay;
+                               inputBucket.blockingGrab((int)(Math.max(0, 
averageInputBytesPerRequest.currentValue())));
+                               outputBucket.blockingGrab((int)(Math.max(0, 
averageOutputBytesPerRequest.currentValue())));
                                long now;
                                do {
                                        now = System.currentTimeMillis();

Modified: trunk/freenet/src/freenet/node/SSKInsertHandler.java
===================================================================
--- trunk/freenet/src/freenet/node/SSKInsertHandler.java        2006-07-08 
01:05:24 UTC (rev 9499)
+++ trunk/freenet/src/freenet/node/SSKInsertHandler.java        2006-07-08 
01:42:43 UTC (rev 9500)
@@ -85,7 +85,7 @@
         Message accepted = DMT.createFNPSSKAccepted(uid, pubKey == null);

         try {
-                       source.send(accepted);
+                       source.send(accepted, null);
                } catch (NotConnectedException e1) {
                        Logger.minor(this, "Lost connection to source");
                        return;
@@ -98,7 +98,7 @@
                        MessageFilter mfPK = 
MessageFilter.create().setType(DMT.FNPSSKPubKey).setField(DMT.UID, 
uid).setSource(source).setTimeout(PUBKEY_TIMEOUT);

                        try {
-                               Message pk = node.usm.waitFor(mfPK);
+                               Message pk = node.usm.waitFor(mfPK, null);
                                if(pk == null) {
                                        Logger.normal(this, "Failed to receive 
FNPSSKPubKey for "+uid);
                                        return;
@@ -109,7 +109,7 @@
                                        Logger.minor(this, "Got pubkey on 
"+uid+" : "+pubKey);
                                        Message confirm = 
DMT.createFNPSSKPubKeyAccepted(uid);
                                        try {
-                                               source.sendAsync(confirm, null, 
0);
+                                               source.sendAsync(confirm, null, 
0, null);
                                        } catch (NotConnectedException e) {
                                                Logger.minor(this, "Lost 
connection to source on "+uid);
                                                return;
@@ -118,7 +118,7 @@
                                        Logger.error(this, "Invalid pubkey from 
"+source+" on "+uid);
                                        Message msg = 
DMT.createFNPDataInsertRejected(uid, DMT.DATA_INSERT_REJECTED_SSK_ERROR);
                                        try {
-                                               source.send(msg);
+                                               source.send(msg, null);
                                        } catch (NotConnectedException ee) {
                                                // Ignore
                                        }
@@ -137,7 +137,7 @@
                        Logger.error(this, "Invalid SSK from "+source, e1);
                        Message msg = DMT.createFNPDataInsertRejected(uid, 
DMT.DATA_INSERT_REJECTED_SSK_ERROR);
                        try {
-                               source.send(msg);
+                               source.send(msg, null);
                        } catch (NotConnectedException e) {
                                // Ignore
                        }
@@ -149,7 +149,7 @@
                if(storedBlock != null && !storedBlock.equals(block)) {
                        Message msg = DMT.createFNPSSKDataFound(uid, 
storedBlock.getRawHeaders(), storedBlock.getRawData());
                        try {
-                               source.send(msg);
+                               source.send(msg, null);
                        } catch (NotConnectedException e) {
                                Logger.minor(this, "Lost connection to source 
on "+uid);
                        }
@@ -162,7 +162,7 @@
                Message msg = DMT.createFNPInsertReply(uid);
                sentSuccess = true;
                try {
-                               source.send(msg);
+                               source.send(msg, null);
                        } catch (NotConnectedException e) {
                                // Ignore
                        }
@@ -191,7 +191,7 @@
                // Forward it
                Message m = DMT.createFNPRejectedOverload(uid, false);
                try {
-                                       source.send(m);
+                                       source.send(m, null);
                                } catch (NotConnectedException e) {
                                        Logger.minor(this, "Lost connection to 
source");
                                        return;
@@ -210,7 +210,7 @@
                                }
                Message msg = DMT.createFNPSSKDataFound(uid, headers, data);
                try {
-                       source.send(msg);
+                       source.send(msg, null);
                } catch (NotConnectedException e) {
                        Logger.minor(this, "Lost connection to source");
                        return;
@@ -231,7 +231,7 @@
                        status == SSKInsertSender.INTERNAL_ERROR) {
                 Message msg = DMT.createFNPRejectedOverload(uid, true);
                 try {
-                                       source.send(msg);
+                                       source.send(msg, null);
                                } catch (NotConnectedException e) {
                                        Logger.minor(this, "Lost connection to 
source");
                                        return;
@@ -247,7 +247,7 @@
             if(status == SSKInsertSender.ROUTE_NOT_FOUND || status == 
SSKInsertSender.ROUTE_REALLY_NOT_FOUND) {
                 Message msg = DMT.createFNPRouteNotFound(uid, sender.getHTL());
                 try {
-                                       source.send(msg);
+                                       source.send(msg, null);
                                } catch (NotConnectedException e) {
                                        Logger.minor(this, "Lost connection to 
source");
                                        return;
@@ -261,7 +261,7 @@
                Message msg = DMT.createFNPInsertReply(uid);
                sentSuccess = true;
                try {
-                                       source.send(msg);
+                                       source.send(msg, null);
                                } catch (NotConnectedException e) {
                                        Logger.minor(this, "Lost connection to 
source");
                                        return;
@@ -275,7 +275,7 @@
             Logger.error(this, "Unknown status code: 
"+sender.getStatusString());
             Message msg = DMT.createFNPRejectedOverload(uid, true);
             try {
-                               source.send(msg);
+                               source.send(msg, null);
                        } catch (NotConnectedException e) {
                                // Ignore
                        }

Modified: trunk/freenet/src/freenet/node/SSKInsertSender.java
===================================================================
--- trunk/freenet/src/freenet/node/SSKInsertSender.java 2006-07-08 01:05:24 UTC 
(rev 9499)
+++ trunk/freenet/src/freenet/node/SSKInsertSender.java 2006-07-08 01:42:43 UTC 
(rev 9500)
@@ -24,7 +24,7 @@
  *   wait for a long data-transfer timeout.
  * - SSKs have pubkeys, which don't always need to be sent.
  */
-public class SSKInsertSender implements Runnable, AnyInsertSender {
+public class SSKInsertSender implements Runnable, AnyInsertSender, ByteCounter 
{

     // Constants
     static final int ACCEPTED_TIMEOUT = 10000;
@@ -172,7 +172,7 @@
             // Send to next node

             try {
-                               next.sendAsync(req, null, 0);
+                               next.sendAsync(req, null, 0, this);
                        } catch (NotConnectedException e1) {
                                Logger.minor(this, "Not connected to "+next);
                                continue;
@@ -190,7 +190,7 @@
             while (true) {

                                try {
-                                       msg = node.usm.waitFor(mf);
+                                       msg = node.usm.waitFor(mf, null);
                                } catch (DisconnectedException e) {
                                        Logger.normal(this, "Disconnected from 
" + next
                                                        + " while waiting for 
Accepted");
@@ -245,7 +245,7 @@
             if(msg.getBoolean(DMT.NEED_PUB_KEY)) {
                Message pkMsg = DMT.createFNPSSKPubKey(uid, pubKey.asBytes());
                try {
-                       next.sendAsync(pkMsg, null, 0);
+                       next.sendAsync(pkMsg, null, 0, this);
                } catch (NotConnectedException e) {
                        Logger.minor(this, "Node disconnected while sending 
pubkey: "+next);
                        continue;
@@ -257,7 +257,7 @@

                Message newAck;
                                try {
-                                       newAck = node.usm.waitFor(mf1);
+                                       newAck = node.usm.waitFor(mf1, null);
                                } catch (DisconnectedException e) {
                                        Logger.minor(this, "Disconnected from 
"+next);
                                        htl--;
@@ -299,7 +299,7 @@

             while (true) {
                                try {
-                                       msg = node.usm.waitFor(mf);
+                                       msg = node.usm.waitFor(mf, null);
                                } catch (DisconnectedException e) {
                                        Logger.normal(this, "Disconnected from 
" + next
                                                        + " while waiting for 
InsertReply on " + this);
@@ -456,6 +456,15 @@
             notifyAll();
         }

+        if(code != TIMED_OUT && code != GENERATED_REJECTED_OVERLOAD && code != 
INTERNAL_ERROR
+                       && code != ROUTE_REALLY_NOT_FOUND) {
+               Logger.minor(this, "SSK insert cost 
"+getTotalSentBytes()+"/"+getTotalReceivedBytes()+" bytes ("+code+")");
+               (source == null ? node.localChkInsertBytesSentAverage : 
node.remoteChkInsertBytesSentAverage)
+                               .report(getTotalSentBytes());
+               (source == null ? node.localChkInsertBytesReceivedAverage : 
node.remoteChkInsertBytesReceivedAverage)
+                               .report(getTotalReceivedBytes());
+        }
+        
         Logger.minor(this, "Set status code: "+getStatusString());
         // Nothing to wait for, no downstream transfers, just exit.
     }
@@ -523,4 +532,33 @@
                return uid;
        }

+       private final Object totalBytesSync = new Object();
+       private int totalBytesSent;
+       
+       public void sentBytes(int x) {
+               synchronized(totalBytesSync) {
+                       totalBytesSent += x;
+               }
+       }
+       
+       public int getTotalSentBytes() {
+               synchronized(totalBytesSync) {
+                       return totalBytesSent;
+               }
+       }
+       
+       private int totalBytesReceived;
+       
+       public void receivedBytes(int x) {
+               synchronized(totalBytesSync) {
+                       totalBytesReceived += x;
+               }
+       }
+       
+       public int getTotalReceivedBytes() {
+               synchronized(totalBytesSync) {
+                       return totalBytesReceived;
+               }
+       }
+
 }

Modified: trunk/freenet/src/freenet/node/SendMessageOnErrorCallback.java
===================================================================
--- trunk/freenet/src/freenet/node/SendMessageOnErrorCallback.java      
2006-07-08 01:05:24 UTC (rev 9499)
+++ trunk/freenet/src/freenet/node/SendMessageOnErrorCallback.java      
2006-07-08 01:42:43 UTC (rev 9500)
@@ -34,7 +34,7 @@
     public void disconnected() {
         Logger.minor(this, "Disconnect trigger: "+this);
         try {
-            dest.sendAsync(msg, null, 0);
+            dest.sendAsync(msg, null, 0, null);
         } catch (NotConnectedException e) {
             Logger.minor(this, "Both source and destination disconnected: 
"+msg+" for "+this);
         }

Modified: trunk/freenet/src/freenet/node/Version.java
===================================================================
--- trunk/freenet/src/freenet/node/Version.java 2006-07-08 01:05:24 UTC (rev 
9499)
+++ trunk/freenet/src/freenet/node/Version.java 2006-07-08 01:42:43 UTC (rev 
9500)
@@ -18,7 +18,7 @@
        public static final String protocolVersion = "1.0";

        /** The build number of the current revision */
-       private static final int buildNumber = 862;
+       private static final int buildNumber = 863;

        /** Oldest build of Fred we will talk to */
        private static final int oldLastGoodBuild = 839;

Modified: trunk/freenet/src/freenet/support/DoubleTokenBucket.java
===================================================================
--- trunk/freenet/src/freenet/support/DoubleTokenBucket.java    2006-07-08 
01:05:24 UTC (rev 9499)
+++ trunk/freenet/src/freenet/support/DoubleTokenBucket.java    2006-07-08 
01:42:43 UTC (rev 9500)
@@ -81,10 +81,6 @@
                if(current > max) current = max;
        }

-       public synchronized long getNanosPerTick() {
-               return nanosPerTick;
-       }
-
        public synchronized long getSize() {
                return max;
        }

Modified: trunk/freenet/src/freenet/support/TokenBucket.java
===================================================================
--- trunk/freenet/src/freenet/support/TokenBucket.java  2006-07-08 01:05:24 UTC 
(rev 9499)
+++ trunk/freenet/src/freenet/support/TokenBucket.java  2006-07-08 01:42:43 UTC 
(rev 9500)
@@ -184,4 +184,8 @@
                }
                return (nowNS - nextTick) / nanosPerTick;
        }
+       
+       public synchronized long getNanosPerTick() {
+               return nanosPerTick;
+       }
 }


Reply via email to