Author: toad
Date: 2006-04-11 18:33:13 +0000 (Tue, 11 Apr 2006)
New Revision: 8519
Added:
trunk/freenet/src/freenet/node/BaseRequestThrottle.java
trunk/freenet/src/freenet/node/ThrottleWindowManager.java
Removed:
trunk/freenet/src/freenet/node/ChainedRequestThrottle.java
trunk/freenet/src/freenet/node/RequestThrottle.java
Modified:
trunk/freenet/src/freenet/node/Node.java
trunk/freenet/src/freenet/node/RequestStarter.java
trunk/freenet/src/freenet/node/Version.java
Log:
633: One window, four rtt's.
In other words:
- The node has a single tracker (window) for the number of requests that can be
in flight at any time based on the success or failure (timeout, overload) of
all types of requests
- Each type of request has its own average round-trip time
- We send a request of each type <request type's average round trip time> /
<number of allowed in-flight requests>
This is instead of the old behaviour of having separate windows and RTTs for
each request type.
Copied: trunk/freenet/src/freenet/node/BaseRequestThrottle.java (from rev 8516,
trunk/freenet/src/freenet/node/RequestThrottle.java)
===================================================================
--- trunk/freenet/src/freenet/node/RequestThrottle.java 2006-04-11 17:17:36 UTC
(rev 8516)
+++ trunk/freenet/src/freenet/node/BaseRequestThrottle.java 2006-04-11
18:33:13 UTC (rev 8519)
@@ -0,0 +1,14 @@
+package freenet.node;
+
+public interface BaseRequestThrottle {
+
+ public static final long DEFAULT_DELAY = 200;
+ static final long MAX_DELAY = 5*60*1000;
+ static final long MIN_DELAY = 20;
+
+ /**
+ * Get the current inter-request delay.
+ */
+ public abstract long getDelay();
+
+}
\ No newline at end of file
Deleted: trunk/freenet/src/freenet/node/ChainedRequestThrottle.java
===================================================================
--- trunk/freenet/src/freenet/node/ChainedRequestThrottle.java 2006-04-11
17:36:56 UTC (rev 8518)
+++ trunk/freenet/src/freenet/node/ChainedRequestThrottle.java 2006-04-11
18:33:13 UTC (rev 8519)
@@ -1,22 +0,0 @@
-package freenet.node;
-
-/**
- * RequestThrottle which takes a second throttle, and never
- * returns a delay less than that throttle's current delay.
- */
-public class ChainedRequestThrottle extends RequestThrottle {
-
- private final RequestThrottle otherThrottle;
-
- public ChainedRequestThrottle(int rtt, float winsz, RequestThrottle
other, String name) {
- super(rtt, winsz, name);
- otherThrottle = other;
- }
-
- public long getDelay() {
- long delay = super.getDelay();
- long otherDelay = otherThrottle.getDelay();
- return Math.max(delay, otherDelay);
- }
-
-}
Modified: trunk/freenet/src/freenet/node/Node.java
===================================================================
--- trunk/freenet/src/freenet/node/Node.java 2006-04-11 17:36:56 UTC (rev
8518)
+++ trunk/freenet/src/freenet/node/Node.java 2006-04-11 18:33:13 UTC (rev
8519)
@@ -92,6 +92,7 @@
import freenet.support.io.FilenameGenerator;
import freenet.support.io.PersistentTempBucketFactory;
import freenet.support.io.TempBucketFactory;
+import freenet.support.math.BootstrappingDecayingRunningAverage;
import freenet.support.math.RunningAverage;
import freenet.support.math.TimeDecayingRunningAverage;
import freenet.transport.IPAddressDetector;
@@ -102,6 +103,35 @@
*/
public class Node {
+ public class MyRequestThrottle implements BaseRequestThrottle {
+
+ private final BootstrappingDecayingRunningAverage
roundTripTime;
+ private final String name;
+
+ public MyRequestThrottle(ThrottleWindowManager throttleWindow,
int rtt, String string) {
+ roundTripTime = new
BootstrappingDecayingRunningAverage(rtt, 10, 5*60*1000, 10);
+ this.name = string;
+ }
+
+ public synchronized long getDelay() {
+ double rtt = roundTripTime.currentValue();
+ double winSizeForMinPacketDelay = rtt / MIN_DELAY;
+ double _simulatedWindowSize =
throttleWindow.currentValue();
+ if (_simulatedWindowSize > winSizeForMinPacketDelay) {
+ _simulatedWindowSize = winSizeForMinPacketDelay;
+ }
+ if (_simulatedWindowSize < 1.0) {
+ _simulatedWindowSize = 1.0F;
+ }
+ // return (long) (_roundTripTime /
_simulatedWindowSize);
+ return Math.max(MIN_DELAY, Math.min((long) (rtt /
_simulatedWindowSize), MAX_DELAY));
+ }
+
+ public synchronized void successfulCompletion(long rtt) {
+ roundTripTime.report(Math.max(rtt, 10));
+ }
+ }
+
/** Config object for the whole node. */
public final Config config;
@@ -258,13 +288,14 @@
public final USKManager uskManager;
final ArchiveManager archiveManager;
public final BucketFactory tempBucketFactory;
- final RequestThrottle chkRequestThrottle;
+ final ThrottleWindowManager throttleWindow;
+ final MyRequestThrottle chkRequestThrottle;
final RequestStarter chkRequestStarter;
- final RequestThrottle chkInsertThrottle;
+ final MyRequestThrottle chkInsertThrottle;
final RequestStarter chkInsertStarter;
- final RequestThrottle sskRequestThrottle;
+ final MyRequestThrottle sskRequestThrottle;
final RequestStarter sskRequestStarter;
- final RequestThrottle sskInsertThrottle;
+ final MyRequestThrottle sskInsertThrottle;
final RequestStarter sskInsertStarter;
public final UserAlertManager alerts;
final RunningAverage throttledPacketSendAverage;
@@ -502,7 +533,8 @@
private Node(Config config, RandomSource random) throws NodeInitException {
// Easy stuff
- startupTime = System.currentTimeMillis();
+ startupTime = System.currentTimeMillis();
+ throttleWindow = new ThrottleWindowManager(2.0);
alerts = new UserAlertManager();
recentlyCompletedIDs = new LRUQueue();
this.config = config;
@@ -921,27 +953,27 @@
// FIXME make all the below arbitrary constants configurable!
archiveManager = new ArchiveManager(MAX_ARCHIVE_HANDLERS,
MAX_CACHED_ARCHIVE_DATA, MAX_ARCHIVE_SIZE, MAX_ARCHIVED_FILE_SIZE,
MAX_CACHED_ELEMENTS, random, tempFilenameGenerator);
- chkRequestThrottle = new RequestThrottle(5000, 2.0F, "CHK
Request");
+ chkRequestThrottle = new MyRequestThrottle(throttleWindow,
5000, "CHK Request");
chkRequestStarter = new RequestStarter(this,
chkRequestThrottle, "CHK Request starter ("+portNumber+")");
chkFetchScheduler = new ClientRequestScheduler(false, false,
random, chkRequestStarter, this);
chkRequestStarter.setScheduler(chkFetchScheduler);
chkRequestStarter.start();
//insertThrottle = new ChainedRequestThrottle(10000, 2.0F,
requestThrottle);
// FIXME reenable the above
- chkInsertThrottle = new RequestThrottle(10000, 2.0F, "CHK
Insert");
+ chkInsertThrottle = new MyRequestThrottle(throttleWindow,
10000, "CHK Insert");
chkInsertStarter = new RequestStarter(this, chkInsertThrottle,
"CHK Insert starter ("+portNumber+")");
chkPutScheduler = new ClientRequestScheduler(true, false,
random, chkInsertStarter, this);
chkInsertStarter.setScheduler(chkPutScheduler);
chkInsertStarter.start();
- sskRequestThrottle = new RequestThrottle(5000, 2.0F, "SSK
Request");
+ sskRequestThrottle = new MyRequestThrottle(throttleWindow,
5000, "SSK Request");
sskRequestStarter = new RequestStarter(this,
sskRequestThrottle, "SSK Request starter ("+portNumber+")");
sskFetchScheduler = new ClientRequestScheduler(false, true,
random, sskRequestStarter, this);
sskRequestStarter.setScheduler(sskFetchScheduler);
sskRequestStarter.start();
//insertThrottle = new ChainedRequestThrottle(10000, 2.0F,
requestThrottle);
// FIXME reenable the above
- sskInsertThrottle = new RequestThrottle(10000, 2.0F, "SSK
Insert");
+ sskInsertThrottle = new MyRequestThrottle(throttleWindow,
10000, "SSK Insert");
sskInsertStarter = new RequestStarter(this, sskInsertThrottle,
"SSK Insert starter ("+portNumber+")");
sskPutScheduler = new ClientRequestScheduler(true, true,
random, sskInsertStarter, this);
sskInsertStarter.setScheduler(sskPutScheduler);
@@ -1064,8 +1096,7 @@
while(true) {
if(rs.waitUntilStatusChange() && (!rejectedOverload)) {
// See below; inserts count both
- chkRequestThrottle.requestRejectedOverload();
- chkInsertThrottle.requestRejectedOverload();
+ throttleWindow.rejectedOverload();
rejectedOverload = true;
}
@@ -1078,8 +1109,7 @@
status ==
RequestSender.GENERATED_REJECTED_OVERLOAD) {
if(!rejectedOverload) {
// See below
- chkRequestThrottle.requestRejectedOverload();
- chkInsertThrottle.requestRejectedOverload();
+ throttleWindow.rejectedOverload();
rejectedOverload = true;
}
} else {
@@ -1088,11 +1118,8 @@
status == RequestSender.ROUTE_NOT_FOUND
||
status == RequestSender.VERIFY_FAILURE)
{
long rtt = System.currentTimeMillis() -
startTime;
- chkRequestThrottle.requestCompleted(rtt);
- // Also report on insert throttle as inserts
use both for window
- // Reason: inserts are excessively biased
otherwise as they
- // visit more nodes (longer time) and are more
likely to encounter an overload.
- chkInsertThrottle.requestCompleted();
+ throttleWindow.requestCompleted();
+ chkRequestThrottle.successfulCompletion(rtt);
}
}
@@ -1161,9 +1188,7 @@
boolean rejectedOverload = false;
while(true) {
if(rs.waitUntilStatusChange() && (!rejectedOverload)) {
- sskRequestThrottle.requestRejectedOverload();
- // See below
- sskInsertThrottle.requestRejectedOverload();
+ throttleWindow.rejectedOverload();
rejectedOverload = true;
}
@@ -1175,8 +1200,7 @@
if(status == RequestSender.TIMED_OUT ||
status ==
RequestSender.GENERATED_REJECTED_OVERLOAD) {
if(!rejectedOverload) {
- sskRequestThrottle.requestRejectedOverload();
- sskInsertThrottle.requestRejectedOverload();
+ throttleWindow.rejectedOverload();
rejectedOverload = true;
}
} else {
@@ -1185,11 +1209,8 @@
status == RequestSender.ROUTE_NOT_FOUND
||
status == RequestSender.VERIFY_FAILURE)
{
long rtt = System.currentTimeMillis() -
startTime;
- sskRequestThrottle.requestCompleted(rtt);
- // Also report on insert throttle as inserts
use both for window
- // Reason: inserts are excessively biased
otherwise as they
- // visit more nodes (longer time) and are more
likely to encounter an overload.
- sskInsertThrottle.requestCompleted();
+ throttleWindow.requestCompleted();
+ sskRequestThrottle.successfulCompletion(rtt);
}
}
@@ -1274,7 +1295,7 @@
}
if((!hasForwardedRejectedOverload) &&
is.receivedRejectedOverload()) {
hasForwardedRejectedOverload = true;
- chkInsertThrottle.requestRejectedOverload();
+ throttleWindow.rejectedOverload();
}
}
@@ -1290,7 +1311,7 @@
}
if(is.anyTransfersFailed() && (!hasForwardedRejectedOverload)) {
hasForwardedRejectedOverload = true; // not strictly
true but same effect
- chkInsertThrottle.requestRejectedOverload();
+ throttleWindow.rejectedOverload();
}
}
@@ -1304,7 +1325,9 @@
// It worked!
long endTime = System.currentTimeMillis();
long len = endTime - startTime;
- chkInsertThrottle.requestCompleted(len);
+
+ chkInsertThrottle.successfulCompletion(len);
+ throttleWindow.requestCompleted();
}
}
@@ -1376,7 +1399,7 @@
}
if((!hasForwardedRejectedOverload) &&
is.receivedRejectedOverload()) {
hasForwardedRejectedOverload = true;
- sskInsertThrottle.requestRejectedOverload();
+ throttleWindow.rejectedOverload();
}
}
@@ -1401,8 +1424,9 @@
|| is.getStatus() == SSKInsertSender.SUCCESS)) {
// It worked!
long endTime = System.currentTimeMillis();
- long len = endTime - startTime;
- sskInsertThrottle.requestCompleted(len);
+ long rtt = endTime - startTime;
+ throttleWindow.requestCompleted();
+ sskInsertThrottle.successfulCompletion(rtt);
}
}
@@ -2056,19 +2080,19 @@
}
}
- public RequestThrottle getCHKRequestThrottle() {
+ public BaseRequestThrottle getCHKRequestThrottle() {
return chkRequestThrottle;
}
- public RequestThrottle getCHKInsertThrottle() {
+ public BaseRequestThrottle getCHKInsertThrottle() {
return chkInsertThrottle;
}
- public RequestThrottle getSSKRequestThrottle() {
+ public BaseRequestThrottle getSSKRequestThrottle() {
return sskRequestThrottle;
}
- public RequestThrottle getSSKInsertThrottle() {
+ public BaseRequestThrottle getSSKInsertThrottle() {
return sskInsertThrottle;
}
Modified: trunk/freenet/src/freenet/node/RequestStarter.java
===================================================================
--- trunk/freenet/src/freenet/node/RequestStarter.java 2006-04-11 17:36:56 UTC
(rev 8518)
+++ trunk/freenet/src/freenet/node/RequestStarter.java 2006-04-11 18:33:13 UTC
(rev 8519)
@@ -36,12 +36,12 @@
return !(prio < MAXIMUM_PRIORITY_CLASS || prio >
MINIMUM_PRIORITY_CLASS);
}
- final RequestThrottle throttle;
+ final BaseRequestThrottle throttle;
RequestScheduler sched;
final Node node;
private long sentRequestTime;
- public RequestStarter(Node node, RequestThrottle throttle, String name)
{
+ public RequestStarter(Node node, BaseRequestThrottle throttle, String
name) {
this.node = node;
this.throttle = throttle;
this.name = name;
@@ -70,10 +70,23 @@
if(req != null) {
Logger.minor(this, "Running "+req);
// Create a thread to handle starting the
request, and the resulting feedback
- Thread t = new Thread(new SenderThread(req));
- t.setDaemon(true);
- t.start();
- Logger.minor(this, "Started "+req+" on "+t);
+ while(true) {
+ try {
+ Thread t = new Thread(new
SenderThread(req));
+ t.setDaemon(true);
+ t.start();
+ Logger.minor(this, "Started
"+req+" on "+t);
+ break;
+ } catch (OutOfMemoryError e) {
+ // Probably out of threads
+ try {
+ Thread.sleep(5000);
+ } catch (InterruptedException
e1) {
+ // Ignore
+ }
+
System.err.println(e.getMessage());
+ }
+ }
sentRequestTime = System.currentTimeMillis();
// Wait
long delay = throttle.getDelay();
Deleted: trunk/freenet/src/freenet/node/RequestThrottle.java
===================================================================
--- trunk/freenet/src/freenet/node/RequestThrottle.java 2006-04-11 17:36:56 UTC
(rev 8518)
+++ trunk/freenet/src/freenet/node/RequestThrottle.java 2006-04-11 18:33:13 UTC
(rev 8519)
@@ -1,81 +0,0 @@
-package freenet.node;
-
-import freenet.support.Logger;
-import freenet.support.math.BootstrappingDecayingRunningAverage;
-
-/**
- * Keeps track of the current request send rate.
- */
-public class RequestThrottle {
-
- protected static final float PACKET_DROP_DECREASE_MULTIPLE = 0.97f;
- protected static final float PACKET_TRANSMIT_INCREMENT = (4 * (1 -
(PACKET_DROP_DECREASE_MULTIPLE * PACKET_DROP_DECREASE_MULTIPLE))) / 3;
- protected static final long MAX_DELAY = 5*60*1000;
- protected static final long MIN_DELAY = 20;
- public static final long DEFAULT_DELAY = 200;
- private long _totalPackets = 0, _droppedPackets = 0;
- private double _simulatedWindowSize = 2;
- private final BootstrappingDecayingRunningAverage roundTripTime;
- private final String name;
-
- RequestThrottle(long rtt, float winSize, String name) {
- _simulatedWindowSize = 2;
- roundTripTime = new BootstrappingDecayingRunningAverage(rtt,
10, 5*60*1000, 10);
- this.name = name;
- }
-
- /**
- * Get the current inter-request delay.
- */
- public synchronized long getDelay() {
- double rtt = roundTripTime.currentValue();
- double winSizeForMinPacketDelay = rtt / MIN_DELAY;
- if (_simulatedWindowSize > winSizeForMinPacketDelay) {
- _simulatedWindowSize = winSizeForMinPacketDelay;
- }
- if (_simulatedWindowSize < 1.0) {
- _simulatedWindowSize = 1.0F;
- }
- // return (long) (_roundTripTime / _simulatedWindowSize);
- return Math.max(MIN_DELAY, Math.min((long) (rtt /
_simulatedWindowSize), MAX_DELAY));
- }
-
- /**
- * Report that a request completed successfully, and the
- * time it took.
- */
- public synchronized void requestCompleted(long time) {
- setRoundTripTime(time);
- requestCompleted();
- Logger.minor(this, "request completed in "+time+" for "+name);
- }
-
- /** Report that a request completed successfully */
- public synchronized void requestCompleted() {
- _totalPackets++;
- _simulatedWindowSize += PACKET_TRANSMIT_INCREMENT;
- Logger.minor(this, "requestCompleted on "+this);
- }
-
- /**
- * Report that a request got RejectedOverload.
- * Do not report the time it took, because it is irrelevant.
- */
- public synchronized void requestRejectedOverload() {
- _droppedPackets++;
- _totalPackets++;
- _simulatedWindowSize *= PACKET_DROP_DECREASE_MULTIPLE;
- Logger.minor(this, "request rejected overload: "+this);
- }
-
- private synchronized void setRoundTripTime(long rtt) {
- roundTripTime.report(Math.max(rtt, 10));
- Logger.minor(this, "Reporting RTT: "+rtt+" on "+this);
- }
-
- public synchronized String toString() {
- return getDelay()+" ms, (w: "
- + _simulatedWindowSize + ", r:" +
roundTripTime.currentValue() + ", d:"
- + (((float) _droppedPackets / (float)
_totalPackets)) + "="+_droppedPackets+"/"+_totalPackets + ") for "+name;
- }
-}
Added: trunk/freenet/src/freenet/node/ThrottleWindowManager.java
===================================================================
--- trunk/freenet/src/freenet/node/ThrottleWindowManager.java 2006-04-11
17:36:56 UTC (rev 8518)
+++ trunk/freenet/src/freenet/node/ThrottleWindowManager.java 2006-04-11
18:33:13 UTC (rev 8519)
@@ -0,0 +1,42 @@
+package freenet.node;
+
+import freenet.support.Logger;
+
+public class ThrottleWindowManager {
+
+ static final float PACKET_DROP_DECREASE_MULTIPLE = 0.97f;
+ static final float PACKET_TRANSMIT_INCREMENT = (4 * (1 -
(PACKET_DROP_DECREASE_MULTIPLE * PACKET_DROP_DECREASE_MULTIPLE))) / 3;
+
+ private long _totalPackets = 0, _droppedPackets = 0;
+ private double _simulatedWindowSize = 2;
+
+ public ThrottleWindowManager(double d) {
+ _simulatedWindowSize = d;
+ }
+
+ public synchronized double currentValue() {
+ if (_simulatedWindowSize < 1.0) {
+ _simulatedWindowSize = 1.0F;
+ }
+ return _simulatedWindowSize;
+ }
+
+ public synchronized void rejectedOverload() {
+ _droppedPackets++;
+ _totalPackets++;
+ _simulatedWindowSize *= PACKET_DROP_DECREASE_MULTIPLE;
+ Logger.minor(this, "request rejected overload: "+this);
+ }
+
+ public synchronized void requestCompleted() {
+ _totalPackets++;
+ _simulatedWindowSize += PACKET_TRANSMIT_INCREMENT;
+ Logger.minor(this, "requestCompleted on "+this);
+ }
+
+ public synchronized String toString() {
+ return super.toString()+" w: "
+ + _simulatedWindowSize + ", d:"
+ + (((float) _droppedPackets / (float)
_totalPackets)) + "="+_droppedPackets+"/"+_totalPackets;
+ }
+}
Modified: trunk/freenet/src/freenet/node/Version.java
===================================================================
--- trunk/freenet/src/freenet/node/Version.java 2006-04-11 17:36:56 UTC (rev
8518)
+++ trunk/freenet/src/freenet/node/Version.java 2006-04-11 18:33:13 UTC (rev
8519)
@@ -20,7 +20,7 @@
public static final String protocolVersion = "1.0";
/** The build number of the current revision */
- private static final int buildNumber = 632;
+ private static final int buildNumber = 633;
/** Oldest build of Fred we will talk to */
private static final int lastGoodBuild = 591;