Author: toad
Date: 2008-08-30 15:34:38 +0000 (Sat, 30 Aug 2008)
New Revision: 22250
Modified:
branches/db4o/freenet/src/freenet/client/async/ClientRequestScheduler.java
branches/db4o/freenet/src/freenet/client/async/ClientRequestSchedulerBase.java
branches/db4o/freenet/src/freenet/client/async/ClientRequestSchedulerCore.java
Log:
Fix activation warnings. This is a little heavier than it ought to be.
Pass in SendableRequest[] into maybeFillStarterQueue.
Pass it in to fillRequestStarterQueue. Factor that out from the runnable into a
method, and call the method from maybeFillStarterQueue.
Modified:
branches/db4o/freenet/src/freenet/client/async/ClientRequestScheduler.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/ClientRequestScheduler.java
2008-08-30 14:59:51 UTC (rev 22249)
+++ branches/db4o/freenet/src/freenet/client/async/ClientRequestScheduler.java
2008-08-30 15:34:38 UTC (rev 22250)
@@ -193,7 +193,7 @@
container.deactivate(req, 1);
return;
}
- schedCore.innerRegister(req, random, container);
+ schedCore.innerRegister(req, random, container,
null);
} else {
jobRunner.queue(new DBJob() {
@@ -201,14 +201,14 @@
if(container.ext().isActive(req))
Logger.error(this,
"ALREADY ACTIVE: "+req+" in off-thread insert register");
container.activate(req, 1);
- schedCore.innerRegister(req,
random, container);
+ schedCore.innerRegister(req,
random, container, null);
container.deactivate(req, 1);
}
}, NativeThread.NORM_PRIORITY, false);
}
} else {
- schedTransient.innerRegister(req, random, null);
+ schedTransient.innerRegister(req, random, null, null);
}
}
@@ -361,7 +361,7 @@
container.activate(getters[i],
1);
if(!(getter.isCancelled(container) || getter.isEmpty(container))) {
wereAnyValid = true;
-
schedCore.innerRegister(getter, random, container);
+
schedCore.innerRegister(getter, random, container, getters);
}
}
if(!wereAnyValid) {
@@ -370,7 +370,7 @@
}
if(reg != null)
container.delete(reg);
- maybeFillStarterQueue(container, clientContext);
+ maybeFillStarterQueue(container, clientContext,
getters);
starter.wakeUp();
} else {
jobRunner.queue(new DBJob() {
@@ -385,7 +385,7 @@
container.activate(getter, 1);
if(!(getter.isCancelled(container) || getter.isEmpty(container))) {
wereAnyValid =
true;
-
schedCore.innerRegister(getter, random, container);
+
schedCore.innerRegister(getter, random, container, getters);
}
container.deactivate(getter, 1);
}
@@ -394,7 +394,7 @@
}
if(reg != null)
container.delete(reg);
-
maybeFillStarterQueue(container, context);
+
maybeFillStarterQueue(container, context, getters);
starter.wakeUp();
}
@@ -404,17 +404,17 @@
if(!anyValid) return;
// Register immediately.
for(int i=0;i<getters.length;i++)
- schedTransient.innerRegister(getters[i],
random, null);
+ schedTransient.innerRegister(getters[i],
random, null, getters);
starter.wakeUp();
}
}
- private void maybeFillStarterQueue(ObjectContainer container,
ClientContext context) {
+ private void maybeFillStarterQueue(ObjectContainer container,
ClientContext context, SendableRequest[] mightBeActive) {
synchronized(this) {
if(starterQueue.size() > MAX_STARTER_QUEUE_SIZE / 2)
return;
}
- requestStarterQueueFiller.run(container, context);
+ fillRequestStarterQueue(container, context, mightBeActive);
}
public ChosenBlock getBetterNonPersistentRequest(short prio, int
retryCount) {
@@ -605,50 +605,62 @@
private DBJob requestStarterQueueFiller = new DBJob() {
public void run(ObjectContainer container, ClientContext
context) {
- if(logMINOR) Logger.minor(this, "Filling request
queue... (SSK="+isSSKScheduler+" insert="+isInsertScheduler);
- short fuzz = -1;
- if(PRIORITY_SOFT.equals(choosenPriorityScheduler))
- fuzz = -1;
- else if(PRIORITY_HARD.equals(choosenPriorityScheduler))
- fuzz = 0;
- synchronized(starterQueue) {
- // Recompute starterQueueLength
- int length = 0;
- PersistentChosenRequest old = null;
- for(PersistentChosenRequest req : starterQueue)
{
- if(old == req)
- Logger.error(this, "DUPLICATE
CHOSEN REQUESTS ON QUEUE: "+req);
- if(old != null && old.request ==
req.request)
- Logger.error(this, "DUPLICATE
REQUEST ON QUEUE: "+old+" vs "+req+" both "+req.request);
+ fillRequestStarterQueue(container, context, null);
+ }
+ };
+
+ private void fillRequestStarterQueue(ObjectContainer container,
ClientContext context, SendableRequest[] mightBeActive) {
+ if(logMINOR) Logger.minor(this, "Filling request queue...
(SSK="+isSSKScheduler+" insert="+isInsertScheduler);
+ short fuzz = -1;
+ if(PRIORITY_SOFT.equals(choosenPriorityScheduler))
+ fuzz = -1;
+ else if(PRIORITY_HARD.equals(choosenPriorityScheduler))
+ fuzz = 0;
+ synchronized(starterQueue) {
+ // Recompute starterQueueLength
+ int length = 0;
+ PersistentChosenRequest old = null;
+ for(PersistentChosenRequest req : starterQueue) {
+ if(old == req)
+ Logger.error(this, "DUPLICATE CHOSEN
REQUESTS ON QUEUE: "+req);
+ if(old != null && old.request == req.request)
+ Logger.error(this, "DUPLICATE REQUEST
ON QUEUE: "+old+" vs "+req+" both "+req.request);
+ boolean ignoreActive = false;
+ if(mightBeActive != null) {
+ for(SendableRequest tmp : mightBeActive)
+ if(tmp == req.request)
ignoreActive = true;
+ }
+ if(!ignoreActive) {
if(container.ext().isActive(req.request))
Logger.error(this, "REQUEST
ALREADY ACTIVATED: "+req.request+" for "+req+" while checking request queue in
filling request queue");
else if(logMINOR)
Logger.minor(this, "Not already
activated for "+req+" in while checking request queue in filling request
queue");
-
req.pruneDuplicates(ClientRequestScheduler.this);
- old = req;
- length += req.sizeNotStarted();
- }
- if(logMINOR) Logger.minor(this, "Queue size:
"+length+" SSK="+isSSKScheduler+" insert="+isInsertScheduler);
- if(length >= MAX_STARTER_QUEUE_SIZE) {
- if(length >= WARNING_STARTER_QUEUE_SIZE)
- Logger.error(this, "Queue
already full: "+starterQueue.size());
- return;
- }
- if(length > MAX_STARTER_QUEUE_SIZE * 3 / 4) {
- return;
- }
+ } else if(logMINOR)
+ Logger.minor(this, "Ignoring active
because just registered: "+req.request);
+
req.pruneDuplicates(ClientRequestScheduler.this);
+ old = req;
+ length += req.sizeNotStarted();
}
-
- while(true) {
- SendableRequest request =
schedCore.removeFirstInner(fuzz, random, offeredKeys, starter, schedTransient,
false, true, Short.MAX_VALUE, Integer.MAX_VALUE, context, container);
- if(request == null) return;
- boolean full = addToStarterQueue(request,
container);
- container.deactivate(request, 1);
- starter.wakeUp();
- if(full) return;
+ if(logMINOR) Logger.minor(this, "Queue size: "+length+"
SSK="+isSSKScheduler+" insert="+isInsertScheduler);
+ if(length >= MAX_STARTER_QUEUE_SIZE) {
+ if(length >= WARNING_STARTER_QUEUE_SIZE)
+ Logger.error(this, "Queue already full:
"+starterQueue.size());
+ return;
}
+ if(length > MAX_STARTER_QUEUE_SIZE * 3 / 4) {
+ return;
+ }
}
- };
+
+ while(true) {
+ SendableRequest request =
schedCore.removeFirstInner(fuzz, random, offeredKeys, starter, schedTransient,
false, true, Short.MAX_VALUE, Integer.MAX_VALUE, context, container);
+ if(request == null) return;
+ boolean full = addToStarterQueue(request, container);
+ container.deactivate(request, 1);
+ starter.wakeUp();
+ if(full) return;
+ }
+ }
/**
* Compare a recently registered SendableRequest to what is already on
the
@@ -657,7 +669,7 @@
* @param req
* @param container
*/
- public void maybeAddToStarterQueue(SendableRequest req, ObjectContainer
container) {
+ public void maybeAddToStarterQueue(SendableRequest req, ObjectContainer
container, SendableRequest[] mightBeActive) {
short prio = req.getPriorityClass(container);
int retryCount = req.getRetryCount();
if(logMINOR)
@@ -671,10 +683,18 @@
Logger.error(this, "ON STARTER QUEUE
TWICE: "+prev+" for "+prev.request);
if(prev != null && prev.request == old.request)
Logger.error(this, "REQUEST ON STARTER
QUEUE TWICE: "+prev+" for "+prev.request+" vs "+old+" for "+old.request);
- if(container.ext().isActive(old.request))
- Logger.error(this, "REQUEST ALREADY
ACTIVATED: "+old.request+" for "+old+" while checking request queue in
maybeAddToStarterQueue", new Exception("debug"));
- else if(logMINOR)
- Logger.minor(this, "Not already
activated for "+old+" in while checking request queue in filling request
queue");
+ boolean ignoreActive = false;
+ if(mightBeActive != null) {
+ for(SendableRequest tmp : mightBeActive)
+ if(tmp == old.request)
ignoreActive = true;
+ }
+ if(!ignoreActive) {
+
if(container.ext().isActive(old.request))
+ Logger.error(this, "REQUEST
ALREADY ACTIVATED: "+old.request+" for "+old+" while checking request queue in
maybeAddToStarterQueue for "+req);
+ else if(logMINOR)
+ Logger.minor(this, "Not already
activated for "+old+" in while checking request queue in maybeAddToStarterQueue
for "+req);
+ } else if(logMINOR)
+ Logger.minor(this, "Ignoring active
because just registered: "+old.request+" in maybeAddToStarterQueue for "+req);
size += old.sizeNotStarted();
if(old.prio > prio || old.prio == prio &&
old.retryCount > retryCount)
betterThanSome = true;
Modified:
branches/db4o/freenet/src/freenet/client/async/ClientRequestSchedulerBase.java
===================================================================
---
branches/db4o/freenet/src/freenet/client/async/ClientRequestSchedulerBase.java
2008-08-30 14:59:51 UTC (rev 22249)
+++
branches/db4o/freenet/src/freenet/client/async/ClientRequestSchedulerBase.java
2008-08-30 15:34:38 UTC (rev 22250)
@@ -71,7 +71,19 @@
logMINOR = Logger.shouldLog(Logger.MINOR,
ClientRequestSchedulerBase.class);
}
- void innerRegister(SendableRequest req, RandomSource random,
ObjectContainer container) {
+ /**
+ * @param req
+ * @param random
+ * @param container
+ * @param maybeActive Array of requests, can be null, which are being
registered
+ * in this group. These will be ignored for purposes of checking
whether stuff
+ * is activated when it shouldn't be. It is perfectly okay to have req
be a
+ * member of maybeActive.
+ *
+ * FIXME: Either get rid of the debugging code and therefore get rid of
maybeActive,
+ * or make req a SendableRequest[] and register them all at once.
+ */
+ void innerRegister(SendableRequest req, RandomSource random,
ObjectContainer container, SendableRequest[] maybeActive) {
if(isInsertScheduler && req instanceof BaseSendableGet)
throw new IllegalArgumentException("Adding a
SendableGet to an insert scheduler!!");
if((!isInsertScheduler) && req instanceof SendableInsert)
@@ -89,7 +101,7 @@
addToGrabArray(prio, retryCount, fixRetryCount(retryCount),
req.getClient(), req.getClientRequest(), req, random, container);
if(logMINOR) Logger.minor(this, "Registered "+req+" on
prioclass="+prio+", retrycount="+retryCount);
if(persistent())
- sched.maybeAddToStarterQueue(req, container);
+ sched.maybeAddToStarterQueue(req, container,
maybeActive);
}
protected void addToRequestsByClientRequest(ClientRequester
clientRequest, SendableRequest req, ObjectContainer container) {
@@ -165,7 +177,7 @@
// Unregister from the RGA's, but keep the pendingKeys
and cooldown queue data.
req.unregister(container, context);
// Then can do innerRegister() (not register()).
- innerRegister(req, random, container);
+ innerRegister(req, random, container, null);
if(persistent())
container.deactivate(req, 1);
}
Modified:
branches/db4o/freenet/src/freenet/client/async/ClientRequestSchedulerCore.java
===================================================================
---
branches/db4o/freenet/src/freenet/client/async/ClientRequestSchedulerCore.java
2008-08-30 14:59:51 UTC (rev 22249)
+++
branches/db4o/freenet/src/freenet/client/async/ClientRequestSchedulerCore.java
2008-08-30 15:34:38 UTC (rev 22250)
@@ -443,9 +443,9 @@
Logger.error(this, "Could not find
client grabber for client "+req.getClient()+" from "+chosenTracker);
}
if(req.persistent())
- innerRegister(req, random, container);
+ innerRegister(req, random, container,
null);
else
- schedTransient.innerRegister(req,
random, container);
+ schedTransient.innerRegister(req,
random, container, null);
continue; // Try the next one on this retry
count.
}
@@ -478,7 +478,7 @@
// Use the recent one instead
if(logMINOR)
Logger.minor(this, "Recently succeeded
req "+altReq+" is better, using that, reregistering chosen "+req);
- schedTransient.innerRegister(req, random, null);
+ schedTransient.innerRegister(req, random, null,
null);
req = altReq;
} else if(altReq != null) {
// Don't use the recent one