Author: toad
Date: 2008-02-28 16:12:52 +0000 (Thu, 28 Feb 2008)
New Revision: 18204

Modified:
   trunk/freenet/src/freenet/io/xfer/BlockTransmitter.java
   trunk/freenet/src/freenet/io/xfer/BulkTransmitter.java
   trunk/freenet/src/freenet/io/xfer/PacketThrottle.java
   trunk/freenet/src/freenet/node/PeerNode.java
Log:
Implement a true congestion window.
This should make bandwidth usage respond much more quickly to network problems, 
and prevent temporary spikes from DoSing a connection for a long time 
afterwards.

Modified: trunk/freenet/src/freenet/io/xfer/BlockTransmitter.java
===================================================================
--- trunk/freenet/src/freenet/io/xfer/BlockTransmitter.java     2008-02-28 
15:54:46 UTC (rev 18203)
+++ trunk/freenet/src/freenet/io/xfer/BlockTransmitter.java     2008-02-28 
16:12:52 UTC (rev 18204)
@@ -30,7 +30,6 @@
 import freenet.io.comm.NotConnectedException;
 import freenet.io.comm.PeerContext;
 import freenet.io.comm.RetrievalException;
-import freenet.node.PeerNode;
 import freenet.node.PrioRunnable;
 import freenet.support.BitArray;
 import freenet.support.DoubleTokenBucket;
@@ -38,7 +37,6 @@
 import freenet.support.Logger;
 import freenet.support.TimeUtil;
 import freenet.support.io.NativeThread;
-import freenet.support.transport.ip.IPUtil;

 /**
  * @author ian
@@ -87,7 +85,6 @@

                        public void run() {
                                while (!_sendComplete) {
-                                       long startCycleTime = 
System.currentTimeMillis();
                                        int packetNo;
                                        try {
                                                synchronized(_senderThread) {
@@ -103,7 +100,8 @@
                                        }
                                        int totalPackets;
                                        try {
-                                               
_destination.sendAsync(DMT.createPacketTransmit(_uid, packetNo, _sentPackets, 
_prb.getPacket(packetNo)), null, PACKET_SIZE, _ctr);
+                                               
throttle.sendThrottledMessage(DMT.createPacketTransmit(_uid, packetNo, 
_sentPackets, _prb.getPacket(packetNo)), 
+                                                               _destination, 
_masterThrottle, PACKET_SIZE, _ctr);
                                                if(_ctr != null) 
_ctr.sentPayload(_prb._packetSize);
                                                
totalPackets=_prb.getNumPackets();
                                        } catch (NotConnectedException e) {
@@ -126,50 +124,9 @@
                                                        
_senderThread.notifyAll();
                                                }
                                        }
-                                       delay(startCycleTime);
                                }
                        }

-                       private void delay(long startCycleTime) {
-                               //FIXME: startCycleTime is not used in this 
function, why is it passed in?
-                               long startThrottle = System.currentTimeMillis();
-
-                               // Get the current inter-packet delay
-                               long end = 
throttle.scheduleDelay(startThrottle);
-
-                               
if(IPUtil.isValidAddress(_destination.getPeer().getAddress(), false))
-                                       
_masterThrottle.blockingGrab(PACKET_SIZE);
-                               
-                               long now = System.currentTimeMillis();
-                               
-                               long delayTime = now - startThrottle;
-                               
-                               // Report the delay caused by bandwidth 
limiting, NOT the delay caused by congestion control.
-                               
((PeerNode)_destination).reportThrottledPacketSendTime(delayTime);
-                               
-                               if (end - now > 2*60*1000)
-                                       Logger.error(this, "per-packet 
congestion control delay: "+(end-now));
-                               
-                               if(now > end) return;
-                               while(now < end) {
-                                       long l = end - now;
-                                       synchronized(_senderThread) {
-                                               if(_sendComplete) return;
-                                       }
-                                       // Check for completion every 2 minutes
-                                       int x = (int) (Math.min(l, 120*1000));
-                                       if(x > 0) {
-                                               try {
-                                                       //FIXME: if the 
senderThread sleeps here for two minutes, that will timeout the receiver, no? 
Should this be a wait()?
-                                                       Thread.sleep(x);
-                                               } catch (InterruptedException 
e) {
-                                                       // Ignore
-                                               }
-                                       }
-                                       now = System.currentTimeMillis();
-                               }
-                       }
-
                        public int getPriority() {
                                return NativeThread.HIGH_PRIORITY;
                        }

Modified: trunk/freenet/src/freenet/io/xfer/BulkTransmitter.java
===================================================================
--- trunk/freenet/src/freenet/io/xfer/BulkTransmitter.java      2008-02-28 
15:54:46 UTC (rev 18203)
+++ trunk/freenet/src/freenet/io/xfer/BulkTransmitter.java      2008-02-28 
16:12:52 UTC (rev 18204)
@@ -13,7 +13,6 @@
 import freenet.support.BitArray;
 import freenet.support.DoubleTokenBucket;
 import freenet.support.Logger;
-import freenet.support.transport.ip.IPUtil;

 /**
  * Bulk data transfer (not block). Bulk transfer is designed for files which 
may be much bigger than a 
@@ -244,36 +243,9 @@
                        }

                        // Congestion control and bandwidth limiting
-                       long now = System.currentTimeMillis();
-                       long waitUntil = peer.getThrottle().scheduleDelay(now);
-                       
-                       if(IPUtil.isValidAddress(peer.getPeer().getAddress(), 
false))
-                               masterThrottle.blockingGrab(packetSize);
-                       
-                       while((now = System.currentTimeMillis()) < waitUntil) {
-                               long sleepTime = waitUntil - now;
-                               try {
-                                       synchronized(this) {
-                                               wait(sleepTime);
-                                               if(finished) {
-                                                       
masterThrottle.recycle(packetSize);
-                                                       return true;
-                                               }
-                                               if(cancelled) {
-                                                       
masterThrottle.recycle(packetSize);
-                                                       if(logMINOR)
-                                                               
Logger.minor(this, "Cancelled after sleeping for throttle "+this);
-                                                       return false;
-                                               }
-                                       }
-                               } catch (InterruptedException e) {
-                                       // Ignore
-                               }
-                       }
-                       // FIXME should this be reported on bwlimitDelayTime ???
-                       
                        try {
-                               peer.sendAsync(DMT.createFNPBulkPacketSend(uid, 
blockNo, buf), null, 0, null);
+                               
peer.getThrottle().sendThrottledMessage(DMT.createFNPBulkPacketSend(uid, 
blockNo, buf), peer, 
+                                               masterThrottle, packetSize, 
null);
                                synchronized(this) {
                                        blocksNotSentButPresent.setBit(blockNo, 
false);
                                }

Modified: trunk/freenet/src/freenet/io/xfer/PacketThrottle.java
===================================================================
--- trunk/freenet/src/freenet/io/xfer/PacketThrottle.java       2008-02-28 
15:54:46 UTC (rev 18203)
+++ trunk/freenet/src/freenet/io/xfer/PacketThrottle.java       2008-02-28 
16:12:52 UTC (rev 18204)
@@ -21,7 +21,14 @@
 import java.util.HashMap;
 import java.util.Map;

+import freenet.io.comm.AsyncMessageCallback;
+import freenet.io.comm.ByteCounter;
+import freenet.io.comm.Message;
+import freenet.io.comm.NotConnectedException;
 import freenet.io.comm.Peer;
+import freenet.io.comm.PeerContext;
+import freenet.node.PeerNode;
+import freenet.support.DoubleTokenBucket;
 import freenet.support.Logger;

 public class PacketThrottle {
@@ -41,6 +48,8 @@
        /** Last return of scheduleDelay(); time before which no packet may be 
sent */
        private long lastScheduledDelay;
        private boolean slowStart = true;
+       /** Total packets in flight, including waiting for bandwidth from the 
central throttle. */
+       private int _packetsInFlight;

        /**
         * Create a PacketThrottle for a given peer.
@@ -134,4 +143,82 @@
                //1000 ms/sec
                return ((PACKET_SIZE * 1000.0 / getDelay()));
        }
+       
+       public void sendThrottledMessage(Message msg, PeerContext peer, 
DoubleTokenBucket overallThrottle, int packetSize, ByteCounter ctr) throws 
NotConnectedException {
+               synchronized(this) {
+                       while(true) {
+                               int windowSize = (int) getWindowSize();
+                               if(_packetsInFlight < windowSize) {
+                                       _packetsInFlight++;
+                                       break;
+                               }
+                               try {
+                                       wait();
+                               } catch (InterruptedException e) {
+                                       // Ignore
+                               }
+                       }
+               }
+               MyCallback callback = new MyCallback();
+               try {
+                       long startTime = System.currentTimeMillis();
+                       overallThrottle.blockingGrab(packetSize);
+                       long delayTime = System.currentTimeMillis() - startTime;
+                       if(peer instanceof PeerNode) {
+                               PeerNode pn = (PeerNode) peer;
+                               if(!pn.isLocalAddress())
+                                       
pn.reportThrottledPacketSendTime(delayTime);
+                       }
+                       peer.sendAsync(msg, callback, packetSize, ctr);
+               } catch (RuntimeException e) {
+                       callback.fatalError();
+                       throw e;
+               } catch (Error e) {
+                       callback.fatalError();
+                       throw e;
+               } catch (NotConnectedException e) {
+                       synchronized(this) {
+                               callback.disconnected();
+                               notifyAll();
+                       }
+                       throw e;
+               }
+       }
+       
+       private class MyCallback implements AsyncMessageCallback {
+
+               private boolean finished = false;
+               
+               public void acknowledged() {
+                       synchronized(PacketThrottle.this) {
+                               if(finished) return;
+                               finished = true;
+                               _packetsInFlight--;
+                               PacketThrottle.this.notifyAll();
+                       }
+               }
+
+               public void disconnected() {
+                       synchronized(PacketThrottle.this) {
+                               if(finished) return;
+                               finished = true;
+                               _packetsInFlight--;
+                               PacketThrottle.this.notifyAll();
+                       }
+               }
+
+               public void fatalError() {
+                       synchronized(PacketThrottle.this) {
+                               if(finished) return;
+                               finished = true;
+                               _packetsInFlight--;
+                               PacketThrottle.this.notifyAll();
+                       }
+               }
+
+               public void sent() {
+                       // Ignore
+               }
+               
+       }
 }

Modified: trunk/freenet/src/freenet/node/PeerNode.java
===================================================================
--- trunk/freenet/src/freenet/node/PeerNode.java        2008-02-28 15:54:46 UTC 
(rev 18203)
+++ trunk/freenet/src/freenet/node/PeerNode.java        2008-02-28 16:12:52 UTC 
(rev 18204)
@@ -69,6 +69,7 @@
 import freenet.support.math.SimpleRunningAverage;
 import freenet.support.math.TimeDecayingRunningAverage;
 import freenet.support.transport.ip.HostnameSyntaxException;
+import freenet.support.transport.ip.IPUtil;

 /**
  * @author amphibian
@@ -3586,4 +3587,10 @@
                if (assignedNetworkID!=0)
                        sendAsync(DMT.createFNPNetworkID(assignedNetworkID), 
null, 0, null);
        }
+
+       public boolean isLocalAddress() {
+               Peer peer = getPeer();
+               if(peer == null) return false; // presumably
+               return IPUtil.isValidAddress(getPeer().getAddress(), false);
+       }
 }


Reply via email to