Author: toad Date: 2008-05-30 18:12:19 +0000 (Fri, 30 May 2008) New Revision: 20145
Modified: branches/db4o/freenet/src/freenet/client/async/ClientRequestScheduler.java branches/db4o/freenet/src/freenet/client/async/ClientRequestSchedulerCore.java branches/db4o/freenet/src/freenet/node/RequestScheduler.java branches/db4o/freenet/src/freenet/node/RequestStarter.java branches/db4o/freenet/src/freenet/support/PrioritizedSerialExecutor.java Log: Implement request-starter-queue. This is a short list which is filled on the database thread. RequestStarter reads from it, and asks CRS to find a better non-persistent request if possible. If it does it uses that instead. Also fix some bugs in removeFirst(). Modified: branches/db4o/freenet/src/freenet/client/async/ClientRequestScheduler.java =================================================================== --- branches/db4o/freenet/src/freenet/client/async/ClientRequestScheduler.java 2008-05-30 04:22:05 UTC (rev 20144) +++ branches/db4o/freenet/src/freenet/client/async/ClientRequestScheduler.java 2008-05-30 18:12:19 UTC (rev 20145) @@ -3,6 +3,7 @@ * http://www.gnu.org/ for further details of the GPL. */ package freenet.client.async; +import java.util.LinkedList; import java.util.Vector; import com.db4o.ObjectContainer; @@ -242,16 +243,62 @@ schedTransient.addPendingKey(key, getter); } - public synchronized SendableRequest removeFirst() { + private synchronized SendableRequest removeFirst() { + if(!databaseExecutor.onThread()) { + throw new IllegalStateException("Not on database thread!"); + } short fuzz = -1; if(PRIORITY_SOFT.equals(choosenPriorityScheduler)) fuzz = -1; else if(PRIORITY_HARD.equals(choosenPriorityScheduler)) fuzz = 0; // schedCore juggles both - return schedCore.removeFirst(fuzz, random, offeredKeys, starter, schedTransient); + return schedCore.removeFirst(fuzz, random, offeredKeys, starter, schedTransient, false, (short) -1, -1); } + + public SendableRequest getBetterNonPersistentRequest(SendableRequest req) { + short fuzz = -1; + if(PRIORITY_SOFT.equals(choosenPriorityScheduler)) + fuzz = -1; + else if(PRIORITY_HARD.equals(choosenPriorityScheduler)) + fuzz = 0; + if(req == null) + return schedCore.removeFirst(fuzz, random, offeredKeys, starter, schedTransient, true, (short) -1, -1); + short prio = req.getPriorityClass(); + int retryCount = req.getRetryCount(); + return schedCore.removeFirst(fuzz, random, offeredKeys, starter, schedTransient, true, prio, retryCount); + } + private static final int MAX_STARTER_QUEUE_SIZE = 10; + + private LinkedList starterQueue = new LinkedList(); + + public LinkedList getRequestStarterQueue() { + return starterQueue; + } + + public void queueFillRequestStarterQueue() { + databaseExecutor.executeNoDupes(requestStarterQueueFiller, + NativeThread.MAX_PRIORITY, "Fill request starter queue"); + } + + private Runnable requestStarterQueueFiller = new Runnable() { + public void run() { + SendableRequest req = null; + while(true) { + synchronized(starterQueue) { + if(req != null) { + starterQueue.add(req); + req = null; + } + if(starterQueue.size() >= MAX_STARTER_QUEUE_SIZE) return; + } + req = removeFirst(); + if(req == null) return; + } + } + }; + public void removePendingKey(final SendableGet getter, final boolean complain, final Key key) { if(getter.persistent()) { boolean dropped = schedTransient.removePendingKey(getter, complain, key); Modified: branches/db4o/freenet/src/freenet/client/async/ClientRequestSchedulerCore.java =================================================================== --- branches/db4o/freenet/src/freenet/client/async/ClientRequestSchedulerCore.java 2008-05-30 04:22:05 UTC (rev 20144) +++ branches/db4o/freenet/src/freenet/client/async/ClientRequestSchedulerCore.java 2008-05-30 18:12:19 UTC (rev 20145) @@ -102,7 +102,7 @@ // We pass in the schedTransient to the next two methods so that we can select between either of them. - private int removeFirstAccordingToPriorities(boolean tryOfferedKeys, int fuzz, RandomSource random, OfferedKeysList[] offeredKeys, ClientRequestSchedulerNonPersistent schedTransient){ + private int removeFirstAccordingToPriorities(boolean tryOfferedKeys, int fuzz, RandomSource random, OfferedKeysList[] offeredKeys, ClientRequestSchedulerNonPersistent schedTransient, boolean transientOnly, short maxPrio){ SortedVectorByNumber result = null; short iteration = 0, priority; @@ -112,9 +112,16 @@ // TWEAKED will do rand%6,0,1,2,3,4,5,6 while(iteration++ < RequestStarter.NUMBER_OF_PRIORITY_CLASSES + 1){ priority = fuzz<0 ? tweakedPrioritySelector[random.nextInt(tweakedPrioritySelector.length)] : prioritySelector[Math.abs(fuzz % prioritySelector.length)]; - result = priorities[priority]; + if(transientOnly) + result = null; + else + result = priorities[priority]; if(result == null) result = schedTransient.priorities[priority]; + if(priority > maxPrio) { + fuzz++; + continue; // Don't return because first round may be higher with soft scheduling + } if((result != null) && (!result.isEmpty()) || (tryOfferedKeys && !offeredKeys[priority].isEmpty())) { if(logMINOR) Logger.minor(this, "using priority : "+priority); @@ -133,14 +140,14 @@ // We prevent a number of race conditions (e.g. adding a retry count and then another // thread removes it cos its empty) ... and in addToGrabArray etc we already sync on this. // The worry is ... is there any nested locking outside of the hierarchy? - SendableRequest removeFirst(int fuzz, RandomSource random, OfferedKeysList[] offeredKeys, RequestStarter starter, ClientRequestSchedulerNonPersistent schedTransient) { + SendableRequest removeFirst(int fuzz, RandomSource random, OfferedKeysList[] offeredKeys, RequestStarter starter, ClientRequestSchedulerNonPersistent schedTransient, boolean transientOnly, short maxPrio, int retryCount) { // Priorities start at 0 if(logMINOR) Logger.minor(this, "removeFirst()"); boolean tryOfferedKeys = offeredKeys != null && random.nextBoolean(); - int choosenPriorityClass = removeFirstAccordingToPriorities(tryOfferedKeys, fuzz, random, offeredKeys, schedTransient); + int choosenPriorityClass = removeFirstAccordingToPriorities(tryOfferedKeys, fuzz, random, offeredKeys, schedTransient, transientOnly, maxPrio); if(choosenPriorityClass == -1 && offeredKeys != null && !tryOfferedKeys) { tryOfferedKeys = true; - choosenPriorityClass = removeFirstAccordingToPriorities(tryOfferedKeys, fuzz, random, offeredKeys, schedTransient); + choosenPriorityClass = removeFirstAccordingToPriorities(tryOfferedKeys, fuzz, random, offeredKeys, schedTransient, transientOnly, maxPrio); } if(choosenPriorityClass == -1) { if(logMINOR) @@ -153,7 +160,9 @@ if(offeredKeys[choosenPriorityClass].hasValidKeys(starter)) return offeredKeys[choosenPriorityClass]; } - SortedVectorByNumber perm = priorities[choosenPriorityClass]; + SortedVectorByNumber perm = null; + if(!transientOnly) + perm = priorities[choosenPriorityClass]; SortedVectorByNumber trans = schedTransient.priorities[choosenPriorityClass]; if(perm == null && trans == null) { if(logMINOR) Logger.minor(this, "No requests to run: chosen priority empty"); @@ -164,7 +173,15 @@ while(true) { int permRetryCount = perm == null ? Integer.MAX_VALUE : perm.getNumberByIndex(permRetryIndex); int transRetryCount = trans == null ? Integer.MAX_VALUE : trans.getNumberByIndex(transRetryIndex); - if(permRetryCount == -1 && transRetryCount == -1) { + if(choosenPriorityClass == maxPrio) { + if(permRetryCount >= retryCount) { + permRetryCount = Integer.MAX_VALUE; + } + if(transRetryCount >= retryCount) { + transRetryCount = Integer.MAX_VALUE; + } + } + if(permRetryCount == Integer.MAX_VALUE && transRetryCount == Integer.MAX_VALUE) { if(logMINOR) Logger.minor(this, "No requests to run: ran out of retrycounts on chosen priority"); return null; } @@ -177,27 +194,27 @@ int permTrackerSize = permRetryTracker.size(); int transTrackerSize = transRetryTracker.size(); if(permTrackerSize + transTrackerSize == 0) { - permRetryCount++; - transRetryCount++; + permRetryIndex++; + transRetryIndex++; continue; } if(random.nextInt(permTrackerSize + transTrackerSize) > permTrackerSize) { chosenTracker = permRetryTracker; trackerParent = perm; - permRetryCount++; + permRetryIndex++; } else { chosenTracker = transRetryTracker; trackerParent = trans; - transRetryCount++; + transRetryIndex++; } } else if(permRetryCount < transRetryCount) { chosenTracker = (SectoredRandomGrabArrayWithInt) perm.getByIndex(permRetryIndex); trackerParent = perm; - permRetryCount++; + permRetryIndex++; } else { chosenTracker = (SectoredRandomGrabArrayWithInt) trans.getByIndex(transRetryIndex); trackerParent = trans; - transRetryCount++; + transRetryIndex++; } if(logMINOR) Logger.minor(this, "Got retry count tracker "+chosenTracker); Modified: branches/db4o/freenet/src/freenet/node/RequestScheduler.java =================================================================== --- branches/db4o/freenet/src/freenet/node/RequestScheduler.java 2008-05-30 04:22:05 UTC (rev 20144) +++ branches/db4o/freenet/src/freenet/node/RequestScheduler.java 2008-05-30 18:12:19 UTC (rev 20145) @@ -3,12 +3,12 @@ * http://www.gnu.org/ for further details of the GPL. */ package freenet.node; +import java.util.LinkedList; + import freenet.keys.ClientKey; public interface RequestScheduler { - public SendableRequest removeFirst(); - /** Tell the scheduler that a request from a specific RandomGrabArray succeeded. * Definition of "succeeded" will vary, but the point is most schedulers will run another * request from the parentGrabArray in the near future on the theory that if one works, @@ -40,5 +40,11 @@ * your max retry count less than this (and more than -1). */ public static final int COOLDOWN_RETRIES = 3; public long countTransientQueuedRequests(); + + public void queueFillRequestStarterQueue(); + + public LinkedList getRequestStarterQueue(); + + public SendableRequest getBetterNonPersistentRequest(SendableRequest req); } Modified: branches/db4o/freenet/src/freenet/node/RequestStarter.java =================================================================== --- branches/db4o/freenet/src/freenet/node/RequestStarter.java 2008-05-30 04:22:05 UTC (rev 20144) +++ branches/db4o/freenet/src/freenet/node/RequestStarter.java 2008-05-30 18:12:19 UTC (rev 20145) @@ -4,6 +4,7 @@ package freenet.node; import java.util.HashSet; +import java.util.LinkedList; import freenet.keys.Key; import freenet.support.Logger; @@ -49,6 +50,9 @@ return !((prio < MAXIMUM_PRIORITY_CLASS) || (prio > MINIMUM_PRIORITY_CLASS)); } + /** Queue of requests to run. Added to by jobs on the database thread. */ + private LinkedList queue; + final BaseRequestThrottle throttle; final TokenBucket inputBucket; final TokenBucket outputBucket; @@ -78,10 +82,12 @@ void setScheduler(RequestScheduler sched) { this.sched = sched; + queue = sched.getRequestStarterQueue(); } void start() { core.getExecutor().execute(this, name); + sched.queueFillRequestStarterQueue(); } final String name; @@ -112,7 +118,9 @@ } sched.moveKeysFromCooldownQueue(); boolean logMINOR = Logger.shouldLog(Logger.MINOR, this); - if(req == null) req = sched.removeFirst(); + if(req == null) { + req = getRequest(); + } if(req != null) { if(logMINOR) Logger.minor(this, "Running "+req); // Wait @@ -151,7 +159,7 @@ // Always take the lock on RequestStarter first. AFAICS we don't synchronize on RequestStarter anywhere else. // Nested locks here prevent extra latency when there is a race, and therefore allow us to sleep indefinitely synchronized(this) { - req = sched.removeFirst(); + req = getRequest(); if(req == null) { try { wait(100*1000); // as close to indefinite as I'm comfortable with! Toad @@ -171,6 +179,26 @@ } } + private SendableRequest getRequest() { + SendableRequest req; + synchronized(queue) { + req = (SendableRequest) queue.removeFirst(); + } + SendableRequest betterReq = sched.getBetterNonPersistentRequest(req); + if(req != null) { + if(betterReq != null) { + synchronized(queue) { + queue.addFirst(req); + } + req = null; + } + } + if(req == null) req = betterReq; + else + sched.queueFillRequestStarterQueue(); + return req; + } + /** * All Key's we are currently fetching. * Locally originated requests only, avoids some complications with HTL, Modified: branches/db4o/freenet/src/freenet/support/PrioritizedSerialExecutor.java =================================================================== --- branches/db4o/freenet/src/freenet/support/PrioritizedSerialExecutor.java 2008-05-30 04:22:05 UTC (rev 20144) +++ branches/db4o/freenet/src/freenet/support/PrioritizedSerialExecutor.java 2008-05-30 18:12:19 UTC (rev 20145) @@ -121,6 +121,24 @@ } } + public void executeNoDupes(Runnable job, int prio, String jobName) { + boolean logMINOR = Logger.shouldLog(Logger.MINOR, this); + synchronized(jobs) { + if(logMINOR) + Logger.minor(this, "Running "+jobName+" : "+job+" priority "+prio+" running="+running+" waiting="+waiting); + if(jobs[prio].contains(job)) { + if(logMINOR) + Logger.minor(this, "Not adding duplicate job "+job); + return; + } + jobs[prio].addLast(job); + jobs.notifyAll(); + if(!running && realExecutor != null) { + reallyStart(logMINOR); + } + } + } + public void execute(Runnable job, String jobName, boolean fromTicker) { execute(job, jobName); }
