Author: toad
Date: 2008-05-29 23:10:06 +0000 (Thu, 29 May 2008)
New Revision: 20138
Modified:
branches/db4o/freenet/src/freenet/client/async/ClientRequestScheduler.java
branches/db4o/freenet/src/freenet/client/async/ClientRequestSchedulerCore.java
branches/db4o/freenet/src/freenet/support/PrioritizedSerialExecutor.java
Log:
Persistent request registration queue:
Instead of scheduling a persistent request to be registered on the database
thread, add it to the registration queue. Do this on the database thread, but
do it immediately if we are already on the database thread.
On startup, and when adding a new request, this causes a job to be queued on
the database thread which actually calls innerRegister().
Thus, once register() has returned, the request WILL BE REGISTERED.
Modified:
branches/db4o/freenet/src/freenet/client/async/ClientRequestScheduler.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/ClientRequestScheduler.java
2008-05-29 20:14:00 UTC (rev 20137)
+++ branches/db4o/freenet/src/freenet/client/async/ClientRequestScheduler.java
2008-05-29 23:10:06 UTC (rev 20138)
@@ -27,6 +27,7 @@
import freenet.support.PrioritizedSerialExecutor;
import freenet.support.SerialExecutor;
import freenet.support.api.StringCallback;
+import freenet.support.io.NativeThread;
/**
* Every X seconds, the RequestSender calls the ClientRequestScheduler to
@@ -94,7 +95,7 @@
public final String name;
private final CooldownQueue transientCooldownQueue;
private final CooldownQueue persistentCooldownQueue;
- private final PrioritizedSerialExecutor databaseExecutor;
+ final PrioritizedSerialExecutor databaseExecutor;
public static final String PRIORITY_NONE = "NONE";
public static final String PRIORITY_SOFT = "SOFT";
@@ -103,7 +104,7 @@
public ClientRequestScheduler(boolean forInserts, boolean forSSKs,
RandomSource random, RequestStarter starter, Node node, NodeClientCore core,
SubConfig sc, String name) {
this.selectorContainer = node.db;
- schedCore = ClientRequestSchedulerCore.create(node, forInserts,
forSSKs, selectorContainer, COOLDOWN_PERIOD);
+ schedCore = ClientRequestSchedulerCore.create(node, forInserts,
forSSKs, selectorContainer, COOLDOWN_PERIOD, core.clientDatabaseExecutor);
schedTransient = new ClientRequestSchedulerNonPersistent(this);
persistentCooldownQueue = schedCore.persistentCooldownQueue;
this.databaseExecutor = core.clientDatabaseExecutor;
@@ -143,6 +144,10 @@
}
public void register(final SendableRequest req) {
+ register(req, databaseExecutor.onThread());
+ }
+
+ public void register(final SendableRequest req, boolean
onDatabaseThread) {
logMINOR = Logger.shouldLog(Logger.MINOR, this);
if(logMINOR) Logger.minor(this, "Registering "+req, new
Exception("debug"));
if(isInsertScheduler != (req instanceof SendableInsert))
@@ -201,17 +206,21 @@
}
}
if(req.persistent()) {
- databaseExecutor.execute(new Runnable() {
- public void run() {
- try {
- schedCore.innerRegister(req,
random);
- starter.wakeUp();
- } catch (Throwable t) {
- Logger.error(this, "Caught "+t,
t);
+ // Add to the persistent registration queue
+ if(onDatabaseThread) {
+ if(!databaseExecutor.onThread()) {
+ throw new IllegalStateException("Not on
database thread!");
+ }
+ schedCore.queueRegister(req, databaseExecutor);
+ } else {
+ databaseExecutor.execute(new Runnable() {
+ public void run() {
+ schedCore.queueRegister(req,
databaseExecutor);
}
- }
- }, "Register request");
+ }, NativeThread.NORM_PRIORITY, "Add persistent
job to queue");
+ }
} else {
+ // Register immediately.
schedTransient.innerRegister(req, random);
starter.wakeUp();
}
Modified:
branches/db4o/freenet/src/freenet/client/async/ClientRequestSchedulerCore.java
===================================================================
---
branches/db4o/freenet/src/freenet/client/async/ClientRequestSchedulerCore.java
2008-05-29 20:14:00 UTC (rev 20137)
+++
branches/db4o/freenet/src/freenet/client/async/ClientRequestSchedulerCore.java
2008-05-29 23:10:06 UTC (rev 20138)
@@ -8,6 +8,7 @@
import com.db4o.ObjectContainer;
import com.db4o.ObjectSet;
import com.db4o.query.Predicate;
+import com.db4o.query.Query;
import com.db4o.types.Db4oList;
import com.db4o.types.Db4oMap;
@@ -15,12 +16,15 @@
import freenet.node.BaseSendableGet;
import freenet.node.Node;
import freenet.node.RequestStarter;
+import freenet.node.SendableGet;
import freenet.node.SendableRequest;
import freenet.support.Logger;
+import freenet.support.PrioritizedSerialExecutor;
import freenet.support.RandomGrabArray;
import freenet.support.SectoredRandomGrabArrayWithInt;
import freenet.support.SectoredRandomGrabArrayWithObject;
import freenet.support.SortedVectorByNumber;
+import freenet.support.io.NativeThread;
/**
* @author toad
@@ -35,6 +39,8 @@
/** Identifier in the database for the node we are attached to */
private final long nodeDBHandle;
final PersistentCooldownQueue persistentCooldownQueue;
+ private transient RandomSource random;
+ private transient PrioritizedSerialExecutor databaseExecutor;
/**
* Fetch a ClientRequestSchedulerCore from the database, or create a
new one.
@@ -42,9 +48,10 @@
* @param forInserts
* @param forSSKs
* @param selectorContainer
+ * @param executor
* @return
*/
- public static ClientRequestSchedulerCore create(Node node, final
boolean forInserts, final boolean forSSKs, ObjectContainer selectorContainer,
long cooldownTime) {
+ public static ClientRequestSchedulerCore create(Node node, final
boolean forInserts, final boolean forSSKs, ObjectContainer selectorContainer,
long cooldownTime, PrioritizedSerialExecutor databaseExecutor) {
final long nodeDBHandle = node.nodeDBHandle;
ObjectSet results = selectorContainer.query(new Predicate() {
public boolean match(ClientRequestSchedulerCore core) {
@@ -61,7 +68,7 @@
core = new ClientRequestSchedulerCore(node, forInserts,
forSSKs, selectorContainer, cooldownTime);
}
logMINOR = Logger.shouldLog(Logger.MINOR,
ClientRequestSchedulerCore.class);
- core.onStarted(selectorContainer, cooldownTime);
+ core.onStarted(selectorContainer, cooldownTime, node.random,
databaseExecutor);
return core;
}
@@ -76,7 +83,7 @@
}
}
- private void onStarted(ObjectContainer container, long cooldownTime) {
+ private void onStarted(ObjectContainer container, long cooldownTime,
RandomSource random, PrioritizedSerialExecutor databaseExecutor) {
((Db4oMap)pendingKeys).activationDepth(1);
((Db4oMap)allRequestsByClientRequest).activationDepth(1);
((Db4oList)recentSuccesses).activationDepth(1);
@@ -84,6 +91,9 @@
if(!isInsertScheduler) {
persistentCooldownQueue.setCooldownTime(cooldownTime);
}
+ this.random = random;
+ this.databaseExecutor = databaseExecutor;
+ databaseExecutor.execute(registerMeRunner,
NativeThread.NORM_PRIORITY, "Register request");
}
// We pass in the schedTransient to the next two methods so that we can
select between either of them.
@@ -316,5 +326,53 @@
return container;
}
+ private final RegisterMeRunner registerMeRunner = new
RegisterMeRunner();
+ class RegisterMeRunner implements Runnable {
+
+ public void run() {
+ Query query = container.query();
+ query.constrain(RegisterMe.class);
+
query.descend("core").constrain(ClientRequestSchedulerCore.this);
+ query.descend("priority").orderAscending();
+ query.descend("addedTime").orderAscending();
+ ObjectSet result = query.execute();
+ if(result.hasNext()) {
+ RegisterMe reg = (RegisterMe) result.next();
+ if(result.hasNext()) {
+
databaseExecutor.execute(registerMeRunner, NativeThread.NORM_PRIORITY,
"Register request");
+ }
+ container.delete(reg);
+ // Don't need to activate, fields should exist?
FIXME
+ innerRegister(reg.getter, random);
+ }
+ }
+
+ }
+ public void queueRegister(SendableRequest req,
PrioritizedSerialExecutor databaseExecutor) {
+ if(!databaseExecutor.onThread()) {
+ throw new IllegalStateException("Not on database
thread!");
+ }
+ RegisterMe reg = new RegisterMe(req, this);
+ container.set(reg);
+ databaseExecutor.execute(registerMeRunner,
NativeThread.NORM_PRIORITY, "Register request");
+ }
+
+
+
}
+
+class RegisterMe {
+ final SendableRequest getter;
+ final ClientRequestSchedulerCore core;
+ final short priority;
+ final long addedTime;
+
+ RegisterMe(SendableRequest getter, ClientRequestSchedulerCore core) {
+ this.getter = getter;
+ this.core = core;
+ this.addedTime = System.currentTimeMillis();
+ this.priority = getter.getPriorityClass();
+ }
+}
+
Modified:
branches/db4o/freenet/src/freenet/support/PrioritizedSerialExecutor.java
===================================================================
--- branches/db4o/freenet/src/freenet/support/PrioritizedSerialExecutor.java
2008-05-29 20:14:00 UTC (rev 20137)
+++ branches/db4o/freenet/src/freenet/support/PrioritizedSerialExecutor.java
2008-05-29 23:10:06 UTC (rev 20138)
@@ -17,14 +17,19 @@
private boolean running;
private static final int NEWJOB_TIMEOUT = 5*60*1000;
+
+ private final Runner runner = new Runner();
- private final Runnable runner = new PrioRunnable() {
+ class Runner implements PrioRunnable {
+ Thread current;
+
public int getPriority() {
return priority;
}
public void run() {
+ current = Thread.currentThread();
while(true) {
Runnable job = null;
synchronized(jobs) {
@@ -136,4 +141,12 @@
return retval;
}
+ public boolean onThread() {
+ Thread running = Thread.currentThread();
+ synchronized(jobs) {
+ if(runner != null) return false;
+ return runner.current == running;
+ }
+ }
+
}