Author: toad
Date: 2008-07-19 16:48:15 +0000 (Sat, 19 Jul 2008)
New Revision: 21250
Modified:
branches/db4o/freenet/src/freenet/client/async/ClientContext.java
branches/db4o/freenet/src/freenet/client/async/ClientRequestScheduler.java
branches/db4o/freenet/src/freenet/client/async/ClientRequestSchedulerCore.java
branches/db4o/freenet/src/freenet/client/async/DBJobRunner.java
branches/db4o/freenet/src/freenet/client/async/RegisterMe.java
branches/db4o/freenet/src/freenet/node/FailureTable.java
branches/db4o/freenet/src/freenet/support/PrioritizedSerialExecutor.java
Log:
If the queue is full, write the RegisterMe and tell CRSCore to re-run the
RegisterMeRunner.
So we don't need to add an item to the queue, which saves memory etc.
Modified: branches/db4o/freenet/src/freenet/client/async/ClientContext.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/ClientContext.java
2008-07-19 14:46:50 UTC (rev 21249)
+++ branches/db4o/freenet/src/freenet/client/async/ClientContext.java
2008-07-19 16:48:15 UTC (rev 21250)
@@ -42,8 +42,10 @@
public transient final HealingQueue healingQueue;
public transient final USKManager uskManager;
public transient final Random fastWeakRandom;
+ public transient final long bootID;
public ClientContext(NodeClientCore core) {
+ this.bootID = core.node.bootID;
this.fecQueue = core.fecQueue;
jobRunner = core;
this.mainExecutor = core.getExecutor();
Modified:
branches/db4o/freenet/src/freenet/client/async/ClientRequestScheduler.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/ClientRequestScheduler.java
2008-07-19 14:46:50 UTC (rev 21249)
+++ branches/db4o/freenet/src/freenet/client/async/ClientRequestScheduler.java
2008-07-19 16:48:15 UTC (rev 21250)
@@ -156,17 +156,24 @@
public void registerInsert(final SendableRequest req, boolean
persistent, boolean regmeOnly) {
registerInsert(req, persistent, regmeOnly,
databaseExecutor.onThread());
}
+
+ static final int QUEUE_THRESHOLD = 250;
public void registerInsert(final SendableRequest req, boolean
persistent, boolean regmeOnly, boolean onDatabaseThread) {
if(persistent) {
if(onDatabaseThread) {
if(regmeOnly) {
- final RegisterMe regme = new
RegisterMe(null, null, req, req.getPriorityClass(selectorContainer), schedCore,
null);
+ long bootID = 0;
+ boolean queueFull =
jobRunner.getQueueSize(NativeThread.NORM_PRIORITY) <= QUEUE_THRESHOLD;
+ if(!queueFull)
+ bootID = this.node.bootID;
+ final RegisterMe regme = new
RegisterMe(null, null, req, req.getPriorityClass(selectorContainer), schedCore,
null, bootID);
selectorContainer.set(regme);
if(logMINOR)
Logger.minor(this, "Added
insert RegisterMe: "+regme);
+ if(!queueFull) {
jobRunner.queue(new DBJob() {
-
+
public void run(ObjectContainer
container, ClientContext context) {
container.delete(regme);
container.activate(req,
1);
@@ -174,6 +181,9 @@
}
}, NativeThread.NORM_PRIORITY, false);
+ } else {
+
schedCore.rerunRegisterMeRunner(jobRunner);
+ }
selectorContainer.deactivate(req, 1);
return;
}
@@ -258,12 +268,19 @@
if(listener != null) {
if(registerOffThread) {
short prio =
listener.getPriorityClass(selectorContainer);
+ boolean queueFull = false;
if(reg == null) {
- reg = new RegisterMe(listener, getters,
null, prio, schedCore, blocks);
+ long bootID = 0;
+ queueFull =
jobRunner.getQueueSize(NativeThread.NORM_PRIORITY) <= QUEUE_THRESHOLD;
+ if(!queueFull)
+ bootID = this.node.bootID;
+
+ reg = new RegisterMe(listener, getters,
null, prio, schedCore, blocks, bootID);
selectorContainer.set(reg);
}
final RegisterMe regme = reg;
if(logMINOR) Logger.minor(this, "Added regme:
"+regme);
+ if(!queueFull) {
jobRunner.queue(new DBJob() {
public void run(ObjectContainer
container, ClientContext context) {
@@ -277,12 +294,15 @@
}
}, NativeThread.NORM_PRIORITY, false);
+ } else {
+
schedCore.rerunRegisterMeRunner(jobRunner);
+ }
return;
} else {
short prio =
listener.getPriorityClass(selectorContainer);
schedCore.addPendingKeys(listener,
selectorContainer);
if(reg == null && getters != null) {
- reg = new RegisterMe(null, getters,
null, prio, schedCore, blocks);
+ reg = new RegisterMe(null, getters,
null, prio, schedCore, blocks, node.bootID);
selectorContainer.set(reg);
if(logMINOR) Logger.minor(this, "Added
regme: "+reg);
} else {
Modified:
branches/db4o/freenet/src/freenet/client/async/ClientRequestSchedulerCore.java
===================================================================
---
branches/db4o/freenet/src/freenet/client/async/ClientRequestSchedulerCore.java
2008-07-19 14:46:50 UTC (rev 21249)
+++
branches/db4o/freenet/src/freenet/client/async/ClientRequestSchedulerCore.java
2008-07-19 16:48:15 UTC (rev 21250)
@@ -131,6 +131,9 @@
preRegisterMeRunner = new DBJob() {
public void run(ObjectContainer container,
ClientContext context) {
+ synchronized(ClientRequestSchedulerCore.this) {
+ if(registerMeSet != null) return;
+ }
long tStart = System.currentTimeMillis();
// FIXME REDFLAG EVIL DB4O BUG!!!
// FIXME verify and file a bug
@@ -182,7 +185,10 @@
// query.constrain(eval);
//
query.descend("key").descend("priority").orderAscending();
//
query.descend("key").descend("addedTime").orderAscending();
- registerMeSet = query.execute();
+ ObjectSet results = query.execute();
+ synchronized(ClientRequestSchedulerCore.this) {
+ registerMeSet = results;
+ }
long tEnd = System.currentTimeMillis();
if(logMINOR)
Logger.minor(this, "RegisterMe query took
"+(tEnd-tStart)+" hasNext="+registerMeSet.hasNext()+" for
insert="+isInsertScheduler+" ssk="+isSSKScheduler);
@@ -198,9 +204,13 @@
private transient DBJob preRegisterMeRunner;
void start(DBJobRunner runner) {
- runner.queue(preRegisterMeRunner,
NativeThread.NORM_PRIORITY, true);
+ startRegisterMeRunner(runner);
}
+ private final void startRegisterMeRunner(DBJobRunner runner) {
+ runner.queue(preRegisterMeRunner, NativeThread.NORM_PRIORITY,
true);
+ }
+
void fillStarterQueue(ObjectContainer container) {
ObjectSet results = container.query(new Predicate() {
public boolean match(PersistentChosenRequest req) {
@@ -560,6 +570,8 @@
private transient RegisterMeRunner registerMeRunner;
+ private transient boolean shouldReRunRegisterMeRunner;
+
class RegisterMeRunner implements DBJob {
public void run(ObjectContainer container, ClientContext
context) {
@@ -586,6 +598,10 @@
long startNext = System.currentTimeMillis();
RegisterMe reg = (RegisterMe)
registerMeSet.next();
container.activate(reg, 1);
+ if(reg.bootID == context.bootID) {
+ if(logMINOR) Logger.minor(this, "Not
registering block as was added to the queue");
+ continue;
+ }
// FIXME remove the leftover/old core handling
at some point, an NPE is acceptable long-term.
if(reg.core != ClientRequestSchedulerCore.this)
{
if(reg.core == null) {
@@ -652,7 +668,15 @@
context.jobRunner.queue(registerMeRunner,
NativeThread.NORM_PRIORITY-1, true);
else {
if(logMINOR) Logger.minor(this,
"RegisterMeRunner finished");
- registerMeSet = null;
+ boolean rerun;
+ synchronized(ClientRequestSchedulerCore.this) {
+ rerun = shouldReRunRegisterMeRunner;
+ shouldReRunRegisterMeRunner = false;
+ registerMeSet = null;
+ }
+ if(rerun) {
+ preRegisterMeRunner.run(container,
context);
+ }
}
}
@@ -859,5 +883,13 @@
}
}
+ public void rerunRegisterMeRunner(DBJobRunner runner) {
+ synchronized(this) {
+ shouldReRunRegisterMeRunner = true;
+ if(registerMeSet != null) return;
+ }
+ startRegisterMeRunner(runner);
+ }
+
}
Modified: branches/db4o/freenet/src/freenet/client/async/DBJobRunner.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/DBJobRunner.java
2008-07-19 14:46:50 UTC (rev 21249)
+++ branches/db4o/freenet/src/freenet/client/async/DBJobRunner.java
2008-07-19 16:48:15 UTC (rev 21250)
@@ -13,4 +13,6 @@
public boolean onDatabaseThread();
+ public int getQueueSize(int priority);
+
}
Modified: branches/db4o/freenet/src/freenet/client/async/RegisterMe.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/RegisterMe.java
2008-07-19 14:46:50 UTC (rev 21249)
+++ branches/db4o/freenet/src/freenet/client/async/RegisterMe.java
2008-07-19 16:48:15 UTC (rev 21250)
@@ -16,11 +16,16 @@
final SendableRequest nonGetRequest;
final ClientRequestSchedulerCore core;
final RegisterMeSortKey key;
+ /**
+ * Only set if the key is on the queue.
+ */
+ final long bootID;
private final int hashCode;
public final BlockSet blocks;
- RegisterMe(GotKeyListener listener, SendableGet[] getters,
SendableRequest nonGetRequest, short prio, ClientRequestSchedulerCore core,
BlockSet blocks) {
+ RegisterMe(GotKeyListener listener, SendableGet[] getters,
SendableRequest nonGetRequest, short prio, ClientRequestSchedulerCore core,
BlockSet blocks, long bootID) {
this.listener = listener;
+ this.bootID = bootID;
this.getters = getters;
this.core = core;
this.nonGetRequest = nonGetRequest;
Modified: branches/db4o/freenet/src/freenet/node/FailureTable.java
===================================================================
--- branches/db4o/freenet/src/freenet/node/FailureTable.java 2008-07-19
14:46:50 UTC (rev 21249)
+++ branches/db4o/freenet/src/freenet/node/FailureTable.java 2008-07-19
16:48:15 UTC (rev 21250)
@@ -49,9 +49,9 @@
private final Node node;
/** Maximum number of keys to track */
- static final int MAX_ENTRIES = 20*1000;
+ static final int MAX_ENTRIES = 2*1000;
/** Maximum number of offers to track */
- static final int MAX_OFFERS = 10*1000;
+ static final int MAX_OFFERS = 1*1000;
/** Terminate a request if there was a DNF on the same key less than 10
minutes ago */
static final int REJECT_TIME = 10*60*1000;
/** After 1 hour we forget about an entry completely */
Modified:
branches/db4o/freenet/src/freenet/support/PrioritizedSerialExecutor.java
===================================================================
--- branches/db4o/freenet/src/freenet/support/PrioritizedSerialExecutor.java
2008-07-19 14:46:50 UTC (rev 21249)
+++ branches/db4o/freenet/src/freenet/support/PrioritizedSerialExecutor.java
2008-07-19 16:48:15 UTC (rev 21250)
@@ -230,4 +230,10 @@
return retval;
}
+ public int getQueueSize(int priority) {
+ synchronized(jobs) {
+ return jobs[priority].size();
+ }
+ }
+
}