Author: toad
Date: 2008-06-20 20:48:24 +0000 (Fri, 20 Jun 2008)
New Revision: 20535
Modified:
branches/db4o/freenet/src/freenet/client/async/ClientRequestScheduler.java
Log:
Move various executor jobs to dbjob's.
Modified:
branches/db4o/freenet/src/freenet/client/async/ClientRequestScheduler.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/ClientRequestScheduler.java
2008-06-20 18:37:18 UTC (rev 20534)
+++ branches/db4o/freenet/src/freenet/client/async/ClientRequestScheduler.java
2008-06-20 20:48:24 UTC (rev 20535)
@@ -102,6 +102,7 @@
final PrioritizedSerialExecutor databaseExecutor;
final PrioritizedSerialExecutor datastoreCheckerExecutor;
public final ClientContext clientContext;
+ final DBJobRunner jobRunner;
public static final String PRIORITY_NONE = "NONE";
public static final String PRIORITY_SOFT = "SOFT";
@@ -142,6 +143,7 @@
transientCooldownQueue = new
RequestCooldownQueue(COOLDOWN_PERIOD);
else
transientCooldownQueue = null;
+ jobRunner = clientContext.jobRunner;
logMINOR = Logger.shouldLog(Logger.MINOR, this);
}
@@ -181,15 +183,15 @@
}, getter.getPriorityClass(), "Checking
datastore");
} else if(persistent) {
- databaseExecutor.execute(new Runnable() {
+ jobRunner.queue(new DBJob() {
- public void run() {
-
schedCore.addPendingKeys(getter, selectorContainer);
+ public void run(ObjectContainer
container, ClientContext context) {
+
schedCore.addPendingKeys(getter, container);
schedCore.queueRegister(getter,
databaseExecutor);
- final Object[] keyTokens =
getter.sendableKeys(selectorContainer);
+ final Object[] keyTokens =
getter.sendableKeys(container);
final ClientKey[] keys = new
ClientKey[keyTokens.length];
for(int
i=0;i<keyTokens.length;i++)
- keys[i] =
getter.getKey(keyTokens[i], selectorContainer);
+ keys[i] =
getter.getKey(keyTokens[i], container);
datastoreCheckerExecutor.execute(new Runnable() {
public void run() {
@@ -199,7 +201,7 @@
}, getter.getPriorityClass(),
"Checking datastore");
}
- }, NativeThread.NORM_PRIORITY, "Registering
request");
+ }, NativeThread.NORM_PRIORITY, false);
} else {
// Not persistent
schedTransient.addPendingKeys(getter, null);
@@ -263,12 +265,13 @@
else {
final SendableGet g = getter;
final Object token = tok;
- databaseExecutor.execute(new Runnable()
{
- public void run() {
- g.onFailure(new
LowLevelGetException(LowLevelGetException.DECODE_FAILED), token,
ClientRequestScheduler.this, selectorContainer, clientContext);
-
selectorContainer.commit();
+ jobRunner.queue(new DBJob() {
+
+ public void run(ObjectContainer
container, ClientContext context) {
+ g.onFailure(new
LowLevelGetException(LowLevelGetException.DECODE_FAILED), token,
ClientRequestScheduler.this, container, context);
}
- }, NativeThread.NORM_PRIORITY, "Block
decode failed");
+
+ }, NativeThread.NORM_PRIORITY, false);
}
continue; // other keys might be valid
}
@@ -280,11 +283,13 @@
final ClientKeyBlock b = block;
final Object t = tok;
final SendableGet g = getter;
- databaseExecutor.execute(new Runnable()
{
- public void run() {
- g.onSuccess(b, true, t,
ClientRequestScheduler.this, selectorContainer, clientContext);
+ jobRunner.queue(new DBJob() {
+
+ public void run(ObjectContainer
container, ClientContext context) {
+ g.onSuccess(b, true, t,
ClientRequestScheduler.this, container, context);
}
- }, NativeThread.NORM_PRIORITY, "Block
found on register");
+
+ }, NativeThread.NORM_PRIORITY, false);
}
// Even with working thread priorities, we
still get very high latency accessing
// the datastore when background threads are
doing it in parallel.
@@ -310,14 +315,15 @@
schedCore.deleteRegisterMe(req);
starter.wakeUp();
} else {
- databaseExecutor.execute(new Runnable() {
- public void run() {
+ jobRunner.queue(new DBJob() {
+
+ public void run(ObjectContainer
container, ClientContext context) {
if(anyValid)
-
schedCore.innerRegister(req, random, selectorContainer);
+
schedCore.innerRegister(req, random, container);
schedCore.deleteRegisterMe(req);
- selectorContainer.commit();
}
- }, NativeThread.NORM_PRIORITY, "Add persistent
job to queue");
+
+ }, NativeThread.NORM_PRIORITY, false);
}
} else {
// Register immediately.
@@ -371,8 +377,7 @@
}
public void queueFillRequestStarterQueue() {
- databaseExecutor.executeNoDupes(requestStarterQueueFiller,
- NativeThread.MAX_PRIORITY, "Fill request
starter queue");
+ jobRunner.queue(requestStarterQueueFiller,
NativeThread.MAX_PRIORITY, true);
}
void addToStarterQueue(PersistentChosenRequest req) {
@@ -381,8 +386,8 @@
}
}
- private Runnable requestStarterQueueFiller = new Runnable() {
- public void run() {
+ private DBJob requestStarterQueueFiller = new DBJob() {
+ public void run(ObjectContainer container, ClientContext
context) {
if(isInsertScheduler && !isSSKScheduler) {
if(logMINOR) Logger.minor(this, "Scheduling
inserts...");
}
@@ -398,7 +403,6 @@
}
if(starterQueue.size() >=
MAX_STARTER_QUEUE_SIZE) finished = true;
}
- selectorContainer.commit();
if(req == null || finished) return;
}
}
@@ -414,19 +418,15 @@
if(transientCooldownQueue != null)
transientCooldownQueue.removeKey(key, getter,
getter.getCooldownWakeupByKey(key, null), null);
} else {
- databaseExecutor.execute(new Runnable() {
- public void run() {
- try {
-
schedCore.removePendingKey(getter, complain, key);
- if(persistentCooldownQueue !=
null)
-
persistentCooldownQueue.removeKey(key, getter,
getter.getCooldownWakeupByKey(key, selectorContainer), selectorContainer);
- selectorContainer.commit();
- } catch (Throwable t) {
- Logger.error(this, "Caught "+t,
t);
- }
+ jobRunner.queue(new DBJob() {
+
+ public void run(ObjectContainer container,
ClientContext context) {
+ schedCore.removePendingKey(getter,
complain, key);
+ if(persistentCooldownQueue != null)
+
persistentCooldownQueue.removeKey(key, getter,
getter.getCooldownWakeupByKey(key, container), container);
}
- }, "removePendingKey");
+ }, NativeThread.NORM_PRIORITY, false);
}
}
@@ -470,18 +470,15 @@
public synchronized void succeeded(final BaseSendableGet succeeded,
final ChosenRequest req) {
if(succeeded.persistent()) {
- databaseExecutor.execute(new Runnable() {
- public void run() {
- try {
- schedCore.succeeded(succeeded);
- if(succeeded.persistent())
-
selectorContainer.delete((PersistentChosenRequest)req);
- selectorContainer.commit();
- } catch (Throwable t) {
- Logger.error(this, "Caught "+t,
t);
- }
+ jobRunner.queue(new DBJob() {
+
+ public void run(ObjectContainer container,
ClientContext context) {
+ schedCore.succeeded(succeeded);
+ if(succeeded.persistent())
+
container.delete((PersistentChosenRequest)req);
}
- }, "Mark success for "+succeeded);
+
+ }, NativeThread.NORM_PRIORITY, false);
} else
schedTransient.succeeded(succeeded);
}
@@ -520,14 +517,14 @@
// Now the persistent stuff
- databaseExecutor.execute(new Runnable() {
+ jobRunner.queue(new DBJob() {
- public void run() {
+ public void run(ObjectContainer container,
ClientContext context) {
final SendableGet[] gets =
schedCore.removePendingKey(key);
if(gets == null) return;
if(persistentCooldownQueue != null) {
for(int i=0;i<gets.length;i++)
-
persistentCooldownQueue.removeKey(key, gets[i],
gets[i].getCooldownWakeupByKey(key, selectorContainer), selectorContainer);
+
persistentCooldownQueue.removeKey(key, gets[i],
gets[i].getCooldownWakeupByKey(key, container), container);
}
// Call the callbacks on the database executor
thread, because the first thing
// they will need to do is access the database
to decide whether they need to
@@ -535,16 +532,15 @@
for(int i=0;i<gets.length;i++) {
try {
if(logMINOR) Logger.minor(this,
"Calling callback for "+gets[i]+" for "+key);
- gets[i].onGotKey(key, block,
ClientRequestScheduler.this, selectorContainer, clientContext);
+ gets[i].onGotKey(key, block,
ClientRequestScheduler.this, container, context);
} catch (Throwable t) {
Logger.error(this, "Caught
"+t+" running callback "+gets[i]+" for "+key);
}
}
if(logMINOR) Logger.minor(this, "Finished
running callbacks");
- selectorContainer.commit();
}
- }, "tripPendingKey for "+block.getKey());
+ }, NativeThread.NORM_PRIORITY, false);
}
@@ -565,15 +561,16 @@
final short oldPrio = priority;
- databaseExecutor.execute(new Runnable() {
- public void run() {
+ jobRunner.queue(new DBJob() {
+
+ public void run(ObjectContainer container,
ClientContext context) {
short priority = schedCore.getKeyPrio(key,
oldPrio);
if(priority >= oldPrio) return; // already on
list at >= priority
offeredKeys[priority].queueKey(key);
starter.wakeUp();
- // No need to commit
}
- }, "maybeQueueOfferedKey");
+
+ }, NativeThread.NORM_PRIORITY, false);
}
public void dequeueOfferedKey(Key key) {
@@ -594,17 +591,14 @@
public void moveKeysFromCooldownQueue() {
moveKeysFromCooldownQueue(transientCooldownQueue, null);
- databaseExecutor.execute(new Runnable() {
- public void run() {
- try {
-
if(moveKeysFromCooldownQueue(persistentCooldownQueue, selectorContainer))
- starter.wakeUp();
- selectorContainer.commit();
- } catch (Throwable t) {
- Logger.error(this, "Caught "+t, t);
- }
+ jobRunner.queue(new DBJob() {
+
+ public void run(ObjectContainer container,
ClientContext context) {
+
if(moveKeysFromCooldownQueue(persistentCooldownQueue, selectorContainer))
+ starter.wakeUp();
}
- }, "moveKeysFromCooldownQueue");
+
+ }, NativeThread.NORM_PRIORITY, false);
}
private boolean moveKeysFromCooldownQueue(CooldownQueue queue,
ObjectContainer container) {
@@ -656,36 +650,39 @@
}
public void callFailure(final SendableGet get, final
LowLevelGetException e, final Object keyNum, int prio, String name, final
ChosenRequest req) {
- databaseExecutor.execute(new Runnable() {
- public void run() {
+ jobRunner.queue(new DBJob() {
+
+ public void run(ObjectContainer container,
ClientContext context) {
get.onFailure(e, keyNum,
ClientRequestScheduler.this, selectorContainer, clientContext);
if(get.persistent())
selectorContainer.delete((PersistentChosenRequest)req);
- selectorContainer.commit();
}
- }, prio, name);
+
+ }, NativeThread.NORM_PRIORITY, false);
}
public void callFailure(final SendableInsert put, final
LowLevelPutException e, final Object keyNum, int prio, String name, final
ChosenRequest req) {
- databaseExecutor.execute(new Runnable() {
- public void run() {
+ jobRunner.queue(new DBJob() {
+
+ public void run(ObjectContainer container,
ClientContext context) {
put.onFailure(e, keyNum, selectorContainer,
clientContext);
if(put.persistent())
selectorContainer.delete((PersistentChosenRequest)req);
- selectorContainer.commit();
}
- }, prio, name);
+
+ }, NativeThread.NORM_PRIORITY, false);
}
public void callSuccess(final SendableInsert put, final Object keyNum,
int prio, String name, final ChosenRequest req) {
- databaseExecutor.execute(new Runnable() {
- public void run() {
+ jobRunner.queue(new DBJob() {
+
+ public void run(ObjectContainer container,
ClientContext context) {
put.onSuccess(keyNum, selectorContainer,
clientContext);
if(put.persistent())
selectorContainer.delete((PersistentChosenRequest)req);
- selectorContainer.commit();
}
- }, prio, name);
+
+ }, NativeThread.NORM_PRIORITY, false);
}
public FECQueue getFECQueue() {