Author: toad
Date: 2008-05-22 22:10:15 +0000 (Thu, 22 May 2008)
New Revision: 20048
Modified:
branches/db4o/freenet/src/freenet/client/async/ClientRequestScheduler.java
branches/db4o/freenet/src/freenet/node/NodeClientCore.java
branches/db4o/freenet/src/freenet/node/RequestScheduler.java
branches/db4o/freenet/src/freenet/node/RequestStarterGroup.java
branches/db4o/freenet/src/freenet/node/SendableGet.java
branches/db4o/freenet/src/freenet/node/simulator/RealNodeBusyNetworkTest.java
Log:
Beginnings of single-database-thread support.
Modified:
branches/db4o/freenet/src/freenet/client/async/ClientRequestScheduler.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/ClientRequestScheduler.java
2008-05-22 19:52:56 UTC (rev 20047)
+++ branches/db4o/freenet/src/freenet/client/async/ClientRequestScheduler.java
2008-05-22 22:10:15 UTC (rev 20048)
@@ -24,6 +24,7 @@
import freenet.node.SendableInsert;
import freenet.node.SendableRequest;
import freenet.support.Logger;
+import freenet.support.SerialExecutor;
import freenet.support.api.StringCallback;
/**
@@ -92,6 +93,7 @@
public final String name;
private final CooldownQueue transientCooldownQueue;
private final CooldownQueue persistentCooldownQueue;
+ private final SerialExecutor databaseExecutor;
public static final String PRIORITY_NONE = "NONE";
public static final String PRIORITY_SOFT = "SOFT";
@@ -103,6 +105,7 @@
schedCore = ClientRequestSchedulerCore.create(node, forInserts,
forSSKs, selectorContainer, COOLDOWN_PERIOD);
schedTransient = new ClientRequestSchedulerNonPersistent(this);
persistentCooldownQueue = schedCore.persistentCooldownQueue;
+ this.databaseExecutor = core.clientDatabaseExecutor;
this.starter = starter;
this.random = random;
this.node = node;
@@ -138,7 +141,7 @@
choosenPriorityScheduler = val;
}
- public void register(SendableRequest req) {
+ public void register(final SendableRequest req) {
logMINOR = Logger.shouldLog(Logger.MINOR, this);
if(logMINOR) Logger.minor(this, "Registering "+req, new
Exception("debug"));
if(isInsertScheduler != (req instanceof SendableInsert))
@@ -196,17 +199,35 @@
}
}
}
- if(req.persistent())
- schedCore.innerRegister(req, random);
- else
+ if(req.persistent()) {
+ databaseExecutor.execute(new Runnable() {
+ public void run() {
+ try {
+ schedCore.innerRegister(req,
random);
+ starter.wakeUp();
+ } catch (Throwable t) {
+ Logger.error(this, "Caught "+t,
t);
+ }
+ }
+ }, "Register request");
+ } else {
schedTransient.innerRegister(req, random);
- starter.wakeUp();
+ starter.wakeUp();
+ }
}
- void addPendingKey(ClientKey key, SendableGet getter) {
- if(getter.persistent())
- schedCore.addPendingKey(key, getter);
- else
+ void addPendingKey(final ClientKey key, final SendableGet getter) {
+ if(getter.persistent()) {
+ databaseExecutor.execute(new Runnable() {
+ public void run() {
+ try {
+ schedCore.addPendingKey(key,
getter);
+ } catch (Throwable t) {
+ Logger.error(this, "Caught "+t,
t);
+ }
+ }
+ }, "Add pending key");
+ } else
schedTransient.addPendingKey(key, getter);
}
@@ -220,18 +241,30 @@
return schedCore.removeFirst(fuzz, random, offeredKeys,
starter, schedTransient);
}
- public void removePendingKey(SendableGet getter, boolean complain, Key
key) {
- boolean dropped =
- schedCore.removePendingKey(getter, complain, key) |
- schedTransient.removePendingKey(getter, complain, key);
+ public void removePendingKey(final SendableGet getter, final boolean
complain, final Key key) {
+ boolean dropped = schedTransient.removePendingKey(getter,
complain, key);
if(dropped && offeredKeys != null && !node.peersWantKey(key)) {
for(int i=0;i<offeredKeys.length;i++)
offeredKeys[i].remove(key);
}
if(transientCooldownQueue != null)
transientCooldownQueue.removeKey(key, getter,
getter.getCooldownWakeupByKey(key), null);
- if(persistentCooldownQueue != null)
- persistentCooldownQueue.removeKey(key, getter,
getter.getCooldownWakeupByKey(key), selectorContainer);
+
+ // Now the persistent clients...
+
+ databaseExecutor.execute(new Runnable() {
+ public void run() {
+ try {
+ schedCore.removePendingKey(getter,
complain, key);
+ if(persistentCooldownQueue != null)
+
persistentCooldownQueue.removeKey(key, getter,
getter.getCooldownWakeupByKey(key), selectorContainer);
+ } catch (Throwable t) {
+ Logger.error(this, "Caught "+t, t);
+ }
+ }
+
+ }, "removePendingKey");
+
}
/**
@@ -241,6 +274,7 @@
* @param complain
*/
public void removePendingKeys(SendableGet getter, boolean complain) {
+ // FIXME should this be a single databaseExecutor thread??
Object[] keyTokens = getter.allKeys();
for(int i=0;i<keyTokens.length;i++) {
Object tok = keyTokens[i];
@@ -254,47 +288,58 @@
}
}
- public void reregisterAll(ClientRequester request) {
- if(request.persistent())
- schedCore.reregisterAll(request, random, this);
- else
+ public void reregisterAll(final ClientRequester request) {
+ if(request.persistent()) {
+ databaseExecutor.execute(new Runnable() {
+ public void run() {
+ try {
+
schedCore.reregisterAll(request, random, ClientRequestScheduler.this);
+ starter.wakeUp();
+ } catch (Throwable t) {
+ Logger.error(this, "Caught "+t,
t);
+ }
+ }
+ }, "Reregister for "+request);
+ } else {
schedTransient.reregisterAll(request, random, this);
- starter.wakeUp();
+ starter.wakeUp();
+ }
}
public String getChoosenPriorityScheduler() {
return choosenPriorityScheduler;
}
- public synchronized void succeeded(BaseSendableGet succeeded) {
- if(succeeded.persistent())
- schedCore.succeeded(succeeded);
- else
+ public synchronized void succeeded(final BaseSendableGet succeeded) {
+ if(succeeded.persistent()) {
+ databaseExecutor.execute(new Runnable() {
+ public void run() {
+ try {
+ schedCore.succeeded(succeeded);
+ } catch (Throwable t) {
+ Logger.error(this, "Caught "+t,
t);
+ }
+ }
+ }, "Mark success for "+succeeded);
+ } else
schedTransient.succeeded(succeeded);
}
public void tripPendingKey(final KeyBlock block) {
if(logMINOR) Logger.minor(this,
"tripPendingKey("+block.getKey()+")");
+
+ // First the transient stuff
+
if(offeredKeys != null) {
for(int i=0;i<offeredKeys.length;i++) {
offeredKeys[i].remove(block.getKey());
}
}
final Key key = block.getKey();
- final SendableGet[] gets = schedCore.removePendingKey(key);
final SendableGet[] transientGets =
schedTransient.removePendingKey(key);
- if(gets == null) return;
- if(transientCooldownQueue != null) {
- for(int i=0;i<gets.length;i++)
- transientCooldownQueue.removeKey(key,
transientGets[i], transientGets[i].getCooldownWakeupByKey(key), null);
- }
- if(persistentCooldownQueue != null) {
- for(int i=0;i<gets.length;i++)
- persistentCooldownQueue.removeKey(key, gets[i],
gets[i].getCooldownWakeupByKey(key), selectorContainer);
- }
- Runnable r = new Runnable() {
+ node.executor.execute(new Runnable() {
public void run() {
- if(logMINOR) Logger.minor(this, "Running
"+gets.length+" callbacks off-thread for "+block.getKey());
+ if(logMINOR) Logger.minor(this, "Running
"+transientGets.length+" callbacks off-thread for "+block.getKey());
for(int i=0;i<transientGets.length;i++) {
try {
if(logMINOR) Logger.minor(this,
"Calling callback for "+transientGets[i]+" for "+key);
@@ -303,6 +348,27 @@
Logger.error(this, "Caught
"+t+" running callback "+transientGets[i]+" for "+key);
}
}
+ }
+ }, "Running off-thread callbacks for "+block.getKey());
+ if(transientCooldownQueue != null) {
+ for(int i=0;i<transientGets.length;i++)
+ transientCooldownQueue.removeKey(key,
transientGets[i], transientGets[i].getCooldownWakeupByKey(key), null);
+ }
+
+ // Now the persistent stuff
+
+ databaseExecutor.execute(new Runnable() {
+
+ public void run() {
+ final SendableGet[] gets =
schedCore.removePendingKey(key);
+ if(gets == null) return;
+ if(persistentCooldownQueue != null) {
+ for(int i=0;i<gets.length;i++)
+
persistentCooldownQueue.removeKey(key, gets[i],
gets[i].getCooldownWakeupByKey(key), selectorContainer);
+ }
+ // Call the callbacks on the database executor
thread, because the first thing
+ // they will need to do is access the database
to decide whether they need to
+ // decode, and if so to find the key to decode
with.
for(int i=0;i<gets.length;i++) {
try {
if(logMINOR) Logger.minor(this,
"Calling callback for "+gets[i]+" for "+key);
@@ -313,16 +379,13 @@
}
if(logMINOR) Logger.minor(this, "Finished
running callbacks");
}
- };
- node.getTicker().queueTimedJob(r, 0); // FIXME ideally these
would be completed on a single thread; when we have 1.5, use a dedicated
non-parallel Executor
+
+ }, "tripPendingKey for "+block.getKey());
+
}
- public boolean anyWantKey(Key key) {
- return schedTransient.anyWantKey(key) ||
schedCore.anyWantKey(key);
- }
-
/** If we want the offered key, or if force is enabled, queue it */
- public void maybeQueueOfferedKey(Key key, boolean force) {
+ public void maybeQueueOfferedKey(final Key key, boolean force) {
if(logMINOR)
Logger.minor(this,
"maybeQueueOfferedKey("+key+","+force);
short priority = Short.MAX_VALUE;
@@ -331,12 +394,21 @@
priority =
RequestStarter.IMMEDIATE_SPLITFILE_PRIORITY_CLASS;
}
priority = schedCore.getKeyPrio(key, priority);
- priority = schedTransient.getKeyPrio(key, priority);
- if(priority == Short.MAX_VALUE) return;
- if(logMINOR)
- Logger.minor(this, "Priority: "+priority);
- offeredKeys[priority].queueKey(key);
- starter.wakeUp();
+ if(priority < Short.MAX_VALUE) {
+ offeredKeys[priority].queueKey(key);
+ starter.wakeUp();
+ }
+
+ final short oldPrio = priority;
+
+ databaseExecutor.execute(new Runnable() {
+ public void run() {
+ short priority = schedTransient.getKeyPrio(key,
oldPrio);
+ if(priority >= oldPrio) return; // already on
list at >= priority
+ offeredKeys[priority].queueKey(key);
+ starter.wakeUp();
+ }
+ }, "maybeQueueOfferedKey");
}
public void dequeueOfferedKey(Key key) {
@@ -345,8 +417,11 @@
}
}
+ /**
+ * MUST be called from database thread!
+ */
public long queueCooldown(ClientKey key, SendableGet getter) {
- if(getter.persistent())
+ if(!getter.persistent())
return persistentCooldownQueue.add(key.getNodeKey(),
getter, selectorContainer);
else
return transientCooldownQueue.add(key.getNodeKey(),
getter, null);
@@ -354,7 +429,15 @@
public void moveKeysFromCooldownQueue() {
moveKeysFromCooldownQueue(transientCooldownQueue, null);
- moveKeysFromCooldownQueue(persistentCooldownQueue,
selectorContainer);
+ databaseExecutor.execute(new Runnable() {
+ public void run() {
+ try {
+
moveKeysFromCooldownQueue(persistentCooldownQueue, selectorContainer);
+ } catch (Throwable t) {
+ Logger.error(this, "Caught "+t, t);
+ }
+ }
+ }, "moveKeysFromCooldownQueue");
}
private void moveKeysFromCooldownQueue(CooldownQueue queue,
ObjectContainer container) {
@@ -386,7 +469,7 @@
}
}
- public long countQueuedRequests() {
+ public long countTransientQueuedRequests() {
// Approximately... there might be some overlap in the two
pendingKeys's...
return schedCore.countQueuedRequests() +
schedTransient.countQueuedRequests();
}
Modified: branches/db4o/freenet/src/freenet/node/NodeClientCore.java
===================================================================
--- branches/db4o/freenet/src/freenet/node/NodeClientCore.java 2008-05-22
19:52:56 UTC (rev 20047)
+++ branches/db4o/freenet/src/freenet/node/NodeClientCore.java 2008-05-22
22:10:15 UTC (rev 20048)
@@ -105,6 +105,15 @@
private boolean lazyResume;
protected final Persister persister;
private final SerialExecutor clientSlowSerialExecutor[];
+ /** All client-layer database access occurs on a SerialExecutor, so
that we don't need
+ * to have multiple parallel transactions. Advantages:
+ * - We never have two copies of the same object in RAM, and more
broadly, we don't
+ * need to worry about interactions between objects from different
transactions.
+ * - Only one weak-reference cache for the database.
+ * - No need to refresh live objects.
+ * - Deactivation is simpler.
+ */
+ public final SerialExecutor clientDatabaseExecutor;
public static int maxBackgroundUSKFetchers;
@@ -130,6 +139,7 @@
else prio = NativeThread.MIN_PRIORITY;
clientSlowSerialExecutor[i] = new SerialExecutor(prio);
}
+ clientDatabaseExecutor = new
SerialExecutor(NativeThread.NORM_PRIORITY);
byte[] pwdBuf = new byte[16];
random.nextBytes(pwdBuf);
this.formPassword = Base64.encode(pwdBuf);
@@ -1106,18 +1116,6 @@
return tempDir;
}
- /**
- * Has any client registered an interest in this particular key?
- */
- public boolean clientWantKey(Key key) {
- if(key instanceof NodeCHK)
- return
requestStarters.chkFetchScheduler.anyWantKey(key);
- else if(key instanceof NodeSSK)
- return
requestStarters.sskFetchScheduler.anyWantKey(key);
- else
- throw new IllegalArgumentException("Not a CHK and not
an SSK!");
- }
-
public boolean hasLoadedQueue() {
return fcpServer.hasFinishedStart();
}
@@ -1144,7 +1142,7 @@
return toadletContainer.getBookmarkURIs();
}
- public long countQueuedRequests() {
- return requestStarters.countQueuedRequests();
+ public long countTransientQueuedRequests() {
+ return requestStarters.countTransientQueuedRequests();
}
}
Modified: branches/db4o/freenet/src/freenet/node/RequestScheduler.java
===================================================================
--- branches/db4o/freenet/src/freenet/node/RequestScheduler.java
2008-05-22 19:52:56 UTC (rev 20047)
+++ branches/db4o/freenet/src/freenet/node/RequestScheduler.java
2008-05-22 22:10:15 UTC (rev 20048)
@@ -4,7 +4,6 @@
package freenet.node;
import freenet.keys.ClientKey;
-import freenet.support.RandomGrabArray;
public interface RequestScheduler {
@@ -25,7 +24,7 @@
* @param key The key to be added.
* @return The time at which the key will leave the cooldown queue.
*/
- public long queueCooldown(ClientKey key, SendableGet getter);
+ long queueCooldown(ClientKey key, SendableGet getter);
/**
* Remove keys from the cooldown queue who have now served their time
and can be requested
@@ -40,6 +39,6 @@
* Note: If you don't want your requests to be subject to cooldown
(e.g. in fproxy), make
* your max retry count less than this (and more than -1). */
public static final int COOLDOWN_RETRIES = 3;
- public long countQueuedRequests();
+ public long countTransientQueuedRequests();
}
Modified: branches/db4o/freenet/src/freenet/node/RequestStarterGroup.java
===================================================================
--- branches/db4o/freenet/src/freenet/node/RequestStarterGroup.java
2008-05-22 19:52:56 UTC (rev 20047)
+++ branches/db4o/freenet/src/freenet/node/RequestStarterGroup.java
2008-05-22 22:10:15 UTC (rev 20048)
@@ -224,11 +224,11 @@
return throttleWindow.realCurrentValue();
}
- public long countQueuedRequests() {
- return chkFetchScheduler.countQueuedRequests() +
- sskFetchScheduler.countQueuedRequests() +
- chkPutScheduler.countQueuedRequests() +
- sskPutScheduler.countQueuedRequests();
+ public long countTransientQueuedRequests() {
+ return chkFetchScheduler.countTransientQueuedRequests() +
+ sskFetchScheduler.countTransientQueuedRequests() +
+ chkPutScheduler.countTransientQueuedRequests() +
+ sskPutScheduler.countTransientQueuedRequests();
}
}
Modified: branches/db4o/freenet/src/freenet/node/SendableGet.java
===================================================================
--- branches/db4o/freenet/src/freenet/node/SendableGet.java 2008-05-22
19:52:56 UTC (rev 20047)
+++ branches/db4o/freenet/src/freenet/node/SendableGet.java 2008-05-22
22:10:15 UTC (rev 20048)
@@ -110,6 +110,12 @@
return parent.chkScheduler;
}
+ /**
+ * Callback for when a block is found. Will be called on the database
executor thread.
+ * @param key
+ * @param block
+ * @param sched
+ */
public abstract void onGotKey(Key key, KeyBlock block, RequestScheduler
sched);
/**
Modified:
branches/db4o/freenet/src/freenet/node/simulator/RealNodeBusyNetworkTest.java
===================================================================
---
branches/db4o/freenet/src/freenet/node/simulator/RealNodeBusyNetworkTest.java
2008-05-22 19:52:56 UTC (rev 20047)
+++
branches/db4o/freenet/src/freenet/node/simulator/RealNodeBusyNetworkTest.java
2008-05-22 22:10:15 UTC (rev 20048)
@@ -143,7 +143,7 @@
}
long totalRunningRequests = 0;
for(int j=0;j<nodes.length;j++) {
- totalRunningRequests +=
nodes[j].clientCore.countQueuedRequests();
+ totalRunningRequests +=
nodes[j].clientCore.countTransientQueuedRequests();
}
System.err.println("Running requests: "+totalRunningRequests);
}
@@ -153,7 +153,7 @@
while(true) {
long totalRunningRequests = 0;
for(int i=0;i<nodes.length;i++) {
- totalRunningRequests +=
nodes[i].clientCore.countQueuedRequests();
+ totalRunningRequests +=
nodes[i].clientCore.countTransientQueuedRequests();
}
System.err.println("Running requests: "+totalRunningRequests);
if(totalRunningRequests == 0) return;