Author: toad
Date: 2008-05-30 18:12:19 +0000 (Fri, 30 May 2008)
New Revision: 20145

Modified:
   branches/db4o/freenet/src/freenet/client/async/ClientRequestScheduler.java
   
branches/db4o/freenet/src/freenet/client/async/ClientRequestSchedulerCore.java
   branches/db4o/freenet/src/freenet/node/RequestScheduler.java
   branches/db4o/freenet/src/freenet/node/RequestStarter.java
   branches/db4o/freenet/src/freenet/support/PrioritizedSerialExecutor.java
Log:
Implement request-starter-queue. This is a short list which is filled on the 
database thread. RequestStarter reads from it, and asks CRS to find a better 
non-persistent request if possible. If it does it uses that instead.
Also fix some bugs in removeFirst().

Modified: 
branches/db4o/freenet/src/freenet/client/async/ClientRequestScheduler.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/ClientRequestScheduler.java  
2008-05-30 04:22:05 UTC (rev 20144)
+++ branches/db4o/freenet/src/freenet/client/async/ClientRequestScheduler.java  
2008-05-30 18:12:19 UTC (rev 20145)
@@ -3,6 +3,7 @@
  * http://www.gnu.org/ for further details of the GPL. */
 package freenet.client.async;

+import java.util.LinkedList;
 import java.util.Vector;

 import com.db4o.ObjectContainer;
@@ -242,16 +243,62 @@
                        schedTransient.addPendingKey(key, getter);
        }

-       public synchronized SendableRequest removeFirst() {
+       private synchronized SendableRequest removeFirst() {
+               if(!databaseExecutor.onThread()) {
+                       throw new IllegalStateException("Not on database 
thread!");
+               }
                short fuzz = -1;
                if(PRIORITY_SOFT.equals(choosenPriorityScheduler))
                        fuzz = -1;
                else if(PRIORITY_HARD.equals(choosenPriorityScheduler))
                        fuzz = 0;       
                // schedCore juggles both
-               return schedCore.removeFirst(fuzz, random, offeredKeys, 
starter, schedTransient);
+               return schedCore.removeFirst(fuzz, random, offeredKeys, 
starter, schedTransient, false, (short) -1, -1);
        }
+
+       public SendableRequest getBetterNonPersistentRequest(SendableRequest 
req) {
+               short fuzz = -1;
+               if(PRIORITY_SOFT.equals(choosenPriorityScheduler))
+                       fuzz = -1;
+               else if(PRIORITY_HARD.equals(choosenPriorityScheduler))
+                       fuzz = 0;       
+               if(req == null)
+                       return schedCore.removeFirst(fuzz, random, offeredKeys, 
starter, schedTransient, true, (short) -1, -1);
+               short prio = req.getPriorityClass();
+               int retryCount = req.getRetryCount();
+               return schedCore.removeFirst(fuzz, random, offeredKeys, 
starter, schedTransient, true, prio, retryCount);
+       }

+       private static final int MAX_STARTER_QUEUE_SIZE = 10;
+       
+       private LinkedList starterQueue = new LinkedList();
+       
+       public LinkedList getRequestStarterQueue() {
+               return starterQueue;
+       }
+       
+       public void queueFillRequestStarterQueue() {
+               databaseExecutor.executeNoDupes(requestStarterQueueFiller, 
+                               NativeThread.MAX_PRIORITY, "Fill request 
starter queue");
+       }
+       
+       private Runnable requestStarterQueueFiller = new Runnable() {
+               public void run() {
+                       SendableRequest req = null;
+                       while(true) {
+                               synchronized(starterQueue) {
+                                       if(req != null) {
+                                               starterQueue.add(req);
+                                               req = null;
+                                       }
+                                       if(starterQueue.size() >= 
MAX_STARTER_QUEUE_SIZE) return;
+                               }
+                               req = removeFirst();
+                               if(req == null) return;
+                       }
+               }
+       };
+       
        public void removePendingKey(final SendableGet getter, final boolean 
complain, final Key key) {
                if(getter.persistent()) {
                        boolean dropped = 
schedTransient.removePendingKey(getter, complain, key);

Modified: 
branches/db4o/freenet/src/freenet/client/async/ClientRequestSchedulerCore.java
===================================================================
--- 
branches/db4o/freenet/src/freenet/client/async/ClientRequestSchedulerCore.java  
    2008-05-30 04:22:05 UTC (rev 20144)
+++ 
branches/db4o/freenet/src/freenet/client/async/ClientRequestSchedulerCore.java  
    2008-05-30 18:12:19 UTC (rev 20145)
@@ -102,7 +102,7 @@

        // We pass in the schedTransient to the next two methods so that we can 
select between either of them.

-       private int removeFirstAccordingToPriorities(boolean tryOfferedKeys, 
int fuzz, RandomSource random, OfferedKeysList[] offeredKeys, 
ClientRequestSchedulerNonPersistent schedTransient){
+       private int removeFirstAccordingToPriorities(boolean tryOfferedKeys, 
int fuzz, RandomSource random, OfferedKeysList[] offeredKeys, 
ClientRequestSchedulerNonPersistent schedTransient, boolean transientOnly, 
short maxPrio){
                SortedVectorByNumber result = null;

                short iteration = 0, priority;
@@ -112,9 +112,16 @@
                // TWEAKED will do rand%6,0,1,2,3,4,5,6
                while(iteration++ < RequestStarter.NUMBER_OF_PRIORITY_CLASSES + 
1){
                        priority = fuzz<0 ? 
tweakedPrioritySelector[random.nextInt(tweakedPrioritySelector.length)] : 
prioritySelector[Math.abs(fuzz % prioritySelector.length)];
-                       result = priorities[priority];
+                       if(transientOnly)
+                               result = null;
+                       else
+                               result = priorities[priority];
                        if(result == null)
                                result = schedTransient.priorities[priority];
+                       if(priority > maxPrio) {
+                               fuzz++;
+                               continue; // Don't return because first round 
may be higher with soft scheduling
+                       }
                        if((result != null) && 
                                        (!result.isEmpty()) || (tryOfferedKeys 
&& !offeredKeys[priority].isEmpty())) {
                                if(logMINOR) Logger.minor(this, "using priority 
: "+priority);
@@ -133,14 +140,14 @@
        // We prevent a number of race conditions (e.g. adding a retry count 
and then another 
        // thread removes it cos its empty) ... and in addToGrabArray etc we 
already sync on this.
        // The worry is ... is there any nested locking outside of the 
hierarchy?
-       SendableRequest removeFirst(int fuzz, RandomSource random, 
OfferedKeysList[] offeredKeys, RequestStarter starter, 
ClientRequestSchedulerNonPersistent schedTransient) {
+       SendableRequest removeFirst(int fuzz, RandomSource random, 
OfferedKeysList[] offeredKeys, RequestStarter starter, 
ClientRequestSchedulerNonPersistent schedTransient, boolean transientOnly, 
short maxPrio, int retryCount) {
                // Priorities start at 0
                if(logMINOR) Logger.minor(this, "removeFirst()");
                boolean tryOfferedKeys = offeredKeys != null && 
random.nextBoolean();
-               int choosenPriorityClass = 
removeFirstAccordingToPriorities(tryOfferedKeys, fuzz, random, offeredKeys, 
schedTransient);
+               int choosenPriorityClass = 
removeFirstAccordingToPriorities(tryOfferedKeys, fuzz, random, offeredKeys, 
schedTransient, transientOnly, maxPrio);
                if(choosenPriorityClass == -1 && offeredKeys != null && 
!tryOfferedKeys) {
                        tryOfferedKeys = true;
-                       choosenPriorityClass = 
removeFirstAccordingToPriorities(tryOfferedKeys, fuzz, random, offeredKeys, 
schedTransient);
+                       choosenPriorityClass = 
removeFirstAccordingToPriorities(tryOfferedKeys, fuzz, random, offeredKeys, 
schedTransient, transientOnly, maxPrio);
                }
                if(choosenPriorityClass == -1) {
                        if(logMINOR)
@@ -153,7 +160,9 @@
                        
if(offeredKeys[choosenPriorityClass].hasValidKeys(starter))
                                return offeredKeys[choosenPriorityClass];
                }
-               SortedVectorByNumber perm = priorities[choosenPriorityClass];
+               SortedVectorByNumber perm = null;
+               if(!transientOnly)
+                       perm = priorities[choosenPriorityClass];
                SortedVectorByNumber trans = 
schedTransient.priorities[choosenPriorityClass];
                if(perm == null && trans == null) {
                        if(logMINOR) Logger.minor(this, "No requests to run: 
chosen priority empty");
@@ -164,7 +173,15 @@
                while(true) {
                        int permRetryCount = perm == null ? Integer.MAX_VALUE : 
perm.getNumberByIndex(permRetryIndex);
                        int transRetryCount = trans == null ? Integer.MAX_VALUE 
: trans.getNumberByIndex(transRetryIndex);
-                       if(permRetryCount == -1 && transRetryCount == -1) {
+                       if(choosenPriorityClass == maxPrio) {
+                               if(permRetryCount >= retryCount) {
+                                       permRetryCount = Integer.MAX_VALUE;
+                               }
+                               if(transRetryCount >= retryCount) {
+                                       transRetryCount = Integer.MAX_VALUE;
+                               }
+                       }
+                       if(permRetryCount == Integer.MAX_VALUE && 
transRetryCount == Integer.MAX_VALUE) {
                                if(logMINOR) Logger.minor(this, "No requests to 
run: ran out of retrycounts on chosen priority");
                                return null;
                        }
@@ -177,27 +194,27 @@
                                int permTrackerSize = permRetryTracker.size();
                                int transTrackerSize = transRetryTracker.size();
                                if(permTrackerSize + transTrackerSize == 0) {
-                                       permRetryCount++;
-                                       transRetryCount++;
+                                       permRetryIndex++;
+                                       transRetryIndex++;
                                        continue;
                                }
                                if(random.nextInt(permTrackerSize + 
transTrackerSize) > permTrackerSize) {
                                        chosenTracker = permRetryTracker;
                                        trackerParent = perm;
-                                       permRetryCount++;
+                                       permRetryIndex++;
                                } else {
                                        chosenTracker = transRetryTracker;
                                        trackerParent = trans;
-                                       transRetryCount++;
+                                       transRetryIndex++;
                                }
                        } else if(permRetryCount < transRetryCount) {
                                chosenTracker = 
(SectoredRandomGrabArrayWithInt) perm.getByIndex(permRetryIndex);
                                trackerParent = perm;
-                               permRetryCount++;
+                               permRetryIndex++;
                        } else {
                                chosenTracker = 
(SectoredRandomGrabArrayWithInt) trans.getByIndex(transRetryIndex);
                                trackerParent = trans;
-                               transRetryCount++;
+                               transRetryIndex++;
                        }
                        if(logMINOR)
                                Logger.minor(this, "Got retry count tracker 
"+chosenTracker);

Modified: branches/db4o/freenet/src/freenet/node/RequestScheduler.java
===================================================================
--- branches/db4o/freenet/src/freenet/node/RequestScheduler.java        
2008-05-30 04:22:05 UTC (rev 20144)
+++ branches/db4o/freenet/src/freenet/node/RequestScheduler.java        
2008-05-30 18:12:19 UTC (rev 20145)
@@ -3,12 +3,12 @@
  * http://www.gnu.org/ for further details of the GPL. */
 package freenet.node;

+import java.util.LinkedList;
+
 import freenet.keys.ClientKey;

 public interface RequestScheduler {

-       public SendableRequest removeFirst();
-
        /** Tell the scheduler that a request from a specific RandomGrabArray 
succeeded.
         * Definition of "succeeded" will vary, but the point is most 
schedulers will run another
         * request from the parentGrabArray in the near future on the theory 
that if one works,
@@ -40,5 +40,11 @@
         * your max retry count less than this (and more than -1). */
        public static final int COOLDOWN_RETRIES = 3;
        public long countTransientQueuedRequests();
+
+       public void queueFillRequestStarterQueue();
+
+       public LinkedList getRequestStarterQueue();
+
+       public SendableRequest getBetterNonPersistentRequest(SendableRequest 
req);

 }

Modified: branches/db4o/freenet/src/freenet/node/RequestStarter.java
===================================================================
--- branches/db4o/freenet/src/freenet/node/RequestStarter.java  2008-05-30 
04:22:05 UTC (rev 20144)
+++ branches/db4o/freenet/src/freenet/node/RequestStarter.java  2008-05-30 
18:12:19 UTC (rev 20145)
@@ -4,6 +4,7 @@
 package freenet.node;

 import java.util.HashSet;
+import java.util.LinkedList;

 import freenet.keys.Key;
 import freenet.support.Logger;
@@ -49,6 +50,9 @@
                return !((prio < MAXIMUM_PRIORITY_CLASS) || (prio > 
MINIMUM_PRIORITY_CLASS));
        }

+       /** Queue of requests to run. Added to by jobs on the database thread. 
*/
+       private LinkedList queue;
+       
        final BaseRequestThrottle throttle;
        final TokenBucket inputBucket;
        final TokenBucket outputBucket;
@@ -78,10 +82,12 @@

        void setScheduler(RequestScheduler sched) {
                this.sched = sched;
+               queue = sched.getRequestStarterQueue();
        }

        void start() {
                core.getExecutor().execute(this, name);
+               sched.queueFillRequestStarterQueue();
        }

        final String name;
@@ -112,7 +118,9 @@
                        }
                        sched.moveKeysFromCooldownQueue();
                        boolean logMINOR = Logger.shouldLog(Logger.MINOR, this);
-                       if(req == null) req = sched.removeFirst();
+                       if(req == null) {
+                               req = getRequest();
+                       }
                        if(req != null) {
                                if(logMINOR) Logger.minor(this, "Running "+req);
                                // Wait
@@ -151,7 +159,7 @@
                                // Always take the lock on RequestStarter 
first. AFAICS we don't synchronize on RequestStarter anywhere else.
                                // Nested locks here prevent extra latency when 
there is a race, and therefore allow us to sleep indefinitely
                                synchronized(this) {
-                                       req = sched.removeFirst();
+                                       req = getRequest();
                                        if(req == null) {
                                                try {
                                                        wait(100*1000); // as 
close to indefinite as I'm comfortable with! Toad
@@ -171,6 +179,26 @@
                }
        }

+       private SendableRequest getRequest() {
+               SendableRequest req;
+               synchronized(queue) {
+                       req = (SendableRequest) queue.removeFirst();
+               }
+               SendableRequest betterReq = 
sched.getBetterNonPersistentRequest(req);
+               if(req != null) {
+                       if(betterReq != null) {
+                               synchronized(queue) {
+                                       queue.addFirst(req);
+                               }
+                               req = null;
+                       }
+               }
+               if(req == null) req = betterReq;
+               else
+                       sched.queueFillRequestStarterQueue();
+               return req;
+       }
+
        /**
         * All Key's we are currently fetching. 
         * Locally originated requests only, avoids some complications with 
HTL, 

Modified: 
branches/db4o/freenet/src/freenet/support/PrioritizedSerialExecutor.java
===================================================================
--- branches/db4o/freenet/src/freenet/support/PrioritizedSerialExecutor.java    
2008-05-30 04:22:05 UTC (rev 20144)
+++ branches/db4o/freenet/src/freenet/support/PrioritizedSerialExecutor.java    
2008-05-30 18:12:19 UTC (rev 20145)
@@ -121,6 +121,24 @@
                }
        }

+       public void executeNoDupes(Runnable job, int prio, String jobName) {
+               boolean logMINOR = Logger.shouldLog(Logger.MINOR, this);
+               synchronized(jobs) {
+                       if(logMINOR) 
+                               Logger.minor(this, "Running "+jobName+" : 
"+job+" priority "+prio+" running="+running+" waiting="+waiting);
+                       if(jobs[prio].contains(job)) {
+                               if(logMINOR)
+                                       Logger.minor(this, "Not adding 
duplicate job "+job);
+                               return;
+                       }
+                       jobs[prio].addLast(job);
+                       jobs.notifyAll();
+                       if(!running && realExecutor != null) {
+                               reallyStart(logMINOR);
+                       }
+               }
+       }
+
        public void execute(Runnable job, String jobName, boolean fromTicker) {
                execute(job, jobName);
        }


Reply via email to