Author: toad
Date: 2008-09-19 17:39:53 +0000 (Fri, 19 Sep 2008)
New Revision: 22708

Modified:
   trunk/freenet/src/freenet/io/comm/MessageCore.java
   trunk/freenet/src/freenet/io/comm/PeerContext.java
   trunk/freenet/src/freenet/io/xfer/BulkReceiver.java
   trunk/freenet/src/freenet/io/xfer/BulkTransmitter.java
   trunk/freenet/src/freenet/io/xfer/PacketThrottle.java
   trunk/freenet/src/freenet/node/AnnounceSender.java
   trunk/freenet/src/freenet/node/CHKInsertHandler.java
   trunk/freenet/src/freenet/node/CHKInsertSender.java
   trunk/freenet/src/freenet/node/DarknetPeerNode.java
   trunk/freenet/src/freenet/node/FNPPacketMangler.java
   trunk/freenet/src/freenet/node/FailureTable.java
   trunk/freenet/src/freenet/node/KeyTracker.java
   trunk/freenet/src/freenet/node/LocationManager.java
   trunk/freenet/src/freenet/node/MessageItem.java
   trunk/freenet/src/freenet/node/NetworkIDManager.java
   trunk/freenet/src/freenet/node/Node.java
   trunk/freenet/src/freenet/node/NodeDispatcher.java
   trunk/freenet/src/freenet/node/OpennetManager.java
   trunk/freenet/src/freenet/node/OutgoingPacketMangler.java
   trunk/freenet/src/freenet/node/PacketSender.java
   trunk/freenet/src/freenet/node/PeerManager.java
   trunk/freenet/src/freenet/node/PeerNode.java
   trunk/freenet/src/freenet/node/RequestHandler.java
   trunk/freenet/src/freenet/node/RequestSender.java
   trunk/freenet/src/freenet/node/ResettingHTLProbeRequestHandler.java
   trunk/freenet/src/freenet/node/SSKInsertHandler.java
   trunk/freenet/src/freenet/node/SSKInsertSender.java
   trunk/freenet/src/freenet/node/SendMessageOnErrorCallback.java
   trunk/freenet/src/freenet/node/updater/NodeUpdateManager.java
   trunk/freenet/src/freenet/node/updater/UpdateOverMandatoryManager.java
Log:
Implement hard bandwidth limiting (of ALL packets, well except for 
disconnection called from PacketSender), in PacketSender.
Get rid of alreadyReportedBytes.


Modified: trunk/freenet/src/freenet/io/comm/MessageCore.java
===================================================================
--- trunk/freenet/src/freenet/io/comm/MessageCore.java  2008-09-19 16:53:54 UTC 
(rev 22707)
+++ trunk/freenet/src/freenet/io/comm/MessageCore.java  2008-09-19 17:39:53 UTC 
(rev 22708)
@@ -502,7 +502,7 @@
                Logger.error(this, "Trying to send internal-only message "+m+" 
of spec "+m.getSpec(), new Exception("debug"));
                return;
            }
-               destination.sendAsync(m, null, 0, ctr);
+               destination.sendAsync(m, null, ctr);
        }

        public void setDispatcher(Dispatcher d) {

Modified: trunk/freenet/src/freenet/io/comm/PeerContext.java
===================================================================
--- trunk/freenet/src/freenet/io/comm/PeerContext.java  2008-09-19 16:53:54 UTC 
(rev 22707)
+++ trunk/freenet/src/freenet/io/comm/PeerContext.java  2008-09-19 17:39:53 UTC 
(rev 22708)
@@ -34,7 +34,7 @@
        int getVersionNumber();

        /** Send a message to the node */
-       public void sendAsync(Message msg, AsyncMessageCallback cb, int 
alreadyReportedBytes, ByteCounter ctr) throws NotConnectedException;
+       public void sendAsync(Message msg, AsyncMessageCallback cb, ByteCounter 
ctr) throws NotConnectedException;

        /** Send a throttled message to the node (may block for a long time). 
         * @throws SyncSendWaitedTooLongException */

Modified: trunk/freenet/src/freenet/io/xfer/BulkReceiver.java
===================================================================
--- trunk/freenet/src/freenet/io/xfer/BulkReceiver.java 2008-09-19 16:53:54 UTC 
(rev 22707)
+++ trunk/freenet/src/freenet/io/xfer/BulkReceiver.java 2008-09-19 17:39:53 UTC 
(rev 22708)
@@ -48,7 +48,7 @@
                        sentCancel = true;
                }
                try {
-                       peer.sendAsync(DMT.createFNPBulkReceiveAborted(uid), 
null, 0, ctr);
+                       peer.sendAsync(DMT.createFNPBulkReceiveAborted(uid), 
null, ctr);
                } catch (NotConnectedException e) {
                        // Cool
                }
@@ -64,7 +64,7 @@
                        MessageFilter mfPacket = 
MessageFilter.create().setSource(peer).setType(DMT.FNPBulkPacketSend) 
.setField(DMT.UID, uid).setTimeout(TIMEOUT);
                        if(prb.hasWholeFile()) {
                                try {
-                                       
peer.sendAsync(DMT.createFNPBulkReceivedAll(uid), null, 0, ctr);
+                                       
peer.sendAsync(DMT.createFNPBulkReceivedAll(uid), null, ctr);
                                } catch (NotConnectedException e) {
                                        // Ignore, we have the data.
                                }

Modified: trunk/freenet/src/freenet/io/xfer/BulkTransmitter.java
===================================================================
--- trunk/freenet/src/freenet/io/xfer/BulkTransmitter.java      2008-09-19 
16:53:54 UTC (rev 22707)
+++ trunk/freenet/src/freenet/io/xfer/BulkTransmitter.java      2008-09-19 
17:39:53 UTC (rev 22708)
@@ -154,7 +154,7 @@
                        sentCancel = true;
                }
                try {
-                       peer.sendAsync(DMT.createFNPBulkSendAborted(uid), null, 
0, ctr);
+                       peer.sendAsync(DMT.createFNPBulkSendAborted(uid), null, 
ctr);
                } catch (NotConnectedException e) {
                        // Cool
                }

Modified: trunk/freenet/src/freenet/io/xfer/PacketThrottle.java
===================================================================
--- trunk/freenet/src/freenet/io/xfer/PacketThrottle.java       2008-09-19 
16:53:54 UTC (rev 22707)
+++ trunk/freenet/src/freenet/io/xfer/PacketThrottle.java       2008-09-19 
17:39:53 UTC (rev 22708)
@@ -148,7 +148,7 @@
                return ((PACKET_SIZE * 1000.0 / getDelay()));
        }

-       public void sendThrottledMessage(Message msg, PeerContext peer, 
DoubleTokenBucket overallThrottle, int packetSize, ByteCounter ctr, long 
deadline, boolean blockForSend) throws NotConnectedException, 
ThrottleDeprecatedException, WaitedTooLongException, 
SyncSendWaitedTooLongException {
+       public void sendThrottledMessage(Message msg, PeerContext peer, int 
packetSize, ByteCounter ctr, long deadline, boolean blockForSend) throws 
NotConnectedException, ThrottleDeprecatedException, WaitedTooLongException, 
SyncSendWaitedTooLongException {
                long start = System.currentTimeMillis();
                long bootID = peer.getBootID();
                synchronized(this) {
@@ -230,15 +230,7 @@
                        Logger.minor(this, "Congestion control wait time: 
"+waitTime+" for "+this);
                MyCallback callback = new MyCallback();
                try {
-                       if(((PeerNode)peer).shouldThrottle()) {
-                               if(logMINOR) Logger.minor(this, "Throttling 
"+peer.shortToString()+" : "+packetSize+" for "+this);
-                               long startTime = System.currentTimeMillis();
-                               overallThrottle.blockingGrab(packetSize);
-                               long delayTime = System.currentTimeMillis() - 
startTime;
-                               
((PeerNode)peer).reportThrottledPacketSendTime(delayTime);
-                       } else if(logMINOR)
-                               Logger.minor(this, "Not throttling 
"+peer.shortToString()+" for "+this);
-                       peer.sendAsync(msg, callback, packetSize, ctr);
+                       peer.sendAsync(msg, callback, ctr);
                        ctr.sentPayload(packetSize);
                        if(blockForSend) {
                                synchronized(callback) {

Modified: trunk/freenet/src/freenet/node/AnnounceSender.java
===================================================================
--- trunk/freenet/src/freenet/node/AnnounceSender.java  2008-09-19 16:53:54 UTC 
(rev 22707)
+++ trunk/freenet/src/freenet/node/AnnounceSender.java  2008-09-19 17:39:53 UTC 
(rev 22708)
@@ -83,7 +83,7 @@
                boolean hasForwarded = false;
                if(source != null) {
                        try {
-                               source.sendAsync(DMT.createFNPAccepted(uid), 
null, 0, this);
+                               source.sendAsync(DMT.createFNPAccepted(uid), 
null, this);
                        } catch (NotConnectedException e) {
                                return;
                        }
@@ -414,7 +414,7 @@
                Message msg = DMT.createFNPRejectedOverload(uid, true);
                if(source != null) {
                        try {
-                               source.sendAsync(msg, null, 0, this);
+                               source.sendAsync(msg, null, this);
                        } catch (NotConnectedException e) {
                                // Ok
                        }
@@ -426,7 +426,7 @@
                Message msg = DMT.createFNPRouteNotFound(uid, htl);
                if(source != null) {
                        try {
-                               source.sendAsync(msg, null, 0, this);
+                               source.sendAsync(msg, null, this);
                        } catch (NotConnectedException e) {
                                // Ok
                        }
@@ -441,7 +441,7 @@
                Message msg = DMT.createFNPOpennetAnnounceCompleted(uid);
                if(source != null) {
                        try {
-                               source.sendAsync(msg, null, 0, this);
+                               source.sendAsync(msg, null, this);
                        } catch (NotConnectedException e) {
                                // Oh well.
                        }
@@ -495,7 +495,7 @@

        private void sendNotWanted() throws NotConnectedException {
                Message msg = DMT.createFNPOpennetAnnounceNodeNotWanted(uid);
-               source.sendAsync(msg, null, 0, this);
+               source.sendAsync(msg, null, this);
        }

        private void sendOurRef(PeerNode next, byte[] ref) throws 
NotConnectedException {

Modified: trunk/freenet/src/freenet/node/CHKInsertHandler.java
===================================================================
--- trunk/freenet/src/freenet/node/CHKInsertHandler.java        2008-09-19 
16:53:54 UTC (rev 22707)
+++ trunk/freenet/src/freenet/node/CHKInsertHandler.java        2008-09-19 
17:39:53 UTC (rev 22708)
@@ -113,9 +113,9 @@
                        if(source.isConnected() && (startTime > 
(source.timeLastConnectionCompleted()+Node.HANDSHAKE_TIMEOUT*4)))
                                Logger.error(this, "Did not receive DataInsert 
on "+uid+" from "+source+" !");
                        Message tooSlow = DMT.createFNPRejectedTimeout(uid);
-                       source.sendAsync(tooSlow, null, 0, this);
+                       source.sendAsync(tooSlow, null, this);
                        Message m = DMT.createFNPInsertTransfersCompleted(uid, 
true);
-                       source.sendAsync(m, null, 0, this);
+                       source.sendAsync(m, null, this);
                        prb = new PartiallyReceivedBlock(Node.PACKETS_IN_BLOCK, 
Node.PACKET_SIZE);
                        br = new BlockReceiver(node.usm, source, uid, prb, 
this);
                        prb.abort(RetrievalException.NO_DATAINSERT, "No 
DataInsert");
@@ -384,7 +384,7 @@
         }
         if(toSend != null) {
             try {
-                source.sendAsync(toSend, null, 0, this);
+                source.sendAsync(toSend, null, this);
             } catch (NotConnectedException e) {
                 // :(
                if(logMINOR) Logger.minor(this, "Lost connection in "+this+" 
when sending FNPDataInsertRejected");

Modified: trunk/freenet/src/freenet/node/CHKInsertSender.java
===================================================================
--- trunk/freenet/src/freenet/node/CHKInsertSender.java 2008-09-19 16:53:54 UTC 
(rev 22707)
+++ trunk/freenet/src/freenet/node/CHKInsertSender.java 2008-09-19 17:39:53 UTC 
(rev 22708)
@@ -339,7 +339,7 @@
                                   ACCEPTED_TIMEOUT, using sendAsync() will 
skip them before they get the request. This would be a need for retuning
                                   ACCEPTED_TIMEOUT.
                                 */
-                               next.sendAsync(req, null, 0, this);
+                               next.sendAsync(req, null, this);
                        } catch (NotConnectedException e1) {
                                if(logMINOR) Logger.minor(this, "Not connected 
to "+next);
                                continue;

Modified: trunk/freenet/src/freenet/node/DarknetPeerNode.java
===================================================================
--- trunk/freenet/src/freenet/node/DarknetPeerNode.java 2008-09-19 16:53:54 UTC 
(rev 22707)
+++ trunk/freenet/src/freenet/node/DarknetPeerNode.java 2008-09-19 17:39:53 UTC 
(rev 22708)
@@ -303,7 +303,7 @@
                if(localRequest) {
                        Message msg = DMT.createRoutingStatus(shouldRoute);
                        try {
-                               sendAsync(msg, null, 0, 
node.nodeStats.setRoutingStatusCtr);
+                               sendAsync(msg, null, 
node.nodeStats.setRoutingStatusCtr);
                        } catch(NotConnectedException e) {
                        // ok
                        }
@@ -1333,7 +1333,7 @@
                                        Node.N2N_MESSAGE_TYPE_FPROXY, fs
                                                        
.toString().getBytes("UTF-8"));
                        try {
-                               sendAsync(n2ntm, null, 0, 
node.nodeStats.nodeToNodeCounter);
+                               sendAsync(n2ntm, null, 
node.nodeStats.nodeToNodeCounter);
                        } catch (NotConnectedException e) {
                                fs.removeValue("sentTime");
                                queueN2NM(fs);
@@ -1367,7 +1367,7 @@
                                        Node.N2N_MESSAGE_TYPE_FPROXY, fs
                                                        
.toString().getBytes("UTF-8"));
                        try {
-                               sendAsync(n2ntm, null, 0, 
node.nodeStats.nodeToNodeCounter);
+                               sendAsync(n2ntm, null, 
node.nodeStats.nodeToNodeCounter);
                        } catch (NotConnectedException e) {
                                fs.removeValue("sentTime");
                                queueN2NM(fs);
@@ -1410,7 +1410,7 @@
                                        Node.N2N_MESSAGE_TYPE_FPROXY, fs
                                                        
.toString().getBytes("UTF-8"));
                        try {
-                               sendAsync(n2ntm, null, 0, 
node.nodeStats.nodeToNodeCounter);
+                               sendAsync(n2ntm, null, 
node.nodeStats.nodeToNodeCounter);
                        } catch (NotConnectedException e) {
                                fs.removeValue("sentTime");
                                queueN2NM(fs);
@@ -1506,7 +1506,7 @@
                if(fo == null) {
                        Logger.error(this, "No such offer: "+uid);
                        try {
-                               sendAsync(DMT.createFNPBulkSendAborted(uid), 
null, 0, node.nodeStats.nodeToNodeCounter);
+                               sendAsync(DMT.createFNPBulkSendAborted(uid), 
null, node.nodeStats.nodeToNodeCounter);
                        } catch (NotConnectedException e) {
                                // Fine by me!
                        }

Modified: trunk/freenet/src/freenet/node/FNPPacketMangler.java
===================================================================
--- trunk/freenet/src/freenet/node/FNPPacketMangler.java        2008-09-19 
16:53:54 UTC (rev 22707)
+++ trunk/freenet/src/freenet/node/FNPPacketMangler.java        2008-09-19 
17:39:53 UTC (rev 22708)
@@ -1667,13 +1667,13 @@
                System.arraycopy(random, 0, data, 
hash.length+iv.length+2+output.length, random.length);
                node.nodeStats.reportAuthBytes(data.length + 
sock.getHeadersLength());
                try {
-                       sendPacket(data, replyTo, pn, 0);
+                       sendPacket(data, replyTo, pn);
                } catch (LocalAddressException e) {
                        Logger.error(this, "Tried to send auth packet to local 
address: "+replyTo+" for "+pn+" - maybe you should set allowLocalAddresses for 
this peer??");
                }
        }

-       private void sendPacket(byte[] data, Peer replyTo, PeerNode pn, int 
alreadyReportedBytes) throws LocalAddressException {
+       private void sendPacket(byte[] data, Peer replyTo, PeerNode pn) throws 
LocalAddressException {
                if(pn != null) {
                        if(pn.isIgnoreSource()) {
                                Peer p = pn.getPeer();
@@ -1684,13 +1684,7 @@
                if(pn != null)
                        pn.reportOutgoingPacket(data, 0, data.length, 
System.currentTimeMillis());
                if(PeerNode.shouldThrottle(replyTo, node)) {
-                       int reportableBytes = data.length - 
alreadyReportedBytes;
-                       if(reportableBytes <= 0) {
-                               Logger.error(this, "alreadyReportedBytes 
("+alreadyReportedBytes+")> data.length ("+data.length+")");
-                               reportableBytes = 0;
-                       }
-                       if(reportableBytes > 0)
-                               node.outputThrottle.forceGrab(reportableBytes);
+                       node.outputThrottle.forceGrab(data.length);
                }
        }

@@ -2007,7 +2001,6 @@
                }
                if(logMINOR) Logger.minor(this, "processOutgoingOrRequeue 
"+messages.length+" messages for "+pn+" ("+neverWaitForPacketNumber+ ')');
                byte[][] messageData = new byte[messages.length][];
-               int[] alreadyReported = new int[messages.length];
                MessageItem[] newMsgs = new MessageItem[messages.length];
                KeyTracker kt = pn.getCurrentKeyTracker();
                if(kt == null) {
@@ -2028,7 +2021,7 @@
                                try {
                                        byte[] buf = mi.getData();
                                        int packetNumber = 
kt.allocateOutgoingPacketNumberNeverBlock();
-                                       int size = 
processOutgoingPreformatted(buf, 0, buf.length, kt, packetNumber, mi.cb, 
mi.alreadyReportedBytes, mi.getPriority());
+                                       int size = 
processOutgoingPreformatted(buf, 0, buf.length, kt, packetNumber, mi.cb, 
mi.getPriority());
                                        //MARK: onSent()
                                        mi.onSent(size);
                                } catch (NotConnectedException e) {
@@ -2072,10 +2065,9 @@
                                        // Will be handled later
                                }
                                newMsgs[x] = mi;
-                               alreadyReported[x] = mi.alreadyReportedBytes;
                                x++;
                                if(mi.cb != null) callbacksCount += 
mi.cb.length;
-                               if(logMINOR) Logger.minor(this, "Sending: 
"+mi+" length "+data.length+" cb "+ Arrays.toString(mi.cb)+" reported 
"+mi.alreadyReportedBytes);
+                               if(logMINOR) Logger.minor(this, "Sending: 
"+mi+" length "+data.length+" cb "+ Arrays.toString(mi.cb));
                                length += (data.length + 2);
                        }
                }
@@ -2090,12 +2082,10 @@
                }
                AsyncMessageCallback callbacks[] = new 
AsyncMessageCallback[callbacksCount];
                x=0;
-               int alreadyReportedBytes = 0;
                short priority = DMT.PRIORITY_BULK_DATA;
                for(int i=0;i<messages.length;i++) {
                        if(messages[i].formatted) continue;
                        if(messages[i].cb != null) {
-                               alreadyReportedBytes += 
messages[i].alreadyReportedBytes;
                                System.arraycopy(messages[i].cb, 0, callbacks, 
x, messages[i].cb.length);
                                x += messages[i].cb.length;
                        }
@@ -2108,7 +2098,7 @@
                                (messageData.length < 256)) {
                        mi_name = null;
                        try {
-                               int size = innerProcessOutgoing(messageData, 0, 
messageData.length, length, pn, neverWaitForPacketNumber, callbacks, 
alreadyReportedBytes, priority);
+                               int size = innerProcessOutgoing(messageData, 0, 
messageData.length, length, pn, neverWaitForPacketNumber, callbacks, priority);
                                int totalMessageSize = 0;
                                for(int i=0;i<messageData.length;i++) 
totalMessageSize += messageData[i].length;
                                int overhead = size - totalMessageSize;
@@ -2146,7 +2136,6 @@
                        length += kt.countAcks() + kt.countAckRequests() + 
kt.countResendRequests();
                        int count = 0;
                        int lastIndex = 0;
-                       alreadyReportedBytes = 0;
                        if(logMINOR)
                                Logger.minor(this, "Sending 
"+messageData.length+" messages");
                        for(int i=0;i<=messageData.length;i++) {
@@ -2164,7 +2153,7 @@
                                                mi_name = null;
                                                try {
                                                        // FIXME regenerate 
callbacks and priority!
-                                                       int size = 
innerProcessOutgoing(messageData, lastIndex, i-lastIndex, length, pn, 
neverWaitForPacketNumber, callbacks, alreadyReportedBytes, priority);
+                                                       int size = 
innerProcessOutgoing(messageData, lastIndex, i-lastIndex, length, pn, 
neverWaitForPacketNumber, callbacks, priority);
                                                        int totalMessageSize = 
0;
                                                        for(int 
j=lastIndex;j<i;j++) totalMessageSize += messageData[j].length;
                                                        int overhead = size - 
totalMessageSize;
@@ -2200,12 +2189,10 @@
                                        lastIndex = i;
                                        if(i != messageData.length) {
                                                length = 1 + 
(messageData[i].length + 2);
-                                               alreadyReportedBytes = 
alreadyReported[i];
                                        }
                                        count = 0;
                                } else {
                                        length = newLength;
-                                       alreadyReportedBytes += 
alreadyReported[i];
                                }
                        }
                }
@@ -2221,8 +2208,8 @@
         * @throws PacketSequenceException 
         */
        private int innerProcessOutgoing(byte[][] messageData, int start, int 
length, int bufferLength, 
-                       PeerNode pn, boolean neverWaitForPacketNumber, 
AsyncMessageCallback[] callbacks, int alreadyReportedBytes, short priority) 
throws NotConnectedException, WouldBlockException, PacketSequenceException {
-               if(logMINOR) Logger.minor(this, 
"innerProcessOutgoing(...,"+start+ ',' +length+ ',' +bufferLength+ 
','+callbacks.length+','+alreadyReportedBytes+')');
+                       PeerNode pn, boolean neverWaitForPacketNumber, 
AsyncMessageCallback[] callbacks, short priority) throws NotConnectedException, 
WouldBlockException, PacketSequenceException {
+               if(logMINOR) Logger.minor(this, 
"innerProcessOutgoing(...,"+start+ ',' +length+ ',' +bufferLength+ 
','+callbacks.length+')');
                byte[] buf = new byte[bufferLength];
                buf[0] = (byte)length;
                int loc = 1;
@@ -2234,15 +2221,15 @@
                        System.arraycopy(data, 0, buf, loc, len);
                        loc += len;
                }
-               return processOutgoingPreformatted(buf, 0, loc, pn, 
neverWaitForPacketNumber, callbacks, alreadyReportedBytes, priority);
+               return processOutgoingPreformatted(buf, 0, loc, pn, 
neverWaitForPacketNumber, callbacks, priority);
        }

        /* (non-Javadoc)
         * @see freenet.node.OutgoingPacketMangler#processOutgoing(byte[], int, 
int, freenet.node.KeyTracker, int)
         */
-       public int processOutgoing(byte[] buf, int offset, int length, 
KeyTracker tracker, int alreadyReportedBytes, short priority) throws 
KeyChangedException, NotConnectedException, PacketSequenceException, 
WouldBlockException {
+       public int processOutgoing(byte[] buf, int offset, int length, 
KeyTracker tracker, short priority) throws KeyChangedException, 
NotConnectedException, PacketSequenceException, WouldBlockException {
                byte[] newBuf = preformat(buf, offset, length);
-               return processOutgoingPreformatted(newBuf, 0, newBuf.length, 
tracker, -1, null, alreadyReportedBytes, priority);
+               return processOutgoingPreformatted(newBuf, 0, newBuf.length, 
tracker, -1, null, priority);
        }

        /**
@@ -2250,7 +2237,7 @@
         * the key changes.
         * @throws PacketSequenceException 
         */
-       int processOutgoingPreformatted(byte[] buf, int offset, int length, 
PeerNode peer, boolean neverWaitForPacketNumber, AsyncMessageCallback[] 
callbacks, int alreadyReportedBytes, short priority) throws 
NotConnectedException, WouldBlockException, PacketSequenceException {
+       int processOutgoingPreformatted(byte[] buf, int offset, int length, 
PeerNode peer, boolean neverWaitForPacketNumber, AsyncMessageCallback[] 
callbacks, short priority) throws NotConnectedException, WouldBlockException, 
PacketSequenceException {
                KeyTracker last = null;
                while(true) {
                        try {
@@ -2264,7 +2251,7 @@
                                }
                                int seqNo = neverWaitForPacketNumber ? 
tracker.allocateOutgoingPacketNumberNeverBlock() :
                                        tracker.allocateOutgoingPacketNumber();
-                               return processOutgoingPreformatted(buf, offset, 
length, tracker, seqNo, callbacks, alreadyReportedBytes, priority);
+                               return processOutgoingPreformatted(buf, offset, 
length, tracker, seqNo, callbacks, priority);
                        } catch (KeyChangedException e) {
                                Logger.normal(this, "Key changed(2) for 
"+peer.getPeer());
                                if(last == peer.getCurrentKeyTracker()) {
@@ -2296,7 +2283,7 @@
        /* (non-Javadoc)
         * @see 
freenet.node.OutgoingPacketMangler#processOutgoingPreformatted(byte[], int, 
int, freenet.node.KeyTracker, int, freenet.node.AsyncMessageCallback[], int)
         */
-       public int processOutgoingPreformatted(byte[] buf, int offset, int 
length, KeyTracker tracker, int packetNumber, AsyncMessageCallback[] callbacks, 
int alreadyReportedBytes, short priority) throws KeyChangedException, 
NotConnectedException, PacketSequenceException, WouldBlockException {
+       public int processOutgoingPreformatted(byte[] buf, int offset, int 
length, KeyTracker tracker, int packetNumber, AsyncMessageCallback[] callbacks, 
short priority) throws KeyChangedException, NotConnectedException, 
PacketSequenceException, WouldBlockException {
                if(logMINOR) {
                        String log = 
"processOutgoingPreformatted("+Fields.hashCode(buf)+", "+offset+ ',' +length+ 
',' +tracker+ ',' +packetNumber+ ',';
                        if(callbacks == null) log += "null";
@@ -2527,7 +2514,7 @@

                if(logMINOR) Logger.minor(this, "Sending... "+seqNumber);

-               int ret = processOutgoingFullyFormatted(plaintext, tracker, 
alreadyReportedBytes);
+               int ret = processOutgoingFullyFormatted(plaintext, tracker);
                if(logMINOR) Logger.minor(this, "Sent packet "+seqNumber);
                return ret;
        }
@@ -2537,7 +2524,7 @@
         * @param plaintext The packet's plaintext, including all formatting,
         * including acks and resend requests. Is clobbered.
         */
-       private int processOutgoingFullyFormatted(byte[] plaintext, KeyTracker 
kt, int alreadyReportedBytes) {
+       private int processOutgoingFullyFormatted(byte[] plaintext, KeyTracker 
kt) {
                BlockCipher sessionCipher = kt.sessionCipher;
                if(logMINOR) Logger.minor(this, "Encrypting with 
"+HexUtil.bytesToHex(kt.sessionKey));
                if(sessionCipher == null) {
@@ -2592,7 +2579,7 @@

                // pn.getPeer() cannot be null
                try {
-                       sendPacket(output, kt.pn.getPeer(), kt.pn, 
alreadyReportedBytes);
+                       sendPacket(output, kt.pn.getPeer(), kt.pn);
 //                     System.err.println(kt.pn.getIdentityString()+" : sent 
packet length "+output.length);
                } catch (LocalAddressException e) {
                        Logger.error(this, "Tried to send data packet to local 
address: "+kt.pn.getPeer()+" for "+kt.pn.allowLocalAddresses());
@@ -2634,7 +2621,7 @@
        }

        public void resend(ResendPacketItem item) throws 
PacketSequenceException, WouldBlockException, KeyChangedException, 
NotConnectedException {
-               int size = processOutgoingPreformatted(item.buf, 0, 
item.buf.length, item.kt, item.packetNumber, item.callbacks, 0, item.priority);
+               int size = processOutgoingPreformatted(item.buf, 0, 
item.buf.length, item.kt, item.packetNumber, item.callbacks, item.priority);
                item.pn.resendByteCounter.sentBytes(size);
        }


Modified: trunk/freenet/src/freenet/node/FailureTable.java
===================================================================
--- trunk/freenet/src/freenet/node/FailureTable.java    2008-09-19 16:53:54 UTC 
(rev 22707)
+++ trunk/freenet/src/freenet/node/FailureTable.java    2008-09-19 17:39:53 UTC 
(rev 22708)
@@ -402,7 +402,7 @@
                        SSKBlock block = node.fetch((NodeSSK)key, false);
                        if(block == null) {
                                // Don't have the key
-                               
source.sendAsync(DMT.createFNPGetOfferedKeyInvalid(uid, 
DMT.GET_OFFERED_KEY_REJECTED_NO_KEY), null, 0, senderCounter);
+                               
source.sendAsync(DMT.createFNPGetOfferedKeyInvalid(uid, 
DMT.GET_OFFERED_KEY_REJECTED_NO_KEY), null, senderCounter);
                                return;
                        }

@@ -410,7 +410,7 @@
                        Message headers = DMT.createFNPSSKDataFoundHeaders(uid, 
block.getRawHeaders());
                        final int dataLength = block.getRawData().length;

-                       source.sendAsync(headers, null, 0, senderCounter);
+                       source.sendAsync(headers, null, senderCounter);

                        node.executor.execute(new PrioRunnable() {

@@ -437,21 +437,21 @@

                        if(RequestHandler.SEND_OLD_FORMAT_SSK) {
                                Message df = DMT.createFNPSSKDataFound(uid, 
block.getRawHeaders(), block.getRawData());
-                               source.sendAsync(df, null, 0, senderCounter);
+                               source.sendAsync(df, null, senderCounter);
                        }
                        if(needPubKey) {
                                Message pk = DMT.createFNPSSKPubKey(uid, 
block.getPubKey());
-                               source.sendAsync(pk, null, 0, senderCounter);
+                               source.sendAsync(pk, null, senderCounter);
                        }
                } else {
                        CHKBlock block = node.fetch((NodeCHK)key, false);
                        if(block == null) {
                                // Don't have the key
-                               
source.sendAsync(DMT.createFNPGetOfferedKeyInvalid(uid, 
DMT.GET_OFFERED_KEY_REJECTED_NO_KEY), null, 0, senderCounter);
+                               
source.sendAsync(DMT.createFNPGetOfferedKeyInvalid(uid, 
DMT.GET_OFFERED_KEY_REJECTED_NO_KEY), null, senderCounter);
                                return;
                        }
                        Message df = DMT.createFNPCHKDataFound(uid, 
block.getRawHeaders());
-                       source.sendAsync(df, null, 0, senderCounter);
+                       source.sendAsync(df, null, senderCounter);
                PartiallyReceivedBlock prb =
                        new PartiallyReceivedBlock(Node.PACKETS_IN_BLOCK, 
Node.PACKET_SIZE, block.getRawData());
                final BlockTransmitter bt =

Modified: trunk/freenet/src/freenet/node/KeyTracker.java
===================================================================
--- trunk/freenet/src/freenet/node/KeyTracker.java      2008-09-19 16:53:54 UTC 
(rev 22707)
+++ trunk/freenet/src/freenet/node/KeyTracker.java      2008-09-19 17:39:53 UTC 
(rev 22708)
@@ -985,7 +985,7 @@
                        // Ignore packet#
                        if(logMINOR)
                                Logger.minor(this, "Queueing resend of what was 
once " + element.packetNumber);
-                       messages[i] = new MessageItem(buf, callbacks, true, 0, 
pn.resendByteCounter, element.priority);
+                       messages[i] = new MessageItem(buf, callbacks, true, 
pn.resendByteCounter, element.priority);
                }
                pn.requeueMessageItems(messages, 0, messages.length, true);


Modified: trunk/freenet/src/freenet/node/LocationManager.java
===================================================================
--- trunk/freenet/src/freenet/node/LocationManager.java 2008-09-19 16:53:54 UTC 
(rev 22707)
+++ trunk/freenet/src/freenet/node/LocationManager.java 2008-09-19 17:39:53 UTC 
(rev 22708)
@@ -861,7 +861,7 @@
             // Reject
             Message reject = DMT.createFNPSwapRejected(oldID);
             try {
-                pn.sendAsync(reject, null, 0, this);
+                pn.sendAsync(reject, null, this);
             } catch (NotConnectedException e1) {
                if(logMINOR) Logger.minor(this, "Lost connection rejecting 
SwapRequest (locked) from "+pn);
             }
@@ -889,7 +889,7 @@
             // Reject
             Message reject = DMT.createFNPSwapRejected(oldID);
             try {
-                pn.sendAsync(reject, null, 0, this);
+                pn.sendAsync(reject, null, this);
             } catch (NotConnectedException e) {
                if(logMINOR) Logger.minor(this, "Lost connection to "+pn+" 
rejecting SwapRequest");
             }
@@ -901,7 +901,7 @@
             // Reject
             Message reject = DMT.createFNPSwapRejected(oldID);
             try {
-                pn.sendAsync(reject, null, 0, this);
+                pn.sendAsync(reject, null, this);
             } catch (NotConnectedException e) {
                if(logMINOR) Logger.minor(this, "Lost connection rejecting 
SwapRequest from "+pn);
             }
@@ -919,7 +919,7 @@
             // Reject
             Message reject = DMT.createFNPSwapRejected(oldID);
             try {
-                pn.sendAsync(reject, null, 0, this);
+                pn.sendAsync(reject, null, this);
             } catch (NotConnectedException e1) {
                if(logMINOR) Logger.minor(this, "Lost connection rejecting 
SwapRequest (locked) from "+pn);
             }
@@ -942,7 +942,7 @@
                        if(logMINOR) Logger.minor(this, "Late reject "+oldID);
                     Message reject = DMT.createFNPSwapRejected(oldID);
                     try {
-                        pn.sendAsync(reject, null, 0, this);
+                        pn.sendAsync(reject, null, this);
                     } catch (NotConnectedException e1) {
                         Logger.normal(this, "Late reject but disconnected from 
sender: "+pn);
                     }
@@ -956,7 +956,7 @@
                     // Forward the request.
                     // Note that we MUST NOT send this blocking as we are on 
the
                     // receiver thread.
-                    randomPeer.sendAsync(m, new 
MyCallback(DMT.createFNPSwapRejected(oldID), pn, item), 0, 
LocationManager.this);
+                    randomPeer.sendAsync(m, new 
MyCallback(DMT.createFNPSwapRejected(oldID), pn, item), LocationManager.this);
                 } catch (NotConnectedException e) {
                        if(logMINOR) Logger.minor(this, "Not connected");
                     // Try a different node
@@ -1001,7 +1001,7 @@
                if(logMINOR) Logger.minor(this, "Rejecting "+msg);
             Message rejected = DMT.createFNPSwapRejected(oldID);
             try {
-                pn.sendAsync(rejected, null, 0, this);
+                pn.sendAsync(rejected, null, this);
             } catch (NotConnectedException e1) {
                if(logMINOR) Logger.minor(this, "Lost connection rejecting 
SwapRequest (locked) from "+pn);
             }
@@ -1065,7 +1065,7 @@
         m.set(DMT.UID, item.incomingID);
         if(logMINOR) Logger.minor(this, "Forwarding SwapReply "+uid+" from 
"+source+" to "+item.requestSender);
         try {
-            item.requestSender.sendAsync(m, null, 0, this);
+            item.requestSender.sendAsync(m, null, this);
         } catch (NotConnectedException e) {
                if(logMINOR) Logger.minor(this, "Lost connection forwarding 
SwapReply "+uid+" to "+item.requestSender);
         }
@@ -1099,7 +1099,7 @@
         // Returning to source - use incomingID
         m.set(DMT.UID, item.incomingID);
         try {
-            item.requestSender.sendAsync(m, null, 0, this);
+            item.requestSender.sendAsync(m, null, this);
         } catch (NotConnectedException e) {
                if(logMINOR) Logger.minor(this, "Lost connection forwarding 
SwapRejected "+uid+" to "+item.requestSender);
         }
@@ -1125,7 +1125,7 @@
         // Sending onwards - use outgoing ID
         m.set(DMT.UID, item.outgoingID);
         try {
-            item.routedTo.sendAsync(m, new 
SendMessageOnErrorCallback(DMT.createFNPSwapRejected(item.incomingID), 
item.requestSender, this), 0, this);
+            item.routedTo.sendAsync(m, new 
SendMessageOnErrorCallback(DMT.createFNPSwapRejected(item.incomingID), 
item.requestSender, this), this);
         } catch (NotConnectedException e) {
                if(logMINOR) Logger.minor(this, "Lost connection forwarding 
SwapCommit "+uid+" to "+item.routedTo);
         }
@@ -1162,7 +1162,7 @@
         // Returning to source - use incomingID
         m.set(DMT.UID, item.incomingID);
         try {
-            item.requestSender.sendAsync(m, null, 0, this);
+            item.requestSender.sendAsync(m, null, this);
         } catch (NotConnectedException e) {
             Logger.normal(this, "Lost connection forwarding SwapComplete 
"+uid+" to "+item.requestSender);
         }
@@ -1278,7 +1278,7 @@
             Message msg = DMT.createFNPSwapRejected(item.incomingID);
             if(logMINOR) Logger.minor(this, "Rejecting in lostOrRestartedNode: 
"+item.incomingID+ " from "+item.requestSender);
             try {
-                item.requestSender.sendAsync(msg, null, 0, this);
+                item.requestSender.sendAsync(msg, null, this);
             } catch (NotConnectedException e1) {
                 Logger.normal(this, "Both sender and receiver disconnected for 
"+item);
             }

Modified: trunk/freenet/src/freenet/node/MessageItem.java
===================================================================
--- trunk/freenet/src/freenet/node/MessageItem.java     2008-09-19 16:53:54 UTC 
(rev 22707)
+++ trunk/freenet/src/freenet/node/MessageItem.java     2008-09-19 17:39:53 UTC 
(rev 22708)
@@ -17,7 +17,6 @@
     final byte[] buf;
     final AsyncMessageCallback[] cb;
     final long submitted;
-    final int alreadyReportedBytes;
     /** If true, the buffer may contain several messages, and is formatted
      * for sending as a single packet.
      */
@@ -25,8 +24,7 @@
     final ByteCounter ctrCallback;
     private final short priority;

-    public MessageItem(Message msg2, AsyncMessageCallback[] cb2, int 
alreadyReportedBytes, ByteCounter ctr, PeerNode pn) {
-       this.alreadyReportedBytes = alreadyReportedBytes;
+    public MessageItem(Message msg2, AsyncMessageCallback[] cb2, ByteCounter 
ctr, PeerNode pn) {
         this.msg = msg2;
         this.cb = cb2;
         formatted = false;
@@ -34,13 +32,9 @@
         this.submitted = System.currentTimeMillis();
         priority = msg2.getSpec().getPriority();
         buf = msg.encodeToPacket(pn);
-        if(buf.length <= alreadyReportedBytes) {
-               Logger.error(this, "buf.length = "+buf.length+" but 
alreadyReportedBytes = "+alreadyReportedBytes+" on "+this);
-        }
     }

-    public MessageItem(byte[] data, AsyncMessageCallback[] cb2, boolean 
formatted, int alreadyReportedBytes, ByteCounter ctr, short priority) {
-       this.alreadyReportedBytes = alreadyReportedBytes;
+    public MessageItem(byte[] data, AsyncMessageCallback[] cb2, boolean 
formatted, ByteCounter ctr, short priority) {
         this.cb = cb2;
         this.msg = null;
         this.buf = data;
@@ -93,7 +87,7 @@

        @Override
        public String toString() {
-               return 
super.toString()+":formatted="+formatted+",msg="+msg+",alreadyReported="+alreadyReportedBytes;
+               return super.toString()+":formatted="+formatted+",msg="+msg;
        }

        public void onDisconnect() {

Modified: trunk/freenet/src/freenet/node/NetworkIDManager.java
===================================================================
--- trunk/freenet/src/freenet/node/NetworkIDManager.java        2008-09-19 
16:53:54 UTC (rev 22707)
+++ trunk/freenet/src/freenet/node/NetworkIDManager.java        2008-09-19 
17:39:53 UTC (rev 22708)
@@ -101,7 +101,7 @@
                if (logMINOR) Logger.minor(this, "Storing secret: "+s);
                addOrReplaceSecret(s); // FIXME - what if the message contain a 
bogus UID?
                try {
-                       pn.sendAsync(DMT.createFNPAccepted(uid), null, 0, ctr);
+                       pn.sendAsync(DMT.createFNPAccepted(uid), null, ctr);
                } catch (NotConnectedException e) {
                        Logger.error(this, "peer disconnected before 
storeSecret ack?", e);
                }
@@ -133,7 +133,7 @@

                if (disableSecretPings || node.recentlyCompleted(uid)) {
                        if (logMINOR) Logger.minor(this, "recently 
complete/loop: "+uid);
-                       source.sendAsync(DMT.createFNPRejectedLoop(uid), null, 
0, ctr);
+                       source.sendAsync(DMT.createFNPRejectedLoop(uid), null, 
ctr);
                } else {
                        byte[] nodeIdentity = ((ShortBuffer) 
m.getObject(DMT.NODE_IDENTITY)).getData();
                        StoredSecret match;
@@ -145,10 +145,10 @@
                                //This is the node that the ping intends to 
reach, we will *not* forward it; but we might not respond positively either.
                                //don't set the completed flag, we might reject 
it from one peer (too short a path) and accept it from another.
                                if (htl > dawnHtl) {
-                                       
source.sendAsync(DMT.createFNPRejectedLoop(uid), null, 0, ctr);
+                                       
source.sendAsync(DMT.createFNPRejectedLoop(uid), null, ctr);
                                } else {
                                        if (logMINOR) Logger.minor(this, 
"Responding to "+source+" with "+match+" from "+match.peer);
-                                       
source.sendAsync(match.getSecretPong(counter+1), null, 0, ctr);
+                                       
source.sendAsync(match.getSecretPong(counter+1), null, ctr);
                                }
                        } else {
                                //Set the completed flag immediately for 
determining reject loops rather than locking the uid.
@@ -169,7 +169,7 @@

                                        if (next==null) {
                                                //would be rnf... but this is a 
more exhaustive and lightweight search I suppose.
-                                               
source.sendAsync(DMT.createFNPRejectedLoop(uid), null, 0, ctr);
+                                               
source.sendAsync(DMT.createFNPRejectedLoop(uid), null, ctr);
                                                break;
                                        }

@@ -177,7 +177,7 @@

                                        if (htl<=0) {
                                                //would be dnf if we were 
looking for data.
-                                               
source.sendAsync(DMT.createFNPRejectedLoop(uid), null, 0, ctr);
+                                               
source.sendAsync(DMT.createFNPRejectedLoop(uid), null, ctr);
                                                break;
                                        }

@@ -188,7 +188,7 @@
                                        counter++;
                                        routedTo.add(next);
                                        try {
-                                               
next.sendAsync(DMT.createFNPSecretPing(uid, target, htl, dawnHtl, counter, 
nodeIdentity), null, 0, ctr);
+                                               
next.sendAsync(DMT.createFNPSecretPing(uid, target, htl, dawnHtl, counter, 
nodeIdentity), null, ctr);
                                        } catch (NotConnectedException e) {
                                                Logger.normal(this, next+" 
disconnected before secret-ping-forward");
                                                continue;
@@ -218,7 +218,7 @@
                                                        counter=suppliedCounter;
                                                long 
secret=msg.getLong(DMT.SECRET);
                                                if (logMINOR) 
Logger.minor(this, node+" forwarding apparently-successful secretpong response: 
"+counter+"/"+secret+" from "+next+" to "+source);
-                                               
source.sendAsync(DMT.createFNPSecretPong(uid, counter, secret), null, 0, ctr);
+                                               
source.sendAsync(DMT.createFNPSecretPong(uid, counter, secret), null, ctr);
                                                break;
                                        }


Modified: trunk/freenet/src/freenet/node/Node.java
===================================================================
--- trunk/freenet/src/freenet/node/Node.java    2008-09-19 16:53:54 UTC (rev 
22707)
+++ trunk/freenet/src/freenet/node/Node.java    2008-09-19 17:39:53 UTC (rev 
22708)
@@ -99,7 +99,6 @@
 import freenet.store.SSKStore;
 import freenet.store.saltedhash.SaltedHashFreenetStore;
 import freenet.support.ByteArrayWrapper;
-import freenet.support.DoubleTokenBucket;
 import freenet.support.Executor;
 import freenet.support.Fields;
 import freenet.support.FileLoggerHook;
@@ -113,6 +112,7 @@
 import freenet.support.PooledExecutor;
 import freenet.support.ShortBuffer;
 import freenet.support.SimpleFieldSet;
+import freenet.support.TokenBucket;
 import freenet.support.api.BooleanCallback;
 import freenet.support.api.IntCallback;
 import freenet.support.api.LongCallback;
@@ -421,7 +421,7 @@
        final LRUHashtable<ByteArrayWrapper, DSAPublicKey> cachedPubKeys;
        final boolean testnetEnabled;
        final TestnetHandler testnetHandler;
-       public final DoubleTokenBucket outputThrottle;
+       public final TokenBucket outputThrottle;
        public boolean throttleLocalData;
        private int outputBandwidthLimit;
        private int inputBandwidthLimit;
@@ -1113,7 +1113,7 @@
                // Add them at a rate determined by the obwLimit.
                // Maximum forced bytes 80%, in other words, 20% of the 
bandwidth is reserved for 
                // block transfers, so we will use that 20% for block transfers 
even if more than 80% of the limit is used for non-limited data (resends etc).
-               outputThrottle = new DoubleTokenBucket(obwLimit/2, 
(1000L*1000L*1000L) / obwLimit, obwLimit/2, 0.8);
+               outputThrottle = new TokenBucket(obwLimit/2, 
(1000L*1000L*1000L) / obwLimit, obwLimit/2);

                nodeConfig.register("inputBandwidthLimit", "-1", sortOrder++, 
false, true, "Node.inBWLimit", "Node.inBWLimitLong",      new IntCallback() {
                                        @Override

Modified: trunk/freenet/src/freenet/node/NodeDispatcher.java
===================================================================
--- trunk/freenet/src/freenet/node/NodeDispatcher.java  2008-09-19 16:53:54 UTC 
(rev 22707)
+++ trunk/freenet/src/freenet/node/NodeDispatcher.java  2008-09-19 17:39:53 UTC 
(rev 22708)
@@ -93,7 +93,7 @@
                        // Send an FNPPong
                        Message reply = 
DMT.createFNPPong(m.getInt(DMT.PING_SEQNO));
                        try {
-                               source.sendAsync(reply, null, 0, pingCounter); 
// nothing we can do if can't contact source
+                               source.sendAsync(reply, null, pingCounter); // 
nothing we can do if can't contact source
                        } catch (NotConnectedException e) {
                                if(logMINOR) Logger.minor(this, "Lost 
connection replying to "+m);
                        }
@@ -247,7 +247,7 @@
                
if(!HMAC.verifyWithSHA256(node.failureTable.offerAuthenticatorKey, 
key.getFullKey(), authenticator)) {
                        Logger.error(this, "Invalid offer request from 
"+source+" : authenticator did not verify");
                        try {
-                               
source.sendAsync(DMT.createFNPGetOfferedKeyInvalid(uid, 
DMT.GET_OFFERED_KEY_REJECTED_BAD_AUTHENTICATOR), null, 0, 
node.failureTable.senderCounter);
+                               
source.sendAsync(DMT.createFNPGetOfferedKeyInvalid(uid, 
DMT.GET_OFFERED_KEY_REJECTED_BAD_AUTHENTICATOR), null, 
node.failureTable.senderCounter);
                        } catch (NotConnectedException e) {
                                // Too bad.
                        }
@@ -267,7 +267,7 @@
                        Logger.normal(this, "Rejecting FNPGetOfferedKey from 
"+source+" for "+key+" : "+reject);
                        Message rejected = DMT.createFNPRejectedOverload(uid, 
true);
                        try {
-                               source.sendAsync(rejected, null, 0, 
node.failureTable.senderCounter);
+                               source.sendAsync(rejected, null, 
node.failureTable.senderCounter);
                        } catch (NotConnectedException e) {
                                Logger.normal(this, "Rejecting (overload) data 
request from "+source.getPeer()+": "+e);
                        }
@@ -346,7 +346,7 @@
                if(node.recentlyCompleted(id)) {
                        Message rejected = DMT.createFNPRejectedLoop(id);
                        try {
-                               source.sendAsync(rejected, null, 0, ctr);
+                               source.sendAsync(rejected, null, ctr);
                        } catch (NotConnectedException e) {
                                Logger.normal(this, "Rejecting data request 
(loop, finished): "+e);
                        }
@@ -358,7 +358,7 @@
                        if(logMINOR) Logger.minor(this, "Could not lock ID 
"+id+" -> rejecting (already running)");
                        Message rejected = DMT.createFNPRejectedLoop(id);
                        try {
-                               source.sendAsync(rejected, null, 0, ctr);
+                               source.sendAsync(rejected, null, ctr);
                        } catch (NotConnectedException e) {
                                Logger.normal(this, "Rejecting request from 
"+source.getPeer()+": "+e);
                        }
@@ -373,7 +373,7 @@
                        Logger.normal(this, "Rejecting "+(isSSK ? "SSK" : 
"CHK")+" request from "+source.getPeer()+" preemptively because "+rejectReason);
                        Message rejected = DMT.createFNPRejectedOverload(id, 
true);
                        try {
-                               source.sendAsync(rejected, null, 0, ctr);
+                               source.sendAsync(rejected, null, ctr);
                        } catch (NotConnectedException e) {
                                Logger.normal(this, "Rejecting (overload) data 
request from "+source.getPeer()+": "+e);
                        }
@@ -396,7 +396,7 @@
                if(node.recentlyCompleted(id)) {
                        Message rejected = DMT.createFNPRejectedLoop(id);
                        try {
-                               source.sendAsync(rejected, null, 0, ctr);
+                               source.sendAsync(rejected, null, ctr);
                        } catch (NotConnectedException e) {
                                Logger.normal(this, "Rejecting insert request 
from "+source.getPeer()+": "+e);
                        }
@@ -406,7 +406,7 @@
                        if(logMINOR) Logger.minor(this, "Could not lock ID 
"+id+" -> rejecting (already running)");
                        Message rejected = DMT.createFNPRejectedLoop(id);
                        try {
-                               source.sendAsync(rejected, null, 0, ctr);
+                               source.sendAsync(rejected, null, ctr);
                        } catch (NotConnectedException e) {
                                Logger.normal(this, "Rejecting insert request 
from "+source.getPeer()+": "+e);
                        }
@@ -418,7 +418,7 @@
                        Logger.normal(this, "Rejecting insert from 
"+source.getPeer()+" preemptively because "+rejectReason);
                        Message rejected = DMT.createFNPRejectedOverload(id, 
true);
                        try {
-                               source.sendAsync(rejected, null, 0, ctr);
+                               source.sendAsync(rejected, null, ctr);
                        } catch (NotConnectedException e) {
                                Logger.normal(this, "Rejecting (overload) 
insert request from "+source.getPeer()+": "+e);
                        }
@@ -453,7 +453,7 @@
                if(node.recentlyCompleted(id)) {
                        Message rejected = DMT.createFNPRejectedLoop(id);
                        try {
-                               source.sendAsync(rejected, null, 0, 
node.nodeStats.probeRequestCtr);
+                               source.sendAsync(rejected, null, 
node.nodeStats.probeRequestCtr);
                        } catch (NotConnectedException e) {
                                Logger.normal(this, "Rejecting probe request 
from "+source.getPeer()+": "+e);
                        }
@@ -466,7 +466,7 @@
                        Logger.normal(this, "Rejecting probe request from 
"+source.getPeer());
                        Message rejected = DMT.createFNPRejectedOverload(id, 
true);
                        try {
-                               source.sendAsync(rejected, null, 0, 
node.nodeStats.probeRequestCtr);
+                               source.sendAsync(rejected, null, 
node.nodeStats.probeRequestCtr);
                        } catch (NotConnectedException e) {
                                Logger.normal(this, "Rejecting (overload) 
insert request from "+source.getPeer()+": "+e);
                        }
@@ -477,7 +477,7 @@
                        Logger.normal(this, "Rejecting invalid 
(target="+target+") probe request from "+source.getPeer());
                        Message rejected = DMT.createFNPRejectedOverload(id, 
true);
                        try {
-                               source.sendAsync(rejected, null, 0, 
node.nodeStats.probeRequestCtr);
+                               source.sendAsync(rejected, null, 
node.nodeStats.probeRequestCtr);
                        } catch (NotConnectedException e) {
                                Logger.normal(this, "Rejecting (invalid) insert 
request from "+source.getPeer()+": "+e);
                        }
@@ -493,7 +493,7 @@
                if(om == null || !source.canAcceptAnnouncements()) {
                        Message msg = DMT.createFNPOpennetDisabled(uid);
                        try {
-                               source.sendAsync(msg, null, 0, 
node.nodeStats.announceByteCounter);
+                               source.sendAsync(msg, null, 
node.nodeStats.announceByteCounter);
                        } catch (NotConnectedException e) {
                                // Ok
                        }
@@ -502,7 +502,7 @@
                if(node.recentlyCompleted(uid)) {
                        Message msg = DMT.createFNPRejectedLoop(uid);
                        try {
-                               source.sendAsync(msg, null, 0, 
node.nodeStats.announceByteCounter);
+                               source.sendAsync(msg, null, 
node.nodeStats.announceByteCounter);
                        } catch (NotConnectedException e) {
                                // Ok
                        }
@@ -516,7 +516,7 @@
                        if(!source.shouldAcceptAnnounce(uid)) {
                                Message msg = 
DMT.createFNPRejectedOverload(uid, true);
                                try {
-                                       source.sendAsync(msg, null, 0, 
node.nodeStats.announceByteCounter);
+                                       source.sendAsync(msg, null, 
node.nodeStats.announceByteCounter);
                                } catch (NotConnectedException e) {
                                        // Ok
                                }
@@ -596,7 +596,7 @@
                        // Relay.
                        if(rc.source != null) {
                                try {
-                                       
rc.source.sendAsync(DMT.createFNPRoutedRejected(id, (short)0), null, 0, 
nodeStats.routedMessageCtr);
+                                       
rc.source.sendAsync(DMT.createFNPRoutedRejected(id, (short)0), null, 
nodeStats.routedMessageCtr);
                                } catch (NotConnectedException e) {
                                        // Ouch.
                                        Logger.error(this, "Unable to relay 
probe DNF: peer disconnected: "+rc.source);
@@ -626,7 +626,7 @@
                ctx = routedContexts.get(lid);
                if(ctx != null) {
                        try {
-                               
source.sendAsync(DMT.createFNPRoutedRejected(id, htl), null, 0, 
nodeStats.routedMessageCtr);
+                               
source.sendAsync(DMT.createFNPRoutedRejected(id, htl), null, 
nodeStats.routedMessageCtr);
                        } catch (NotConnectedException e) {
                                if(logMINOR) Logger.minor(this, "Lost 
connection rejecting "+m);
                        }
@@ -648,7 +648,7 @@
                } else if(htl == 0) {
                        Message reject = DMT.createFNPRoutedRejected(id, 
(short)0);
                        if(source != null) try {
-                               source.sendAsync(reject, null, 0, 
nodeStats.routedMessageCtr);
+                               source.sendAsync(reject, null, 
nodeStats.routedMessageCtr);
                        } catch (NotConnectedException e) {
                                if(logMINOR) Logger.minor(this, "Lost 
connection rejecting "+m);
                        }
@@ -670,7 +670,7 @@
                PeerNode pn = ctx.source;
                if(pn == null) return false;
                try {
-                       pn.sendAsync(m, null, 0, nodeStats.routedMessageCtr);
+                       pn.sendAsync(m, null, nodeStats.routedMessageCtr);
                } catch (NotConnectedException e) {
                        if(logMINOR) Logger.minor(this, "Lost connection 
forwarding "+m+" to "+pn);
                }
@@ -696,7 +696,7 @@
                                if(logMINOR) Logger.minor(this, "Forwarding 
"+m.getSpec()+" to "+next.getPeer().getPort());
                                ctx.addSent(next);
                                try {
-                                       next.sendAsync(m, null, 0, 
nodeStats.routedMessageCtr);
+                                       next.sendAsync(m, null, 
nodeStats.routedMessageCtr);
                                } catch (NotConnectedException e) {
                                        continue;
                                }
@@ -705,7 +705,7 @@
                                // Reached a dead end...
                                Message reject = 
DMT.createFNPRoutedRejected(id, htl);
                                if(pn != null) try {
-                                       pn.sendAsync(reject, null, 0, 
nodeStats.routedMessageCtr);
+                                       pn.sendAsync(reject, null, 
nodeStats.routedMessageCtr);
                                } catch (NotConnectedException e) {
                                        Logger.error(this, "Cannot send reject 
message back to source "+pn);
                                        return true;
@@ -741,7 +741,7 @@
                        Message reply = DMT.createFNPRoutedPong(id, x);
                        if(logMINOR) Logger.minor(this, "Replying - counter = 
"+x+" for "+id);
                        try {
-                               src.sendAsync(reply, null, 0, 
nodeStats.routedMessageCtr);
+                               src.sendAsync(reply, null, 
nodeStats.routedMessageCtr);
                        } catch (NotConnectedException e) {
                                if(logMINOR) Logger.minor(this, "Lost 
connection replying to "+m+" in dispatchRoutedMessage");
                        }

Modified: trunk/freenet/src/freenet/node/OpennetManager.java
===================================================================
--- trunk/freenet/src/freenet/node/OpennetManager.java  2008-09-19 16:53:54 UTC 
(rev 22707)
+++ trunk/freenet/src/freenet/node/OpennetManager.java  2008-09-19 17:39:53 UTC 
(rev 22708)
@@ -520,7 +520,7 @@
                long xferUID = node.random.nextLong();
                Message msg2 = isReply ? 
DMT.createFNPOpennetConnectReplyNew(uid, xferUID, noderef.length, 
padded.length) :
                        DMT.createFNPOpennetConnectDestinationNew(uid, xferUID, 
noderef.length, padded.length);
-               peer.sendAsync(msg2, null, 0, ctr);
+               peer.sendAsync(msg2, null, ctr);
                innerSendOpennetRef(xferUID, padded, peer, ctr);
        }

@@ -551,7 +551,7 @@
                long xferUID = node.random.nextLong();
                Message msg = DMT.createFNPOpennetAnnounceRequest(uid, xferUID, 
noderef.length, 
                                paddedSize(noderef.length), target, htl);
-               peer.sendAsync(msg, null, 0, ctr);
+               peer.sendAsync(msg, null, ctr);
                return xferUID;
        }

@@ -582,7 +582,7 @@
                long xferUID = node.random.nextLong();
                Message msg = DMT.createFNPOpennetAnnounceReply(uid, xferUID, 
noderef.length, 
                                padded.length);
-               peer.sendAsync(msg, null, 0, ctr);
+               peer.sendAsync(msg, null, ctr);
                innerSendOpennetRef(xferUID, padded, peer, ctr);
        }

@@ -664,7 +664,7 @@
        public void rejectRef(long uid, PeerNode source, int reason, 
ByteCounter ctr) {
                Message msg = DMT.createFNPOpennetNoderefRejected(uid, reason);
                try {
-                       source.sendAsync(msg, null, 0, ctr);
+                       source.sendAsync(msg, null, ctr);
                } catch (NotConnectedException e) {
                        // Ignore
                }

Modified: trunk/freenet/src/freenet/node/OutgoingPacketMangler.java
===================================================================
--- trunk/freenet/src/freenet/node/OutgoingPacketMangler.java   2008-09-19 
16:53:54 UTC (rev 22707)
+++ trunk/freenet/src/freenet/node/OutgoingPacketMangler.java   2008-09-19 
17:39:53 UTC (rev 22708)
@@ -43,7 +43,7 @@
         * @return Total size including UDP headers of the sent packet.
         */
        public int processOutgoing(byte[] buf, int offset, int length,
-                       KeyTracker tracker, int alreadyReportedBytes, short 
priority)
+                       KeyTracker tracker, short priority)
                        throws KeyChangedException, NotConnectedException,
                        PacketSequenceException, WouldBlockException;

@@ -66,7 +66,7 @@
         */
        public int processOutgoingPreformatted(byte[] buf, int offset, int 
length,
                        KeyTracker tracker, int packetNumber,
-                       AsyncMessageCallback[] callbacks, int 
alreadyReportedBytes, short priority)
+                       AsyncMessageCallback[] callbacks, short priority)
                        throws KeyChangedException, NotConnectedException,
                        PacketSequenceException, WouldBlockException;


Modified: trunk/freenet/src/freenet/node/PacketSender.java
===================================================================
--- trunk/freenet/src/freenet/node/PacketSender.java    2008-09-19 16:53:54 UTC 
(rev 22707)
+++ trunk/freenet/src/freenet/node/PacketSender.java    2008-09-19 17:39:53 UTC 
(rev 22708)
@@ -12,6 +12,7 @@
 import freenet.io.comm.Message;
 import freenet.io.comm.NotConnectedException;
 import freenet.io.comm.PacketSocketHandler;
+import freenet.io.comm.UdpSocketHandler;
 import freenet.support.FileLoggerHook;
 import freenet.support.Logger;
 import freenet.support.OOMHandler;
@@ -162,11 +163,16 @@
        public void run() {
                if(logMINOR) Logger.minor(this, "In PacketSender.run()");
                freenet.support.Logger.OSThread.logPID(this);
+               /*
+                * Index of the point in the nodes list at which we sent a 
packet and then
+                * ran out of bandwidth. We start the loop from here next time.
+                */
+               int brokeAt = 0;
                while(true) {
                        lastReceivedPacketFromAnyNode = lastReportedNoPackets;
                        try {
                                logMINOR = Logger.shouldLog(Logger.MINOR, this);
-                               realRun();
+                               brokeAt = realRun(brokeAt);
                        } catch(OutOfMemoryError e) {
                                OOMHandler.handleOOM(e);
                                System.err.println("Will retry above failed 
operation...");
@@ -178,7 +184,7 @@
                }
        }

-       private void realRun() {
+       private int realRun(int brokeAt) {
                long now = System.currentTimeMillis();
                lastTimeInSeconds = (int) (now / 1000);
                PeerManager pm = node.peers;
@@ -200,9 +206,27 @@
                long oldTempNow = now;
                // Needs to be run very frequently. Maybe change to a regular 
once per second schedule job?
                // Maybe not worth it as it is fairly lightweight.
+               // FIXME given the lock contention, maybe it's worth it? What 
about 
+               // running it on the UdpSocketHandler thread? That would surely 
be better...?
                node.lm.removeTooOldQueuedItems();
+               
+               boolean canSendThrottled = false;
+               
+               int MAX_PACKET_SIZE = 
node.darknetCrypto.socket.getMaxPacketSize();
+               long count = node.outputThrottle.count();
+               if(count > MAX_PACKET_SIZE)
+                       canSendThrottled = true;
+               else {
+                       long canSendAt = node.outputThrottle.getNanosPerTick() 
* (MAX_PACKET_SIZE - count);
+                       canSendAt = (canSendAt / (1000*1000)) + (canSendAt % 
(1000*1000) == 0 ? 0 : 1);
+                       if(logMINOR)
+                               Logger.minor(this, "Can send throttled packets 
in "+canSendAt+"ms");
+                       nextActionTime = Math.min(nextActionTime, now + 
canSendAt);
+               }
+               
+               int newBrokeAt = 0;
                for(int i = 0; i < nodes.length; i++) {
-                       PeerNode pn = nodes[i];
+                       PeerNode pn = nodes[i + brokeAt % nodes.length];
                        lastReceivedPacketFromAnyNode =
                                Math.max(pn.lastReceivedPacketTime(), 
lastReceivedPacketFromAnyNode);
                        pn.maybeOnConnect();
@@ -210,6 +234,8 @@
                                // Might as well do it properly.
                                node.peers.disconnect(pn, true, true);
                        }
+                       if(pn.shouldThrottle() && !canSendThrottled)
+                               continue;

                        if(pn.isConnected()) {
                                // Is the node dead?
@@ -230,7 +256,20 @@
                                        continue;
                                }

-                               pn.maybeSendPacket(now, rpiTemp, rpiIntTemp);
+                               if(pn.maybeSendPacket(now, rpiTemp, rpiIntTemp) 
&& canSendThrottled) {
+                                       canSendThrottled = false;
+                                       count = node.outputThrottle.count();
+                                       if(count > MAX_PACKET_SIZE)
+                                               canSendThrottled = true;
+                                       else {
+                                               long canSendAt = 
node.outputThrottle.getNanosPerTick() * (MAX_PACKET_SIZE - count);
+                                               canSendAt = (canSendAt / 
(1000*1000)) + (canSendAt % (1000*1000) == 0 ? 0 : 1);
+                                               if(logMINOR)
+                                                       Logger.minor(this, "Can 
send throttled packets in "+canSendAt+"ms");
+                                               nextActionTime = 
Math.min(nextActionTime, now + canSendAt);
+                                               newBrokeAt = i;
+                                       }
+                               }

                                long urgentTime = pn.getNextUrgentTime(now);
                                // Should spam the logs, unless there is a 
deadlock
@@ -256,6 +295,7 @@
                                Logger.error(this, "tempNow is more than 5 
seconds past oldTempNow (" + (tempNow - oldTempNow) + ") in PacketSender 
working with " + pn.userToString());
                        oldTempNow = tempNow;
                }
+               brokeAt = newBrokeAt;

                // Consider sending connect requests to our opennet old-peers.
                // No point if they are NATed, of course... but we don't know 
whether they are.
@@ -372,6 +412,7 @@
                        // because a new packet came in.
                        }
                }
+               return brokeAt;
        }

        /** Wake up, and send any queued packets. */

Modified: trunk/freenet/src/freenet/node/PeerManager.java
===================================================================
--- trunk/freenet/src/freenet/node/PeerManager.java     2008-09-19 16:53:54 UTC 
(rev 22707)
+++ trunk/freenet/src/freenet/node/PeerManager.java     2008-09-19 17:39:53 UTC 
(rev 22708)
@@ -517,7 +517,7 @@
                                                if(removePeer(pn))
                                                        writePeers();
                                        }
-                               }, 0, ctrDisconn);
+                               }, ctrDisconn);
                        } catch(NotConnectedException e) {
                                if(pn.isDisconnecting() && removePeer(pn))
                                        writePeers();
@@ -723,7 +723,7 @@
                        if(onlyRealConnections && !peers[i].isRealConnection())
                                continue;
                        try {
-                               peers[i].sendAsync(msg, null, 0, ctr);
+                               peers[i].sendAsync(msg, null, ctr);
                        } catch(NotConnectedException e) {
                                // Ignore
                        }

Modified: trunk/freenet/src/freenet/node/PeerNode.java
===================================================================
--- trunk/freenet/src/freenet/node/PeerNode.java        2008-09-19 16:53:54 UTC 
(rev 22707)
+++ trunk/freenet/src/freenet/node/PeerNode.java        2008-09-19 17:39:53 UTC 
(rev 22708)
@@ -1024,7 +1024,7 @@
        * relating to this packet (normally set when we have delayed a packet 
in order to
        * throttle it).
        */
-       public void sendAsync(Message msg, AsyncMessageCallback cb, int 
alreadyReportedBytes, ByteCounter ctr) throws NotConnectedException {
+       public void sendAsync(Message msg, AsyncMessageCallback cb, ByteCounter 
ctr) throws NotConnectedException {
                if(ctr == null)
                        Logger.error(this, "Bytes not logged", new 
Exception("debug"));
                if(logMINOR)
@@ -1032,7 +1032,7 @@
                if(!isConnected())
                        throw new NotConnectedException();
                addToLocalNodeSentMessagesToStatistic(msg);
-               MessageItem item = new MessageItem(msg, cb == null ? null : new 
AsyncMessageCallback[]{cb}, alreadyReportedBytes, ctr, this);
+               MessageItem item = new MessageItem(msg, cb == null ? null : new 
AsyncMessageCallback[]{cb}, ctr, this);
                long now = System.currentTimeMillis();
                reportBackoffStatus(now);
                int x = 0;
@@ -1562,7 +1562,7 @@
        */
        public void sendSync(Message req, ByteCounter ctr) throws 
NotConnectedException {
                SyncMessageCallback cb = new SyncMessageCallback();
-               sendAsync(req, cb, 0, ctr);
+               sendAsync(req, cb, ctr);
                cb.waitForSend(60 * 1000);
                if (!cb.done) {
                        Logger.error(this, "Waited too long for a blocking send 
for " + req + " to " + PeerNode.this, new Exception("error"));
@@ -2119,12 +2119,12 @@

                try {
                        if(isRealConnection())
-                               sendAsync(locMsg, null, 0, 
node.nodeStats.initialMessagesCtr);
-                       sendAsync(ipMsg, null, 0, 
node.nodeStats.initialMessagesCtr);
-                       sendAsync(timeMsg, null, 0, 
node.nodeStats.initialMessagesCtr);
-                       sendAsync(packetsMsg, null, 0, 
node.nodeStats.initialMessagesCtr);
-                       sendAsync(dRoutingMsg, null, 0, 
node.nodeStats.initialMessagesCtr);
-                       sendAsync(uptimeMsg, null, 0, 
node.nodeStats.initialMessagesCtr);
+                               sendAsync(locMsg, null, 
node.nodeStats.initialMessagesCtr);
+                       sendAsync(ipMsg, null, 
node.nodeStats.initialMessagesCtr);
+                       sendAsync(timeMsg, null, 
node.nodeStats.initialMessagesCtr);
+                       sendAsync(packetsMsg, null, 
node.nodeStats.initialMessagesCtr);
+                       sendAsync(dRoutingMsg, null, 
node.nodeStats.initialMessagesCtr);
+                       sendAsync(uptimeMsg, null, 
node.nodeStats.initialMessagesCtr);
                } catch(NotConnectedException e) {
                        Logger.error(this, "Completed handshake with " + 
getPeer() + " but disconnected (" + isConnected + ':' + currentTracker + "!!!: 
" + e, e);
                }
@@ -2163,7 +2163,7 @@
        private void sendIPAddressMessage() {
                Message ipMsg = DMT.createFNPDetectedIPAddress(detectedPeer);
                try {
-                       sendAsync(ipMsg, null, 0, node.nodeStats.changedIPCtr);
+                       sendAsync(ipMsg, null, node.nodeStats.changedIPCtr);
                } catch(NotConnectedException e) {
                        Logger.normal(this, "Sending IP change message to " + 
this + " but disconnected: " + e, e);
                }
@@ -2478,7 +2478,7 @@
                        if(t < now || forceSendPrimary) {
                                try {
                                        if(logMINOR) Logger.minor(this, 
"Sending urgent notifications for current tracker on "+shortToString());
-                                       int size = 
outgoingMangler.processOutgoing(null, 0, 0, tracker, 0, DMT.PRIORITY_NOW);
+                                       int size = 
outgoingMangler.processOutgoing(null, 0, 0, tracker, DMT.PRIORITY_NOW);
                                        
node.nodeStats.reportNotificationOnlyPacketSent(size);
                                } catch(NotConnectedException e) {
                                // Ignore
@@ -2495,7 +2495,7 @@
                        if(t < now)
                                try {
                                        if(logMINOR) Logger.minor(this, 
"Sending urgent notifications for previous tracker on "+shortToString());
-                                       int size = 
outgoingMangler.processOutgoing(null, 0, 0, tracker, 0, DMT.PRIORITY_NOW);
+                                       int size = 
outgoingMangler.processOutgoing(null, 0, 0, tracker, DMT.PRIORITY_NOW);
                                        
node.nodeStats.reportNotificationOnlyPacketSent(size);
                                } catch(NotConnectedException e) {
                                // Ignore
@@ -2712,7 +2712,7 @@
                                Logger.error(this, "No tracker to resend packet 
" + item.packetNumber + " on");
                                continue;
                        }
-                       MessageItem mi = new MessageItem(item.buf, 
item.callbacks, true, 0, resendByteCounter, item.priority);
+                       MessageItem mi = new MessageItem(item.buf, 
item.callbacks, true, resendByteCounter, item.priority);
                        requeueMessageItems(new MessageItem[]{mi}, 0, 1, true);
                }
        }
@@ -3431,7 +3431,7 @@
                byte[] authenticator = 
HMAC.macWithSHA256(node.failureTable.offerAuthenticatorKey, keyBytes, 32);
                Message msg = DMT.createFNPOfferKey(key, authenticator);
                try {
-                       sendAsync(msg, null, 0, node.nodeStats.sendOffersCtr);
+                       sendAsync(msg, null, node.nodeStats.sendOffersCtr);
                } catch(NotConnectedException e) {
                // Ignore
                }
@@ -3873,7 +3873,7 @@
                        n2nm = DMT.createNodeToNodeMessage(
                                        n2nType, 
fs.toString().getBytes("UTF-8"));
                        try {
-                               sendAsync(n2nm, null, 0, 
node.nodeStats.nodeToNodeCounter);
+                               sendAsync(n2nm, null, 
node.nodeStats.nodeToNodeCounter);
                        } catch (NotConnectedException e) {
                                if(includeSentTime) {
                                        fs.removeValue("sentTime");
@@ -3950,7 +3950,7 @@

        void sendFNPNetworkID(ByteCounter ctr) throws NotConnectedException {
                if (assignedNetworkID!=0)
-                       sendAsync(DMT.createFNPNetworkID(assignedNetworkID), 
null, 0, ctr);
+                       sendAsync(DMT.createFNPNetworkID(assignedNetworkID), 
null, ctr);
        }

        public boolean shouldThrottle() {
@@ -3997,7 +3997,7 @@
                if(logMINOR) Logger.minor(this, "Sending throttled message with 
timeout "+timeout+" packet size "+packetSize+" to "+shortToString());
                for(int i=0;i<100;i++) {
                        try {
-                               getThrottle().sendThrottledMessage(msg, this, 
node.outputThrottle, packetSize, ctr, deadline, blockForSend);
+                               getThrottle().sendThrottledMessage(msg, this, 
packetSize, ctr, deadline, blockForSend);
                                return;
                        } catch (ThrottleDeprecatedException e) {
                                // Try with the new throttle. We don't need it, 
we'll get it from getThrottle().
@@ -4062,7 +4062,7 @@
         * @param rpiTemp
         * @param rpiTemp
         */
-       public void maybeSendPacket(long now, Vector rpiTemp, int[] rpiIntTemp) 
{
+       public boolean maybeSendPacket(long now, Vector rpiTemp, int[] 
rpiIntTemp) {
                // If there are any urgent notifications, we must send a packet.
                boolean mustSend = false;
                if(mustSendNotificationsNow(now)) {
@@ -4091,23 +4091,23 @@
                                        if(logMINOR)
                                                Logger.minor(this, "Resending " 
+ item.packetNumber + " to " + item.kt);
                                        getOutgoingMangler().resend(item);
-                                       return;
+                                       return true;
                                } catch(KeyChangedException e) {
                                        Logger.error(this, "Caught " + e + " 
resending packets to " + kt);
                                        requeueResendItems(rpiTemp);
-                                       return;
+                                       return false;
                                } catch(NotConnectedException e) {
                                        Logger.normal(this, "Caught " + e + " 
resending packets to " + kt);
                                        requeueResendItems(rpiTemp);
-                                       return;
+                                       return false;
                                } catch(PacketSequenceException e) {
                                        Logger.error(this, "Caught " + e + " - 
disconnecting", e);
                                        // PSE is fairly drastic, something is 
broken between us, but maybe we can resync
                                        forceDisconnect(false); 
-                                       return;
+                                       return false;
                                } catch(WouldBlockException e) {
                                        Logger.error(this, "Impossible: " + e, 
e);
-                                       return;
+                                       return false;
                                }
                        }

@@ -4233,15 +4233,16 @@
                        // Force packet to have a sequence number.
                        Message m = DMT.createFNPVoid();
                        addToLocalNodeSentMessagesToStatistic(m);
-                       messages.add(new MessageItem(m, null, 0, null, this));
+                       messages.add(new MessageItem(m, null, null, this));
                }

-               if(messages.isEmpty()) return;
+               if(messages.isEmpty()) return false;

                // Send packets, right now, blocking, including any active 
notifications
                // Note that processOutgoingOrRequeue will drop messages from 
the end
                // if necessary to fit the messages into a single packet.
                
getOutgoingMangler().processOutgoingOrRequeue(messages.toArray(new 
MessageItem[messages.size()]), this, true, false, true);
-
+               
+               return true;
        }
 }

Modified: trunk/freenet/src/freenet/node/RequestHandler.java
===================================================================
--- trunk/freenet/src/freenet/node/RequestHandler.java  2008-09-19 16:53:54 UTC 
(rev 22707)
+++ trunk/freenet/src/freenet/node/RequestHandler.java  2008-09-19 17:39:53 UTC 
(rev 22708)
@@ -136,7 +136,7 @@
                        Logger.minor(this, "Handling a request: " + uid);

                Message accepted = DMT.createFNPAccepted(uid);
-               source.sendAsync(accepted, null, 0, this);
+               source.sendAsync(accepted, null, this);

                Object o = node.makeRequestSender(key, htl, uid, source, false, 
true, false, false);
                if(o instanceof KeyBlock) {
@@ -168,7 +168,7 @@
                                // Forward RejectedOverload
                                //Note: This message is only decernable from 
the terminal messages by the IS_LOCAL flag being false. (!IS_LOCAL)->!Terminal
                                Message msg = 
DMT.createFNPRejectedOverload(uid, false);
-                               source.sendAsync(msg, null, 0, this);
+                               source.sendAsync(msg, null, this);
                                //If the status changes (e.g. to SUCCESS), 
there is little need to send yet another reject overload.
                                sentRejectedOverload = true;
                        }
@@ -182,7 +182,7 @@
                try {
                        // Is a CHK.
                        Message df = DMT.createFNPCHKDataFound(uid, 
rs.getHeaders());
-                       source.sendAsync(df, null, 0, this);
+                       source.sendAsync(df, null, this);

                        PartiallyReceivedBlock prb = rs.getPRB();
                        bt =
@@ -339,7 +339,7 @@
                // SUCCESS requires that BOTH the pubkey AND the data/headers 
have been received.
                // The pubKey will have been set on the SSK key, and the 
SSKBlock will have been constructed.
                Message headersMsg = DMT.createFNPSSKDataFoundHeaders(uid, 
headers);
-               source.sendAsync(headersMsg, null, 0, this);
+               source.sendAsync(headersMsg, null, this);
                final Message dataMsg = DMT.createFNPSSKDataFoundData(uid, 
data);
                node.executor.execute(new PrioRunnable() {

@@ -366,13 +366,13 @@

                if(SEND_OLD_FORMAT_SSK) {
                        Message df = DMT.createFNPSSKDataFound(uid, headers, 
data);
-                       source.sendAsync(df, null, 0, this);
+                       source.sendAsync(df, null, this);
                        // Not throttled, so report payload here.
                        sentPayload(data.length);
                }
                if(needsPubKey) {
                        Message pk = DMT.createFNPSSKPubKey(uid, pubKey);
-                       source.sendAsync(pk, null, 0, this);
+                       source.sendAsync(pk, null, this);
                }
        }

@@ -380,7 +380,7 @@
                // SUCCESS requires that BOTH the pubkey AND the data/headers 
have been received.
                // The pubKey will have been set on the SSK key, and the 
SSKBlock will have been constructed.
                Message headersMsg = DMT.createFNPSSKDataFoundHeaders(uid, 
headers);
-               source.sendAsync(headersMsg, null, 0, ctr);
+               source.sendAsync(headersMsg, null, ctr);
                final Message dataMsg = DMT.createFNPSSKDataFoundData(uid, 
data);
                try {
                        source.sendThrottledMessage(dataMsg, data.length, ctr, 
60 * 1000, false);
@@ -391,13 +391,13 @@

                if(SEND_OLD_FORMAT_SSK) {
                        Message df = DMT.createFNPSSKDataFound(uid, headers, 
data);
-                       source.sendAsync(df, null, 0, ctr);
+                       source.sendAsync(df, null, ctr);
                        // Not throttled, so report payload here.
                        ctr.sentPayload(data.length);
                }
                if(needsPubKey) {
                        Message pk = DMT.createFNPSSKPubKey(uid, pubKey);
-                       source.sendAsync(pk, null, 0, ctr);
+                       source.sendAsync(pk, null, ctr);
                }
        }

@@ -417,7 +417,7 @@
                        BlockTransmitter bt =
                                new BlockTransmitter(node.usm, source, uid, 
prb, this);
                        node.addTransferringRequestHandler(uid);
-                       source.sendAsync(df, null, 0, this);
+                       source.sendAsync(df, null, this);
                        if(bt.send(node.executor)) {
                                // for byte logging
                                status = RequestSender.SUCCESS;
@@ -451,7 +451,7 @@
                else
                        sendTerminalCalled = true;

-               source.sendAsync(msg, new TerminalMessageByteCountCollector(), 
0, this);
+               source.sendAsync(msg, new TerminalMessageByteCountCollector(), 
this);
        }
        boolean sendTerminalCalled = false;


Modified: trunk/freenet/src/freenet/node/RequestSender.java
===================================================================
--- trunk/freenet/src/freenet/node/RequestSender.java   2008-09-19 16:53:54 UTC 
(rev 22707)
+++ trunk/freenet/src/freenet/node/RequestSender.java   2008-09-19 17:39:53 UTC 
(rev 22708)
@@ -209,7 +209,7 @@
                }
                Message msg = DMT.createFNPGetOfferedKey(key, 
offer.authenticator, pubKey == null, uid);
                try {
-                               pn.sendAsync(msg, null, 0, this);
+                               pn.sendAsync(msg, null, this);
                        } catch (NotConnectedException e2) {
                                if(logMINOR)
                                        Logger.minor(this, "Disconnected: 
"+pn+" getting offer for "+key);

Modified: trunk/freenet/src/freenet/node/ResettingHTLProbeRequestHandler.java
===================================================================
--- trunk/freenet/src/freenet/node/ResettingHTLProbeRequestHandler.java 
2008-09-19 16:53:54 UTC (rev 22707)
+++ trunk/freenet/src/freenet/node/ResettingHTLProbeRequestHandler.java 
2008-09-19 17:39:53 UTC (rev 22708)
@@ -43,8 +43,8 @@
                Message accepted = DMT.createFNPAccepted(uid);
                Message trace = DMT.createFNPRHProbeTrace(uid, 
sender.getNearestLoc(), sender.getBest(), htl, (short)1, (short)1, 
n.getLocation(), n.swapIdentifier, LocationManager.extractLocs(peers, true), 
LocationManager.extractUIDs(peers), (short)0, (short)1, "", 
source.swapIdentifier);
                try {
-                       source.sendAsync(accepted, null, 0, sender);
-                       source.sendAsync(trace, null, 0, sender);
+                       source.sendAsync(accepted, null, sender);
+                       source.sendAsync(trace, null, sender);
                } catch (NotConnectedException e) {
                        // We completed(id), rather than locking it, so we 
don't need to unlock.
                        return; // So all we need to do is not start the sender.
@@ -53,33 +53,33 @@
        }

        public void onCompletion(double nearest, double best, short counter, 
short uniqueCounter, short linearCounter) throws NotConnectedException {
-               source.sendAsync(DMT.createFNPRHProbeReply(uid, nearest, best, 
counter, uniqueCounter, linearCounter), null, 0, sender);
+               source.sendAsync(DMT.createFNPRHProbeReply(uid, nearest, best, 
counter, uniqueCounter, linearCounter), null, sender);
        }

        public void onRNF(short htl, double nearest, double best, short 
counter, short uniqueCounter, short linearCounter) throws NotConnectedException 
{
                Message rnf = DMT.createFNPRouteNotFound(uid, htl);
                Message sub = DMT.createFNPRHReturnSubMessage(nearest, best, 
counter, uniqueCounter, linearCounter, "rnf");
                rnf.addSubMessage(sub);
-               source.sendAsync(rnf, null, 0, sender);
+               source.sendAsync(rnf, null, sender);
        }

        public void onReceivedRejectOverload(double nearest, double best, short 
counter, short uniqueCounter, short linearCounter, String reason) throws 
NotConnectedException {
                Message ro = DMT.createFNPRejectedOverload(uid, false);
                Message sub = DMT.createFNPRHReturnSubMessage(nearest, best, 
counter, uniqueCounter, linearCounter, reason);
                ro.addSubMessage(sub);
-               source.sendAsync(ro, null, 0, sender);
+               source.sendAsync(ro, null, sender);
        }

        public void onTimeout(double nearest, double best, short counter, short 
uniqueCounter, short linearCounter, String reason) throws NotConnectedException 
{
                Message ro = DMT.createFNPRejectedOverload(uid, true);
                Message sub = DMT.createFNPRHReturnSubMessage(nearest, best, 
counter, uniqueCounter, linearCounter, reason);
                ro.addSubMessage(sub);
-               source.sendAsync(ro, null, 0, sender);
+               source.sendAsync(ro, null, sender);
        }

        public void onTrace(long uid, double nearest, double best, short htl, 
short counter, short uniqueCounter, double location, long myUID, ShortBuffer 
peerLocs, ShortBuffer peerUIDs, short forkCount, short linearCounter, String 
reason, long prevUID) throws NotConnectedException {
                Message trace = DMT.createFNPRHProbeTrace(uid, nearest, best, 
htl, counter, uniqueCounter, location, myUID, peerLocs, peerUIDs, forkCount, 
linearCounter, reason, prevUID);
-               source.sendAsync(trace, null, 0, sender);
+               source.sendAsync(trace, null, sender);
        }

 }

Modified: trunk/freenet/src/freenet/node/SSKInsertHandler.java
===================================================================
--- trunk/freenet/src/freenet/node/SSKInsertHandler.java        2008-09-19 
16:53:54 UTC (rev 22707)
+++ trunk/freenet/src/freenet/node/SSKInsertHandler.java        2008-09-19 
17:39:53 UTC (rev 22708)
@@ -86,7 +86,7 @@
         Message accepted = DMT.createFNPSSKAccepted(uid, pubKey == null);

         try {
-                       source.sendAsync(accepted, null, 0, this);
+                       source.sendAsync(accepted, null, this);
                } catch (NotConnectedException e1) {
                        if(logMINOR) Logger.minor(this, "Lost connection to 
source");
                        return;
@@ -141,7 +141,7 @@
                                        if(logMINOR) Logger.minor(this, "Got 
pubkey on "+uid+" : "+pubKey);
                                        Message confirm = 
DMT.createFNPSSKPubKeyAccepted(uid);
                                        try {
-                                               source.sendAsync(confirm, null, 
0, this);
+                                               source.sendAsync(confirm, null, 
this);
                                        } catch (NotConnectedException e) {
                                                if(logMINOR) Logger.minor(this, 
"Lost connection to source on "+uid);
                                                return;

Modified: trunk/freenet/src/freenet/node/SSKInsertSender.java
===================================================================
--- trunk/freenet/src/freenet/node/SSKInsertSender.java 2008-09-19 16:53:54 UTC 
(rev 22707)
+++ trunk/freenet/src/freenet/node/SSKInsertSender.java 2008-09-19 17:39:53 UTC 
(rev 22708)
@@ -170,9 +170,9 @@
             // Send to next node

             try {
-                               next.sendAsync(request, null, 0, this);
+                               next.sendAsync(request, null, this);
                                if(RequestHandler.SEND_OLD_FORMAT_SSK) {
-                                       next.sendAsync(req, null, 0, this);
+                                       next.sendAsync(req, null, this);
                                        // Not throttled, so report here.
                                        sentPayload(data.length);
                                }
@@ -248,7 +248,7 @@
             Message dataMsg = DMT.createFNPSSKInsertRequestData(uid, data);

             try {
-                               next.sendAsync(headersMsg, null, 0, this);
+                               next.sendAsync(headersMsg, null, this);
                                next.sendThrottledMessage(dataMsg, data.length, 
this, SSKInsertHandler.DATA_INSERT_TIMEOUT, false);
                        } catch (NotConnectedException e1) {
                                if(logMINOR) Logger.minor(this, "Not connected 
to "+next);
@@ -265,7 +265,7 @@
             if(msg.getBoolean(DMT.NEED_PUB_KEY)) {
                Message pkMsg = DMT.createFNPSSKPubKey(uid, pubKey);
                try {
-                       next.sendAsync(pkMsg, null, 0, this);
+                       next.sendAsync(pkMsg, null, this);
                } catch (NotConnectedException e) {
                        if(logMINOR) Logger.minor(this, "Node disconnected 
while sending pubkey: "+next);
                        continue;

Modified: trunk/freenet/src/freenet/node/SendMessageOnErrorCallback.java
===================================================================
--- trunk/freenet/src/freenet/node/SendMessageOnErrorCallback.java      
2008-09-19 16:53:54 UTC (rev 22707)
+++ trunk/freenet/src/freenet/node/SendMessageOnErrorCallback.java      
2008-09-19 17:39:53 UTC (rev 22708)
@@ -44,7 +44,7 @@
        if(Logger.shouldLog(Logger.MINOR, this))
                Logger.minor(this, "Disconnect trigger: "+this);
         try {
-            dest.sendAsync(msg, null, 0, ctr);
+            dest.sendAsync(msg, null, ctr);
         } catch (NotConnectedException e) {
                if(Logger.shouldLog(Logger.MINOR, this))
                        Logger.minor(this, "Both source and destination 
disconnected: "+msg+" for "+this);

Modified: trunk/freenet/src/freenet/node/updater/NodeUpdateManager.java
===================================================================
--- trunk/freenet/src/freenet/node/updater/NodeUpdateManager.java       
2008-09-19 16:53:54 UTC (rev 22707)
+++ trunk/freenet/src/freenet/node/updater/NodeUpdateManager.java       
2008-09-19 17:39:53 UTC (rev 22708)
@@ -192,7 +192,7 @@
                        if((!hasBeenBlown) && (mainUpdater == null || 
mainUpdater.getFetchedVersion() <= 0)) return;
                }
                try {
-                       peer.sendAsync(getUOMAnnouncement(), null, 0, ctr);
+                       peer.sendAsync(getUOMAnnouncement(), null, ctr);
                } catch (NotConnectedException e) {
                        // Sad, but ignore it
                }

Modified: trunk/freenet/src/freenet/node/updater/UpdateOverMandatoryManager.java
===================================================================
--- trunk/freenet/src/freenet/node/updater/UpdateOverMandatoryManager.java      
2008-09-19 16:53:54 UTC (rev 22707)
+++ trunk/freenet/src/freenet/node/updater/UpdateOverMandatoryManager.java      
2008-09-19 17:39:53 UTC (rev 22708)
@@ -181,7 +181,7 @@
                                                public void sent() {
                                                        // Cool
                                                }
-                                       }, 0, updateManager.ctr);
+                                       }, updateManager.ctr);

                                        // The reply message will start the 
transfer. It includes the revocation URI
                                        // so we can tell if anything wierd is 
happening.
@@ -340,7 +340,7 @@
                                                }
                                        }, REQUEST_MAIN_JAR_TIMEOUT);
                                }
-                       }, 0, updateManager.ctr);
+                       }, updateManager.ctr);
                } catch (NotConnectedException e) {
                        synchronized(this) {
                                nodesAskedSendMainJar.remove(source);
@@ -616,7 +616,7 @@
                                        return super.toString() + 
"("+uid+":"+source.getPeer()+")";
                                }

-                       }, 0, updateManager.ctr);
+                       }, updateManager.ctr);
                } catch (NotConnectedException e) {
                        Logger.error(this, "Peer "+source+" asked us for the 
blob file for the revocation key, then disconnected when we tried to send the 
UOMSendingRevocation: "+e, e);
                        return true;
@@ -898,7 +898,7 @@
        private void cancelSend(PeerNode source, long uid) {
                Message msg = DMT.createFNPBulkReceiveAborted(uid);
                try {
-                       source.sendAsync(msg, null, 0, updateManager.ctr);
+                       source.sendAsync(msg, null, updateManager.ctr);
                } catch (NotConnectedException e1) {
                        // Ignore
                }
@@ -991,7 +991,7 @@
                                        return super.toString() + 
"("+uid+":"+source.getPeer()+")";
                                }

-                       }, 0, updateManager.ctr);
+                       }, updateManager.ctr);
                } catch (NotConnectedException e) {
                        Logger.error(this, "Peer "+source+" asked us for the 
blob file for the main jar, then disconnected when we tried to send the 
UOMSendingMain: "+e, e);
                        return true;


Reply via email to