Author: toad
Date: 2009-02-14 16:10:08 +0000 (Sat, 14 Feb 2009)
New Revision: 25651
Modified:
branches/db4o/freenet/src/freenet/client/async/ClientRequestScheduler.java
branches/db4o/freenet/src/freenet/client/async/CooldownQueue.java
branches/db4o/freenet/src/freenet/client/async/PersistentCooldownQueue.java
branches/db4o/freenet/src/freenet/client/async/RequestCooldownQueue.java
branches/db4o/freenet/src/freenet/node/RequestScheduler.java
branches/db4o/freenet/src/freenet/node/RequestStarter.java
Log:
Move moveKeysFromCooldownQueue() inside fillRequestQueue(), and wake up the
starter if needed after processing the queue.
When we decide not to fill the request queue for a while, take into account
when the first cooldown queue item will be due for reinstatement (ignoring
anything after our maximum don't-fill interval elapses).
Don't skip (||) the transient cooldown queue when there is stuff on the
persistent cooldown queue.
Modified:
branches/db4o/freenet/src/freenet/client/async/ClientRequestScheduler.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/ClientRequestScheduler.java
2009-02-14 16:04:55 UTC (rev 25650)
+++ branches/db4o/freenet/src/freenet/client/async/ClientRequestScheduler.java
2009-02-14 16:10:08 UTC (rev 25651)
@@ -509,6 +509,7 @@
* the above limit. So we have a higher limit before we complain that
* something odd is happening.. (e.g. leaking
PersistentChosenRequest's). */
static final int WARNING_STARTER_QUEUE_SIZE = 800;
+ private static final long WAIT_AFTER_NOTHING_TO_START = 60*1000;
private transient LinkedList<PersistentChosenRequest> starterQueue =
new LinkedList<PersistentChosenRequest>();
@@ -570,13 +571,11 @@
}
}
- /* If new stuff is added, maybeFillStarterQueue will be called anyway,
- * so it is safe to not run the queue filler regularly. */
- private long lastFilledStarterQueueEmpty = -1;
+ private long nextQueueFillRequestStarterQueue = -1;
public void queueFillRequestStarterQueue() {
- if(lastFilledStarterQueueEmpty > 0 &&
- System.currentTimeMillis() -
lastFilledStarterQueueEmpty < 60*1000)
+ if(nextQueueFillRequestStarterQueue > 0 &&
+ System.currentTimeMillis() <
nextQueueFillRequestStarterQueue)
return;
if(starterQueueLength() > MAX_STARTER_QUEUE_SIZE / 2)
return;
@@ -667,12 +666,19 @@
private void fillRequestStarterQueue(ObjectContainer container,
ClientContext context, SendableRequest[] mightBeActive) {
if(logMINOR) Logger.minor(this, "Filling request queue...
(SSK="+isSSKScheduler+" insert="+isInsertScheduler);
+ boolean wakeUp = false;
+ long noLaterThan = Long.MAX_VALUE;
+ noLaterThan =
moveKeysFromCooldownQueue(persistentCooldownQueue, true, container);
+ noLaterThan = Math.min(noLaterThan,
moveKeysFromCooldownQueue(transientCooldownQueue, false, container));
+ if(noLaterThan != Long.MAX_VALUE)
+ wakeUp = true;
short fuzz = -1;
if(PRIORITY_SOFT.equals(choosenPriorityScheduler))
fuzz = -1;
else if(PRIORITY_HARD.equals(choosenPriorityScheduler))
fuzz = 0;
boolean added = false;
+ boolean finished = false;
synchronized(starterQueue) {
if(logMINOR && (!isSSKScheduler) &&
(!isInsertScheduler)) {
Logger.minor(this, "Scheduling CHK fetches...");
@@ -711,12 +717,16 @@
if(length >= MAX_STARTER_QUEUE_SIZE) {
if(length >= WARNING_STARTER_QUEUE_SIZE)
Logger.error(this, "Queue already full:
"+length);
- return;
+ finished = true;
}
if(length > MAX_STARTER_QUEUE_SIZE * 3 / 4) {
- return;
+ finished = true;
}
}
+ if(finished) {
+ if(wakeUp) starter.wakeUp();
+ return;
+ }
if((!isSSKScheduler) && (!isInsertScheduler)) {
Logger.minor(this, "Scheduling CHK fetches...");
@@ -725,9 +735,15 @@
SendableRequest request =
schedCore.removeFirstInner(fuzz, random, offeredKeys, starter, schedTransient,
false, true, Short.MAX_VALUE, Integer.MAX_VALUE, context, container);
if(request == null) {
synchronized(ClientRequestScheduler.this) {
- if(!added)
- lastFilledStarterQueueEmpty =
System.currentTimeMillis();
+ // Don't wake up for a while, but no
later than the time we expect the next item to come off the cooldown queue
+ if(!added) {
+ if(noLaterThan !=
Long.MAX_VALUE)
+
nextQueueFillRequestStarterQueue = Math.min(System.currentTimeMillis() +
WAIT_AFTER_NOTHING_TO_START, noLaterThan + 1);
+ else
+
nextQueueFillRequestStarterQueue = System.currentTimeMillis() +
WAIT_AFTER_NOTHING_TO_START;
+ }
}
+ if(wakeUp) starter.wakeUp();
return;
}
added = true;
@@ -975,22 +991,8 @@
return transientCooldownQueue.add(key.getNodeKey(),
getter, null);
}
- private final DBJob moveFromCooldownJob = new DBJob() {
-
- public void run(ObjectContainer container, ClientContext
context) {
- if(moveKeysFromCooldownQueue(persistentCooldownQueue,
true, container) ||
-
moveKeysFromCooldownQueue(transientCooldownQueue, false, container))
- starter.wakeUp();
- }
-
- };
-
- public void moveKeysFromCooldownQueue() {
- jobRunner.queue(moveFromCooldownJob,
NativeThread.NORM_PRIORITY, true);
- }
-
- private boolean moveKeysFromCooldownQueue(CooldownQueue queue, boolean
persistent, ObjectContainer container) {
- if(queue == null) return false;
+ private long moveKeysFromCooldownQueue(CooldownQueue queue, boolean
persistent, ObjectContainer container) {
+ if(queue == null) return Long.MAX_VALUE;
long now = System.currentTimeMillis();
/*
* Only go around once. We will be called again. If there are
keys to move, then RequestStarter will not
@@ -1010,8 +1012,12 @@
* nodes with little RAM it would be bad...
*/
final int MAX_KEYS = 20;
- Key[] keys = queue.removeKeyBefore(now, container, MAX_KEYS);
- if(keys == null) return false;
+ Object ret = queue.removeKeyBefore(now,
WAIT_AFTER_NOTHING_TO_START, container, MAX_KEYS);
+ if(ret == null) return Long.MAX_VALUE;
+ if(ret instanceof Long) {
+ return (Long) ret;
+ }
+ Key[] keys = (Key[]) ret;
for(int j=0;j<keys.length;j++) {
Key key = keys[j];
if(persistent)
@@ -1039,7 +1045,7 @@
if(persistent)
container.deactivate(key, 5);
}
- return true;
+ return Long.MAX_VALUE;
}
public long countTransientQueuedRequests() {
Modified: branches/db4o/freenet/src/freenet/client/async/CooldownQueue.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/CooldownQueue.java
2009-02-14 16:04:55 UTC (rev 25650)
+++ branches/db4o/freenet/src/freenet/client/async/CooldownQueue.java
2009-02-14 16:10:08 UTC (rev 25651)
@@ -17,9 +17,13 @@
/**
* Remove a key whose cooldown time has passed.
- * @return Either an array of Key's or null if no keys have passed
their cooldown time.
+ * @param dontCareAfter If the next item to come out of the cooldown
+ * queue is more than this many millis after now, return null.
+ * @return Either an array of Key's or a Long indicating the time at
+ * which the next key will be removed from the cooldown, or null if
+ * no keys have passed their cooldown time.
*/
- public abstract Key[] removeKeyBefore(long now, ObjectContainer
container, int maxKeys);
+ public abstract Object removeKeyBefore(long now, long dontCareAfter,
ObjectContainer container, int maxKeys);
/**
* @return True if the key was found.
Modified:
branches/db4o/freenet/src/freenet/client/async/PersistentCooldownQueue.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/PersistentCooldownQueue.java
2009-02-14 16:04:55 UTC (rev 25650)
+++ branches/db4o/freenet/src/freenet/client/async/PersistentCooldownQueue.java
2009-02-14 16:10:08 UTC (rev 25651)
@@ -84,7 +84,7 @@
return found;
}
- public Key[] removeKeyBefore(final long now, ObjectContainer container,
int maxCount) {
+ public Object removeKeyBefore(final long now, long dontCareAfterMillis,
ObjectContainer container, int maxCount) {
// Will be called repeatedly until no more keys are returned,
so it doesn't
// matter very much if they're not in order.
@@ -124,7 +124,19 @@
container.delete(i);
v.add(i.key);
}
- return (Key[]) v.toArray(new Key[v.size()]);
+ if(!v.isEmpty()) {
+ return (Key[]) v.toArray(new Key[v.size()]);
+ } else {
+ query = container.query();
+
query.descend("time").orderAscending().constrain(new Long(now +
dontCareAfterMillis)).smaller().
+
and(query.descend("parent").constrain(this).identity());
+ results = query.execute();
+ if(results.hasNext()) {
+ return ((PersistentCooldownQueueItem)
results.next()).time;
+ } else {
+ return null;
+ }
+ }
} else {
long tEnd = System.currentTimeMillis();
if(tEnd - tStart > 1000)
Modified:
branches/db4o/freenet/src/freenet/client/async/RequestCooldownQueue.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/RequestCooldownQueue.java
2009-02-14 16:04:55 UTC (rev 25650)
+++ branches/db4o/freenet/src/freenet/client/async/RequestCooldownQueue.java
2009-02-14 16:10:08 UTC (rev 25651)
@@ -117,7 +117,7 @@
/* (non-Javadoc)
* @see freenet.client.async.CooldownQueue#removeKeyBefore(long)
*/
- public synchronized Key[] removeKeyBefore(long now, ObjectContainer
container, int maxKeys) {
+ public synchronized Object removeKeyBefore(long now, long
dontCareAfterMillis, ObjectContainer container, int maxKeys) {
ArrayList v = new ArrayList();
logMINOR = Logger.shouldLog(Logger.MINOR, this);
boolean foundIT = false;
@@ -133,7 +133,10 @@
while(true) {
if(startPtr == endPtr) {
if(logMINOR) Logger.minor(this, "No keys
queued");
- return (Key[]) v.toArray(new Key[v.size()]);
+ if(!v.isEmpty())
+ return (Key[]) v.toArray(new
Key[v.size()]);
+ else
+ return null;
}
long time = times[startPtr];
Key key = keys[startPtr];
@@ -148,7 +151,12 @@
} else {
if(time > now) {
if(logMINOR) Logger.minor(this, "First
key is later at time "+time);
- return (Key[]) v.toArray(new
Key[v.size()]);
+ if(!v.isEmpty())
+ return (Key[]) v.toArray(new
Key[v.size()]);
+ else if(time < (now +
dontCareAfterMillis))
+ return Long.valueOf(time);
+ else
+ return null;
}
times[startPtr] = 0;
keys[startPtr] = null;
@@ -158,8 +166,11 @@
}
if(logMINOR) Logger.minor(this, "Returning key "+key);
v.add(key);
- if(v.size() == maxKeys)
- return (Key[]) v.toArray(new Key[v.size()]);
+ if(v.size() == maxKeys) {
+ if(!v.isEmpty())
+ return (Key[]) v.toArray(new
Key[v.size()]);
+ else return null;
+ }
}
}
Modified: branches/db4o/freenet/src/freenet/node/RequestScheduler.java
===================================================================
--- branches/db4o/freenet/src/freenet/node/RequestScheduler.java
2009-02-14 16:04:55 UTC (rev 25650)
+++ branches/db4o/freenet/src/freenet/node/RequestScheduler.java
2009-02-14 16:10:08 UTC (rev 25651)
@@ -34,12 +34,6 @@
*/
long queueCooldown(ClientKey key, SendableGet getter, ObjectContainer
container);
- /**
- * Remove keys from the cooldown queue who have now served their time
and can be requested
- * again.
- */
- public void moveKeysFromCooldownQueue();
-
/** Once a key has been requested a few times, don't request it again
for 30 minutes.
* To do so would be pointless given ULPRs, and just waste bandwidth. */
public static final long COOLDOWN_PERIOD = 30*60*1000;
Modified: branches/db4o/freenet/src/freenet/node/RequestStarter.java
===================================================================
--- branches/db4o/freenet/src/freenet/node/RequestStarter.java 2009-02-14
16:04:55 UTC (rev 25650)
+++ branches/db4o/freenet/src/freenet/node/RequestStarter.java 2009-02-14
16:10:08 UTC (rev 25651)
@@ -115,8 +115,6 @@
}
continue;
}
- if(!isInsert)
- sched.moveKeysFromCooldownQueue();
boolean logMINOR = Logger.shouldLog(Logger.MINOR, this);
if(req == null) {
req = sched.grabRequest();
_______________________________________________
cvs mailing list
[email protected]
http://emu.freenetproject.org/cgi-bin/mailman/listinfo/cvs