Author: toad
Date: 2009-02-14 16:10:08 +0000 (Sat, 14 Feb 2009)
New Revision: 25651

Modified:
   branches/db4o/freenet/src/freenet/client/async/ClientRequestScheduler.java
   branches/db4o/freenet/src/freenet/client/async/CooldownQueue.java
   branches/db4o/freenet/src/freenet/client/async/PersistentCooldownQueue.java
   branches/db4o/freenet/src/freenet/client/async/RequestCooldownQueue.java
   branches/db4o/freenet/src/freenet/node/RequestScheduler.java
   branches/db4o/freenet/src/freenet/node/RequestStarter.java
Log:
Move moveKeysFromCooldownQueue() inside fillRequestQueue(), and wake up the 
starter if needed after processing the queue.
When we decide not to fill the request queue for a while, take into account 
when the first cooldown queue item will be due for reinstatement (ignoring 
anything after our maximum don't-fill interval elapses).
Don't skip (||) the transient cooldown queue when there is stuff on the 
persistent cooldown queue.


Modified: 
branches/db4o/freenet/src/freenet/client/async/ClientRequestScheduler.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/ClientRequestScheduler.java  
2009-02-14 16:04:55 UTC (rev 25650)
+++ branches/db4o/freenet/src/freenet/client/async/ClientRequestScheduler.java  
2009-02-14 16:10:08 UTC (rev 25651)
@@ -509,6 +509,7 @@
         * the above limit. So we have a higher limit before we complain that 
         * something odd is happening.. (e.g. leaking 
PersistentChosenRequest's). */
        static final int WARNING_STARTER_QUEUE_SIZE = 800;
+       private static final long WAIT_AFTER_NOTHING_TO_START = 60*1000;
        
        private transient LinkedList<PersistentChosenRequest> starterQueue = 
new LinkedList<PersistentChosenRequest>();
        
@@ -570,13 +571,11 @@
                }
        }
        
-       /* If new stuff is added, maybeFillStarterQueue will be called anyway,
-        * so it is safe to not run the queue filler regularly. */
-       private long lastFilledStarterQueueEmpty = -1;
+       private long nextQueueFillRequestStarterQueue = -1;
        
        public void queueFillRequestStarterQueue() {
-               if(lastFilledStarterQueueEmpty > 0 &&
-                               System.currentTimeMillis() - 
lastFilledStarterQueueEmpty < 60*1000)
+               if(nextQueueFillRequestStarterQueue > 0 &&
+                               System.currentTimeMillis() < 
nextQueueFillRequestStarterQueue)
                        return;
                if(starterQueueLength() > MAX_STARTER_QUEUE_SIZE / 2)
                        return;
@@ -667,12 +666,19 @@
        
        private void fillRequestStarterQueue(ObjectContainer container, 
ClientContext context, SendableRequest[] mightBeActive) {
                if(logMINOR) Logger.minor(this, "Filling request queue... 
(SSK="+isSSKScheduler+" insert="+isInsertScheduler);
+               boolean wakeUp = false;
+               long noLaterThan = Long.MAX_VALUE;
+               noLaterThan = 
moveKeysFromCooldownQueue(persistentCooldownQueue, true, container);
+               noLaterThan = Math.min(noLaterThan, 
moveKeysFromCooldownQueue(transientCooldownQueue, false, container));
+               if(noLaterThan != Long.MAX_VALUE)
+                       wakeUp = true;
                short fuzz = -1;
                if(PRIORITY_SOFT.equals(choosenPriorityScheduler))
                        fuzz = -1;
                else if(PRIORITY_HARD.equals(choosenPriorityScheduler))
                        fuzz = 0;
                boolean added = false;
+               boolean finished = false;
                synchronized(starterQueue) {
                        if(logMINOR && (!isSSKScheduler) && 
(!isInsertScheduler)) {
                                Logger.minor(this, "Scheduling CHK fetches...");
@@ -711,12 +717,16 @@
                        if(length >= MAX_STARTER_QUEUE_SIZE) {
                                if(length >= WARNING_STARTER_QUEUE_SIZE)
                                        Logger.error(this, "Queue already full: 
"+length);
-                               return;
+                               finished = true;
                        }
                        if(length > MAX_STARTER_QUEUE_SIZE * 3 / 4) {
-                               return;
+                               finished = true;
                        }
                }
+               if(finished) {
+                       if(wakeUp) starter.wakeUp();
+                       return;
+               }
                
                if((!isSSKScheduler) && (!isInsertScheduler)) {
                        Logger.minor(this, "Scheduling CHK fetches...");
@@ -725,9 +735,15 @@
                        SendableRequest request = 
schedCore.removeFirstInner(fuzz, random, offeredKeys, starter, schedTransient, 
false, true, Short.MAX_VALUE, Integer.MAX_VALUE, context, container);
                        if(request == null) {
                                synchronized(ClientRequestScheduler.this) {
-                                       if(!added) 
-                                               lastFilledStarterQueueEmpty = 
System.currentTimeMillis();
+                                       // Don't wake up for a while, but no 
later than the time we expect the next item to come off the cooldown queue
+                                       if(!added) {
+                                               if(noLaterThan != 
Long.MAX_VALUE)
+                                                       
nextQueueFillRequestStarterQueue = Math.min(System.currentTimeMillis() + 
WAIT_AFTER_NOTHING_TO_START, noLaterThan + 1);
+                                               else
+                                                       
nextQueueFillRequestStarterQueue = System.currentTimeMillis() + 
WAIT_AFTER_NOTHING_TO_START;
+                                       }
                                }
+                               if(wakeUp) starter.wakeUp();
                                return;
                        }
                        added = true;
@@ -975,22 +991,8 @@
                        return transientCooldownQueue.add(key.getNodeKey(), 
getter, null);
        }
 
-       private final DBJob moveFromCooldownJob = new DBJob() {
-               
-               public void run(ObjectContainer container, ClientContext 
context) {
-                       if(moveKeysFromCooldownQueue(persistentCooldownQueue, 
true, container) ||
-                                       
moveKeysFromCooldownQueue(transientCooldownQueue, false, container))
-                               starter.wakeUp();
-               }
-               
-       };
-       
-       public void moveKeysFromCooldownQueue() {
-               jobRunner.queue(moveFromCooldownJob, 
NativeThread.NORM_PRIORITY, true);
-       }
-       
-       private boolean moveKeysFromCooldownQueue(CooldownQueue queue, boolean 
persistent, ObjectContainer container) {
-               if(queue == null) return false;
+       private long moveKeysFromCooldownQueue(CooldownQueue queue, boolean 
persistent, ObjectContainer container) {
+               if(queue == null) return Long.MAX_VALUE;
                long now = System.currentTimeMillis();
                /*
                 * Only go around once. We will be called again. If there are 
keys to move, then RequestStarter will not
@@ -1010,8 +1012,12 @@
                 * nodes with little RAM it would be bad...
                 */
                final int MAX_KEYS = 20;
-               Key[] keys = queue.removeKeyBefore(now, container, MAX_KEYS);
-               if(keys == null) return false;
+               Object ret = queue.removeKeyBefore(now, 
WAIT_AFTER_NOTHING_TO_START, container, MAX_KEYS);
+               if(ret == null) return Long.MAX_VALUE;
+               if(ret instanceof Long) {
+                       return (Long) ret;
+               }
+               Key[] keys = (Key[]) ret;
                for(int j=0;j<keys.length;j++) {
                        Key key = keys[j];
                        if(persistent)
@@ -1039,7 +1045,7 @@
                        if(persistent)
                                container.deactivate(key, 5);
                }
-               return true;
+               return Long.MAX_VALUE;
        }
 
        public long countTransientQueuedRequests() {

Modified: branches/db4o/freenet/src/freenet/client/async/CooldownQueue.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/CooldownQueue.java   
2009-02-14 16:04:55 UTC (rev 25650)
+++ branches/db4o/freenet/src/freenet/client/async/CooldownQueue.java   
2009-02-14 16:10:08 UTC (rev 25651)
@@ -17,9 +17,13 @@
 
        /**
         * Remove a key whose cooldown time has passed.
-        * @return Either an array of Key's or null if no keys have passed 
their cooldown time.
+        * @param dontCareAfter If the next item to come out of the cooldown
+        * queue is more than this many millis after now, return null.
+        * @return Either an array of Key's or a Long indicating the time at
+        * which the next key will be removed from the cooldown, or null if 
+        * no keys have passed their cooldown time.
         */
-       public abstract Key[] removeKeyBefore(long now, ObjectContainer 
container, int maxKeys);
+       public abstract Object removeKeyBefore(long now, long dontCareAfter, 
ObjectContainer container, int maxKeys);
 
        /**
         * @return True if the key was found.

Modified: 
branches/db4o/freenet/src/freenet/client/async/PersistentCooldownQueue.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/PersistentCooldownQueue.java 
2009-02-14 16:04:55 UTC (rev 25650)
+++ branches/db4o/freenet/src/freenet/client/async/PersistentCooldownQueue.java 
2009-02-14 16:10:08 UTC (rev 25651)
@@ -84,7 +84,7 @@
                return found;
        }
 
-       public Key[] removeKeyBefore(final long now, ObjectContainer container, 
int maxCount) {
+       public Object removeKeyBefore(final long now, long dontCareAfterMillis, 
ObjectContainer container, int maxCount) {
                // Will be called repeatedly until no more keys are returned, 
so it doesn't
                // matter very much if they're not in order.
                
@@ -124,7 +124,19 @@
                                container.delete(i);
                                v.add(i.key);
                        }
-                       return (Key[]) v.toArray(new Key[v.size()]);
+                       if(!v.isEmpty()) {
+                               return (Key[]) v.toArray(new Key[v.size()]);
+                       } else {
+                               query = container.query();
+                               
query.descend("time").orderAscending().constrain(new Long(now + 
dontCareAfterMillis)).smaller().
+                                       
and(query.descend("parent").constrain(this).identity());
+                               results = query.execute();
+                               if(results.hasNext()) {
+                                       return ((PersistentCooldownQueueItem) 
results.next()).time;
+                               } else {
+                                       return null;
+                               }
+                       }
                } else {
                        long tEnd = System.currentTimeMillis();
                        if(tEnd - tStart > 1000)

Modified: 
branches/db4o/freenet/src/freenet/client/async/RequestCooldownQueue.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/RequestCooldownQueue.java    
2009-02-14 16:04:55 UTC (rev 25650)
+++ branches/db4o/freenet/src/freenet/client/async/RequestCooldownQueue.java    
2009-02-14 16:10:08 UTC (rev 25651)
@@ -117,7 +117,7 @@
        /* (non-Javadoc)
         * @see freenet.client.async.CooldownQueue#removeKeyBefore(long)
         */
-       public synchronized Key[] removeKeyBefore(long now, ObjectContainer 
container, int maxKeys) {
+       public synchronized Object removeKeyBefore(long now, long 
dontCareAfterMillis, ObjectContainer container, int maxKeys) {
                ArrayList v = new ArrayList();
                logMINOR = Logger.shouldLog(Logger.MINOR, this);
                boolean foundIT = false;
@@ -133,7 +133,10 @@
                while(true) {
                        if(startPtr == endPtr) {
                                if(logMINOR) Logger.minor(this, "No keys 
queued");
-                               return (Key[]) v.toArray(new Key[v.size()]);
+                               if(!v.isEmpty())
+                                       return (Key[]) v.toArray(new 
Key[v.size()]);
+                               else
+                                       return null;
                        }
                        long time = times[startPtr];
                        Key key = keys[startPtr];
@@ -148,7 +151,12 @@
                        } else {
                                if(time > now) {
                                        if(logMINOR) Logger.minor(this, "First 
key is later at time "+time);
-                                       return (Key[]) v.toArray(new 
Key[v.size()]);
+                                       if(!v.isEmpty())
+                                               return (Key[]) v.toArray(new 
Key[v.size()]);
+                                       else if(time < (now + 
dontCareAfterMillis)) 
+                                               return Long.valueOf(time);
+                                       else
+                                               return null;
                                }
                                times[startPtr] = 0;
                                keys[startPtr] = null;
@@ -158,8 +166,11 @@
                        }
                        if(logMINOR) Logger.minor(this, "Returning key "+key);
                        v.add(key);
-                       if(v.size() == maxKeys)
-                               return (Key[]) v.toArray(new Key[v.size()]);
+                       if(v.size() == maxKeys) {
+                               if(!v.isEmpty())
+                                       return (Key[]) v.toArray(new 
Key[v.size()]);
+                               else return null;
+                       }
                }
        }
        

Modified: branches/db4o/freenet/src/freenet/node/RequestScheduler.java
===================================================================
--- branches/db4o/freenet/src/freenet/node/RequestScheduler.java        
2009-02-14 16:04:55 UTC (rev 25650)
+++ branches/db4o/freenet/src/freenet/node/RequestScheduler.java        
2009-02-14 16:10:08 UTC (rev 25651)
@@ -34,12 +34,6 @@
         */
        long queueCooldown(ClientKey key, SendableGet getter, ObjectContainer 
container);
 
-       /**
-        * Remove keys from the cooldown queue who have now served their time 
and can be requested 
-        * again.
-        */
-       public void moveKeysFromCooldownQueue();
-       
        /** Once a key has been requested a few times, don't request it again 
for 30 minutes. 
         * To do so would be pointless given ULPRs, and just waste bandwidth. */
        public static final long COOLDOWN_PERIOD = 30*60*1000;

Modified: branches/db4o/freenet/src/freenet/node/RequestStarter.java
===================================================================
--- branches/db4o/freenet/src/freenet/node/RequestStarter.java  2009-02-14 
16:04:55 UTC (rev 25650)
+++ branches/db4o/freenet/src/freenet/node/RequestStarter.java  2009-02-14 
16:10:08 UTC (rev 25651)
@@ -115,8 +115,6 @@
                                }
                                continue;
                        }
-                       if(!isInsert)
-                               sched.moveKeysFromCooldownQueue();
                        boolean logMINOR = Logger.shouldLog(Logger.MINOR, this);
                        if(req == null) {
                                req = sched.grabRequest();

_______________________________________________
cvs mailing list
[email protected]
http://emu.freenetproject.org/cgi-bin/mailman/listinfo/cvs

Reply via email to