Author: toad
Date: 2008-02-28 16:12:52 +0000 (Thu, 28 Feb 2008)
New Revision: 18204
Modified:
trunk/freenet/src/freenet/io/xfer/BlockTransmitter.java
trunk/freenet/src/freenet/io/xfer/BulkTransmitter.java
trunk/freenet/src/freenet/io/xfer/PacketThrottle.java
trunk/freenet/src/freenet/node/PeerNode.java
Log:
Implement a true congestion window.
This should make bandwidth usage respond much more quickly to network problems,
and prevent temporary spikes from DoSing a connection for a long time
afterwards.
Modified: trunk/freenet/src/freenet/io/xfer/BlockTransmitter.java
===================================================================
--- trunk/freenet/src/freenet/io/xfer/BlockTransmitter.java 2008-02-28
15:54:46 UTC (rev 18203)
+++ trunk/freenet/src/freenet/io/xfer/BlockTransmitter.java 2008-02-28
16:12:52 UTC (rev 18204)
@@ -30,7 +30,6 @@
import freenet.io.comm.NotConnectedException;
import freenet.io.comm.PeerContext;
import freenet.io.comm.RetrievalException;
-import freenet.node.PeerNode;
import freenet.node.PrioRunnable;
import freenet.support.BitArray;
import freenet.support.DoubleTokenBucket;
@@ -38,7 +37,6 @@
import freenet.support.Logger;
import freenet.support.TimeUtil;
import freenet.support.io.NativeThread;
-import freenet.support.transport.ip.IPUtil;
/**
* @author ian
@@ -87,7 +85,6 @@
public void run() {
while (!_sendComplete) {
- long startCycleTime =
System.currentTimeMillis();
int packetNo;
try {
synchronized(_senderThread) {
@@ -103,7 +100,8 @@
}
int totalPackets;
try {
-
_destination.sendAsync(DMT.createPacketTransmit(_uid, packetNo, _sentPackets,
_prb.getPacket(packetNo)), null, PACKET_SIZE, _ctr);
+
throttle.sendThrottledMessage(DMT.createPacketTransmit(_uid, packetNo,
_sentPackets, _prb.getPacket(packetNo)),
+ _destination,
_masterThrottle, PACKET_SIZE, _ctr);
if(_ctr != null)
_ctr.sentPayload(_prb._packetSize);
totalPackets=_prb.getNumPackets();
} catch (NotConnectedException e) {
@@ -126,50 +124,9 @@
_senderThread.notifyAll();
}
}
- delay(startCycleTime);
}
}
- private void delay(long startCycleTime) {
- //FIXME: startCycleTime is not used in this
function, why is it passed in?
- long startThrottle = System.currentTimeMillis();
-
- // Get the current inter-packet delay
- long end =
throttle.scheduleDelay(startThrottle);
-
-
if(IPUtil.isValidAddress(_destination.getPeer().getAddress(), false))
-
_masterThrottle.blockingGrab(PACKET_SIZE);
-
- long now = System.currentTimeMillis();
-
- long delayTime = now - startThrottle;
-
- // Report the delay caused by bandwidth
limiting, NOT the delay caused by congestion control.
-
((PeerNode)_destination).reportThrottledPacketSendTime(delayTime);
-
- if (end - now > 2*60*1000)
- Logger.error(this, "per-packet
congestion control delay: "+(end-now));
-
- if(now > end) return;
- while(now < end) {
- long l = end - now;
- synchronized(_senderThread) {
- if(_sendComplete) return;
- }
- // Check for completion every 2 minutes
- int x = (int) (Math.min(l, 120*1000));
- if(x > 0) {
- try {
- //FIXME: if the
senderThread sleeps here for two minutes, that will timeout the receiver, no?
Should this be a wait()?
- Thread.sleep(x);
- } catch (InterruptedException
e) {
- // Ignore
- }
- }
- now = System.currentTimeMillis();
- }
- }
-
public int getPriority() {
return NativeThread.HIGH_PRIORITY;
}
Modified: trunk/freenet/src/freenet/io/xfer/BulkTransmitter.java
===================================================================
--- trunk/freenet/src/freenet/io/xfer/BulkTransmitter.java 2008-02-28
15:54:46 UTC (rev 18203)
+++ trunk/freenet/src/freenet/io/xfer/BulkTransmitter.java 2008-02-28
16:12:52 UTC (rev 18204)
@@ -13,7 +13,6 @@
import freenet.support.BitArray;
import freenet.support.DoubleTokenBucket;
import freenet.support.Logger;
-import freenet.support.transport.ip.IPUtil;
/**
* Bulk data transfer (not block). Bulk transfer is designed for files which
may be much bigger than a
@@ -244,36 +243,9 @@
}
// Congestion control and bandwidth limiting
- long now = System.currentTimeMillis();
- long waitUntil = peer.getThrottle().scheduleDelay(now);
-
- if(IPUtil.isValidAddress(peer.getPeer().getAddress(),
false))
- masterThrottle.blockingGrab(packetSize);
-
- while((now = System.currentTimeMillis()) < waitUntil) {
- long sleepTime = waitUntil - now;
- try {
- synchronized(this) {
- wait(sleepTime);
- if(finished) {
-
masterThrottle.recycle(packetSize);
- return true;
- }
- if(cancelled) {
-
masterThrottle.recycle(packetSize);
- if(logMINOR)
-
Logger.minor(this, "Cancelled after sleeping for throttle "+this);
- return false;
- }
- }
- } catch (InterruptedException e) {
- // Ignore
- }
- }
- // FIXME should this be reported on bwlimitDelayTime ???
-
try {
- peer.sendAsync(DMT.createFNPBulkPacketSend(uid,
blockNo, buf), null, 0, null);
+
peer.getThrottle().sendThrottledMessage(DMT.createFNPBulkPacketSend(uid,
blockNo, buf), peer,
+ masterThrottle, packetSize,
null);
synchronized(this) {
blocksNotSentButPresent.setBit(blockNo,
false);
}
Modified: trunk/freenet/src/freenet/io/xfer/PacketThrottle.java
===================================================================
--- trunk/freenet/src/freenet/io/xfer/PacketThrottle.java 2008-02-28
15:54:46 UTC (rev 18203)
+++ trunk/freenet/src/freenet/io/xfer/PacketThrottle.java 2008-02-28
16:12:52 UTC (rev 18204)
@@ -21,7 +21,14 @@
import java.util.HashMap;
import java.util.Map;
+import freenet.io.comm.AsyncMessageCallback;
+import freenet.io.comm.ByteCounter;
+import freenet.io.comm.Message;
+import freenet.io.comm.NotConnectedException;
import freenet.io.comm.Peer;
+import freenet.io.comm.PeerContext;
+import freenet.node.PeerNode;
+import freenet.support.DoubleTokenBucket;
import freenet.support.Logger;
public class PacketThrottle {
@@ -41,6 +48,8 @@
/** Last return of scheduleDelay(); time before which no packet may be
sent */
private long lastScheduledDelay;
private boolean slowStart = true;
+ /** Total packets in flight, including waiting for bandwidth from the
central throttle. */
+ private int _packetsInFlight;
/**
* Create a PacketThrottle for a given peer.
@@ -134,4 +143,82 @@
//1000 ms/sec
return ((PACKET_SIZE * 1000.0 / getDelay()));
}
+
+ public void sendThrottledMessage(Message msg, PeerContext peer,
DoubleTokenBucket overallThrottle, int packetSize, ByteCounter ctr) throws
NotConnectedException {
+ synchronized(this) {
+ while(true) {
+ int windowSize = (int) getWindowSize();
+ if(_packetsInFlight < windowSize) {
+ _packetsInFlight++;
+ break;
+ }
+ try {
+ wait();
+ } catch (InterruptedException e) {
+ // Ignore
+ }
+ }
+ }
+ MyCallback callback = new MyCallback();
+ try {
+ long startTime = System.currentTimeMillis();
+ overallThrottle.blockingGrab(packetSize);
+ long delayTime = System.currentTimeMillis() - startTime;
+ if(peer instanceof PeerNode) {
+ PeerNode pn = (PeerNode) peer;
+ if(!pn.isLocalAddress())
+
pn.reportThrottledPacketSendTime(delayTime);
+ }
+ peer.sendAsync(msg, callback, packetSize, ctr);
+ } catch (RuntimeException e) {
+ callback.fatalError();
+ throw e;
+ } catch (Error e) {
+ callback.fatalError();
+ throw e;
+ } catch (NotConnectedException e) {
+ synchronized(this) {
+ callback.disconnected();
+ notifyAll();
+ }
+ throw e;
+ }
+ }
+
+ private class MyCallback implements AsyncMessageCallback {
+
+ private boolean finished = false;
+
+ public void acknowledged() {
+ synchronized(PacketThrottle.this) {
+ if(finished) return;
+ finished = true;
+ _packetsInFlight--;
+ PacketThrottle.this.notifyAll();
+ }
+ }
+
+ public void disconnected() {
+ synchronized(PacketThrottle.this) {
+ if(finished) return;
+ finished = true;
+ _packetsInFlight--;
+ PacketThrottle.this.notifyAll();
+ }
+ }
+
+ public void fatalError() {
+ synchronized(PacketThrottle.this) {
+ if(finished) return;
+ finished = true;
+ _packetsInFlight--;
+ PacketThrottle.this.notifyAll();
+ }
+ }
+
+ public void sent() {
+ // Ignore
+ }
+
+ }
}
Modified: trunk/freenet/src/freenet/node/PeerNode.java
===================================================================
--- trunk/freenet/src/freenet/node/PeerNode.java 2008-02-28 15:54:46 UTC
(rev 18203)
+++ trunk/freenet/src/freenet/node/PeerNode.java 2008-02-28 16:12:52 UTC
(rev 18204)
@@ -69,6 +69,7 @@
import freenet.support.math.SimpleRunningAverage;
import freenet.support.math.TimeDecayingRunningAverage;
import freenet.support.transport.ip.HostnameSyntaxException;
+import freenet.support.transport.ip.IPUtil;
/**
* @author amphibian
@@ -3586,4 +3587,10 @@
if (assignedNetworkID!=0)
sendAsync(DMT.createFNPNetworkID(assignedNetworkID),
null, 0, null);
}
+
+ public boolean isLocalAddress() {
+ Peer peer = getPeer();
+ if(peer == null) return false; // presumably
+ return IPUtil.isValidAddress(getPeer().getAddress(), false);
+ }
}