Author: toad
Date: 2008-06-24 15:43:05 +0000 (Tue, 24 Jun 2008)
New Revision: 20648
Modified:
branches/db4o/freenet/src/freenet/client/async/ClientRequestScheduler.java
branches/db4o/freenet/src/freenet/client/async/ClientRequestSchedulerBase.java
branches/db4o/freenet/src/freenet/client/async/ClientRequestSchedulerCore.java
branches/db4o/freenet/src/freenet/client/async/ClientRequestSchedulerNonPersistent.java
Log:
Remove ObjectContainer from CRSCore.
Modified:
branches/db4o/freenet/src/freenet/client/async/ClientRequestScheduler.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/ClientRequestScheduler.java
2008-06-24 15:34:20 UTC (rev 20647)
+++ branches/db4o/freenet/src/freenet/client/async/ClientRequestScheduler.java
2008-06-24 15:43:05 UTC (rev 20648)
@@ -115,7 +115,7 @@
this.selectorContainer = node.db;
schedCore = ClientRequestSchedulerCore.create(node, forInserts,
forSSKs, selectorContainer, COOLDOWN_PERIOD, core.clientDatabaseExecutor, this,
context);
schedTransient = new ClientRequestSchedulerNonPersistent(this);
- schedCore.fillStarterQueue();
+ schedCore.fillStarterQueue(selectorContainer);
schedCore.start(core);
persistentCooldownQueue = schedCore.persistentCooldownQueue;
this.databaseExecutor = core.clientDatabaseExecutor;
@@ -165,7 +165,7 @@
public void register(final SendableRequest req, boolean
onDatabaseThread, final RegisterMe reg) {
logMINOR = Logger.shouldLog(Logger.MINOR, this);
if(logMINOR) Logger.minor(this, "Registering "+req, new
Exception("debug"));
- boolean persistent = req.persistent();
+ final boolean persistent = req.persistent();
if(isInsertScheduler != (req instanceof SendableInsert))
throw new IllegalArgumentException("Expected a
SendableInsert: "+req);
if(req instanceof SendableGet) {
@@ -173,7 +173,7 @@
if(persistent && onDatabaseThread) {
schedCore.addPendingKeys(getter,
selectorContainer);
- schedCore.queueRegister(getter,
databaseExecutor);
+ schedCore.queueRegister(getter,
databaseExecutor, selectorContainer);
final Object[] keyTokens =
getter.sendableKeys(selectorContainer);
final ClientKey[] keys = new
ClientKey[keyTokens.length];
for(int i=0;i<keyTokens.length;i++)
@@ -190,7 +190,7 @@
public void run(ObjectContainer
container, ClientContext context) {
schedCore.addPendingKeys(getter, container);
- schedCore.queueRegister(getter,
databaseExecutor);
+ schedCore.queueRegister(getter,
databaseExecutor, container);
final Object[] keyTokens =
getter.sendableKeys(container);
final ClientKey[] keys = new
ClientKey[keyTokens.length];
for(int
i=0;i<keyTokens.length;i++)
@@ -222,12 +222,26 @@
}, getter.getPriorityClass(), "Checking
datastore");
}
} else {
- if(persistent)
- schedCore.queueRegister(req, databaseExecutor);
- // Pretend to not be on the database thread.
- // In some places (e.g. SplitFileInserter.start(), we
call register() *many* times within a single transaction.
- // We can greatly improve responsiveness at the cost of
some throughput and RAM by only adding the tags at this point.
- finishRegister(req, persistent, false, true, reg);
+ if(persistent) {
+ if(onDatabaseThread) {
+ schedCore.queueRegister(req,
databaseExecutor, selectorContainer);
+ finishRegister(req, persistent, false,
true, reg);
+ } else {
+ jobRunner.queue(new DBJob() {
+
+ public void run(ObjectContainer
container, ClientContext context) {
+
schedCore.queueRegister(req, databaseExecutor, selectorContainer);
+ // Pretend to not be on
the database thread.
+ // In some places (e.g.
SplitFileInserter.start(), we call register() *many* times within a single
transaction.
+ // We can greatly
improve responsiveness at the cost of some throughput and RAM by only adding
the tags at this point.
+ finishRegister(req,
persistent, false, true, reg);
+ }
+
+ }, NativeThread.NORM_PRIORITY, false);
+ }
+ } else {
+ finishRegister(req, persistent, false, true,
reg);
+ }
}
}
@@ -366,7 +380,7 @@
schedTransient.addPendingKey(key, getter);
}
- private synchronized ChosenRequest removeFirst() {
+ private synchronized ChosenRequest removeFirst(ObjectContainer
container) {
if(!databaseExecutor.onThread()) {
throw new IllegalStateException("Not on database
thread!");
}
@@ -376,7 +390,7 @@
else if(PRIORITY_HARD.equals(choosenPriorityScheduler))
fuzz = 0;
// schedCore juggles both
- return schedCore.removeFirst(fuzz, random, offeredKeys,
starter, schedTransient, false, Short.MAX_VALUE, Short.MAX_VALUE,
clientContext);
+ return schedCore.removeFirst(fuzz, random, offeredKeys,
starter, schedTransient, false, Short.MAX_VALUE, Short.MAX_VALUE,
clientContext, container);
}
public ChosenRequest getBetterNonPersistentRequest(ChosenRequest req) {
@@ -386,10 +400,10 @@
else if(PRIORITY_HARD.equals(choosenPriorityScheduler))
fuzz = 0;
if(req == null)
- return schedCore.removeFirst(fuzz, random, offeredKeys,
starter, schedTransient, true, Short.MAX_VALUE, Integer.MAX_VALUE,
clientContext);
+ return schedCore.removeFirst(fuzz, random, offeredKeys,
starter, schedTransient, true, Short.MAX_VALUE, Integer.MAX_VALUE,
clientContext, null);
short prio = req.request.getPriorityClass();
int retryCount = req.request.getRetryCount();
- return schedCore.removeFirst(fuzz, random, offeredKeys,
starter, schedTransient, true, prio, retryCount, clientContext);
+ return schedCore.removeFirst(fuzz, random, offeredKeys,
starter, schedTransient, true, prio, retryCount, clientContext, null);
}
private static final int MAX_STARTER_QUEUE_SIZE = 100;
@@ -424,7 +438,7 @@
}
}
while(true) {
- req = removeFirst();
+ req = removeFirst(container);
if(req == null) return;
synchronized(starterQueue) {
if(req != null) {
Modified:
branches/db4o/freenet/src/freenet/client/async/ClientRequestSchedulerBase.java
===================================================================
---
branches/db4o/freenet/src/freenet/client/async/ClientRequestSchedulerBase.java
2008-06-24 15:34:20 UTC (rev 20647)
+++
branches/db4o/freenet/src/freenet/client/async/ClientRequestSchedulerBase.java
2008-06-24 15:43:05 UTC (rev 20648)
@@ -61,9 +61,6 @@
abstract boolean persistent();
- /** @return The container if this is persistent, otherwise null */
- abstract ObjectContainer container();
-
protected ClientRequestSchedulerBase(boolean forInserts, boolean
forSSKs, Map pendingKeys, Map allRequestsByClientRequest, List recentSuccesses)
{
this.isInsertScheduler = forInserts;
this.isSSKScheduler = forSSKs;
@@ -244,14 +241,14 @@
addToGrabArray(req.getPriorityClass(), retryCount,
fixRetryCount(retryCount), req.getClient(), req.getClientRequest(), req,
random, container);
Set v = (Set)
allRequestsByClientRequest.get(req.getClientRequest());
if(v == null) {
- v = makeSetForAllRequestsByClientRequest();
+ v = makeSetForAllRequestsByClientRequest(container);
allRequestsByClientRequest.put(req.getClientRequest(),
v);
}
v.add(req);
if(logMINOR) Logger.minor(this, "Registered "+req+" on
prioclass="+req.getPriorityClass()+", retrycount="+req.getRetryCount()+"
v.size()="+v.size());
}
- protected abstract Set makeSetForAllRequestsByClientRequest();
+ protected abstract Set
makeSetForAllRequestsByClientRequest(ObjectContainer container);
void addToGrabArray(short priorityClass, int retryCount, int rc, Object
client, ClientRequester cr, SendableRequest req, RandomSource random,
ObjectContainer container) {
if((priorityClass > RequestStarter.MINIMUM_PRIORITY_CLASS) ||
(priorityClass < RequestStarter.MAXIMUM_PRIORITY_CLASS))
Modified:
branches/db4o/freenet/src/freenet/client/async/ClientRequestSchedulerCore.java
===================================================================
---
branches/db4o/freenet/src/freenet/client/async/ClientRequestSchedulerCore.java
2008-06-24 15:34:20 UTC (rev 20647)
+++
branches/db4o/freenet/src/freenet/client/async/ClientRequestSchedulerCore.java
2008-06-24 15:43:05 UTC (rev 20648)
@@ -40,7 +40,6 @@
*/
class ClientRequestSchedulerCore extends ClientRequestSchedulerBase implements
KeysFetchingLocally {
- private ObjectContainer container;
private static boolean logMINOR;
/** Identifier in the database for the node we are attached to */
private final long nodeDBHandle;
@@ -92,7 +91,6 @@
ClientRequestSchedulerCore(Node node, boolean forInserts, boolean
forSSKs, ObjectContainer selectorContainer, long cooldownTime) {
super(forInserts, forSSKs, forInserts ? null :
selectorContainer.ext().collections().newHashMap(1024),
selectorContainer.ext().collections().newHashMap(32),
selectorContainer.ext().collections().newLinkedList());
this.nodeDBHandle = node.nodeDBHandle;
- this.container = selectorContainer;
if(!forInserts) {
this.persistentCooldownQueue = new
PersistentCooldownQueue();
} else {
@@ -105,7 +103,6 @@
((Db4oMap)pendingKeys).activationDepth(Integer.MAX_VALUE);
((Db4oMap)allRequestsByClientRequest).activationDepth(1);
((Db4oList)recentSuccesses).activationDepth(Integer.MAX_VALUE);
- this.container = container;
if(!isInsertScheduler) {
persistentCooldownQueue.setCooldownTime(cooldownTime);
}
@@ -147,7 +144,7 @@
runner.queue(preRegisterMeRunner, NativeThread.NORM_PRIORITY,
true);
}
- void fillStarterQueue() {
+ void fillStarterQueue(ObjectContainer container) {
ObjectSet results = container.query(new Predicate() {
public boolean match(PersistentChosenRequest req) {
if(req.core != ClientRequestSchedulerCore.this)
return false;
@@ -206,8 +203,8 @@
// We prevent a number of race conditions (e.g. adding a retry count
and then another
// thread removes it cos its empty) ... and in addToGrabArray etc we
already sync on this.
// The worry is ... is there any nested locking outside of the
hierarchy?
- ChosenRequest removeFirst(int fuzz, RandomSource random,
OfferedKeysList[] offeredKeys, RequestStarter starter,
ClientRequestSchedulerNonPersistent schedTransient, boolean transientOnly,
short maxPrio, int retryCount, ClientContext context) {
- SendableRequest req = removeFirstInner(fuzz, random,
offeredKeys, starter, schedTransient, transientOnly, maxPrio, retryCount,
context);
+ ChosenRequest removeFirst(int fuzz, RandomSource random,
OfferedKeysList[] offeredKeys, RequestStarter starter,
ClientRequestSchedulerNonPersistent schedTransient, boolean transientOnly,
short maxPrio, int retryCount, ClientContext context, ObjectContainer
container) {
+ SendableRequest req = removeFirstInner(fuzz, random,
offeredKeys, starter, schedTransient, transientOnly, maxPrio, retryCount,
context, container);
if(req == null) return null;
Object token = req.chooseKey(this, req.persistent() ? container
: null, context);
if(token == null) {
@@ -243,7 +240,7 @@
}
}
- SendableRequest removeFirstInner(int fuzz, RandomSource random,
OfferedKeysList[] offeredKeys, RequestStarter starter,
ClientRequestSchedulerNonPersistent schedTransient, boolean transientOnly,
short maxPrio, int retryCount, ClientContext context) {
+ SendableRequest removeFirstInner(int fuzz, RandomSource random,
OfferedKeysList[] offeredKeys, RequestStarter starter,
ClientRequestSchedulerNonPersistent schedTransient, boolean transientOnly,
short maxPrio, int retryCount, ClientContext context, ObjectContainer
container) {
// Priorities start at 0
if(logMINOR) Logger.minor(this, "removeFirst()");
boolean tryOfferedKeys = offeredKeys != null &&
random.nextBoolean();
@@ -453,10 +450,6 @@
return true;
}
- ObjectContainer container() {
- return container;
- }
-
private transient ObjectSet registerMeSet;
private transient RegisterMeRunner registerMeRunner;
@@ -513,7 +506,7 @@
}
}
- public void queueRegister(SendableRequest req,
PrioritizedSerialExecutor databaseExecutor) {
+ public void queueRegister(SendableRequest req,
PrioritizedSerialExecutor databaseExecutor, ObjectContainer container) {
if(!databaseExecutor.onThread()) {
throw new IllegalStateException("Not on database
thread!");
}
@@ -531,8 +524,8 @@
synchronized(keysFetching) {
keysFetching.remove(key);
}
- sched.databaseExecutor.execute(new Runnable() {
- public void run() {
+ sched.clientContext.jobRunner.queue(new DBJob() {
+ public void run(ObjectContainer container,
ClientContext context) {
ObjectSet results = container.query(new
Predicate() {
public boolean
match(PersistentChosenRequest req) {
if(req.core !=
ClientRequestSchedulerCore.this) return false;
@@ -545,10 +538,10 @@
container.commit();
}
}
- }, NativeThread.NORM_PRIORITY, "Remove fetching key");
+ }, NativeThread.NORM_PRIORITY, false);
}
- protected Set makeSetForAllRequestsByClientRequest() {
+ protected Set makeSetForAllRequestsByClientRequest(ObjectContainer
container) {
return new Db4oSet(container, 1);
}
Modified:
branches/db4o/freenet/src/freenet/client/async/ClientRequestSchedulerNonPersistent.java
===================================================================
---
branches/db4o/freenet/src/freenet/client/async/ClientRequestSchedulerNonPersistent.java
2008-06-24 15:34:20 UTC (rev 20647)
+++
branches/db4o/freenet/src/freenet/client/async/ClientRequestSchedulerNonPersistent.java
2008-06-24 15:43:05 UTC (rev 20648)
@@ -39,7 +39,7 @@
return null;
}
- protected Set makeSetForAllRequestsByClientRequest() {
+ protected Set makeSetForAllRequestsByClientRequest(ObjectContainer
ignored) {
return new HashSet();
}