Author: toad
Date: 2005-11-30 21:27:36 +0000 (Wed, 30 Nov 2005)
New Revision: 7640
Removed:
trunk/freenet/src/freenet/node/ThrottledPacketLagException.java
trunk/freenet/src/freenet/node/ThrottledPacketSender.java
Modified:
trunk/freenet/src/freenet/io/xfer/BlockReceiver.java
trunk/freenet/src/freenet/io/xfer/BlockTransmitter.java
trunk/freenet/src/freenet/node/Node.java
trunk/freenet/src/freenet/node/NodePinger.java
trunk/freenet/src/freenet/node/PeerNode.java
trunk/freenet/src/freenet/node/Version.java
Log:
252: (mandatory)
Eliminate old throttled packet queue hard bandwidth limiter.
Implement new, simpler, ian-inspired hard bandwidth limiter (idea from
ian/dijjer).
Also implement new, untested, currently unused, soft bandwidth limiter.
Take average total delay to send a block into account in load estimation.
Delete some dead code.
Also logging.
Modified: trunk/freenet/src/freenet/io/xfer/BlockReceiver.java
===================================================================
--- trunk/freenet/src/freenet/io/xfer/BlockReceiver.java 2005-11-30
18:11:21 UTC (rev 7639)
+++ trunk/freenet/src/freenet/io/xfer/BlockReceiver.java 2005-11-30
21:27:36 UTC (rev 7640)
@@ -74,6 +74,7 @@
Logger.normal(this, "Disconnected during receive: "+_uid+"
from "+_sender);
throw new
RetrievalException(RetrievalException.SENDER_DISCONNECTED);
}
+ Logger.minor(this, "Received "+m1);
if ((m1 != null) && m1.getSpec().equals(DMT.sendAborted)) {
_prb.abort(m1.getInt(DMT.REASON),
m1.getString(DMT.DESCRIPTION));
throw new
RetrievalException(m1.getInt(DMT.REASON), m1.getString(DMT.DESCRIPTION));
@@ -96,8 +97,7 @@
Long resendTime = (Long)
_recentlyReportedMissingPackets.get(new Integer(x));
if ((resendTime == null) ||
(System.currentTimeMillis() > resendTime.longValue())) {
// Make a note of the
earliest time we should resend this, based on the number of other
- // packets we
- // are already waiting
for
+ // packets we are
already waiting for
long resendWait =
System.currentTimeMillis()
+
(MAX_ROUND_TRIP_TIME + (_recentlyReportedMissingPackets.size() *
MAX_SEND_INTERVAL));
_recentlyReportedMissingPackets.put(new Integer(x), (new Long(resendWait)));
@@ -105,6 +105,7 @@
}
}
}
+ Logger.minor(this, "Missing: "+missing.size());
if (missing.size() > 0) {
Message mn =
DMT.createMissingPacketNotification(_uid, missing);
_usm.send(_sender, mn);
Modified: trunk/freenet/src/freenet/io/xfer/BlockTransmitter.java
===================================================================
--- trunk/freenet/src/freenet/io/xfer/BlockTransmitter.java 2005-11-30
18:11:21 UTC (rev 7639)
+++ trunk/freenet/src/freenet/io/xfer/BlockTransmitter.java 2005-11-30
21:27:36 UTC (rev 7640)
@@ -29,7 +29,6 @@
import freenet.io.comm.PeerContext;
import freenet.io.comm.UdpSocketManager;
import freenet.node.PeerNode;
-import freenet.node.ThrottledPacketLagException;
import freenet.support.BitArray;
import freenet.support.Logger;
@@ -51,7 +50,48 @@
BitArray _sentPackets;
boolean failedByOverload = false;
final PacketThrottle throttle;
+
+ // Static stuff for global bandwidth limiter
+ /** Synchronization object for bandwidth limiting */
+ static final Object lastPacketSendTimeSync = new Object();
+ /** Time at which the last known packet is scheduled to be sent.
+ * We will not send another packet until at least minPacketDelay ms
after this time. */
+ static long hardLastPacketSendTime = System.currentTimeMillis();
+ /** Minimum interval between packet sends, for overall hard bandwidth
limiter */
+ static long minPacketDelay = 0;
+ /** Minimum average interval between packet sends, for averaged (soft)
overall
+ * bandwidth usage limiter. */
+ static long minSoftDelay = 0;
+ /** "Soft" equivalent to hardLastPacketSendTime. Can lag up to half the
softLimitPeriod
+ * behind the current time. Otherwise is similar. This gives it
flexibility; we can have
+ * spurts above the average limit, but over the softLimitPeriod, it
will average out to
+ * the target minSoftDelay. */
+ static long softLastPacketSendTime = System.currentTimeMillis();
+ /** Period over which the soft limiter should work */
+ static long softLimitPeriod;
+ public static void setMinPacketInterval(int delay) {
+ synchronized(lastPacketSendTimeSync) {
+ minPacketDelay = delay;
+ }
+ }
+
+ public static void setSoftMinPacketInterval(int delay) {
+ synchronized(lastPacketSendTimeSync) {
+ minSoftDelay = delay;
+ }
+ }
+
+ public static void setSoftLimitPeriod(long period) {
+ synchronized(lastPacketSendTimeSync) {
+ softLimitPeriod = period;
+ long now = System.currentTimeMillis();
+ if(now - softLastPacketSendTime > period / 2) {
+ softLastPacketSendTime = now - (period / 2);
+ }
+ }
+ }
+
public BlockTransmitter(UdpSocketManager usm, PeerContext destination,
long uid, PartiallyReceivedBlock source) {
_usm = usm;
_destination = destination;
@@ -69,18 +109,8 @@
public void run() {
int sentSinceLastPing = 0;
while (!_sendComplete) {
- long delay =
throttle.getDelay();
- long waitUntil =
System.currentTimeMillis() + delay;
- Logger.minor(this, "Waiting for
"+delay+" ms for "+_uid+" : "+throttle);
+ long startCycleTime =
System.currentTimeMillis();
try {
- while (waitUntil >
System.currentTimeMillis()) {
-
if(_sendComplete) return;
- synchronized
(_senderThread) {
- long x
= waitUntil - System.currentTimeMillis();
- if(x >
0)
-
_senderThread.wait(x);
- }
- }
while (true) {
synchronized(_unsent) {
if(_unsent.size() != 0) break;
@@ -91,13 +121,24 @@
}
}
} catch (InterruptedException
e) { }
+ long startDelayTime =
System.currentTimeMillis();
+ delay(startCycleTime);
int packetNo;
synchronized(_unsent) {
packetNo = ((Integer)
_unsent.removeFirst()).intValue();
}
_sentPackets.setBit(packetNo,
true);
try {
-
((PeerNode)_destination).throttledSend(DMT.createPacketTransmit(_uid, packetNo,
_sentPackets, _prb.getPacket(packetNo)), SEND_TIMEOUT);
+ long endDelayTime =
System.currentTimeMillis();
+ long delayTime =
endDelayTime - startDelayTime;
+
((PeerNode)_destination).reportThrottledPacketSendTime(delayTime);
+
((PeerNode)_destination).sendAsync(DMT.createPacketTransmit(_uid, packetNo,
_sentPackets, _prb.getPacket(packetNo)), null);
+ // May have been delays
in sending, so update to avoid sending more frequently than allowed
+ long now =
System.currentTimeMillis();
+
synchronized(lastPacketSendTimeSync) {
+
if(hardLastPacketSendTime < now)
+
hardLastPacketSendTime = now;
+ }
// We accelerate the ping rate
during the transfer to keep a closer eye on round-trip-time
sentSinceLastPing++;
if (sentSinceLastPing >=
PING_EVERY) {
@@ -117,14 +158,78 @@
_sendComplete =
true;
_senderThread.notifyAll();
}
- } catch
(ThrottledPacketLagException e) {
- Logger.error(this,
"Terminating send due to overload: "+e);
-
synchronized(_senderThread) {
-
failedByOverload = true;
- _sendComplete =
true;
-
_senderThread.notifyAll();
+ }
+ }
+ }
+
+ /** @return True if _sendComplete */
+ private boolean delay(long startCycleTime) {
+
+ // Get the current inter-packet delay
+ long delay = throttle.getDelay();
+
+ while(true) {
+
+ long now = System.currentTimeMillis();
+
+ long endTime = -1;
+
+ boolean thenSend = true;
+
+ // Synchronize on the static lock, and
update
+ synchronized(lastPacketSendTimeSync) {
+
+ // Get the current time
+ now =
System.currentTimeMillis();
+
+ // Update time if necessary to
avoid spurts
+ if(hardLastPacketSendTime <
(now - minPacketDelay))
+ hardLastPacketSendTime
= now - minPacketDelay;
+
+ // Wait until the next send
window
+ long newHardLastPacketSendTime =
+ hardLastPacketSendTime
+ minPacketDelay;
+
+ long earliestSendTime =
startCycleTime + delay;
+
+ if(earliestSendTime >
hardLastPacketSendTime) {
+ // Don't clog up other
senders!
+ thenSend = false;
+ endTime =
earliestSendTime;
+ } else {
+ hardLastPacketSendTime
= newHardLastPacketSendTime;
+ endTime =
hardLastPacketSendTime;
+
+ // What about the soft
limit?
+
+ if(now -
softLastPacketSendTime > minSoftDelay / 2) {
+
softLastPacketSendTime = now - (minSoftDelay / 2);
}
+
+ softLastPacketSendTime
+= minSoftDelay;
+
+
if(softLastPacketSendTime > hardLastPacketSendTime) {
+ endTime =
hardLastPacketSendTime = softLastPacketSendTime;
+ }
}
+ }
+
+ while(now < endTime) {
+ synchronized(_senderThread) {
+ if(_sendComplete)
+ return true;
+ try {
+
_senderThread.wait(endTime - now);
+ } catch
(InterruptedException e) {
+ // Ignore
+ }
+ }
+ if(_sendComplete)
+ return true;
+ now =
System.currentTimeMillis();
+ }
+
+ if(thenSend) return false;
}
}
};
Modified: trunk/freenet/src/freenet/node/Node.java
===================================================================
--- trunk/freenet/src/freenet/node/Node.java 2005-11-30 18:11:21 UTC (rev
7639)
+++ trunk/freenet/src/freenet/node/Node.java 2005-11-30 21:27:36 UTC (rev
7640)
@@ -37,6 +37,7 @@
import freenet.io.comm.PeerParseException;
import freenet.io.comm.UdpSocketManager;
import freenet.io.xfer.AbortedException;
+import freenet.io.xfer.BlockTransmitter;
import freenet.io.xfer.PartiallyReceivedBlock;
import freenet.keys.CHKBlock;
import freenet.keys.CHKVerifyException;
@@ -330,8 +331,12 @@
insertSenders = new HashMap();
runningUIDs = new HashSet();
- globalThrottle = new ThrottledPacketSender(throttleInterval);
+ BlockTransmitter.setMinPacketInterval(throttleInterval);
+ // FIXME test the soft limit
+ BlockTransmitter.setSoftLimitPeriod(14*24*60*60*1000);
+ BlockTransmitter.setSoftMinPacketInterval(0);
+
lm = new LocationManager(random);
try {
@@ -866,7 +871,6 @@
final LRUQueue recentlyCompletedIDs;
- public final ThrottledPacketSender globalThrottle;
static final int MAX_RECENTLY_COMPLETED_IDS = 10*1000;
/**
Modified: trunk/freenet/src/freenet/node/NodePinger.java
===================================================================
--- trunk/freenet/src/freenet/node/NodePinger.java 2005-11-30 18:11:21 UTC
(rev 7639)
+++ trunk/freenet/src/freenet/node/NodePinger.java 2005-11-30 21:27:36 UTC
(rev 7640)
@@ -48,7 +48,10 @@
PeerNode peer = peers[i];
if(!peer.isConnected()) continue;
peerCount++;
- total *= peer.averagePingTime();
+ double avgPingTime = peer.averagePingTime();
+ double avgThrottledPacketSendTime =
peer.throttledPacketSendAverage.currentValue();
+ double value = Math.max(avgPingTime,
avgThrottledPacketSendTime);
+ total *= value;
}
if(peerCount > 0) {
total = Math.pow(total, 1.0 / peerCount);
Modified: trunk/freenet/src/freenet/node/PeerNode.java
===================================================================
--- trunk/freenet/src/freenet/node/PeerNode.java 2005-11-30 18:11:21 UTC
(rev 7639)
+++ trunk/freenet/src/freenet/node/PeerNode.java 2005-11-30 21:27:36 UTC
(rev 7640)
@@ -152,31 +152,6 @@
/** The time at which we last completed a connection setup. */
private long connectedTime;
- /** The overall probability of the node rejecting a request or insert
- * because of overload or timeout.
- */
- private final RunningAverage pRejectOverload;
-
- /** The probability of the node rejecting a data request because of
- * overload, or of it timing out etc.
- */
- private final RunningAverage pDataRequestRejectOverload;
-
- /** The probability of the node rejecting an insert because of
- * overload, timing out, etc.
- */
- private final RunningAverage pInsertRejectOverload;
-
- private final Object biasLock = new Object();
-
- private double biasValue = 1.0;
-
- /** Sensitivity of bias computations */
- private double BIAS_SENSITIVITY = 0.01;
-
- /** Target pRO for bias computations */
- private double BIAS_TARGET = 0.05;
-
/**
* Create a PeerNode from a SimpleFieldSet containing a
* node reference for one. This must contain the following
@@ -269,11 +244,9 @@
decrementHTLAtMinimum = node.random.nextFloat() <
Node.DECREMENT_AT_MIN_PROB;
// FIXME maybe a simple binary RA would be better?
- pDataRequestRejectOverload = new SimpleRunningAverage(100, 0.05);
- pInsertRejectOverload = new SimpleRunningAverage(100, 0.05);
- pRejectOverload = new SimpleRunningAverage(100, 0.05);
pingNumber = node.random.nextLong();
pingAverage = new SimpleRunningAverage(20, 1);
+ throttledPacketSendAverage = new SimpleRunningAverage(20, 1);
}
private void randomizeMaxTimeBetweenPacketSends() {
@@ -952,10 +925,6 @@
return hashCode;
}
- public void throttledSend(Message message, long maxWaitTime) throws
NotConnectedException, ThrottledPacketLagException {
- node.globalThrottle.sendPacket(message, this, maxWaitTime);
- }
-
private final Object backoffSync = new Object();
public boolean isBackedOff() {
@@ -1005,6 +974,7 @@
final LRUHashtable pingsSentTimes = new LRUHashtable();
long pingNumber;
final RunningAverage pingAverage;
+ final RunningAverage throttledPacketSendAverage;
public void sendPing() {
long pingNo;
@@ -1054,4 +1024,8 @@
public double averagePingTime() {
return pingAverage.currentValue();
}
+
+ public void reportThrottledPacketSendTime(long timeDiff) {
+ throttledPacketSendAverage.report(timeDiff);
+ }
}
Deleted: trunk/freenet/src/freenet/node/ThrottledPacketLagException.java
===================================================================
--- trunk/freenet/src/freenet/node/ThrottledPacketLagException.java
2005-11-30 18:11:21 UTC (rev 7639)
+++ trunk/freenet/src/freenet/node/ThrottledPacketLagException.java
2005-11-30 21:27:36 UTC (rev 7640)
@@ -1,10 +0,0 @@
-package freenet.node;
-
-/**
- * Thrown when a throttled send is queued for too long.
- * @author root
- *
- */
-public class ThrottledPacketLagException extends Exception {
-
-}
Deleted: trunk/freenet/src/freenet/node/ThrottledPacketSender.java
===================================================================
--- trunk/freenet/src/freenet/node/ThrottledPacketSender.java 2005-11-30
18:11:21 UTC (rev 7639)
+++ trunk/freenet/src/freenet/node/ThrottledPacketSender.java 2005-11-30
21:27:36 UTC (rev 7640)
@@ -1,161 +0,0 @@
-package freenet.node;
-
-import java.util.LinkedList;
-
-import freenet.io.comm.Message;
-import freenet.io.comm.NotConnectedException;
-import freenet.support.Logger;
-
-/**
- * Sends throttled packets.
- * Only data packets are throttled, and they are all
- * slightly over 1kB. So we can throttle bandwidth by
- * just sending no more than N of these per second.
- * Initially a rather brutal implementation; we send one
- * packet, every 1/N seconds, or we don't. That's it!
- */
-public class ThrottledPacketSender implements Runnable {
-
- final LinkedList queuedPackets;
- final int sleepTime;
-
- public ThrottledPacketSender(int sleepTime) {
- this.sleepTime = sleepTime;
- queuedPackets = new LinkedList();
- Thread t = new Thread(this, "Throttled packet sender");
- t.setDaemon(true);
- t.start();
- }
-
- public class ThrottledPacket {
- public ThrottledPacket(Message msg2, PeerNode pn2) {
- this.msg = msg2;
- this.pn = pn2;
- sent = false;
- queuedTime = System.currentTimeMillis();
- }
-
- final Message msg;
- final PeerNode pn;
- final long queuedTime;
- boolean sent;
- boolean lostConn;
- RuntimeException re;
- Error err;
-
- public void waitUntilSent(long maxWaitTime) throws
NotConnectedException, ThrottledPacketLagException {
- long startTime = System.currentTimeMillis();
- long waitEndTime = startTime + maxWaitTime;
- synchronized(this) {
- while(!(sent || lostConn || re != null || err
!= null)) {
- try {
- long wait = waitEndTime -
System.currentTimeMillis();
- if(wait > 0)
- wait(10*1000);
- if(wait <= 0) {
- throw new
ThrottledPacketLagException();
- }
- } catch (InterruptedException e) {
- // Ignore
- }
- }
- if(lostConn) throw new NotConnectedException();
- if(re != null) throw re;
- if(err != null) throw err;
- long timeDiff = System.currentTimeMillis() -
queuedTime;
- if(timeDiff > 30*1000)
- Logger.error(this, "Took "+timeDiff+"
ms to send packet "+msg+" to "+pn);
- else
- Logger.minor(this, "Took "+timeDiff+"
ms to send packet "+msg+" to "+pn);
-
- }
- }
- }
-
- public void sendPacket(Message msg, PeerNode pn, long maxWaitTime)
throws NotConnectedException, ThrottledPacketLagException {
- ThrottledPacket p = queuePacket(msg, pn);
- p.waitUntilSent(maxWaitTime);
- }
-
- private ThrottledPacket queuePacket(Message msg, PeerNode pn) throws
NotConnectedException {
- if(!pn.isConnected())
- throw new NotConnectedException();
- ThrottledPacket p = new ThrottledPacket(msg, pn);
- synchronized(queuedPackets) {
- queuedPackets.addLast(p);
- queuedPackets.notifyAll();
- }
- return p;
- }
-
- public void run() {
- while(true) {
- if(sendThrottledPacket()) {
- // Sent one
- // Sleep
- try {
- if(sleepTime > 0)
- Thread.sleep(sleepTime);
- } catch (InterruptedException e) {
- // Huh?
- }
- } else {
- // Didn't send one
- // Wait for one
- synchronized(queuedPackets) {
- while(queuedPackets.isEmpty())
- try {
-
queuedPackets.wait(10*1000);
- } catch (InterruptedException
e) {
- // Never mind
- }
- }
- }
-
- }
- }
-
- private boolean sendThrottledPacket() {
- while(true) {
- ThrottledPacket p;
- synchronized(queuedPackets) {
- if(queuedPackets.isEmpty()) return false;
- p = (ThrottledPacket)
queuedPackets.removeFirst();
- }
- if(!p.pn.isConnected()) {
- synchronized(p) {
- p.lostConn = true;
- p.notifyAll();
- }
- continue;
- }
- try {
- p.pn.send(p.msg);
- } catch (NotConnectedException e) {
- synchronized(p) {
- p.lostConn = true;
- p.notifyAll();
- }
- continue;
- } catch (RuntimeException e) {
- synchronized(p) {
- p.re = e;
- p.notifyAll();
- }
- return true;
- } catch (Error e) {
- synchronized(p) {
- p.err = e;
- p.notifyAll();
- }
- return true;
- }
- synchronized(p) {
- p.sent = true;
- p.notifyAll();
- }
- return true;
- }
- }
-
-}
Modified: trunk/freenet/src/freenet/node/Version.java
===================================================================
--- trunk/freenet/src/freenet/node/Version.java 2005-11-30 18:11:21 UTC (rev
7639)
+++ trunk/freenet/src/freenet/node/Version.java 2005-11-30 21:27:36 UTC (rev
7640)
@@ -20,10 +20,10 @@
public static final String protocolVersion = "1.0";
/** The build number of the current revision */
- public static final int buildNumber = 251;
+ public static final int buildNumber = 252;
/** Oldest build of Fred we will talk to */
- public static final int lastGoodBuild = 251;
+ public static final int lastGoodBuild = 252;
/** The highest reported build of fred */
public static int highestSeenBuild = buildNumber;