Author: toad
Date: 2006-08-12 19:54:58 +0000 (Sat, 12 Aug 2006)
New Revision: 10049
Added:
trunk/freenet/src/freenet/node/RequestStarterGroup.java
Modified:
trunk/freenet/src/freenet/client/HighLevelSimpleClientImpl.java
trunk/freenet/src/freenet/client/async/USKManager.java
trunk/freenet/src/freenet/clients/http/NinjaSpider.java
trunk/freenet/src/freenet/clients/http/Spider.java
trunk/freenet/src/freenet/node/ARKFetcher.java
trunk/freenet/src/freenet/node/NodeARKInserter.java
trunk/freenet/src/freenet/node/NodeClientCore.java
trunk/freenet/src/freenet/node/fcp/ClientGet.java
trunk/freenet/src/freenet/node/fcp/ClientPut.java
trunk/freenet/src/freenet/node/fcp/ClientPutDir.java
trunk/freenet/src/freenet/node/updater/NodeUpdater.java
Log:
Separate RequestStarter's and related stuff into RequestStarterGroup.
This includes all the load limiting code.
Next stage is to persist the load limiting data.
Modified: trunk/freenet/src/freenet/client/HighLevelSimpleClientImpl.java
===================================================================
--- trunk/freenet/src/freenet/client/HighLevelSimpleClientImpl.java
2006-08-12 19:24:25 UTC (rev 10048)
+++ trunk/freenet/src/freenet/client/HighLevelSimpleClientImpl.java
2006-08-12 19:54:58 UTC (rev 10049)
@@ -104,7 +104,7 @@
if(uri == null) throw new NullPointerException();
FetcherContext context = getFetcherContext();
FetchWaiter fw = new FetchWaiter();
- ClientGetter get = new ClientGetter(fw, core.chkFetchScheduler,
core.sskFetchScheduler, uri, context, priorityClass, this, null);
+ ClientGetter get = new ClientGetter(fw,
core.requestStarters.chkFetchScheduler, core.requestStarters.sskFetchScheduler,
uri, context, priorityClass, this, null);
get.start();
return fw.waitForCompletion();
}
@@ -113,7 +113,7 @@
if(uri == null) throw new NullPointerException();
FetcherContext context = getFetcherContext(overrideMaxSize);
FetchWaiter fw = new FetchWaiter();
- ClientGetter get = new ClientGetter(fw, core.chkFetchScheduler,
core.sskFetchScheduler, uri, context, priorityClass, this, null);
+ ClientGetter get = new ClientGetter(fw,
core.requestStarters.chkFetchScheduler, core.requestStarters.sskFetchScheduler,
uri, context, priorityClass, this, null);
get.start();
return fw.waitForCompletion();
}
@@ -126,7 +126,7 @@
InserterContext context = getInserterContext(true);
PutWaiter pw = new PutWaiter();
ClientPutter put = new ClientPutter(pw, insert.data,
insert.desiredURI, insert.clientMetadata,
- context, core.chkPutScheduler,
core.sskPutScheduler, priorityClass, getCHKOnly, isMetadata, this, null);
+ context, core.requestStarters.chkPutScheduler,
core.requestStarters.sskPutScheduler, priorityClass, getCHKOnly, isMetadata,
this, null);
put.start();
return pw.waitForCompletion();
}
@@ -151,7 +151,7 @@
public FreenetURI insertManifest(FreenetURI insertURI, HashMap
bucketsByName, String defaultName) throws InserterException {
PutWaiter pw = new PutWaiter();
SimpleManifestPutter putter =
- new SimpleManifestPutter(pw, core.chkPutScheduler,
core.sskPutScheduler,
SimpleManifestPutter.bucketsByNameToManifestEntries(bucketsByName),
priorityClass, insertURI, defaultName, getInserterContext(true), false, this);
+ new SimpleManifestPutter(pw,
core.requestStarters.chkPutScheduler, core.requestStarters.sskPutScheduler,
SimpleManifestPutter.bucketsByNameToManifestEntries(bucketsByName),
priorityClass, insertURI, defaultName, getInserterContext(true), false, this);
putter.start();
return pw.waitForCompletion();
}
Modified: trunk/freenet/src/freenet/client/async/USKManager.java
===================================================================
--- trunk/freenet/src/freenet/client/async/USKManager.java 2006-08-12
19:24:25 UTC (rev 10048)
+++ trunk/freenet/src/freenet/client/async/USKManager.java 2006-08-12
19:54:58 UTC (rev 10049)
@@ -56,8 +56,8 @@
backgroundFetchContext =
core.makeClient(RequestStarter.UPDATE_PRIORITY_CLASS).getFetcherContext();
backgroundFetchContext.followRedirects = false;
backgroundFetchContext.uskManager = this;
- this.chkRequestScheduler = core.chkFetchScheduler;
- this.sskRequestScheduler = core.sskFetchScheduler;
+ this.chkRequestScheduler =
core.requestStarters.chkFetchScheduler;
+ this.sskRequestScheduler =
core.requestStarters.sskFetchScheduler;
latestVersionByClearUSK = new HashMap();
subscribersByClearUSK = new HashMap();
fetchersByUSK = new HashMap();
Modified: trunk/freenet/src/freenet/clients/http/NinjaSpider.java
===================================================================
--- trunk/freenet/src/freenet/clients/http/NinjaSpider.java 2006-08-12
19:24:25 UTC (rev 10048)
+++ trunk/freenet/src/freenet/clients/http/NinjaSpider.java 2006-08-12
19:54:58 UTC (rev 10049)
@@ -181,7 +181,7 @@
}
private ClientGetter makeGetter(FreenetURI uri) {
- ClientGetter g = new ClientGetter(this, core.chkFetchScheduler,
core.sskFetchScheduler, uri, ctx, PRIORITY_CLASS, this, null);
+ ClientGetter g = new ClientGetter(this,
core.requestStarters.chkFetchScheduler, core.requestStarters.sskFetchScheduler,
uri, ctx, PRIORITY_CLASS, this, null);
return g;
}
Modified: trunk/freenet/src/freenet/clients/http/Spider.java
===================================================================
--- trunk/freenet/src/freenet/clients/http/Spider.java 2006-08-12 19:24:25 UTC
(rev 10048)
+++ trunk/freenet/src/freenet/clients/http/Spider.java 2006-08-12 19:54:58 UTC
(rev 10049)
@@ -128,7 +128,7 @@
}
private ClientGetter makeGetter(FreenetURI uri) {
- ClientGetter g = new ClientGetter(this, core.chkFetchScheduler,
core.sskFetchScheduler, uri, ctx, PRIORITY_CLASS, this, null);
+ ClientGetter g = new ClientGetter(this,
core.requestStarters.chkFetchScheduler, core.requestStarters.sskFetchScheduler,
uri, ctx, PRIORITY_CLASS, this, null);
return g;
}
Modified: trunk/freenet/src/freenet/node/ARKFetcher.java
===================================================================
--- trunk/freenet/src/freenet/node/ARKFetcher.java 2006-08-12 19:24:25 UTC
(rev 10048)
+++ trunk/freenet/src/freenet/node/ARKFetcher.java 2006-08-12 19:54:58 UTC
(rev 10049)
@@ -91,7 +91,7 @@
startedEdition = uri.getSuggestedEdition();
fetchingURI = uri;
Logger.minor(this, "Fetching ARK: "+uri+" for
"+peer);
- cg = new ClientGetter(this,
node.clientCore.chkFetchScheduler, node.clientCore.sskFetchScheduler,
+ cg = new ClientGetter(this,
node.clientCore.requestStarters.chkFetchScheduler,
node.clientCore.requestStarters.sskFetchScheduler,
uri, node.arkFetcherContext,
RequestStarter.UPDATE_PRIORITY_CLASS,
this, new ArrayBucket());
getter = cg;
Modified: trunk/freenet/src/freenet/node/NodeARKInserter.java
===================================================================
--- trunk/freenet/src/freenet/node/NodeARKInserter.java 2006-08-12 19:24:25 UTC
(rev 10048)
+++ trunk/freenet/src/freenet/node/NodeARKInserter.java 2006-08-12 19:54:58 UTC
(rev 10049)
@@ -131,7 +131,7 @@
inserter = new ClientPutter(this, b, uri,
new ClientMetadata("text/plain") /* it
won't quite fit in an SSK anyway */,
node.clientCore.makeClient((short)0).getInserterContext(true),
- node.clientCore.chkPutScheduler,
node.clientCore.sskPutScheduler, RequestStarter.INTERACTIVE_PRIORITY_CLASS,
false, false, this, null);
+
node.clientCore.requestStarters.chkPutScheduler,
node.clientCore.requestStarters.sskPutScheduler,
RequestStarter.INTERACTIVE_PRIORITY_CLASS, false, false, this, null);
try {
Modified: trunk/freenet/src/freenet/node/NodeClientCore.java
===================================================================
--- trunk/freenet/src/freenet/node/NodeClientCore.java 2006-08-12 19:24:25 UTC
(rev 10048)
+++ trunk/freenet/src/freenet/node/NodeClientCore.java 2006-08-12 19:54:58 UTC
(rev 10049)
@@ -7,7 +7,6 @@
import freenet.client.HighLevelSimpleClient;
import freenet.client.HighLevelSimpleClientImpl;
import freenet.client.InserterContext;
-import freenet.client.async.ClientRequestScheduler;
import freenet.client.async.HealingQueue;
import freenet.client.async.SimpleHealingQueue;
import freenet.client.async.USKManager;
@@ -46,31 +45,20 @@
import freenet.support.io.PersistentEncryptedTempBucketFactory;
import freenet.support.io.PersistentTempBucketFactory;
import freenet.support.io.TempBucketFactory;
-import freenet.support.math.BootstrappingDecayingRunningAverage;
-import freenet.support.math.TimeDecayingRunningAverage;
+/**
+ * The connection between the node and the client layer.
+ */
public class NodeClientCore {
public final USKManager uskManager;
final ArchiveManager archiveManager;
- final ThrottleWindowManager throttleWindow;
- final MyRequestThrottle chkRequestThrottle;
- final RequestStarter chkRequestStarter;
- final MyRequestThrottle chkInsertThrottle;
- final RequestStarter chkInsertStarter;
- final MyRequestThrottle sskRequestThrottle;
- final RequestStarter sskRequestStarter;
- final MyRequestThrottle sskInsertThrottle;
- final RequestStarter sskInsertStarter;
+ public final RequestStarterGroup requestStarters;
private final HealingQueue healingQueue;
/** Must be included as a hidden field in order for any dangerous HTTP
operation to complete successfully. */
public final String formPassword;
File downloadDir;
- public final ClientRequestScheduler chkFetchScheduler;
- public final ClientRequestScheduler chkPutScheduler;
- public final ClientRequestScheduler sskFetchScheduler;
- public final ClientRequestScheduler sskPutScheduler;
final FilenameGenerator tempFilenameGenerator;
public final BucketFactory tempBucketFactory;
final Node node;
@@ -104,7 +92,7 @@
random.nextBytes(pwdBuf);
this.formPassword = Base64.encode(pwdBuf);
alerts = new UserAlertManager();
- throttleWindow = new ThrottleWindowManager(2.0);
+ requestStarters = new RequestStarterGroup(node, this,
portNumber, random, config);
// Temp files
@@ -183,81 +171,20 @@
}
- SubConfig schedulerConfig = new SubConfig("node.scheduler",
config);
-
archiveManager = new ArchiveManager(MAX_ARCHIVE_HANDLERS,
MAX_CACHED_ARCHIVE_DATA, MAX_ARCHIVE_SIZE, MAX_ARCHIVED_FILE_SIZE,
MAX_CACHED_ELEMENTS, random, tempFilenameGenerator);
- chkRequestThrottle = new MyRequestThrottle(throttleWindow,
5000, "CHK Request");
- chkRequestStarter = new RequestStarter(this,
chkRequestThrottle, "CHK Request starter ("+portNumber+")",
node.requestOutputThrottle, node.requestInputThrottle,
node.localChkFetchBytesSentAverage, node.localChkFetchBytesReceivedAverage);
- chkFetchScheduler = new ClientRequestScheduler(false, false,
random, chkRequestStarter, node, schedulerConfig, "CHKrequester");
- chkRequestStarter.setScheduler(chkFetchScheduler);
- chkRequestStarter.start();
- //insertThrottle = new ChainedRequestThrottle(10000, 2.0F,
requestThrottle);
- // FIXME reenable the above
- chkInsertThrottle = new MyRequestThrottle(throttleWindow,
20000, "CHK Insert");
- chkInsertStarter = new RequestStarter(this, chkInsertThrottle,
"CHK Insert starter ("+portNumber+")", node.requestOutputThrottle,
node.requestInputThrottle, node.localChkInsertBytesSentAverage,
node.localChkInsertBytesReceivedAverage);
- chkPutScheduler = new ClientRequestScheduler(true, false,
random, chkInsertStarter, node, schedulerConfig, "CHKinserter");
- chkInsertStarter.setScheduler(chkPutScheduler);
- chkInsertStarter.start();
-
- sskRequestThrottle = new MyRequestThrottle(throttleWindow,
5000, "SSK Request");
- sskRequestStarter = new RequestStarter(this,
sskRequestThrottle, "SSK Request starter ("+portNumber+")",
node.requestOutputThrottle, node.requestInputThrottle,
node.localSskFetchBytesSentAverage, node.localSskFetchBytesReceivedAverage);
- sskFetchScheduler = new ClientRequestScheduler(false, true,
random, sskRequestStarter, node, schedulerConfig, "SSKrequester");
- sskRequestStarter.setScheduler(sskFetchScheduler);
- sskRequestStarter.start();
- //insertThrottle = new ChainedRequestThrottle(10000, 2.0F,
requestThrottle);
- // FIXME reenable the above
- sskInsertThrottle = new MyRequestThrottle(throttleWindow,
20000, "SSK Insert");
- sskInsertStarter = new RequestStarter(this, sskInsertThrottle,
"SSK Insert starter ("+portNumber+")", node.requestOutputThrottle,
node.requestInputThrottle, node.localSskInsertBytesSentAverage,
node.localSskFetchBytesReceivedAverage);
- sskPutScheduler = new ClientRequestScheduler(true, true,
random, sskInsertStarter, node, schedulerConfig, "SSKinserter");
- sskInsertStarter.setScheduler(sskPutScheduler);
- sskInsertStarter.start();
-
Logger.normal(this, "Initializing USK Manager");
System.out.println("Initializing USK Manager");
uskManager = new USKManager(this);
- healingQueue = new SimpleHealingQueue(chkPutScheduler,
+ healingQueue = new
SimpleHealingQueue(requestStarters.chkPutScheduler,
new InserterContext(tempBucketFactory,
tempBucketFactory, persistentTempBucketFactory,
random, 0, 2, 1, 0, 0, new
SimpleEventProducer(),
false, uskManager),
RequestStarter.PREFETCH_PRIORITY_CLASS, 512 /* FIXME make configurable */);
- schedulerConfig.finishedInitialization();
}
- public class MyRequestThrottle implements BaseRequestThrottle {
- private final BootstrappingDecayingRunningAverage
roundTripTime;
-
- public MyRequestThrottle(ThrottleWindowManager throttleWindow,
int rtt, String string) {
- roundTripTime = new
BootstrappingDecayingRunningAverage(rtt, 10, 5*60*1000, 10);
- }
-
- public synchronized long getDelay() {
- double rtt = roundTripTime.currentValue();
- double winSizeForMinPacketDelay = rtt / MIN_DELAY;
- double _simulatedWindowSize =
throttleWindow.currentValue();
- if (_simulatedWindowSize > winSizeForMinPacketDelay) {
- _simulatedWindowSize = winSizeForMinPacketDelay;
- }
- if (_simulatedWindowSize < 1.0) {
- _simulatedWindowSize = 1.0F;
- }
- // return (long) (_roundTripTime /
_simulatedWindowSize);
- return Math.max(MIN_DELAY, Math.min((long) (rtt /
_simulatedWindowSize), MAX_DELAY));
- }
-
- public synchronized void successfulCompletion(long rtt) {
- roundTripTime.report(Math.max(rtt, 10));
- Logger.minor(this, "Reported successful completion:
"+rtt+" on "+this+" avg "+roundTripTime.currentValue());
- }
-
- public String toString() {
- return "rtt: "+roundTripTime.currentValue()+"
_s="+throttleWindow.currentValue();
- }
- }
-
-
public void start(Config config) throws NodeInitException {
// TMCI
try{
@@ -337,7 +264,7 @@
while(true) {
if(rs.waitUntilStatusChange() && (!rejectedOverload)) {
// See below; inserts count both
- throttleWindow.rejectedOverload();
+
requestStarters.throttleWindow.rejectedOverload();
rejectedOverload = true;
}
@@ -356,7 +283,7 @@
(status ==
RequestSender.GENERATED_REJECTED_OVERLOAD)) {
if(!rejectedOverload) {
// See below
- throttleWindow.rejectedOverload();
+
requestStarters.throttleWindow.rejectedOverload();
rejectedOverload = true;
}
} else {
@@ -366,8 +293,8 @@
(status ==
RequestSender.VERIFY_FAILURE)) {
long rtt = System.currentTimeMillis() -
startTime;
if(!rejectedOverload)
-
throttleWindow.requestCompleted();
-
chkRequestThrottle.successfulCompletion(rtt);
+
requestStarters.throttleWindow.requestCompleted();
+
requestStarters.chkRequestThrottle.successfulCompletion(rtt);
}
}
@@ -432,7 +359,7 @@
boolean rejectedOverload = false;
while(true) {
if(rs.waitUntilStatusChange() && (!rejectedOverload)) {
- throttleWindow.rejectedOverload();
+
requestStarters.throttleWindow.rejectedOverload();
rejectedOverload = true;
}
@@ -450,7 +377,7 @@
if((status == RequestSender.TIMED_OUT) ||
(status ==
RequestSender.GENERATED_REJECTED_OVERLOAD)) {
if(!rejectedOverload) {
- throttleWindow.rejectedOverload();
+
requestStarters.throttleWindow.rejectedOverload();
rejectedOverload = true;
}
} else {
@@ -461,8 +388,8 @@
long rtt = System.currentTimeMillis() -
startTime;
if(!rejectedOverload)
-
throttleWindow.requestCompleted();
-
sskRequestThrottle.successfulCompletion(rtt);
+
requestStarters.throttleWindow.requestCompleted();
+
requestStarters.sskRequestThrottle.successfulCompletion(rtt);
}
}
@@ -541,7 +468,7 @@
}
if((!hasReceivedRejectedOverload) &&
is.receivedRejectedOverload()) {
hasReceivedRejectedOverload = true;
- throttleWindow.rejectedOverload();
+
requestStarters.throttleWindow.rejectedOverload();
}
}
@@ -557,7 +484,7 @@
}
if(is.anyTransfersFailed() &&
(!hasReceivedRejectedOverload)) {
hasReceivedRejectedOverload = true; // not
strictly true but same effect
- throttleWindow.rejectedOverload();
+
requestStarters.throttleWindow.rejectedOverload();
}
}
@@ -572,9 +499,9 @@
long endTime = System.currentTimeMillis();
long len = endTime - startTime;
- chkInsertThrottle.successfulCompletion(len);
+
requestStarters.chkInsertThrottle.successfulCompletion(len);
if(!hasReceivedRejectedOverload)
- throttleWindow.requestCompleted();
+
requestStarters.throttleWindow.requestCompleted();
}
}
@@ -652,7 +579,7 @@
}
if((!hasReceivedRejectedOverload) &&
is.receivedRejectedOverload()) {
hasReceivedRejectedOverload = true;
- throttleWindow.rejectedOverload();
+
requestStarters.throttleWindow.rejectedOverload();
}
}
@@ -678,8 +605,8 @@
// It worked!
long endTime = System.currentTimeMillis();
long rtt = endTime - startTime;
- throttleWindow.requestCompleted();
- sskInsertThrottle.successfulCompletion(rtt);
+
requestStarters.throttleWindow.requestCompleted();
+
requestStarters.sskInsertThrottle.successfulCompletion(rtt);
}
}
@@ -739,22 +666,6 @@
return new HighLevelSimpleClientImpl(this, archiveManager,
tempBucketFactory, random, !Node.DONT_CACHE_LOCAL_REQUESTS, prioClass);
}
- public BaseRequestThrottle getCHKRequestThrottle() {
- return chkRequestThrottle;
- }
-
- public BaseRequestThrottle getCHKInsertThrottle() {
- return chkInsertThrottle;
- }
-
- public BaseRequestThrottle getSSKRequestThrottle() {
- return sskRequestThrottle;
- }
-
- public BaseRequestThrottle getSSKInsertThrottle() {
- return sskInsertThrottle;
- }
-
public FCPServer getFCPServer() {
return fcpServer;
}
@@ -807,9 +718,9 @@
SimpleSendableInsert ssi = new SimpleSendableInsert(this,
block, RequestStarter.MAXIMUM_PRIORITY_CLASS);
Logger.minor(this, "Queueing random reinsert for "+block+" :
"+ssi);
if(block instanceof CHKBlock)
- chkPutScheduler.register(ssi);
+ requestStarters.chkPutScheduler.register(ssi);
else if(block instanceof SSKBlock)
- sskPutScheduler.register(ssi);
+ requestStarters.sskPutScheduler.register(ssi);
else
Logger.error(this, "Don't know what to do with
"+block+" should be queued for reinsert");
}
Added: trunk/freenet/src/freenet/node/RequestStarterGroup.java
===================================================================
--- trunk/freenet/src/freenet/node/RequestStarterGroup.java 2006-08-12
19:24:25 UTC (rev 10048)
+++ trunk/freenet/src/freenet/node/RequestStarterGroup.java 2006-08-12
19:54:58 UTC (rev 10049)
@@ -0,0 +1,109 @@
+package freenet.node;
+
+import freenet.client.async.ClientRequestScheduler;
+import freenet.config.Config;
+import freenet.config.SubConfig;
+import freenet.crypt.RandomSource;
+import freenet.support.Logger;
+import freenet.support.math.BootstrappingDecayingRunningAverage;
+
+public class RequestStarterGroup {
+
+ final ThrottleWindowManager throttleWindow;
+ final MyRequestThrottle chkRequestThrottle;
+ final RequestStarter chkRequestStarter;
+ final MyRequestThrottle chkInsertThrottle;
+ final RequestStarter chkInsertStarter;
+ final MyRequestThrottle sskRequestThrottle;
+ final RequestStarter sskRequestStarter;
+ final MyRequestThrottle sskInsertThrottle;
+ final RequestStarter sskInsertStarter;
+
+ public final ClientRequestScheduler chkFetchScheduler;
+ public final ClientRequestScheduler chkPutScheduler;
+ public final ClientRequestScheduler sskFetchScheduler;
+ public final ClientRequestScheduler sskPutScheduler;
+
+ RequestStarterGroup(Node node, NodeClientCore core, int portNumber,
RandomSource random, Config config) {
+ SubConfig schedulerConfig = new SubConfig("node.scheduler",
config);
+
+ throttleWindow = new ThrottleWindowManager(2.0);
+ chkRequestThrottle = new MyRequestThrottle(throttleWindow,
5000, "CHK Request");
+ chkRequestStarter = new RequestStarter(core,
chkRequestThrottle, "CHK Request starter ("+portNumber+")",
node.requestOutputThrottle, node.requestInputThrottle,
node.localChkFetchBytesSentAverage, node.localChkFetchBytesReceivedAverage);
+ chkFetchScheduler = new ClientRequestScheduler(false, false,
random, chkRequestStarter, node, schedulerConfig, "CHKrequester");
+ chkRequestStarter.setScheduler(chkFetchScheduler);
+ chkRequestStarter.start();
+ //insertThrottle = new ChainedRequestThrottle(10000, 2.0F,
requestThrottle);
+ // FIXME reenable the above
+ chkInsertThrottle = new MyRequestThrottle(throttleWindow,
20000, "CHK Insert");
+ chkInsertStarter = new RequestStarter(core, chkInsertThrottle,
"CHK Insert starter ("+portNumber+")", node.requestOutputThrottle,
node.requestInputThrottle, node.localChkInsertBytesSentAverage,
node.localChkInsertBytesReceivedAverage);
+ chkPutScheduler = new ClientRequestScheduler(true, false,
random, chkInsertStarter, node, schedulerConfig, "CHKinserter");
+ chkInsertStarter.setScheduler(chkPutScheduler);
+ chkInsertStarter.start();
+
+ sskRequestThrottle = new MyRequestThrottle(throttleWindow,
5000, "SSK Request");
+ sskRequestStarter = new RequestStarter(core,
sskRequestThrottle, "SSK Request starter ("+portNumber+")",
node.requestOutputThrottle, node.requestInputThrottle,
node.localSskFetchBytesSentAverage, node.localSskFetchBytesReceivedAverage);
+ sskFetchScheduler = new ClientRequestScheduler(false, true,
random, sskRequestStarter, node, schedulerConfig, "SSKrequester");
+ sskRequestStarter.setScheduler(sskFetchScheduler);
+ sskRequestStarter.start();
+ //insertThrottle = new ChainedRequestThrottle(10000, 2.0F,
requestThrottle);
+ // FIXME reenable the above
+ sskInsertThrottle = new MyRequestThrottle(throttleWindow,
20000, "SSK Insert");
+ sskInsertStarter = new RequestStarter(core, sskInsertThrottle,
"SSK Insert starter ("+portNumber+")", node.requestOutputThrottle,
node.requestInputThrottle, node.localSskInsertBytesSentAverage,
node.localSskFetchBytesReceivedAverage);
+ sskPutScheduler = new ClientRequestScheduler(true, true,
random, sskInsertStarter, node, schedulerConfig, "SSKinserter");
+ sskInsertStarter.setScheduler(sskPutScheduler);
+ sskInsertStarter.start();
+
+ schedulerConfig.finishedInitialization();
+ }
+
+ public class MyRequestThrottle implements BaseRequestThrottle {
+
+ private final BootstrappingDecayingRunningAverage
roundTripTime;
+
+ public MyRequestThrottle(ThrottleWindowManager throttleWindow,
int rtt, String string) {
+ roundTripTime = new
BootstrappingDecayingRunningAverage(rtt, 10, 5*60*1000, 10);
+ }
+
+ public synchronized long getDelay() {
+ double rtt = roundTripTime.currentValue();
+ double winSizeForMinPacketDelay = rtt / MIN_DELAY;
+ double _simulatedWindowSize =
throttleWindow.currentValue();
+ if (_simulatedWindowSize > winSizeForMinPacketDelay) {
+ _simulatedWindowSize = winSizeForMinPacketDelay;
+ }
+ if (_simulatedWindowSize < 1.0) {
+ _simulatedWindowSize = 1.0F;
+ }
+ // return (long) (_roundTripTime /
_simulatedWindowSize);
+ return Math.max(MIN_DELAY, Math.min((long) (rtt /
_simulatedWindowSize), MAX_DELAY));
+ }
+
+ public synchronized void successfulCompletion(long rtt) {
+ roundTripTime.report(Math.max(rtt, 10));
+ Logger.minor(this, "Reported successful completion:
"+rtt+" on "+this+" avg "+roundTripTime.currentValue());
+ }
+
+ public String toString() {
+ return "rtt: "+roundTripTime.currentValue()+"
_s="+throttleWindow.currentValue();
+ }
+ }
+
+ public BaseRequestThrottle getCHKRequestThrottle() {
+ return chkRequestThrottle;
+ }
+
+ public BaseRequestThrottle getCHKInsertThrottle() {
+ return chkInsertThrottle;
+ }
+
+ public BaseRequestThrottle getSSKRequestThrottle() {
+ return sskRequestThrottle;
+ }
+
+ public BaseRequestThrottle getSSKInsertThrottle() {
+ return sskInsertThrottle;
+ }
+
+
+}
Modified: trunk/freenet/src/freenet/node/fcp/ClientGet.java
===================================================================
--- trunk/freenet/src/freenet/node/fcp/ClientGet.java 2006-08-12 19:24:25 UTC
(rev 10048)
+++ trunk/freenet/src/freenet/node/fcp/ClientGet.java 2006-08-12 19:54:58 UTC
(rev 10049)
@@ -114,7 +114,7 @@
ret.free();
throw e;
}
- getter = new ClientGetter(this, client.core.chkFetchScheduler,
client.core.sskFetchScheduler, uri, fctx, priorityClass, client, returnBucket);
+ getter = new ClientGetter(this,
client.core.requestStarters.chkFetchScheduler,
client.core.requestStarters.sskFetchScheduler, uri, fctx, priorityClass,
client, returnBucket);
if(persistenceType != PERSIST_CONNECTION) {
FCPMessage msg = persistentTagMessage();
client.queueClientRequestMessage(msg, 0);
@@ -171,7 +171,7 @@
ret.free();
throw e;
}
- getter = new ClientGetter(this, client.core.chkFetchScheduler,
client.core.sskFetchScheduler, uri, fctx, priorityClass, client, returnBucket);
+ getter = new ClientGetter(this,
client.core.requestStarters.chkFetchScheduler,
client.core.requestStarters.sskFetchScheduler, uri, fctx, priorityClass,
client, returnBucket);
if(persistenceType != PERSIST_CONNECTION) {
FCPMessage msg = persistentTagMessage();
client.queueClientRequestMessage(msg, 0);
@@ -235,7 +235,7 @@
}
returnBucket = ret;
- getter = new ClientGetter(this, client.core.chkFetchScheduler,
client.core.sskFetchScheduler, uri, fctx, priorityClass, client, returnBucket);
+ getter = new ClientGetter(this,
client.core.requestStarters.chkFetchScheduler,
client.core.requestStarters.sskFetchScheduler, uri, fctx, priorityClass,
client, returnBucket);
if(persistenceType != PERSIST_CONNECTION) {
FCPMessage msg = persistentTagMessage();
client.queueClientRequestMessage(msg, 0);
Modified: trunk/freenet/src/freenet/node/fcp/ClientPut.java
===================================================================
--- trunk/freenet/src/freenet/node/fcp/ClientPut.java 2006-08-12 19:24:25 UTC
(rev 10048)
+++ trunk/freenet/src/freenet/node/fcp/ClientPut.java 2006-08-12 19:54:58 UTC
(rev 10049)
@@ -110,7 +110,7 @@
this.clientMetadata = cm;
Logger.minor(this, "data = "+data+", uploadFrom =
"+ClientPutMessage.uploadFromString(uploadFrom));
inserter = new ClientPutter(this, data, uri, cm,
- ctx, client.core.chkPutScheduler,
client.core.sskPutScheduler, priorityClass,
+ ctx,
client.core.requestStarters.chkPutScheduler,
client.core.requestStarters.sskPutScheduler, priorityClass,
getCHKOnly, isMetadata, client, null);
if(persistenceType != PERSIST_CONNECTION) {
FCPMessage msg = persistentTagMessage();
@@ -157,7 +157,7 @@
this.clientMetadata = cm;
Logger.minor(this, "data = "+data+", uploadFrom =
"+ClientPutMessage.uploadFromString(uploadFrom));
inserter = new ClientPutter(this, data, uri, cm,
- ctx, client.core.chkPutScheduler,
client.core.sskPutScheduler, priorityClass,
+ ctx,
client.core.requestStarters.chkPutScheduler,
client.core.requestStarters.sskPutScheduler, priorityClass,
getCHKOnly, isMetadata, client, null);
if(persistenceType != PERSIST_CONNECTION) {
FCPMessage msg = persistentTagMessage();
@@ -245,8 +245,8 @@
throw new PersistenceParseException("shouldn't happen");
}
this.clientMetadata = cm;
- inserter = new ClientPutter(this, data, uri, cm, ctx,
client.core.chkPutScheduler,
- client.core.sskPutScheduler, priorityClass,
getCHKOnly, isMetadata, client, fs.subset("progress"));
+ inserter = new ClientPutter(this, data, uri, cm, ctx,
client.core.requestStarters.chkPutScheduler,
+ client.core.requestStarters.sskPutScheduler,
priorityClass, getCHKOnly, isMetadata, client, fs.subset("progress"));
if(persistenceType != PERSIST_CONNECTION) {
FCPMessage msg = persistentTagMessage();
client.queueClientRequestMessage(msg, 0);
Modified: trunk/freenet/src/freenet/node/fcp/ClientPutDir.java
===================================================================
--- trunk/freenet/src/freenet/node/fcp/ClientPutDir.java 2006-08-12
19:24:25 UTC (rev 10048)
+++ trunk/freenet/src/freenet/node/fcp/ClientPutDir.java 2006-08-12
19:54:58 UTC (rev 10049)
@@ -37,7 +37,7 @@
this.defaultName = message.defaultName;
SimpleManifestPutter p;
try {
- p = new SimpleManifestPutter(this,
client.core.chkPutScheduler, client.core.sskPutScheduler,
+ p = new SimpleManifestPutter(this,
client.core.requestStarters.chkPutScheduler,
client.core.requestStarters.sskPutScheduler,
manifestElements, priorityClass, uri,
defaultName, ctx, message.getCHKOnly, client);
} catch (InserterException e) {
onFailure(e, null);
@@ -123,7 +123,7 @@
SimpleManifestPutter p = null;
try {
if(!finished)
- p = new SimpleManifestPutter(this,
client.core.chkPutScheduler, client.core.sskPutScheduler,
+ p = new SimpleManifestPutter(this,
client.core.requestStarters.chkPutScheduler,
client.core.requestStarters.sskPutScheduler,
manifestElements,
priorityClass, uri, defaultName, ctx, getCHKOnly, client);
} catch (InserterException e) {
onFailure(e, null);
Modified: trunk/freenet/src/freenet/node/updater/NodeUpdater.java
===================================================================
--- trunk/freenet/src/freenet/node/updater/NodeUpdater.java 2006-08-12
19:24:25 UTC (rev 10048)
+++ trunk/freenet/src/freenet/node/updater/NodeUpdater.java 2006-08-12
19:54:58 UTC (rev 10049)
@@ -159,7 +159,7 @@
try{
if((cg==null)||cg.isCancelled()){
Logger.minor(this, "Scheduling request
for "+URI.setSuggestedEdition(availableVersion));
- cg = new ClientGetter(this,
core.chkFetchScheduler, core.sskFetchScheduler,
+ cg = new ClientGetter(this,
core.requestStarters.chkFetchScheduler, core.requestStarters.sskFetchScheduler,
URI.setSuggestedEdition(availableVersion), ctx,
RequestStarter.UPDATE_PRIORITY_CLASS,
this, new
ArrayBucket());
toStart = cg;
@@ -509,7 +509,7 @@
Logger.minor(this,
"fetcher="+revocationGetter);
if(revocationGetter !=
null)
Logger.minor(this, "revocation fetcher:
cancelled="+revocationGetter.isCancelled()+",
finished="+revocationGetter.isFinished());
- cg = revocationGetter =
new ClientGetter(NodeUpdater.this, core.chkFetchScheduler,
core.sskFetchScheduler, revocationURI, ctxRevocation,
RequestStarter.MAXIMUM_PRIORITY_CLASS, NodeUpdater.this, null);
+ cg = revocationGetter =
new ClientGetter(NodeUpdater.this, core.requestStarters.chkFetchScheduler,
core.requestStarters.sskFetchScheduler, revocationURI, ctxRevocation,
RequestStarter.MAXIMUM_PRIORITY_CLASS, NodeUpdater.this, null);
Logger.minor(this,
"Queued another revocation fetcher");
}
}