Author: toad
Date: 2008-05-31 13:12:13 +0000 (Sat, 31 May 2008)
New Revision: 20155
Added:
branches/db4o/freenet/src/freenet/client/async/ChosenRequest.java
Modified:
branches/db4o/freenet/src/freenet/client/async/ClientRequestScheduler.java
branches/db4o/freenet/src/freenet/client/async/ClientRequestSchedulerCore.java
branches/db4o/freenet/src/freenet/node/RequestScheduler.java
branches/db4o/freenet/src/freenet/node/RequestStarter.java
Log:
Return ChosenRequest's, not SendableRequest's.
This means we call chooseKey() and getNodeKey() on the database thread, which
means no database I/O on the request starter thread.
Added: branches/db4o/freenet/src/freenet/client/async/ChosenRequest.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/ChosenRequest.java
(rev 0)
+++ branches/db4o/freenet/src/freenet/client/async/ChosenRequest.java
2008-05-31 13:12:13 UTC (rev 20155)
@@ -0,0 +1,29 @@
+/* This code is part of Freenet. It is distributed under the GNU General
+ * Public License, version 2 (or at your option any later version). See
+ * http://www.gnu.org/ for further details of the GPL. */
+package freenet.client.async;
+
+import freenet.keys.Key;
+import freenet.node.SendableRequest;
+
+/**
+ * A request chosen by ClientRequestScheduler.
+ * @author toad
+ */
+public class ChosenRequest {
+
+ /** The request object */
+ public final SendableRequest request;
+ /** The token indicating the key within the request to be
fetched/inserted.
+ * Meaning is entirely defined by the request. */
+ public final Object token;
+ /** The key to be fetched, null if not a BaseSendableGet */
+ public final Key key;
+
+ ChosenRequest(SendableRequest req, Object tok, Key key) {
+ request = req;
+ token = tok;
+ this.key = key;
+ }
+
+}
Modified:
branches/db4o/freenet/src/freenet/client/async/ClientRequestScheduler.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/ClientRequestScheduler.java
2008-05-31 12:35:43 UTC (rev 20154)
+++ branches/db4o/freenet/src/freenet/client/async/ClientRequestScheduler.java
2008-05-31 13:12:13 UTC (rev 20155)
@@ -17,6 +17,7 @@
import freenet.keys.KeyBlock;
import freenet.keys.KeyVerifyException;
import freenet.node.BaseSendableGet;
+import freenet.node.KeysFetchingLocally;
import freenet.node.LowLevelGetException;
import freenet.node.Node;
import freenet.node.NodeClientCore;
@@ -241,7 +242,7 @@
schedTransient.addPendingKey(key, getter);
}
- private synchronized SendableRequest removeFirst() {
+ private synchronized ChosenRequest removeFirst() {
if(!databaseExecutor.onThread()) {
throw new IllegalStateException("Not on database
thread!");
}
@@ -254,7 +255,7 @@
return schedCore.removeFirst(fuzz, random, offeredKeys,
starter, schedTransient, false, (short) -1, -1);
}
- public SendableRequest getBetterNonPersistentRequest(SendableRequest
req) {
+ public ChosenRequest getBetterNonPersistentRequest(ChosenRequest req) {
short fuzz = -1;
if(PRIORITY_SOFT.equals(choosenPriorityScheduler))
fuzz = -1;
@@ -262,8 +263,8 @@
fuzz = 0;
if(req == null)
return schedCore.removeFirst(fuzz, random, offeredKeys,
starter, schedTransient, true, (short) -1, -1);
- short prio = req.getPriorityClass();
- int retryCount = req.getRetryCount();
+ short prio = req.request.getPriorityClass();
+ int retryCount = req.request.getRetryCount();
return schedCore.removeFirst(fuzz, random, offeredKeys,
starter, schedTransient, true, prio, retryCount);
}
@@ -282,7 +283,7 @@
private Runnable requestStarterQueueFiller = new Runnable() {
public void run() {
- SendableRequest req = null;
+ ChosenRequest req = null;
while(true) {
synchronized(starterQueue) {
if(req != null) {
@@ -531,4 +532,12 @@
// Approximately... there might be some overlap in the two
pendingKeys's...
return schedCore.countQueuedRequests() +
schedTransient.countQueuedRequests();
}
+
+ public KeysFetchingLocally fetchingKeys() {
+ return schedCore;
+ }
+
+ public void removeFetchingKey(Key key) {
+ schedCore.removeFetchingKey(key);
+ }
}
Modified:
branches/db4o/freenet/src/freenet/client/async/ClientRequestSchedulerCore.java
===================================================================
---
branches/db4o/freenet/src/freenet/client/async/ClientRequestSchedulerCore.java
2008-05-31 12:35:43 UTC (rev 20154)
+++
branches/db4o/freenet/src/freenet/client/async/ClientRequestSchedulerCore.java
2008-05-31 13:12:13 UTC (rev 20155)
@@ -3,6 +3,7 @@
* http://www.gnu.org/ for further details of the GPL. */
package freenet.client.async;
+import java.util.HashSet;
import java.util.List;
import com.db4o.ObjectContainer;
@@ -14,7 +15,9 @@
import freenet.crypt.RandomSource;
import freenet.keys.ClientKey;
+import freenet.keys.Key;
import freenet.node.BaseSendableGet;
+import freenet.node.KeysFetchingLocally;
import freenet.node.Node;
import freenet.node.RequestStarter;
import freenet.node.SendableGet;
@@ -33,7 +36,7 @@
* Does not refer to any non-persistable classes as member variables: Node
must always
* be passed in if we need to use it!
*/
-class ClientRequestSchedulerCore extends ClientRequestSchedulerBase {
+class ClientRequestSchedulerCore extends ClientRequestSchedulerBase implements
KeysFetchingLocally {
private ObjectContainer container;
private static boolean logMINOR;
@@ -43,8 +46,19 @@
private transient RandomSource random;
private transient PrioritizedSerialExecutor databaseExecutor;
private transient ClientRequestScheduler sched;
-
+
/**
+ * All Key's we are currently fetching.
+ * Locally originated requests only, avoids some complications with
HTL,
+ * and also has the benefit that we can see stuff that's been scheduled
on a SenderThread
+ * but that thread hasn't started yet. FIXME: Both issues can be
avoided: first we'd get
+ * rid of the SenderThread and start the requests directly and
asynchronously, secondly
+ * we'd move this to node but only track keys we are fetching at max
HTL.
+ * LOCKING: Always lock this LAST.
+ */
+ private transient HashSet keysFetching;
+
+ /**
* Fetch a ClientRequestSchedulerCore from the database, or create a
new one.
* @param node
* @param forInserts
@@ -95,8 +109,12 @@
}
this.random = random;
this.databaseExecutor = databaseExecutor;
+ if(!isInsertScheduler)
+ keysFetching = new HashSet();
+ else
+ keysFetching = null;
+ this.sched = sched;
databaseExecutor.execute(registerMeRunner,
NativeThread.NORM_PRIORITY, "Register request");
- this.sched = sched;
}
// We pass in the schedTransient to the next two methods so that we can
select between either of them.
@@ -139,7 +157,25 @@
// We prevent a number of race conditions (e.g. adding a retry count
and then another
// thread removes it cos its empty) ... and in addToGrabArray etc we
already sync on this.
// The worry is ... is there any nested locking outside of the
hierarchy?
- SendableRequest removeFirst(int fuzz, RandomSource random,
OfferedKeysList[] offeredKeys, RequestStarter starter,
ClientRequestSchedulerNonPersistent schedTransient, boolean transientOnly,
short maxPrio, int retryCount) {
+ ChosenRequest removeFirst(int fuzz, RandomSource random,
OfferedKeysList[] offeredKeys, RequestStarter starter,
ClientRequestSchedulerNonPersistent schedTransient, boolean transientOnly,
short maxPrio, int retryCount) {
+ SendableRequest req = removeFirstInner(fuzz, random,
offeredKeys, starter, schedTransient, transientOnly, maxPrio, retryCount);
+ Object token = req.chooseKey(this);
+ if(token == null) {
+ return null;
+ } else {
+ Key key;
+ if(isInsertScheduler)
+ key = null;
+ else
+ key = ((BaseSendableGet)req).getNodeKey(token);
+ ChosenRequest ret = new ChosenRequest(req, token, key);
+ if(key != null)
+ keysFetching.add(key);
+ return ret;
+ }
+ }
+
+ SendableRequest removeFirstInner(int fuzz, RandomSource random,
OfferedKeysList[] offeredKeys, RequestStarter starter,
ClientRequestSchedulerNonPersistent schedTransient, boolean transientOnly,
short maxPrio, int retryCount) {
// Priorities start at 0
if(logMINOR) Logger.minor(this, "removeFirst()");
boolean tryOfferedKeys = offeredKeys != null &&
random.nextBoolean();
@@ -156,7 +192,7 @@
for(;choosenPriorityClass <=
RequestStarter.MINIMUM_PRIORITY_CLASS;choosenPriorityClass++) {
if(logMINOR) Logger.minor(this, "Using priority
"+choosenPriorityClass);
if(tryOfferedKeys) {
-
if(offeredKeys[choosenPriorityClass].hasValidKeys(starter))
+ if(offeredKeys[choosenPriorityClass].hasValidKeys(this))
return offeredKeys[choosenPriorityClass];
}
SortedVectorByNumber perm = null;
@@ -401,6 +437,18 @@
}
}
}
+
+ public boolean hasKey(Key key) {
+ synchronized(keysFetching) {
+ return keysFetching.contains(key);
+ }
+ }
+
+ public void removeFetchingKey(Key key) {
+ synchronized(keysFetching) {
+ keysFetching.remove(key);
+ }
+ }
}
Modified: branches/db4o/freenet/src/freenet/node/RequestScheduler.java
===================================================================
--- branches/db4o/freenet/src/freenet/node/RequestScheduler.java
2008-05-31 12:35:43 UTC (rev 20154)
+++ branches/db4o/freenet/src/freenet/node/RequestScheduler.java
2008-05-31 13:12:13 UTC (rev 20155)
@@ -5,7 +5,9 @@
import java.util.LinkedList;
+import freenet.client.async.ChosenRequest;
import freenet.keys.ClientKey;
+import freenet.keys.Key;
public interface RequestScheduler {
@@ -45,6 +47,10 @@
public LinkedList getRequestStarterQueue();
- public SendableRequest getBetterNonPersistentRequest(SendableRequest
req);
+ public ChosenRequest getBetterNonPersistentRequest(ChosenRequest req);
+
+ public KeysFetchingLocally fetchingKeys();
+
+ public void removeFetchingKey(Key key);
}
Modified: branches/db4o/freenet/src/freenet/node/RequestStarter.java
===================================================================
--- branches/db4o/freenet/src/freenet/node/RequestStarter.java 2008-05-31
12:35:43 UTC (rev 20154)
+++ branches/db4o/freenet/src/freenet/node/RequestStarter.java 2008-05-31
13:12:13 UTC (rev 20155)
@@ -6,6 +6,7 @@
import java.util.HashSet;
import java.util.LinkedList;
+import freenet.client.async.ChosenRequest;
import freenet.keys.Key;
import freenet.support.Logger;
import freenet.support.OOMHandler;
@@ -20,7 +21,7 @@
* And you have to provide a RequestStarterClient. We do round robin between
* clients on the same priority level.
*/
-public class RequestStarter implements Runnable, KeysFetchingLocally,
RandomGrabArrayItemExclusionList {
+public class RequestStarter implements Runnable,
RandomGrabArrayItemExclusionList {
/*
* Priority classes
@@ -77,7 +78,6 @@
this.averageInputBytesPerRequest = averageInputBytesPerRequest;
this.isInsert = isInsert;
this.isSSK = isSSK;
- if(!isInsert) keysFetching = new HashSet();
}
void setScheduler(RequestScheduler sched) {
@@ -97,7 +97,7 @@
}
void realRun() {
- SendableRequest req = null;
+ ChosenRequest req = null;
sentRequestTime = System.currentTimeMillis();
// The last time at which we sent a request or decided not to
long cycleTime = sentRequestTime;
@@ -171,7 +171,7 @@
}
if(req == null) continue;
if(!startRequest(req, logMINOR)) {
- if(!req.isCancelled())
+ if(!req.request.isCancelled())
Logger.normal(this, "No requests to
start on "+req);
}
req = null;
@@ -189,16 +189,16 @@
* thread is probably doing other things so we have to wait for that to
finish).
* @return
*/
- private SendableRequest getRequest() {
- SendableRequest req;
+ private ChosenRequest getRequest() {
+ ChosenRequest req;
while(true) {
synchronized(queue) {
- req = (SendableRequest) queue.removeFirst();
+ req = (ChosenRequest) queue.removeFirst();
}
- if(req.isCancelled()) continue;
+ if(req.request.isCancelled()) continue;
break;
}
- SendableRequest betterReq =
sched.getBetterNonPersistentRequest(req);
+ ChosenRequest betterReq =
sched.getBetterNonPersistentRequest(req);
if(req != null) {
if(betterReq != null) {
synchronized(queue) {
@@ -213,59 +213,10 @@
return req;
}
- /**
- * All Key's we are currently fetching.
- * Locally originated requests only, avoids some complications with
HTL,
- * and also has the benefit that we can see stuff that's been scheduled
on a SenderThread
- * but that thread hasn't started yet. FIXME: Both issues can be
avoided: first we'd get
- * rid of the SenderThread and start the requests directly and
asynchronously, secondly
- * we'd move this to node but only track keys we are fetching at max
HTL.
- * LOCKING: Always lock this LAST.
- */
- private HashSet keysFetching;
-
- private boolean startRequest(SendableRequest req, boolean logMINOR) {
- // Create a thread to handle starting the request, and the
resulting feedback
- Object keyNum = null;
- Key key = null;
- while(true) {
- try {
- keyNum = req.chooseKey(isInsert ? null : this);
- if(keyNum == null) return false;
- if(!isInsert) {
- key =
((BaseSendableGet)req).getNodeKey(keyNum);
- if(key == null) return false;
- synchronized(keysFetching) {
- keysFetching.add(key);
- }
- }
- core.getExecutor().execute(new
SenderThread(req, keyNum, key), "RequestStarter$SenderThread for "+req);
- if(logMINOR) Logger.minor(this, "Started
"+req+" key "+keyNum);
- return true;
- } catch (OutOfMemoryError e) {
- OOMHandler.handleOOM(e);
- System.err.println("Will retry above failed
operation...");
- // Possibly out of threads
- try {
- Thread.sleep(5000);
- } catch (InterruptedException e1) {
- // Ignore
- }
- synchronized(keysFetching) {
- if(key != null)
keysFetching.remove(key);
- }
- } catch (Throwable t) {
- if(keyNum != null) {
- // Re-queue
- Logger.error(this, "Caught "+t+" while
trying to start request");
- req.internalError(keyNum, t, sched);
- return true; // Sort of ... maybe it
will clear
- }
- synchronized(keysFetching) {
- if(key != null)
keysFetching.remove(key);
- }
- }
- }
+ private boolean startRequest(ChosenRequest req, boolean logMINOR) {
+ if(sched.fetchingKeys().hasKey(req.key)) return false;
+ core.getExecutor().execute(new SenderThread(req.request,
req.token, req.key), "RequestStarter$SenderThread for "+req);
+ return true;
}
public void run() {
@@ -306,9 +257,7 @@
Logger.minor(this, "Finished "+req);
} finally {
if(!isInsert) {
- synchronized(keysFetching) {
- keysFetching.remove(key);
- }
+ sched.removeFetchingKey(key);
}
}
}
@@ -321,16 +270,10 @@
}
}
- public boolean hasKey(Key key) {
- synchronized(keysFetching) {
- return keysFetching.contains(key);
- }
- }
-
public boolean exclude(RandomGrabArrayItem item) {
if(isInsert) return false;
BaseSendableGet get = (BaseSendableGet) item;
- if(get.hasValidKeys(this))
+ if(get.hasValidKeys(sched.fetchingKeys()))
return false;
Logger.normal(this, "Excluding (no valid keys): "+get);
return true;