Author: toad
Date: 2006-08-12 19:54:58 +0000 (Sat, 12 Aug 2006)
New Revision: 10049

Added:
   trunk/freenet/src/freenet/node/RequestStarterGroup.java
Modified:
   trunk/freenet/src/freenet/client/HighLevelSimpleClientImpl.java
   trunk/freenet/src/freenet/client/async/USKManager.java
   trunk/freenet/src/freenet/clients/http/NinjaSpider.java
   trunk/freenet/src/freenet/clients/http/Spider.java
   trunk/freenet/src/freenet/node/ARKFetcher.java
   trunk/freenet/src/freenet/node/NodeARKInserter.java
   trunk/freenet/src/freenet/node/NodeClientCore.java
   trunk/freenet/src/freenet/node/fcp/ClientGet.java
   trunk/freenet/src/freenet/node/fcp/ClientPut.java
   trunk/freenet/src/freenet/node/fcp/ClientPutDir.java
   trunk/freenet/src/freenet/node/updater/NodeUpdater.java
Log:
Separate RequestStarter's and related stuff into RequestStarterGroup.
This includes all the load limiting code.
Next stage is to persist the load limiting data.

Modified: trunk/freenet/src/freenet/client/HighLevelSimpleClientImpl.java
===================================================================
--- trunk/freenet/src/freenet/client/HighLevelSimpleClientImpl.java     
2006-08-12 19:24:25 UTC (rev 10048)
+++ trunk/freenet/src/freenet/client/HighLevelSimpleClientImpl.java     
2006-08-12 19:54:58 UTC (rev 10049)
@@ -104,7 +104,7 @@
                if(uri == null) throw new NullPointerException();
                FetcherContext context = getFetcherContext();
                FetchWaiter fw = new FetchWaiter();
-               ClientGetter get = new ClientGetter(fw, core.chkFetchScheduler, 
core.sskFetchScheduler, uri, context, priorityClass, this, null);
+               ClientGetter get = new ClientGetter(fw, 
core.requestStarters.chkFetchScheduler, core.requestStarters.sskFetchScheduler, 
uri, context, priorityClass, this, null);
                get.start();
                return fw.waitForCompletion();
        }
@@ -113,7 +113,7 @@
                if(uri == null) throw new NullPointerException();
                FetcherContext context = getFetcherContext(overrideMaxSize);
                FetchWaiter fw = new FetchWaiter();
-               ClientGetter get = new ClientGetter(fw, core.chkFetchScheduler, 
core.sskFetchScheduler, uri, context, priorityClass, this, null);
+               ClientGetter get = new ClientGetter(fw, 
core.requestStarters.chkFetchScheduler, core.requestStarters.sskFetchScheduler, 
uri, context, priorityClass, this, null);
                get.start();
                return fw.waitForCompletion();
        }
@@ -126,7 +126,7 @@
                InserterContext context = getInserterContext(true);
                PutWaiter pw = new PutWaiter();
                ClientPutter put = new ClientPutter(pw, insert.data, 
insert.desiredURI, insert.clientMetadata, 
-                               context, core.chkPutScheduler, 
core.sskPutScheduler, priorityClass, getCHKOnly, isMetadata, this, null);
+                               context, core.requestStarters.chkPutScheduler, 
core.requestStarters.sskPutScheduler, priorityClass, getCHKOnly, isMetadata, 
this, null);
                put.start();
                return pw.waitForCompletion();
        }
@@ -151,7 +151,7 @@
        public FreenetURI insertManifest(FreenetURI insertURI, HashMap 
bucketsByName, String defaultName) throws InserterException {
                PutWaiter pw = new PutWaiter();
                SimpleManifestPutter putter =
-                       new SimpleManifestPutter(pw, core.chkPutScheduler, 
core.sskPutScheduler, 
SimpleManifestPutter.bucketsByNameToManifestEntries(bucketsByName), 
priorityClass, insertURI, defaultName, getInserterContext(true), false, this);
+                       new SimpleManifestPutter(pw, 
core.requestStarters.chkPutScheduler, core.requestStarters.sskPutScheduler, 
SimpleManifestPutter.bucketsByNameToManifestEntries(bucketsByName), 
priorityClass, insertURI, defaultName, getInserterContext(true), false, this);
                putter.start();
                return pw.waitForCompletion();
        }

Modified: trunk/freenet/src/freenet/client/async/USKManager.java
===================================================================
--- trunk/freenet/src/freenet/client/async/USKManager.java      2006-08-12 
19:24:25 UTC (rev 10048)
+++ trunk/freenet/src/freenet/client/async/USKManager.java      2006-08-12 
19:54:58 UTC (rev 10049)
@@ -56,8 +56,8 @@
                backgroundFetchContext = 
core.makeClient(RequestStarter.UPDATE_PRIORITY_CLASS).getFetcherContext();
                backgroundFetchContext.followRedirects = false;
                backgroundFetchContext.uskManager = this;
-               this.chkRequestScheduler = core.chkFetchScheduler;
-               this.sskRequestScheduler = core.sskFetchScheduler;
+               this.chkRequestScheduler = 
core.requestStarters.chkFetchScheduler;
+               this.sskRequestScheduler = 
core.requestStarters.sskFetchScheduler;
                latestVersionByClearUSK = new HashMap();
                subscribersByClearUSK = new HashMap();
                fetchersByUSK = new HashMap();

Modified: trunk/freenet/src/freenet/clients/http/NinjaSpider.java
===================================================================
--- trunk/freenet/src/freenet/clients/http/NinjaSpider.java     2006-08-12 
19:24:25 UTC (rev 10048)
+++ trunk/freenet/src/freenet/clients/http/NinjaSpider.java     2006-08-12 
19:54:58 UTC (rev 10049)
@@ -181,7 +181,7 @@
        }

        private ClientGetter makeGetter(FreenetURI uri) {
-               ClientGetter g = new ClientGetter(this, core.chkFetchScheduler, 
core.sskFetchScheduler, uri, ctx, PRIORITY_CLASS, this, null);
+               ClientGetter g = new ClientGetter(this, 
core.requestStarters.chkFetchScheduler, core.requestStarters.sskFetchScheduler, 
uri, ctx, PRIORITY_CLASS, this, null);
                return g;
        }


Modified: trunk/freenet/src/freenet/clients/http/Spider.java
===================================================================
--- trunk/freenet/src/freenet/clients/http/Spider.java  2006-08-12 19:24:25 UTC 
(rev 10048)
+++ trunk/freenet/src/freenet/clients/http/Spider.java  2006-08-12 19:54:58 UTC 
(rev 10049)
@@ -128,7 +128,7 @@
        }

        private ClientGetter makeGetter(FreenetURI uri) {
-               ClientGetter g = new ClientGetter(this, core.chkFetchScheduler, 
core.sskFetchScheduler, uri, ctx, PRIORITY_CLASS, this, null);
+               ClientGetter g = new ClientGetter(this, 
core.requestStarters.chkFetchScheduler, core.requestStarters.sskFetchScheduler, 
uri, ctx, PRIORITY_CLASS, this, null);
                return g;
        }


Modified: trunk/freenet/src/freenet/node/ARKFetcher.java
===================================================================
--- trunk/freenet/src/freenet/node/ARKFetcher.java      2006-08-12 19:24:25 UTC 
(rev 10048)
+++ trunk/freenet/src/freenet/node/ARKFetcher.java      2006-08-12 19:54:58 UTC 
(rev 10049)
@@ -91,7 +91,7 @@
                                startedEdition = uri.getSuggestedEdition();
                                fetchingURI = uri;
                                Logger.minor(this, "Fetching ARK: "+uri+" for 
"+peer);
-                               cg = new ClientGetter(this, 
node.clientCore.chkFetchScheduler, node.clientCore.sskFetchScheduler, 
+                               cg = new ClientGetter(this, 
node.clientCore.requestStarters.chkFetchScheduler, 
node.clientCore.requestStarters.sskFetchScheduler, 
                                                uri, node.arkFetcherContext, 
RequestStarter.UPDATE_PRIORITY_CLASS, 
                                                this, new ArrayBucket());
                                getter = cg;

Modified: trunk/freenet/src/freenet/node/NodeARKInserter.java
===================================================================
--- trunk/freenet/src/freenet/node/NodeARKInserter.java 2006-08-12 19:24:25 UTC 
(rev 10048)
+++ trunk/freenet/src/freenet/node/NodeARKInserter.java 2006-08-12 19:54:58 UTC 
(rev 10049)
@@ -131,7 +131,7 @@
                inserter = new ClientPutter(this, b, uri,
                                        new ClientMetadata("text/plain") /* it 
won't quite fit in an SSK anyway */, 
                                        
node.clientCore.makeClient((short)0).getInserterContext(true),
-                                       node.clientCore.chkPutScheduler, 
node.clientCore.sskPutScheduler, RequestStarter.INTERACTIVE_PRIORITY_CLASS, 
false, false, this, null);
+                                       
node.clientCore.requestStarters.chkPutScheduler, 
node.clientCore.requestStarters.sskPutScheduler, 
RequestStarter.INTERACTIVE_PRIORITY_CLASS, false, false, this, null);

                try {


Modified: trunk/freenet/src/freenet/node/NodeClientCore.java
===================================================================
--- trunk/freenet/src/freenet/node/NodeClientCore.java  2006-08-12 19:24:25 UTC 
(rev 10048)
+++ trunk/freenet/src/freenet/node/NodeClientCore.java  2006-08-12 19:54:58 UTC 
(rev 10049)
@@ -7,7 +7,6 @@
 import freenet.client.HighLevelSimpleClient;
 import freenet.client.HighLevelSimpleClientImpl;
 import freenet.client.InserterContext;
-import freenet.client.async.ClientRequestScheduler;
 import freenet.client.async.HealingQueue;
 import freenet.client.async.SimpleHealingQueue;
 import freenet.client.async.USKManager;
@@ -46,31 +45,20 @@
 import freenet.support.io.PersistentEncryptedTempBucketFactory;
 import freenet.support.io.PersistentTempBucketFactory;
 import freenet.support.io.TempBucketFactory;
-import freenet.support.math.BootstrappingDecayingRunningAverage;
-import freenet.support.math.TimeDecayingRunningAverage;

+/**
+ * The connection between the node and the client layer.
+ */
 public class NodeClientCore {

        public final USKManager uskManager;
        final ArchiveManager archiveManager;
-       final ThrottleWindowManager throttleWindow;
-       final MyRequestThrottle chkRequestThrottle;
-       final RequestStarter chkRequestStarter;
-       final MyRequestThrottle chkInsertThrottle;
-       final RequestStarter chkInsertStarter;
-       final MyRequestThrottle sskRequestThrottle;
-       final RequestStarter sskRequestStarter;
-       final MyRequestThrottle sskInsertThrottle;
-       final RequestStarter sskInsertStarter;
+       public final RequestStarterGroup requestStarters;
        private final HealingQueue healingQueue;
        /** Must be included as a hidden field in order for any dangerous HTTP 
operation to complete successfully. */
        public final String formPassword;

        File downloadDir;
-       public final ClientRequestScheduler chkFetchScheduler;
-       public final ClientRequestScheduler chkPutScheduler;
-       public final ClientRequestScheduler sskFetchScheduler;
-       public final ClientRequestScheduler sskPutScheduler;
        final FilenameGenerator tempFilenameGenerator;
        public final BucketFactory tempBucketFactory;
        final Node node;
@@ -104,7 +92,7 @@
                random.nextBytes(pwdBuf);
                this.formPassword = Base64.encode(pwdBuf);
                alerts = new UserAlertManager();
-               throttleWindow = new ThrottleWindowManager(2.0);
+               requestStarters = new RequestStarterGroup(node, this, 
portNumber, random, config);

                // Temp files

@@ -183,81 +171,20 @@
                }


-               SubConfig schedulerConfig = new SubConfig("node.scheduler", 
config);
-               
                archiveManager = new ArchiveManager(MAX_ARCHIVE_HANDLERS, 
MAX_CACHED_ARCHIVE_DATA, MAX_ARCHIVE_SIZE, MAX_ARCHIVED_FILE_SIZE, 
MAX_CACHED_ELEMENTS, random, tempFilenameGenerator);
-               chkRequestThrottle = new MyRequestThrottle(throttleWindow, 
5000, "CHK Request");
-               chkRequestStarter = new RequestStarter(this, 
chkRequestThrottle, "CHK Request starter ("+portNumber+")", 
node.requestOutputThrottle, node.requestInputThrottle, 
node.localChkFetchBytesSentAverage, node.localChkFetchBytesReceivedAverage);
-               chkFetchScheduler = new ClientRequestScheduler(false, false, 
random, chkRequestStarter, node, schedulerConfig, "CHKrequester");
-               chkRequestStarter.setScheduler(chkFetchScheduler);
-               chkRequestStarter.start();
-               //insertThrottle = new ChainedRequestThrottle(10000, 2.0F, 
requestThrottle);
-               // FIXME reenable the above
-               chkInsertThrottle = new MyRequestThrottle(throttleWindow, 
20000, "CHK Insert");
-               chkInsertStarter = new RequestStarter(this, chkInsertThrottle, 
"CHK Insert starter ("+portNumber+")", node.requestOutputThrottle, 
node.requestInputThrottle, node.localChkInsertBytesSentAverage, 
node.localChkInsertBytesReceivedAverage);
-               chkPutScheduler = new ClientRequestScheduler(true, false, 
random, chkInsertStarter, node, schedulerConfig, "CHKinserter");
-               chkInsertStarter.setScheduler(chkPutScheduler);
-               chkInsertStarter.start();
-
-               sskRequestThrottle = new MyRequestThrottle(throttleWindow, 
5000, "SSK Request");
-               sskRequestStarter = new RequestStarter(this, 
sskRequestThrottle, "SSK Request starter ("+portNumber+")", 
node.requestOutputThrottle, node.requestInputThrottle, 
node.localSskFetchBytesSentAverage, node.localSskFetchBytesReceivedAverage);
-               sskFetchScheduler = new ClientRequestScheduler(false, true, 
random, sskRequestStarter, node, schedulerConfig, "SSKrequester");
-               sskRequestStarter.setScheduler(sskFetchScheduler);
-               sskRequestStarter.start();
-               //insertThrottle = new ChainedRequestThrottle(10000, 2.0F, 
requestThrottle);
-               // FIXME reenable the above
-               sskInsertThrottle = new MyRequestThrottle(throttleWindow, 
20000, "SSK Insert");
-               sskInsertStarter = new RequestStarter(this, sskInsertThrottle, 
"SSK Insert starter ("+portNumber+")", node.requestOutputThrottle, 
node.requestInputThrottle, node.localSskInsertBytesSentAverage, 
node.localSskFetchBytesReceivedAverage);
-               sskPutScheduler = new ClientRequestScheduler(true, true, 
random, sskInsertStarter, node, schedulerConfig, "SSKinserter");
-               sskInsertStarter.setScheduler(sskPutScheduler);
-               sskInsertStarter.start();
-               
                Logger.normal(this, "Initializing USK Manager");
                System.out.println("Initializing USK Manager");
                uskManager = new USKManager(this);

-               healingQueue = new SimpleHealingQueue(chkPutScheduler,
+               healingQueue = new 
SimpleHealingQueue(requestStarters.chkPutScheduler,
                                new InserterContext(tempBucketFactory, 
tempBucketFactory, persistentTempBucketFactory, 
                                                random, 0, 2, 1, 0, 0, new 
SimpleEventProducer(), 
                                                false, uskManager), 
RequestStarter.PREFETCH_PRIORITY_CLASS, 512 /* FIXME make configurable */);

-               schedulerConfig.finishedInitialization();
        }


-       public class MyRequestThrottle implements BaseRequestThrottle {

-               private final BootstrappingDecayingRunningAverage 
roundTripTime; 
-               
-               public MyRequestThrottle(ThrottleWindowManager throttleWindow, 
int rtt, String string) {
-                       roundTripTime = new 
BootstrappingDecayingRunningAverage(rtt, 10, 5*60*1000, 10);
-               }
-
-               public synchronized long getDelay() {
-                       double rtt = roundTripTime.currentValue();
-                       double winSizeForMinPacketDelay = rtt / MIN_DELAY;
-                       double _simulatedWindowSize = 
throttleWindow.currentValue();
-                       if (_simulatedWindowSize > winSizeForMinPacketDelay) {
-                               _simulatedWindowSize = winSizeForMinPacketDelay;
-                       }
-                       if (_simulatedWindowSize < 1.0) {
-                               _simulatedWindowSize = 1.0F;
-                       }
-                       // return (long) (_roundTripTime / 
_simulatedWindowSize);
-                       return Math.max(MIN_DELAY, Math.min((long) (rtt / 
_simulatedWindowSize), MAX_DELAY));
-               }
-
-               public synchronized void successfulCompletion(long rtt) {
-                       roundTripTime.report(Math.max(rtt, 10));
-                       Logger.minor(this, "Reported successful completion: 
"+rtt+" on "+this+" avg "+roundTripTime.currentValue());
-               }
-               
-               public String toString() {
-                       return "rtt: "+roundTripTime.currentValue()+" 
_s="+throttleWindow.currentValue();
-               }
-       }
-
-
        public void start(Config config) throws NodeInitException {
                // TMCI
                try{
@@ -337,7 +264,7 @@
                while(true) {
                        if(rs.waitUntilStatusChange() && (!rejectedOverload)) {
                                // See below; inserts count both
-                               throttleWindow.rejectedOverload();
+                               
requestStarters.throttleWindow.rejectedOverload();
                                rejectedOverload = true;
                        }

@@ -356,7 +283,7 @@
                                        (status == 
RequestSender.GENERATED_REJECTED_OVERLOAD)) {
                                if(!rejectedOverload) {
                                        // See below
-                                       throttleWindow.rejectedOverload();
+                                       
requestStarters.throttleWindow.rejectedOverload();
                                        rejectedOverload = true;
                                }
                        } else {
@@ -366,8 +293,8 @@
                                                (status == 
RequestSender.VERIFY_FAILURE)) {
                                        long rtt = System.currentTimeMillis() - 
startTime;
                                        if(!rejectedOverload)
-                                               
throttleWindow.requestCompleted();
-                                       
chkRequestThrottle.successfulCompletion(rtt);
+                                               
requestStarters.throttleWindow.requestCompleted();
+                                       
requestStarters.chkRequestThrottle.successfulCompletion(rtt);
                                }
                        }

@@ -432,7 +359,7 @@
                boolean rejectedOverload = false;
                while(true) {
                        if(rs.waitUntilStatusChange() && (!rejectedOverload)) {
-                               throttleWindow.rejectedOverload();
+                               
requestStarters.throttleWindow.rejectedOverload();
                                rejectedOverload = true;
                        }

@@ -450,7 +377,7 @@
                        if((status == RequestSender.TIMED_OUT) ||
                                        (status == 
RequestSender.GENERATED_REJECTED_OVERLOAD)) {
                                if(!rejectedOverload) {
-                                       throttleWindow.rejectedOverload();
+                                       
requestStarters.throttleWindow.rejectedOverload();
                                        rejectedOverload = true;
                                }
                        } else {
@@ -461,8 +388,8 @@
                                        long rtt = System.currentTimeMillis() - 
startTime;

                                        if(!rejectedOverload)
-                                               
throttleWindow.requestCompleted();
-                                       
sskRequestThrottle.successfulCompletion(rtt);
+                                               
requestStarters.throttleWindow.requestCompleted();
+                                       
requestStarters.sskRequestThrottle.successfulCompletion(rtt);
                                }
                        }

@@ -541,7 +468,7 @@
                        }
                        if((!hasReceivedRejectedOverload) && 
is.receivedRejectedOverload()) {
                                hasReceivedRejectedOverload = true;
-                               throttleWindow.rejectedOverload();
+                               
requestStarters.throttleWindow.rejectedOverload();
                        }
                }

@@ -557,7 +484,7 @@
                        }
                        if(is.anyTransfersFailed() && 
(!hasReceivedRejectedOverload)) {
                                hasReceivedRejectedOverload = true; // not 
strictly true but same effect
-                               throttleWindow.rejectedOverload();
+                               
requestStarters.throttleWindow.rejectedOverload();
                        }
                }

@@ -572,9 +499,9 @@
                                long endTime = System.currentTimeMillis();
                                long len = endTime - startTime;

-                               chkInsertThrottle.successfulCompletion(len);
+                               
requestStarters.chkInsertThrottle.successfulCompletion(len);
                                if(!hasReceivedRejectedOverload)
-                                       throttleWindow.requestCompleted();
+                                       
requestStarters.throttleWindow.requestCompleted();
                        }
                }

@@ -652,7 +579,7 @@
                        }
                        if((!hasReceivedRejectedOverload) && 
is.receivedRejectedOverload()) {
                                hasReceivedRejectedOverload = true;
-                               throttleWindow.rejectedOverload();
+                               
requestStarters.throttleWindow.rejectedOverload();
                        }
                }

@@ -678,8 +605,8 @@
                                // It worked!
                                long endTime = System.currentTimeMillis();
                                long rtt = endTime - startTime;
-                               throttleWindow.requestCompleted();
-                               sskInsertThrottle.successfulCompletion(rtt);
+                               
requestStarters.throttleWindow.requestCompleted();
+                               
requestStarters.sskInsertThrottle.successfulCompletion(rtt);
                        }
                }

@@ -739,22 +666,6 @@
                return new HighLevelSimpleClientImpl(this, archiveManager, 
tempBucketFactory, random, !Node.DONT_CACHE_LOCAL_REQUESTS, prioClass);
        }

-       public BaseRequestThrottle getCHKRequestThrottle() {
-               return chkRequestThrottle;
-       }
-
-       public BaseRequestThrottle getCHKInsertThrottle() {
-               return chkInsertThrottle;
-       }
-       
-       public BaseRequestThrottle getSSKRequestThrottle() {
-               return sskRequestThrottle;
-       }
-       
-       public BaseRequestThrottle getSSKInsertThrottle() {
-               return sskInsertThrottle;
-       }
-
        public FCPServer getFCPServer() {
                return fcpServer;
        }
@@ -807,9 +718,9 @@
                SimpleSendableInsert ssi = new SimpleSendableInsert(this, 
block, RequestStarter.MAXIMUM_PRIORITY_CLASS);
                Logger.minor(this, "Queueing random reinsert for "+block+" : 
"+ssi);
                if(block instanceof CHKBlock)
-                       chkPutScheduler.register(ssi);
+                       requestStarters.chkPutScheduler.register(ssi);
                else if(block instanceof SSKBlock)
-                       sskPutScheduler.register(ssi);
+                       requestStarters.sskPutScheduler.register(ssi);
                else
                        Logger.error(this, "Don't know what to do with 
"+block+" should be queued for reinsert");
        }

Added: trunk/freenet/src/freenet/node/RequestStarterGroup.java
===================================================================
--- trunk/freenet/src/freenet/node/RequestStarterGroup.java     2006-08-12 
19:24:25 UTC (rev 10048)
+++ trunk/freenet/src/freenet/node/RequestStarterGroup.java     2006-08-12 
19:54:58 UTC (rev 10049)
@@ -0,0 +1,109 @@
+package freenet.node;
+
+import freenet.client.async.ClientRequestScheduler;
+import freenet.config.Config;
+import freenet.config.SubConfig;
+import freenet.crypt.RandomSource;
+import freenet.support.Logger;
+import freenet.support.math.BootstrappingDecayingRunningAverage;
+
+public class RequestStarterGroup {
+
+       final ThrottleWindowManager throttleWindow;
+       final MyRequestThrottle chkRequestThrottle;
+       final RequestStarter chkRequestStarter;
+       final MyRequestThrottle chkInsertThrottle;
+       final RequestStarter chkInsertStarter;
+       final MyRequestThrottle sskRequestThrottle;
+       final RequestStarter sskRequestStarter;
+       final MyRequestThrottle sskInsertThrottle;
+       final RequestStarter sskInsertStarter;
+
+       public final ClientRequestScheduler chkFetchScheduler;
+       public final ClientRequestScheduler chkPutScheduler;
+       public final ClientRequestScheduler sskFetchScheduler;
+       public final ClientRequestScheduler sskPutScheduler;
+
+       RequestStarterGroup(Node node, NodeClientCore core, int portNumber, 
RandomSource random, Config config) {
+               SubConfig schedulerConfig = new SubConfig("node.scheduler", 
config);
+               
+               throttleWindow = new ThrottleWindowManager(2.0);
+               chkRequestThrottle = new MyRequestThrottle(throttleWindow, 
5000, "CHK Request");
+               chkRequestStarter = new RequestStarter(core, 
chkRequestThrottle, "CHK Request starter ("+portNumber+")", 
node.requestOutputThrottle, node.requestInputThrottle, 
node.localChkFetchBytesSentAverage, node.localChkFetchBytesReceivedAverage);
+               chkFetchScheduler = new ClientRequestScheduler(false, false, 
random, chkRequestStarter, node, schedulerConfig, "CHKrequester");
+               chkRequestStarter.setScheduler(chkFetchScheduler);
+               chkRequestStarter.start();
+               //insertThrottle = new ChainedRequestThrottle(10000, 2.0F, 
requestThrottle);
+               // FIXME reenable the above
+               chkInsertThrottle = new MyRequestThrottle(throttleWindow, 
20000, "CHK Insert");
+               chkInsertStarter = new RequestStarter(core, chkInsertThrottle, 
"CHK Insert starter ("+portNumber+")", node.requestOutputThrottle, 
node.requestInputThrottle, node.localChkInsertBytesSentAverage, 
node.localChkInsertBytesReceivedAverage);
+               chkPutScheduler = new ClientRequestScheduler(true, false, 
random, chkInsertStarter, node, schedulerConfig, "CHKinserter");
+               chkInsertStarter.setScheduler(chkPutScheduler);
+               chkInsertStarter.start();
+
+               sskRequestThrottle = new MyRequestThrottle(throttleWindow, 
5000, "SSK Request");
+               sskRequestStarter = new RequestStarter(core, 
sskRequestThrottle, "SSK Request starter ("+portNumber+")", 
node.requestOutputThrottle, node.requestInputThrottle, 
node.localSskFetchBytesSentAverage, node.localSskFetchBytesReceivedAverage);
+               sskFetchScheduler = new ClientRequestScheduler(false, true, 
random, sskRequestStarter, node, schedulerConfig, "SSKrequester");
+               sskRequestStarter.setScheduler(sskFetchScheduler);
+               sskRequestStarter.start();
+               //insertThrottle = new ChainedRequestThrottle(10000, 2.0F, 
requestThrottle);
+               // FIXME reenable the above
+               sskInsertThrottle = new MyRequestThrottle(throttleWindow, 
20000, "SSK Insert");
+               sskInsertStarter = new RequestStarter(core, sskInsertThrottle, 
"SSK Insert starter ("+portNumber+")", node.requestOutputThrottle, 
node.requestInputThrottle, node.localSskInsertBytesSentAverage, 
node.localSskFetchBytesReceivedAverage);
+               sskPutScheduler = new ClientRequestScheduler(true, true, 
random, sskInsertStarter, node, schedulerConfig, "SSKinserter");
+               sskInsertStarter.setScheduler(sskPutScheduler);
+               sskInsertStarter.start();
+               
+               schedulerConfig.finishedInitialization();
+       }
+       
+       public class MyRequestThrottle implements BaseRequestThrottle {
+
+               private final BootstrappingDecayingRunningAverage 
roundTripTime; 
+               
+               public MyRequestThrottle(ThrottleWindowManager throttleWindow, 
int rtt, String string) {
+                       roundTripTime = new 
BootstrappingDecayingRunningAverage(rtt, 10, 5*60*1000, 10);
+               }
+
+               public synchronized long getDelay() {
+                       double rtt = roundTripTime.currentValue();
+                       double winSizeForMinPacketDelay = rtt / MIN_DELAY;
+                       double _simulatedWindowSize = 
throttleWindow.currentValue();
+                       if (_simulatedWindowSize > winSizeForMinPacketDelay) {
+                               _simulatedWindowSize = winSizeForMinPacketDelay;
+                       }
+                       if (_simulatedWindowSize < 1.0) {
+                               _simulatedWindowSize = 1.0F;
+                       }
+                       // return (long) (_roundTripTime / 
_simulatedWindowSize);
+                       return Math.max(MIN_DELAY, Math.min((long) (rtt / 
_simulatedWindowSize), MAX_DELAY));
+               }
+
+               public synchronized void successfulCompletion(long rtt) {
+                       roundTripTime.report(Math.max(rtt, 10));
+                       Logger.minor(this, "Reported successful completion: 
"+rtt+" on "+this+" avg "+roundTripTime.currentValue());
+               }
+               
+               public String toString() {
+                       return "rtt: "+roundTripTime.currentValue()+" 
_s="+throttleWindow.currentValue();
+               }
+       }
+
+       public BaseRequestThrottle getCHKRequestThrottle() {
+               return chkRequestThrottle;
+       }
+
+       public BaseRequestThrottle getCHKInsertThrottle() {
+               return chkInsertThrottle;
+       }
+       
+       public BaseRequestThrottle getSSKRequestThrottle() {
+               return sskRequestThrottle;
+       }
+       
+       public BaseRequestThrottle getSSKInsertThrottle() {
+               return sskInsertThrottle;
+       }
+
+       
+}

Modified: trunk/freenet/src/freenet/node/fcp/ClientGet.java
===================================================================
--- trunk/freenet/src/freenet/node/fcp/ClientGet.java   2006-08-12 19:24:25 UTC 
(rev 10048)
+++ trunk/freenet/src/freenet/node/fcp/ClientGet.java   2006-08-12 19:54:58 UTC 
(rev 10049)
@@ -114,7 +114,7 @@
                                ret.free();
                                throw e;
                        }
-               getter = new ClientGetter(this, client.core.chkFetchScheduler, 
client.core.sskFetchScheduler, uri, fctx, priorityClass, client, returnBucket);
+               getter = new ClientGetter(this, 
client.core.requestStarters.chkFetchScheduler, 
client.core.requestStarters.sskFetchScheduler, uri, fctx, priorityClass, 
client, returnBucket);
                if(persistenceType != PERSIST_CONNECTION) {
                        FCPMessage msg = persistentTagMessage();
                        client.queueClientRequestMessage(msg, 0);
@@ -171,7 +171,7 @@
                                ret.free();
                                throw e;
                        }
-               getter = new ClientGetter(this, client.core.chkFetchScheduler, 
client.core.sskFetchScheduler, uri, fctx, priorityClass, client, returnBucket);
+               getter = new ClientGetter(this, 
client.core.requestStarters.chkFetchScheduler, 
client.core.requestStarters.sskFetchScheduler, uri, fctx, priorityClass, 
client, returnBucket);
                if(persistenceType != PERSIST_CONNECTION) {
                        FCPMessage msg = persistentTagMessage();
                        client.queueClientRequestMessage(msg, 0);
@@ -235,7 +235,7 @@
                }
                returnBucket = ret;

-               getter = new ClientGetter(this, client.core.chkFetchScheduler, 
client.core.sskFetchScheduler, uri, fctx, priorityClass, client, returnBucket);
+               getter = new ClientGetter(this, 
client.core.requestStarters.chkFetchScheduler, 
client.core.requestStarters.sskFetchScheduler, uri, fctx, priorityClass, 
client, returnBucket);
                if(persistenceType != PERSIST_CONNECTION) {
                        FCPMessage msg = persistentTagMessage();
                        client.queueClientRequestMessage(msg, 0);

Modified: trunk/freenet/src/freenet/node/fcp/ClientPut.java
===================================================================
--- trunk/freenet/src/freenet/node/fcp/ClientPut.java   2006-08-12 19:24:25 UTC 
(rev 10048)
+++ trunk/freenet/src/freenet/node/fcp/ClientPut.java   2006-08-12 19:54:58 UTC 
(rev 10049)
@@ -110,7 +110,7 @@
                this.clientMetadata = cm;
                Logger.minor(this, "data = "+data+", uploadFrom = 
"+ClientPutMessage.uploadFromString(uploadFrom));
                inserter = new ClientPutter(this, data, uri, cm, 
-                               ctx, client.core.chkPutScheduler, 
client.core.sskPutScheduler, priorityClass, 
+                               ctx, 
client.core.requestStarters.chkPutScheduler, 
client.core.requestStarters.sskPutScheduler, priorityClass, 
                                getCHKOnly, isMetadata, client, null);
                if(persistenceType != PERSIST_CONNECTION) {
                        FCPMessage msg = persistentTagMessage();
@@ -157,7 +157,7 @@
                this.clientMetadata = cm;
                Logger.minor(this, "data = "+data+", uploadFrom = 
"+ClientPutMessage.uploadFromString(uploadFrom));
                inserter = new ClientPutter(this, data, uri, cm, 
-                               ctx, client.core.chkPutScheduler, 
client.core.sskPutScheduler, priorityClass, 
+                               ctx, 
client.core.requestStarters.chkPutScheduler, 
client.core.requestStarters.sskPutScheduler, priorityClass, 
                                getCHKOnly, isMetadata, client, null);
                if(persistenceType != PERSIST_CONNECTION) {
                        FCPMessage msg = persistentTagMessage();
@@ -245,8 +245,8 @@
                        throw new PersistenceParseException("shouldn't happen");
                }
                this.clientMetadata = cm;
-               inserter = new ClientPutter(this, data, uri, cm, ctx, 
client.core.chkPutScheduler, 
-                               client.core.sskPutScheduler, priorityClass, 
getCHKOnly, isMetadata, client, fs.subset("progress"));
+               inserter = new ClientPutter(this, data, uri, cm, ctx, 
client.core.requestStarters.chkPutScheduler, 
+                               client.core.requestStarters.sskPutScheduler, 
priorityClass, getCHKOnly, isMetadata, client, fs.subset("progress"));
                if(persistenceType != PERSIST_CONNECTION) {
                        FCPMessage msg = persistentTagMessage();
                        client.queueClientRequestMessage(msg, 0);

Modified: trunk/freenet/src/freenet/node/fcp/ClientPutDir.java
===================================================================
--- trunk/freenet/src/freenet/node/fcp/ClientPutDir.java        2006-08-12 
19:24:25 UTC (rev 10048)
+++ trunk/freenet/src/freenet/node/fcp/ClientPutDir.java        2006-08-12 
19:54:58 UTC (rev 10049)
@@ -37,7 +37,7 @@
                this.defaultName = message.defaultName;
                SimpleManifestPutter p;
                try {
-                       p = new SimpleManifestPutter(this, 
client.core.chkPutScheduler, client.core.sskPutScheduler,
+                       p = new SimpleManifestPutter(this, 
client.core.requestStarters.chkPutScheduler, 
client.core.requestStarters.sskPutScheduler,
                                        manifestElements, priorityClass, uri, 
defaultName, ctx, message.getCHKOnly, client);
                } catch (InserterException e) {
                        onFailure(e, null);
@@ -123,7 +123,7 @@
                SimpleManifestPutter p = null;
                try {
                        if(!finished)
-                               p = new SimpleManifestPutter(this, 
client.core.chkPutScheduler, client.core.sskPutScheduler,
+                               p = new SimpleManifestPutter(this, 
client.core.requestStarters.chkPutScheduler, 
client.core.requestStarters.sskPutScheduler,
                                                manifestElements, 
priorityClass, uri, defaultName, ctx, getCHKOnly, client);
                } catch (InserterException e) {
                        onFailure(e, null);

Modified: trunk/freenet/src/freenet/node/updater/NodeUpdater.java
===================================================================
--- trunk/freenet/src/freenet/node/updater/NodeUpdater.java     2006-08-12 
19:24:25 UTC (rev 10048)
+++ trunk/freenet/src/freenet/node/updater/NodeUpdater.java     2006-08-12 
19:54:58 UTC (rev 10049)
@@ -159,7 +159,7 @@
                        try{
                                if((cg==null)||cg.isCancelled()){
                                        Logger.minor(this, "Scheduling request 
for "+URI.setSuggestedEdition(availableVersion));
-                                       cg = new ClientGetter(this, 
core.chkFetchScheduler, core.sskFetchScheduler, 
+                                       cg = new ClientGetter(this, 
core.requestStarters.chkFetchScheduler, core.requestStarters.sskFetchScheduler, 
                                                        
URI.setSuggestedEdition(availableVersion), ctx, 
RequestStarter.UPDATE_PRIORITY_CLASS, 
                                                        this, new 
ArrayBucket());
                                        toStart = cg;
@@ -509,7 +509,7 @@
                                                        Logger.minor(this, 
"fetcher="+revocationGetter);
                                                        if(revocationGetter != 
null)
                                                                
Logger.minor(this, "revocation fetcher: 
cancelled="+revocationGetter.isCancelled()+", 
finished="+revocationGetter.isFinished());
-                                                       cg = revocationGetter = 
new ClientGetter(NodeUpdater.this, core.chkFetchScheduler, 
core.sskFetchScheduler, revocationURI, ctxRevocation, 
RequestStarter.MAXIMUM_PRIORITY_CLASS, NodeUpdater.this, null);
+                                                       cg = revocationGetter = 
new ClientGetter(NodeUpdater.this, core.requestStarters.chkFetchScheduler, 
core.requestStarters.sskFetchScheduler, revocationURI, ctxRevocation, 
RequestStarter.MAXIMUM_PRIORITY_CLASS, NodeUpdater.this, null);
                                                        Logger.minor(this, 
"Queued another revocation fetcher");
                                                }
                                        }


Reply via email to