Author: toad
Date: 2008-08-14 00:35:04 +0000 (Thu, 14 Aug 2008)
New Revision: 21830
Modified:
branches/db4o/freenet/src/freenet/client/async/ChosenBlock.java
branches/db4o/freenet/src/freenet/client/async/ClientRequestScheduler.java
branches/db4o/freenet/src/freenet/client/async/PersistentChosenRequest.java
branches/db4o/freenet/src/freenet/node/RequestScheduler.java
branches/db4o/freenet/src/freenet/node/RequestStarter.java
Log:
Fix priority inversion problems.
Only add to keysFetching when we actually start the request. Only remove when
done.
Check whether in keysFetching when creating a PersistentChosenRequest, also
when checking the queue size, and grabbing a request from it.
Start a request from the best PersistentChosenRequest on the starterQueue, not
from the first one.
Modified: branches/db4o/freenet/src/freenet/client/async/ChosenBlock.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/ChosenBlock.java
2008-08-13 23:31:21 UTC (rev 21829)
+++ branches/db4o/freenet/src/freenet/client/async/ChosenBlock.java
2008-08-14 00:35:04 UTC (rev 21830)
@@ -32,12 +32,6 @@
this.localRequestOnly = localRequestOnly;
this.cacheLocalRequests = cacheLocalRequests;
this.ignoreStore = ignoreStore;
- // Ignore return value. Not our problem at this point.
- // It won't have been scheduled if there were *no* valid
blocks...
- // it's possible, but very unlikely, that some blocks are
scheduled
- // and some are not.
- if(key != null)
- sched.addToFetching(key);
}
public abstract boolean isPersistent();
@@ -66,8 +60,4 @@
}
public abstract SendableRequestSender getSender(ClientContext context);
-
- public void removeFromFetching(ClientRequestSchedulerCore core) {
- core.removeFetchingKey(key);
- }
}
Modified:
branches/db4o/freenet/src/freenet/client/async/ClientRequestScheduler.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/ClientRequestScheduler.java
2008-08-13 23:31:21 UTC (rev 21829)
+++ branches/db4o/freenet/src/freenet/client/async/ClientRequestScheduler.java
2008-08-14 00:35:04 UTC (rev 21830)
@@ -574,9 +574,18 @@
*/
public ChosenBlock grabRequest() {
while(true) {
- PersistentChosenRequest reqGroup;
+ PersistentChosenRequest reqGroup = null;
synchronized(starterQueue) {
- reqGroup = starterQueue.isEmpty() ? null :
starterQueue.getFirst();
+ short bestPriority = Short.MAX_VALUE;
+ int bestRetryCount = Integer.MAX_VALUE;
+ for(PersistentChosenRequest req : starterQueue)
{
+ if(req.prio < bestPriority ||
+ (req.prio ==
bestPriority && req.retryCount < bestRetryCount)) {
+ bestPriority = req.prio;
+ bestRetryCount = req.retryCount;
+ reqGroup = req;
+ }
+ }
}
if(reqGroup != null) {
// Try to find a better non-persistent request
@@ -590,7 +599,7 @@
ChosenBlock block;
int finalLength = 0;
synchronized(starterQueue) {
- block =
reqGroup.grabNotStarted(clientContext.fastWeakRandom);
+ block =
reqGroup.grabNotStarted(clientContext.fastWeakRandom, this);
if(block == null) {
for(int i=0;i<starterQueue.size();i++) {
if(starterQueue.get(i) ==
reqGroup) {
@@ -671,8 +680,10 @@
synchronized(starterQueue) {
// Recompute starterQueueLength
int length = 0;
- for(PersistentChosenRequest req : starterQueue)
+ for(PersistentChosenRequest req : starterQueue)
{
+
req.pruneDuplicates(ClientRequestScheduler.this);
length += req.sizeNotStarted();
+ }
if(logMINOR) Logger.minor(this, "Queue size:
"+length+" SSK="+isSSKScheduler+" insert="+isInsertScheduler);
if(length >= MAX_STARTER_QUEUE_SIZE) {
if(length >= WARNING_STARTER_QUEUE_SIZE)
@@ -1126,6 +1137,10 @@
public boolean addToFetching(Key key) {
return schedCore.addToFetching(key);
}
+
+ public boolean hasFetchingKey(Key key) {
+ return schedCore.hasKey(key);
+ }
public long countPersistentQueuedRequests(ObjectContainer container) {
return schedCore.countQueuedRequests(container);
Modified:
branches/db4o/freenet/src/freenet/client/async/PersistentChosenRequest.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/PersistentChosenRequest.java
2008-08-13 23:31:21 UTC (rev 21829)
+++ branches/db4o/freenet/src/freenet/client/async/PersistentChosenRequest.java
2008-08-14 00:35:04 UTC (rev 21830)
@@ -4,12 +4,14 @@
package freenet.client.async;
import java.util.ArrayList;
+import java.util.List;
import java.util.Random;
import java.util.Vector;
import com.db4o.ObjectContainer;
import freenet.client.FetchContext;
+import freenet.keys.Key;
import freenet.node.BulkCallFailureItem;
import freenet.node.LowLevelGetException;
import freenet.node.RequestScheduler;
@@ -72,7 +74,13 @@
boolean reqActive = container.ext().isActive(req);
if(!reqActive)
container.activate(req, 1);
- blocksNotStarted.addAll(req.makeBlocks(this, sched, container,
context));
+ List<PersistentChosenBlock> candidates = req.makeBlocks(this,
sched, container, context);
+ for(PersistentChosenBlock block : candidates) {
+ Key key = block.key;
+ if(key != null && sched.hasFetchingKey(key))
+ continue;
+ blocksNotStarted.add(block);
+ }
sender = req.getSender(container, context);
if(!reqActive)
container.deactivate(req, 1);
@@ -84,7 +92,7 @@
public void run() {
synchronized(PersistentChosenRequest.this) {
if(finished) return;
- Logger.error(this, "Still not finished
after timeout: "+this);
+ Logger.error(this, "Still not finished
after timeout: "+PersistentChosenRequest.this);
}
}
@@ -194,14 +202,20 @@
scheduler.removeRunningRequest(request);
}
- public synchronized ChosenBlock grabNotStarted(Random random) {
- int size = blocksNotStarted.size();
- if(size == 0) return null;
- PersistentChosenBlock ret;
- if(size == 1) ret = blocksNotStarted.remove(0);
- else ret = blocksNotStarted.remove(random.nextInt(size));
- blocksStarted.add(ret);
- return ret;
+ public synchronized ChosenBlock grabNotStarted(Random random,
RequestScheduler sched) {
+ while(true) {
+ int size = blocksNotStarted.size();
+ if(size == 0) return null;
+ PersistentChosenBlock ret;
+ if(size == 1) ret = blocksNotStarted.remove(0);
+ else ret =
blocksNotStarted.remove(random.nextInt(size));
+ Key key = ret.key;
+ if(key != null && sched.hasFetchingKey(key))
+ // Already fetching; remove from list.
+ continue;
+ blocksStarted.add(ret);
+ return ret;
+ }
}
public synchronized int sizeNotStarted() {
@@ -211,19 +225,27 @@
public void onDumped(ClientRequestSchedulerCore core, ObjectContainer
container) {
if(logMINOR)
Logger.minor(this, "Dumping "+this);
- ArrayList<PersistentChosenBlock> oldNotStarted;
boolean wasStarted;
synchronized(this) {
- oldNotStarted = (ArrayList<PersistentChosenBlock>)
blocksNotStarted.clone();
blocksNotStarted.clear();
wasStarted = !blocksStarted.isEmpty();
}
- for(PersistentChosenBlock block : oldNotStarted) {
- block.removeFromFetching(core);
- }
if(!wasStarted) {
if(logMINOR) Logger.minor(this, "Finishing immediately
in onDumped() as nothing pending: "+this);
finish(container, core.sched.clientContext, true);
}
}
+
+ public synchronized void pruneDuplicates(ClientRequestScheduler sched) {
+ for(int i=0;i<blocksNotStarted.size();i++) {
+ PersistentChosenBlock block = blocksNotStarted.get(i);
+ Key key = block.key;
+ if(key == null) continue;
+ if(sched.hasFetchingKey(key)) {
+ blocksNotStarted.remove(i);
+ if(logMINOR) Logger.minor(this, "Pruned
duplicate "+block+" from "+this);
+ i--;
+ }
+ }
+ }
}
Modified: branches/db4o/freenet/src/freenet/node/RequestScheduler.java
===================================================================
--- branches/db4o/freenet/src/freenet/node/RequestScheduler.java
2008-08-13 23:31:21 UTC (rev 21829)
+++ branches/db4o/freenet/src/freenet/node/RequestScheduler.java
2008-08-14 00:35:04 UTC (rev 21830)
@@ -68,6 +68,8 @@
public void removeRunningRequest(SendableRequest request);
public abstract boolean isRunningRequest(SendableRequest request);
+
+ public boolean hasFetchingKey(Key key);
public void start(NodeClientCore core);
Modified: branches/db4o/freenet/src/freenet/node/RequestStarter.java
===================================================================
--- branches/db4o/freenet/src/freenet/node/RequestStarter.java 2008-08-13
23:31:21 UTC (rev 21829)
+++ branches/db4o/freenet/src/freenet/node/RequestStarter.java 2008-08-14
00:35:04 UTC (rev 21830)
@@ -179,9 +179,12 @@
private boolean startRequest(ChosenBlock req, boolean logMINOR) {
if((!req.isPersistent()) && req.isCancelled()) {
- sched.removeFetchingKey(req.key);
return false;
}
+ if(req.key != null) {
+ if(!sched.addToFetching(req.key))
+ return false;
+ }
if(logMINOR) Logger.minor(this, "Running request "+req+"
priority "+req.getPriority());
core.getExecutor().execute(new SenderThread(req, req.key),
"RequestStarter$SenderThread for "+req);
return true;
@@ -225,7 +228,7 @@
if(Logger.shouldLog(Logger.MINOR, this))
Logger.minor(this, "Finished "+req);
} finally {
- sched.removeFetchingKey(key);
+ if(key != null) sched.removeFetchingKey(key);
}
}