Author: toad
Date: 2008-08-28 17:38:32 +0000 (Thu, 28 Aug 2008)
New Revision: 22200
Modified:
branches/db4o/freenet/src/freenet/client/async/BaseSingleFileFetcher.java
branches/db4o/freenet/src/freenet/client/async/ClientGetter.java
branches/db4o/freenet/src/freenet/client/async/ClientRequestScheduler.java
branches/db4o/freenet/src/freenet/client/async/ClientRequestSchedulerBase.java
branches/db4o/freenet/src/freenet/client/async/ClientRequestSchedulerCore.java
branches/db4o/freenet/src/freenet/client/async/DatastoreChecker.java
branches/db4o/freenet/src/freenet/client/async/InsertCompressor.java
branches/db4o/freenet/src/freenet/client/async/PersistentChosenRequest.java
branches/db4o/freenet/src/freenet/client/async/SingleBlockInserter.java
branches/db4o/freenet/src/freenet/client/async/SingleFileFetcher.java
branches/db4o/freenet/src/freenet/client/async/SplitFileFetcher.java
branches/db4o/freenet/src/freenet/client/async/SplitFileFetcherKeyListener.java
branches/db4o/freenet/src/freenet/client/async/SplitFileFetcherSegment.java
branches/db4o/freenet/src/freenet/client/async/SplitFileFetcherSubSegment.java
branches/db4o/freenet/src/freenet/node/RequestScheduler.java
branches/db4o/freenet/src/freenet/node/RequestStarter.java
branches/db4o/freenet/src/freenet/node/SendableRequest.java
branches/db4o/freenet/src/freenet/node/SimpleSendableInsert.java
branches/db4o/freenet/src/freenet/support/RandomGrabArray.java
Log:
Partially fix request priorities bug in maybeAddToStarterQueue().
Remove ClientRequestScheduler.selectorContainer.
ALWAYS pass in the container from a dbjob.
CRSBase: persistent() not req.persistent().
Activation: remove pending keys *after* activating.
Deactivation: non-redundant splitfiles.
Activation: DatastoreChecker finishRegister() caller.
Deactivation: SplitFileFetcherSubSegment
Logging: detect when stuff is already activated when entering a db job in
various places. This indicates a serious problem, which turns out (with more
logging in SubSegment) to be the fact that db4o activates stuff on commit but
doesn't deactivate it.
More logging!
More deactivation.
Modified:
branches/db4o/freenet/src/freenet/client/async/BaseSingleFileFetcher.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/BaseSingleFileFetcher.java
2008-08-28 16:55:45 UTC (rev 22199)
+++ branches/db4o/freenet/src/freenet/client/async/BaseSingleFileFetcher.java
2008-08-28 17:38:32 UTC (rev 22200)
@@ -105,7 +105,7 @@
if(persistent)
container.activate(key, 5);
RequestScheduler sched =
context.getFetchScheduler(key instanceof ClientSSK);
- cooldownWakeupTime =
sched.queueCooldown(key, this);
+ cooldownWakeupTime =
sched.queueCooldown(key, this, container);
if(persistent)
container.deactivate(key, 5);
}
@@ -261,7 +261,7 @@
container.activate(ctx.blocks, 5);
}
try {
- getScheduler(context).register(this, new SendableGet[]
{ this }, persistent, true, ctx.blocks, false);
+ getScheduler(context).register(this, new SendableGet[]
{ this }, persistent, true, container, ctx.blocks, false);
} catch (KeyListenerConstructionException e) {
Logger.error(this, "Impossible: "+e+" on "+this, e);
}
@@ -274,7 +274,7 @@
container.activate(ctx.blocks, 5);
}
try {
- getScheduler(context).register(null, new SendableGet[]
{ this }, persistent, true, ctx.blocks, true);
+ getScheduler(context).register(null, new SendableGet[]
{ this }, persistent, true, container, ctx.blocks, true);
} catch (KeyListenerConstructionException e) {
Logger.error(this, "Impossible: "+e+" on "+this, e);
}
Modified: branches/db4o/freenet/src/freenet/client/async/ClientGetter.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/ClientGetter.java
2008-08-28 16:55:45 UTC (rev 22199)
+++ branches/db4o/freenet/src/freenet/client/async/ClientGetter.java
2008-08-28 17:38:32 UTC (rev 22200)
@@ -231,6 +231,8 @@
if(persistent())
container.activate(s, 1);
s.cancel(container, context);
+ if(persistent())
+ container.deactivate(s, 1);
}
}
Modified:
branches/db4o/freenet/src/freenet/client/async/ClientRequestScheduler.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/ClientRequestScheduler.java
2008-08-28 16:55:45 UTC (rev 22199)
+++ branches/db4o/freenet/src/freenet/client/async/ClientRequestScheduler.java
2008-08-28 17:38:32 UTC (rev 22200)
@@ -85,12 +85,6 @@
}
}
- /** Long-lived container for use by the selector thread.
- * We commit when we move a request to a lower retry level.
- * We need to refresh objects when we activate them.
- */
- final ObjectContainer selectorContainer;
-
/** This DOES NOT PERSIST */
private final OfferedKeysList[] offeredKeys;
// we have one for inserts and one for requests
@@ -115,8 +109,7 @@
public ClientRequestScheduler(boolean forInserts, boolean forSSKs,
RandomSource random, RequestStarter starter, Node node, NodeClientCore core,
SubConfig sc, String name, ClientContext context) {
this.isInsertScheduler = forInserts;
this.isSSKScheduler = forSSKs;
- this.selectorContainer = node.db;
- schedCore = ClientRequestSchedulerCore.create(node, forInserts,
forSSKs, selectorContainer, COOLDOWN_PERIOD, core.clientDatabaseExecutor, this,
context);
+ schedCore = ClientRequestSchedulerCore.create(node, forInserts,
forSSKs, node.db, COOLDOWN_PERIOD, core.clientDatabaseExecutor, this, context);
schedTransient = new ClientRequestSchedulerNonPersistent(this,
forInserts, forSSKs);
persistentCooldownQueue = schedCore.persistentCooldownQueue;
this.databaseExecutor = core.clientDatabaseExecutor;
@@ -161,13 +154,13 @@
choosenPriorityScheduler = val;
}
- public void registerInsert(final SendableRequest req, boolean
persistent, boolean regmeOnly) {
- registerInsert(req, persistent, regmeOnly,
databaseExecutor.onThread());
+ public void registerInsert(final SendableRequest req, boolean
persistent, boolean regmeOnly, ObjectContainer container) {
+ registerInsert(req, persistent, regmeOnly,
databaseExecutor.onThread(), container);
}
static final int QUEUE_THRESHOLD = 100;
- public void registerInsert(final SendableRequest req, boolean
persistent, boolean regmeOnly, boolean onDatabaseThread) {
+ public void registerInsert(final SendableRequest req, boolean
persistent, boolean regmeOnly, boolean onDatabaseThread, ObjectContainer
container) {
if(!isInsertScheduler)
throw new IllegalArgumentException("Adding a
SendableInsert to a request scheduler!!");
if(persistent) {
@@ -177,8 +170,8 @@
boolean queueFull =
jobRunner.getQueueSize(NativeThread.NORM_PRIORITY) >= QUEUE_THRESHOLD;
if(!queueFull)
bootID = this.node.bootID;
- final RegisterMe regme = new
RegisterMe(req, req.getPriorityClass(selectorContainer), schedCore, null,
bootID);
- selectorContainer.set(regme);
+ final RegisterMe regme = new
RegisterMe(req, req.getPriorityClass(container), schedCore, null, bootID);
+ container.set(regme);
if(logMINOR)
Logger.minor(this, "Added
insert RegisterMe: "+regme);
if(!queueFull) {
@@ -186,8 +179,10 @@
public void run(ObjectContainer
container, ClientContext context) {
container.delete(regme);
+
if(container.ext().isActive(req))
+
Logger.error(this, "ALREADY ACTIVE: "+req+" in delayed insert register");
container.activate(req,
1);
- registerInsert(req,
true, false, true);
+ registerInsert(req,
true, false, true, container);
container.deactivate(req, 1);
}
@@ -195,16 +190,19 @@
} else {
schedCore.rerunRegisterMeRunner(jobRunner);
}
- selectorContainer.deactivate(req, 1);
+ container.deactivate(req, 1);
return;
}
- schedCore.innerRegister(req, random,
selectorContainer);
+ schedCore.innerRegister(req, random, container);
} else {
jobRunner.queue(new DBJob() {
public void run(ObjectContainer
container, ClientContext context) {
+
if(container.ext().isActive(req))
+ Logger.error(this,
"ALREADY ACTIVE: "+req+" in off-thread insert register");
container.activate(req, 1);
- schedCore.innerRegister(req,
random, selectorContainer);
+ schedCore.innerRegister(req,
random, container);
+ container.deactivate(req, 1);
}
}, NativeThread.NORM_PRIORITY, false);
@@ -226,7 +224,7 @@
* register the listener once.
* @throws FetchException
*/
- public void register(final HasKeyListener hasListener, final
SendableGet[] getters, final boolean persistent, boolean onDatabaseThread,
final BlockSet blocks, final boolean noCheckStore) throws
KeyListenerConstructionException {
+ public void register(final HasKeyListener hasListener, final
SendableGet[] getters, final boolean persistent, boolean onDatabaseThread,
ObjectContainer container, final BlockSet blocks, final boolean noCheckStore)
throws KeyListenerConstructionException {
logMINOR = Logger.shouldLog(Logger.MINOR, this);
if(logMINOR)
Logger.minor(this,
"register("+persistent+","+hasListener+","+getters);
@@ -236,20 +234,26 @@
}
if(persistent) {
if(onDatabaseThread) {
- innerRegister(hasListener, getters, blocks,
noCheckStore);
+ innerRegister(hasListener, getters, blocks,
noCheckStore, container);
} else {
jobRunner.queue(new DBJob() {
public void run(ObjectContainer
container, ClientContext context) {
// registerOffThread would be
pointless because this is a separate job.
- if(hasListener != null)
+ if(hasListener != null) {
+
if(container.ext().isActive(hasListener))
+
Logger.error(this, "ALREADY ACTIVE in delayed register: "+hasListener);
container.activate(hasListener, 1);
+ }
if(getters != null) {
- for(int
i=0;i<getters.length;i++)
+ for(int
i=0;i<getters.length;i++) {
+
if(container.ext().isActive(getters[i]))
+
Logger.error(this, "ALREADY ACTIVE in delayed register: "+getters[i]);
container.activate(getters[i], 1);
+ }
}
try {
-
innerRegister(hasListener, getters, blocks, noCheckStore);
+
innerRegister(hasListener, getters, blocks, noCheckStore, container);
} catch
(KeyListenerConstructionException e) {
Logger.error(this,
"Registration failed to create Bloom filters: "+e+" on "+hasListener, e);
}
@@ -266,7 +270,7 @@
} else {
final KeyListener listener;
if(hasListener != null) {
- listener =
hasListener.makeKeyListener(selectorContainer, clientContext);
+ listener =
hasListener.makeKeyListener(container, clientContext);
schedTransient.addPendingKeys(listener);
} else
listener = null;
@@ -279,26 +283,26 @@
if(!(getters[i].isCancelled(null) ||
getters[i].isEmpty(null)))
anyValid = true;
}
- finishRegister(getters, false,
onDatabaseThread, anyValid, null);
+ finishRegister(getters, false,
onDatabaseThread, container, anyValid, null);
}
}
}
- private void innerRegister(final HasKeyListener hasListener, final
SendableGet[] getters, final BlockSet blocks, boolean noCheckStore) throws
KeyListenerConstructionException {
+ private void innerRegister(final HasKeyListener hasListener, final
SendableGet[] getters, final BlockSet blocks, boolean noCheckStore,
ObjectContainer container) throws KeyListenerConstructionException {
final KeyListener listener;
if(hasListener != null) {
- listener =
hasListener.makeKeyListener(selectorContainer, clientContext);
+ listener = hasListener.makeKeyListener(container,
clientContext);
schedCore.addPendingKeys(listener);
- selectorContainer.set(hasListener);
+ container.set(hasListener);
} else
listener = null;
// Avoid NPEs due to deactivation.
if(getters != null) {
for(SendableGet getter : getters) {
- selectorContainer.activate(getter, 1);
- selectorContainer.set(getter);
+ container.activate(getter, 1);
+ container.set(getter);
}
}
@@ -309,33 +313,33 @@
if(!noCheckStore) {
// Check the datastore before proceding.
for(SendableGet getter : getters) {
- selectorContainer.activate(getter, 1);
- datastoreChecker.queuePersistentRequest(getter,
blocks, selectorContainer);
- selectorContainer.deactivate(getter, 1);
+ container.activate(getter, 1);
+ datastoreChecker.queuePersistentRequest(getter,
blocks, container);
+ container.deactivate(getter, 1);
}
- selectorContainer.deactivate(listener, 1);
+ container.deactivate(listener, 1);
} else {
// We have already checked the datastore, this is a
retry, the listener hasn't been unregistered.
short prio = RequestStarter.MINIMUM_PRIORITY_CLASS;
for(int i=0;i<getters.length;i++) {
- short p =
getters[i].getPriorityClass(selectorContainer);
+ short p =
getters[i].getPriorityClass(container);
if(p < prio) prio = p;
}
- this.finishRegister(getters, true, true, true, null);
+ this.finishRegister(getters, true, true, container,
true, null);
}
}
- void finishRegister(final SendableGet[] getters, boolean persistent,
boolean onDatabaseThread, final boolean anyValid, final DatastoreCheckerItem
reg) {
+ void finishRegister(final SendableGet[] getters, boolean persistent,
boolean onDatabaseThread, ObjectContainer container, final boolean anyValid,
final DatastoreCheckerItem reg) {
if(isInsertScheduler && getters != null) {
IllegalStateException e = new
IllegalStateException("finishRegister on an insert scheduler");
if(onDatabaseThread || !persistent) {
for(int i=0;i<getters.length;i++) {
if(persistent)
-
selectorContainer.activate(getters[i], 1);
- getters[i].internalError(e, this,
selectorContainer, clientContext, persistent);
+ container.activate(getters[i],
1);
+ getters[i].internalError(e, this,
container, clientContext, persistent);
if(persistent)
-
selectorContainer.deactivate(getters[i], 1);
+
container.deactivate(getters[i], 1);
}
}
throw e;
@@ -347,17 +351,17 @@
throw new IllegalStateException("Not on
database thread!");
}
if(persistent)
- selectorContainer.activate(getters, 1);
+ container.activate(getters, 1);
if(logMINOR)
Logger.minor(this, "finishRegister()
for "+getters);
if(anyValid) {
boolean wereAnyValid = false;
for(int i=0;i<getters.length;i++) {
SendableGet getter = getters[i];
-
selectorContainer.activate(getters[i], 1);
-
if(!(getter.isCancelled(selectorContainer) ||
getter.isEmpty(selectorContainer))) {
+ container.activate(getters[i],
1);
+
if(!(getter.isCancelled(container) || getter.isEmpty(container))) {
wereAnyValid = true;
-
schedCore.innerRegister(getter, random, selectorContainer);
+
schedCore.innerRegister(getter, random, container);
}
}
if(!wereAnyValid) {
@@ -365,25 +369,25 @@
}
}
if(reg != null)
- selectorContainer.delete(reg);
- maybeFillStarterQueue(selectorContainer,
clientContext);
+ container.delete(reg);
+ maybeFillStarterQueue(container, clientContext);
starter.wakeUp();
} else {
jobRunner.queue(new DBJob() {
public void run(ObjectContainer
container, ClientContext context) {
- container.activate(getters, 1);
if(logMINOR)
Logger.minor(this,
"finishRegister() for "+getters);
boolean wereAnyValid = false;
- for(int
i=0;i<getters.length;i++) {
- SendableGet getter =
getters[i];
-
container.activate(getters[i], 1);
-
if(!(getter.isCancelled(selectorContainer) ||
getter.isEmpty(selectorContainer))) {
+ for(SendableGet getter :
getters) {
+
if(container.ext().isActive(getter))
+
Logger.error(this, "ALREADY ACTIVE in delayed finishRegister: "+getter);
+
container.activate(getter, 1);
+
if(!(getter.isCancelled(container) || getter.isEmpty(container))) {
wereAnyValid =
true;
-
schedCore.innerRegister(getter, random, selectorContainer);
+
schedCore.innerRegister(getter, random, container);
}
-
container.deactivate(getters[i], 1);
+
container.deactivate(getter, 1);
}
if(!wereAnyValid) {
Logger.normal(this, "No
requests valid: "+getters);
@@ -583,6 +587,10 @@
// Recompute starterQueueLength
int length = 0;
for(PersistentChosenRequest req : starterQueue)
{
+
if(container.ext().isActive(req.request))
+ Logger.error(this, "REQUEST
ALREADY ACTIVATED: "+req.request+" for "+req+" while checking request queue in
filling request queue");
+ else if(logMINOR)
+ Logger.minor(this, "Not already
activated for "+req+" in while checking request queue in filling request
queue");
req.pruneDuplicates(ClientRequestScheduler.this);
length += req.sizeNotStarted();
}
@@ -618,21 +626,25 @@
public void maybeAddToStarterQueue(SendableRequest req, ObjectContainer
container) {
short prio = req.getPriorityClass(container);
int retryCount = req.getRetryCount();
+ if(logMINOR)
+ Logger.minor(this, "Maybe adding to starter queue:
prio="+prio+" retry count="+retryCount);
synchronized(starterQueue) {
- boolean allBetter = true;
boolean betterThanSome = false;
int size = 0;
for(PersistentChosenRequest old : starterQueue) {
+ if(container.ext().isActive(old.request))
+ Logger.error(this, "REQUEST ALREADY
ACTIVATED: "+old.request+" for "+old+" while checking request queue in
maybeAddToStarterQueue");
+ else if(logMINOR)
+ Logger.minor(this, "Not already
activated for "+req+" in while checking request queue in filling request
queue");
size += old.sizeNotStarted();
- if(old.prio < prio)
- allBetter = false;
- else if(old.prio == prio && old.retryCount <=
retryCount)
- allBetter = false;
- if(old.prio > prio || old.prio == prio &&
old.prio > retryCount)
+ if(old.prio > prio || old.prio == prio &&
old.retryCount > retryCount)
betterThanSome = true;
}
- if(allBetter && !starterQueue.isEmpty()) return;
- if(size >= MAX_STARTER_QUEUE_SIZE && !betterThanSome)
return;
+ if(size >= MAX_STARTER_QUEUE_SIZE && !betterThanSome) {
+ if(logMINOR)
+ Logger.minor(this, "Not adding to
starter queue: over limit and req not better than any queued requests");
+ return;
+ }
}
addToStarterQueue(req, container);
trimStarterQueue(container);
@@ -737,6 +749,8 @@
jobRunner.queue(new DBJob() {
public void run(ObjectContainer container,
ClientContext context) {
+ if(container.ext().isActive(succeeded))
+ Logger.error(this, "ALREADY
ACTIVE in succeeded(): "+succeeded);
container.activate(succeeded, 1);
schedCore.succeeded(succeeded,
container);
container.deactivate(succeeded, 1);
@@ -808,9 +822,9 @@
/**
* MUST be called from database thread!
*/
- public long queueCooldown(ClientKey key, SendableGet getter) {
+ public long queueCooldown(ClientKey key, SendableGet getter,
ObjectContainer container) {
if(getter.persistent())
- return persistentCooldownQueue.add(key.getNodeKey(),
getter, selectorContainer);
+ return persistentCooldownQueue.add(key.getNodeKey(),
getter, container);
else
return transientCooldownQueue.add(key.getNodeKey(),
getter, null);
}
@@ -818,8 +832,8 @@
private final DBJob moveFromCooldownJob = new DBJob() {
public void run(ObjectContainer container, ClientContext
context) {
- if(moveKeysFromCooldownQueue(persistentCooldownQueue,
true, selectorContainer) ||
-
moveKeysFromCooldownQueue(transientCooldownQueue, false, selectorContainer))
+ if(moveKeysFromCooldownQueue(persistentCooldownQueue,
true, container) ||
+
moveKeysFromCooldownQueue(transientCooldownQueue, false, container))
starter.wakeUp();
}
@@ -865,6 +879,8 @@
}
if(reqs != null) {
for(int i=0;i<reqs.length;i++) {
+ if(container.ext().isActive(reqs[i]))
+ Logger.error(this, "ALREADY
ACTIVE in moveKeysFromCooldownQueue: "+reqs[i]);
container.activate(reqs[i], 1);
reqs[i].requeueAfterCooldown(key, now,
container, clientContext);
container.deactivate(reqs[i], 1);
@@ -905,6 +921,8 @@
jobRunner.queue(new DBJob() {
public void run(ObjectContainer container,
ClientContext context) {
+ if(container.ext().isActive(get))
+ Logger.error(this, "ALREADY
ACTIVE: "+get+" in callFailure(request)");
container.activate(get, 1);
get.onFailure(e, null, container,
clientContext);
container.deactivate(get, 1);
@@ -921,6 +939,8 @@
jobRunner.queue(new DBJob() {
public void run(ObjectContainer container,
ClientContext context) {
+ if(container.ext().isActive(insert))
+ Logger.error(this, "ALREADY
ACTIVE: "+insert+" in callFailure(insert)");
container.activate(insert, 1);
insert.onFailure(e, null, container,
context);
container.deactivate(insert, 1);
@@ -961,9 +981,9 @@
return isInsertScheduler;
}
- public void removeFromAllRequestsByClientRequest(ClientRequester
clientRequest, SendableRequest get, boolean dontComplain) {
+ public void removeFromAllRequestsByClientRequest(ClientRequester
clientRequest, SendableRequest get, boolean dontComplain, ObjectContainer
container) {
if(get.persistent())
- schedCore.removeFromAllRequestsByClientRequest(get,
clientRequest, dontComplain, selectorContainer);
+ schedCore.removeFromAllRequestsByClientRequest(get,
clientRequest, dontComplain, container);
else
schedTransient.removeFromAllRequestsByClientRequest(get, clientRequest,
dontComplain, null);
}
Modified:
branches/db4o/freenet/src/freenet/client/async/ClientRequestSchedulerBase.java
===================================================================
---
branches/db4o/freenet/src/freenet/client/async/ClientRequestSchedulerBase.java
2008-08-28 16:55:45 UTC (rev 22199)
+++
branches/db4o/freenet/src/freenet/client/async/ClientRequestSchedulerBase.java
2008-08-28 17:38:32 UTC (rev 22200)
@@ -101,7 +101,7 @@
container.deactivate(v, 1);
addToGrabArray(prio, retryCount, fixRetryCount(retryCount),
req.getClient(), req.getClientRequest(), req, random, container);
if(logMINOR) Logger.minor(this, "Registered "+req+" on
prioclass="+prio+", retrycount="+retryCount+" v.size()="+vSize);
- if(req.persistent())
+ if(persistent())
sched.maybeAddToStarterQueue(req, container);
}
Modified:
branches/db4o/freenet/src/freenet/client/async/ClientRequestSchedulerCore.java
===================================================================
---
branches/db4o/freenet/src/freenet/client/async/ClientRequestSchedulerCore.java
2008-08-28 16:55:45 UTC (rev 22199)
+++
branches/db4o/freenet/src/freenet/client/async/ClientRequestSchedulerCore.java
2008-08-28 17:38:32 UTC (rev 22200)
@@ -208,6 +208,7 @@
ObjectSet<HasKeyListener> results =
container.query(HasKeyListener.class);
for(HasKeyListener l : results) {
+ container.activate(l, 1);
try {
if(l.isCancelled(container)) continue;
KeyListener listener =
l.makeKeyListener(container, context);
@@ -218,6 +219,7 @@
e.printStackTrace();
Logger.error(this, "FAILED TO LOAD REQUEST
BLOOM FILTERS: "+e, e);
}
+ container.deactivate(l, 1);
}
}
@@ -644,7 +646,7 @@
} else {
if(logMINOR)
Logger.minor(this,
"Registering RegisterMe for insert: "+reg.nonGetRequest);
-
sched.registerInsert(reg.nonGetRequest, true, false);
+
sched.registerInsert(reg.nonGetRequest, true, false, container);
}
container.delete(reg);
container.deactivate(reg.nonGetRequest,
1);
Modified: branches/db4o/freenet/src/freenet/client/async/DatastoreChecker.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/DatastoreChecker.java
2008-08-28 16:55:45 UTC (rev 22199)
+++ branches/db4o/freenet/src/freenet/client/async/DatastoreChecker.java
2008-08-28 17:38:32 UTC (rev 22200)
@@ -381,13 +381,18 @@
context.jobRunner.queue(new DBJob() {
public void run(ObjectContainer container,
ClientContext context) {
- scheduler.finishRegister(new
SendableGet[] { get }, true, true, valid, it);
+ if(container.ext().isActive(get)) {
+ Logger.error(this, "ALREADY
ACTIVATED: "+get);
+ }
+ container.activate(get, 1);
+ scheduler.finishRegister(new
SendableGet[] { get }, true, true, container, valid, it);
+ container.deactivate(get, 1);
loader.run(container, context);
}
}, NativeThread.NORM_PRIORITY, false);
} else {
- sched.finishRegister(new SendableGet[] { getter },
false, false, anyValid, item);
+ sched.finishRegister(new SendableGet[] { getter },
false, false, null, anyValid, item);
}
}
Modified: branches/db4o/freenet/src/freenet/client/async/InsertCompressor.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/InsertCompressor.java
2008-08-28 16:55:45 UTC (rev 22199)
+++ branches/db4o/freenet/src/freenet/client/async/InsertCompressor.java
2008-08-28 17:38:32 UTC (rev 22200)
@@ -105,6 +105,8 @@
context.jobRunner.queue(new DBJob() {
public void run(ObjectContainer
container, ClientContext context) {
+
if(container.ext().isActive(inserter))
+
Logger.error(this, "ALREADY ACTIVE in start compression callback: "+inserter);
container.activate(inserter, 1);
inserter.onStartCompression(phase, container, context);
container.deactivate(inserter, 1);
@@ -148,6 +150,8 @@
context.jobRunner.queue(new DBJob() {
public void run(ObjectContainer
container, ClientContext context) {
+
if(container.ext().isActive(inserter))
+ Logger.error(this,
"ALREADY ACTIVE in compressed callback: "+inserter);
container.activate(inserter, 1);
inserter.onCompressed(output,
container, context);
container.deactivate(inserter,
1);
@@ -175,6 +179,8 @@
context.jobRunner.queue(new DBJob() {
public void run(ObjectContainer
container, ClientContext context) {
+
if(container.ext().isActive(inserter))
+ Logger.error(this,
"ALREADY ACTIVE in compress failure callback: "+inserter);
container.activate(inserter, 1);
container.activate(inserter.cb,
1);
inserter.cb.onFailure(new
InsertException(InsertException.BUCKET_ERROR, e, null), inserter, container,
context);
@@ -193,6 +199,8 @@
context.jobRunner.queue(new DBJob() {
public void run(ObjectContainer
container, ClientContext context) {
+
if(container.ext().isActive(inserter))
+ Logger.error(this,
"ALREADY ACTIVE in compress size callback: "+inserter);
container.activate(inserter, 1);
container.activate(inserter.cb,
1);
inserter.cb.onFailure(new
InsertException(InsertException.BUCKET_ERROR, e, null), inserter, container,
context);
Modified:
branches/db4o/freenet/src/freenet/client/async/PersistentChosenRequest.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/PersistentChosenRequest.java
2008-08-28 16:55:45 UTC (rev 22199)
+++ branches/db4o/freenet/src/freenet/client/async/PersistentChosenRequest.java
2008-08-28 17:38:32 UTC (rev 22200)
@@ -141,6 +141,8 @@
}
private void finish(ObjectContainer container, ClientContext context,
boolean dumping) {
+ if(container.ext().isActive(request))
+ Logger.error(this, "ALREADY ACTIVATED: "+request);
container.activate(request, 1);
Logger.normal(this, "Finishing "+this+" for "+request);
// Call all the callbacks.
@@ -149,6 +151,7 @@
if(finished) {
if(blocksFinished.isEmpty()) {
// Okay...
+ container.deactivate(request, 1);
return;
} else {
Logger.error(this, "Finished but
blocksFinished not empty on "+this, new Exception("debug"));
@@ -163,6 +166,7 @@
Logger.error(this, "No finished blocks in
finish() on "+this);
else if(logMINOR)
Logger.minor(this, "No finished blocks in
finish() on "+this);
+ container.deactivate(request, 1);
return;
}
if(request instanceof SendableGet) {
@@ -200,6 +204,7 @@
}
}
scheduler.removeRunningRequest(request);
+ container.deactivate(request, 1);
}
public synchronized ChosenBlock grabNotStarted(Random random,
RequestScheduler sched) {
Modified:
branches/db4o/freenet/src/freenet/client/async/SingleBlockInserter.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/SingleBlockInserter.java
2008-08-28 16:55:45 UTC (rev 22199)
+++ branches/db4o/freenet/src/freenet/client/async/SingleBlockInserter.java
2008-08-28 17:38:32 UTC (rev 22200)
@@ -204,7 +204,7 @@
}
if(persistent)
container.set(this);
- getScheduler(context).registerInsert(this, persistent, false,
true);
+ getScheduler(context).registerInsert(this, persistent, false,
true, container);
}
private void fail(InsertException e, ObjectContainer container,
ClientContext context) {
@@ -272,7 +272,7 @@
if(persistent)
container.set(this);
} else {
- getScheduler(context).registerInsert(this, persistent,
true, true);
+ getScheduler(context).registerInsert(this, persistent,
true, true, container);
}
}
Modified: branches/db4o/freenet/src/freenet/client/async/SingleFileFetcher.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/SingleFileFetcher.java
2008-08-28 16:55:45 UTC (rev 22199)
+++ branches/db4o/freenet/src/freenet/client/async/SingleFileFetcher.java
2008-08-28 17:38:32 UTC (rev 22200)
@@ -612,6 +612,7 @@
sf.schedule(container, context);
} catch (KeyListenerConstructionException e) {
onFailure(e.getFetchException(), false,
container, context);
+ if(persistent) container.deactivate(sf,
1);
return;
}
if(persistent) container.deactivate(sf, 1);
@@ -671,6 +672,8 @@
if(!context.jobRunner.onDatabaseThread())
context.jobRunner.queue(new DBJob() {
public void run(ObjectContainer
container, ClientContext context) {
+
if(container.ext().isActive(SingleFileFetcher.this))
+ Logger.error(this,
"ALREADY ACTIVE in SFF callback: "+SingleFileFetcher.this);
container.activate(SingleFileFetcher.this, 1);
innerWrapHandleMetadata(notFinalizedSize, container, context);
container.deactivate(SingleFileFetcher.this, 1);
Modified: branches/db4o/freenet/src/freenet/client/async/SplitFileFetcher.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/SplitFileFetcher.java
2008-08-28 16:55:45 UTC (rev 22199)
+++ branches/db4o/freenet/src/freenet/client/async/SplitFileFetcher.java
2008-08-28 17:38:32 UTC (rev 22200)
@@ -491,7 +491,7 @@
container.deactivate(segments[i], 1);
}
BlockSet blocks = fetchContext.blocks;
- context.getChkFetchScheduler().register(this, getters,
persistent, true, blocks, false);
+ context.getChkFetchScheduler().register(this, getters,
persistent, true, container, blocks, false);
}
public void cancel(ObjectContainer container, ClientContext context) {
Modified:
branches/db4o/freenet/src/freenet/client/async/SplitFileFetcherKeyListener.java
===================================================================
---
branches/db4o/freenet/src/freenet/client/async/SplitFileFetcherKeyListener.java
2008-08-28 16:55:45 UTC (rev 22199)
+++
branches/db4o/freenet/src/freenet/client/async/SplitFileFetcherKeyListener.java
2008-08-28 17:38:32 UTC (rev 22200)
@@ -158,13 +158,19 @@
byte[] salted = localSaltKey(key);
for(int i=0;i<segmentFilters.length;i++) {
if(segmentFilters[i].checkFilter(salted)) {
- if(persistent)
+ if(persistent) {
+ if(container.ext().isActive(fetcher))
+ Logger.error(this, "ALREADY
ACTIVE in definitelyWantKey(): "+fetcher);
container.activate(fetcher, 1);
+ }
SplitFileFetcherSegment segment =
fetcher.getSegment(i);
if(persistent)
container.deactivate(fetcher, 1);
- if(persistent)
+ if(persistent) {
+ if(container.ext().isActive(segment))
+ Logger.error(this, "ALREADY
ACTIVE in definitelyWantKey(): "+segment);
container.activate(segment, 1);
+ }
boolean found = segment.getBlockNumber(key,
container) >= 0;
if(!found)
Logger.error(this, "Found block in
primary and segment bloom filters but segment doesn't want it: "+segment+" on
"+this);
@@ -190,11 +196,17 @@
match = segmentFilters[i].checkFilter(salted);
}
if(match) {
- if(persistent)
+ if(persistent) {
+ if(container.ext().isActive(fetcher))
+ Logger.error(this, "ALREADY
ACTIVATED: "+fetcher);
container.activate(fetcher, 1);
+ }
SplitFileFetcherSegment segment =
fetcher.getSegment(i);
- if(persistent)
+ if(persistent) {
+ if(container.ext().isActive(segment))
+ Logger.error(this, "ALREADY
ACTIVATED: "+segment);
container.activate(segment, 1);
+ }
if(logMINOR)
Logger.minor(this, "Key may be in
segment "+segment);
if(segment.onGotKey(key, block, container,
context)) {
@@ -234,13 +246,19 @@
byte[] salted = localSaltKey(key);
for(int i=0;i<segmentFilters.length;i++) {
if(segmentFilters[i].checkFilter(salted)) {
- if(persistent)
+ if(persistent) {
+ if(container.ext().isActive(fetcher))
+ Logger.error(this, "ALREADY
ACTIVATED: "+fetcher);
container.activate(fetcher, 1);
+ }
SplitFileFetcherSegment segment =
fetcher.getSegment(i);
if(persistent)
container.deactivate(fetcher, 1);
- if(persistent)
+ if(persistent) {
+ if(container.ext().isActive(segment))
+ Logger.error(this, "ALREADY
ACTIVATED: "+segment);
container.activate(segment, 1);
+ }
int blockNum = segment.getBlockNumber(key,
container);
if(blockNum >= 0) {
ret.add(segment.getSubSegmentFor(blockNum, container));
Modified:
branches/db4o/freenet/src/freenet/client/async/SplitFileFetcherSegment.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/SplitFileFetcherSegment.java
2008-08-28 16:55:45 UTC (rev 22199)
+++ branches/db4o/freenet/src/freenet/client/async/SplitFileFetcherSegment.java
2008-08-28 17:38:32 UTC (rev 22200)
@@ -387,8 +387,14 @@
return;
}
- if(splitfileType == Metadata.SPLITFILE_NONREDUNDANT)
+ if(splitfileType == Metadata.SPLITFILE_NONREDUNDANT) {
+ if(persistent) {
+ container.deactivate(parentFetcher, 1);
+ container.deactivate(parent, 1);
+ container.deactivate(context, 1);
+ }
return;
+ }
// Now heal
@@ -624,7 +630,7 @@
if(dataCooldownTimes[blockNo] >
now)
Logger.error(this,
"Already on the cooldown queue! for "+this+" data block no "+blockNo, new
Exception("error"));
else
-
dataCooldownTimes[blockNo] = sched.queueCooldown(key, sub);
+
dataCooldownTimes[blockNo] = sched.queueCooldown(key, sub, container);
cooldown = true;
}
}
@@ -642,7 +648,7 @@
if(checkCooldownTimes[checkNo]
> now)
Logger.error(this,
"Already on the cooldown queue! for "+this+" check block no "+blockNo, new
Exception("error"));
else
-
checkCooldownTimes[checkNo] = sched.queueCooldown(key, sub);
+
checkCooldownTimes[checkNo] = sched.queueCooldown(key, sub, container);
cooldown = true;
}
}
@@ -741,12 +747,12 @@
checkBuckets[i] = null;
}
}
- parentFetcher.removeMyPendingKeys(this, container, context);
removeSubSegments(container, context);
if(persistent) {
container.set(this);
container.activate(parentFetcher, 1);
}
+ parentFetcher.removeMyPendingKeys(this, container, context);
parentFetcher.segmentFinished(this, container, context);
if(persistent && !dontDeactivateParent)
container.deactivate(parentFetcher, 1);
Modified:
branches/db4o/freenet/src/freenet/client/async/SplitFileFetcherSubSegment.java
===================================================================
---
branches/db4o/freenet/src/freenet/client/async/SplitFileFetcherSubSegment.java
2008-08-28 16:55:45 UTC (rev 22199)
+++
branches/db4o/freenet/src/freenet/client/async/SplitFileFetcherSubSegment.java
2008-08-28 17:38:32 UTC (rev 22200)
@@ -194,11 +194,14 @@
container.activate(segment, 1);
}
boolean hasSet = false;
+ boolean retval = false;
synchronized(segment) {
for(int i=0;i<10;i++) {
Object ret;
int x;
- if(blockNums.isEmpty()) return false;
+ if(blockNums.isEmpty()) {
+ break;
+ }
x = context.random.nextInt(blockNums.size());
ret = (Integer) blockNums.get(x);
Key key =
segment.getBlockNodeKey(((Integer)ret).intValue(), container);
@@ -218,10 +221,15 @@
if(keys.hasKey(key)) {
continue;
}
- return true;
+ retval = true;
+ break;
}
- return false;
}
+ if(persistent) {
+ container.deactivate(blockNums, 5);
+ container.deactivate(segment, 1);
+ }
+ return retval;
}
public boolean ignoreStore() {
@@ -539,7 +547,7 @@
if(schedule) {
// Only need to register once for all the blocks.
try {
- context.getChkFetchScheduler().register(null,
new SendableGet[] { this }, persistent, true, null, true);
+ context.getChkFetchScheduler().register(null,
new SendableGet[] { this }, persistent, true, container, null, true);
} catch (KeyListenerConstructionException e) {
Logger.error(this, "Impossible: "+e+" on
"+this, e);
}
@@ -595,7 +603,7 @@
if(schedule) {
if(dontSchedule) return true;
try {
- context.getChkFetchScheduler().register(null,
new SendableGet[] { this }, persistent, true, null, true);
+ context.getChkFetchScheduler().register(null,
new SendableGet[] { this }, persistent, true, container, null, true);
} catch (KeyListenerConstructionException e) {
Logger.error(this, "Impossible: "+e+" on
"+this, e);
}
@@ -604,7 +612,7 @@
}
public String toString() {
- return
super.toString()+":"+retryCount+"/"+segment+'('+(blockNums == null ? "null" :
String.valueOf(blockNums.size()))+')';
+ return
super.toString()+":"+retryCount+"/"+segment+'('+(blockNums == null ? "null" :
String.valueOf(blockNums.size()))+"),tempid="+objectHash();
}
public void possiblyRemoveFromParent(ObjectContainer container,
ClientContext context) {
@@ -751,7 +759,7 @@
public void reschedule(ObjectContainer container, ClientContext
context) {
try {
- getScheduler(context).register(null, new SendableGet[]
{ this }, persistent, true, segment.blockFetchContext.blocks, true);
+ getScheduler(context).register(null, new SendableGet[]
{ this }, persistent, true, container, segment.blockFetchContext.blocks, true);
} catch (KeyListenerConstructionException e) {
Logger.error(this, "Impossible: "+e+" on "+this, e);
}
@@ -847,4 +855,11 @@
return keys;
}
+ public int objectHash() {
+ return super.hashCode();
+ }
+
+ public void objectOnActivate(ObjectContainer container) {
+ Logger.minor(this, "ACTIVATING: "+this, new Exception("debug"));
+ }
}
Modified: branches/db4o/freenet/src/freenet/node/RequestScheduler.java
===================================================================
--- branches/db4o/freenet/src/freenet/node/RequestScheduler.java
2008-08-28 16:55:45 UTC (rev 22199)
+++ branches/db4o/freenet/src/freenet/node/RequestScheduler.java
2008-08-28 17:38:32 UTC (rev 22200)
@@ -5,6 +5,8 @@
import java.util.LinkedList;
+import com.db4o.ObjectContainer;
+
import freenet.client.FECQueue;
import freenet.client.async.ChosenBlock;
import freenet.client.async.ClientContext;
@@ -30,7 +32,7 @@
* @param key The key to be added.
* @return The time at which the key will leave the cooldown queue.
*/
- long queueCooldown(ClientKey key, SendableGet getter);
+ long queueCooldown(ClientKey key, SendableGet getter, ObjectContainer
container);
/**
* Remove keys from the cooldown queue who have now served their time
and can be requested
Modified: branches/db4o/freenet/src/freenet/node/RequestStarter.java
===================================================================
--- branches/db4o/freenet/src/freenet/node/RequestStarter.java 2008-08-28
16:55:45 UTC (rev 22199)
+++ branches/db4o/freenet/src/freenet/node/RequestStarter.java 2008-08-28
17:38:32 UTC (rev 22200)
@@ -242,7 +242,7 @@
public boolean exclude(RandomGrabArrayItem item, ObjectContainer
container, ClientContext context) {
if(sched.isRunningRequest((SendableRequest)item)) {
- Logger.normal(this, "Excluding already-running request:
"+item);
+ Logger.normal(this, "Excluding already-running request:
"+item, new Exception("debug"));
return true;
}
if(isInsert) return false;
Modified: branches/db4o/freenet/src/freenet/node/SendableRequest.java
===================================================================
--- branches/db4o/freenet/src/freenet/node/SendableRequest.java 2008-08-28
16:55:45 UTC (rev 22199)
+++ branches/db4o/freenet/src/freenet/node/SendableRequest.java 2008-08-28
17:38:32 UTC (rev 22200)
@@ -111,7 +111,7 @@
ClientRequester cr = getClientRequest();
if(persistent)
container.activate(cr, 1);
- getScheduler(context).removeFromAllRequestsByClientRequest(cr,
this, true);
+ getScheduler(context).removeFromAllRequestsByClientRequest(cr,
this, true, container);
// FIXME should we deactivate??
//if(persistent) container.deactivate(cr, 1);
}
Modified: branches/db4o/freenet/src/freenet/node/SimpleSendableInsert.java
===================================================================
--- branches/db4o/freenet/src/freenet/node/SimpleSendableInsert.java
2008-08-28 16:55:45 UTC (rev 22199)
+++ branches/db4o/freenet/src/freenet/node/SimpleSendableInsert.java
2008-08-28 17:38:32 UTC (rev 22200)
@@ -122,7 +122,7 @@
public void schedule() {
finished = false; // can reschedule
- scheduler.registerInsert(this, false, false);
+ scheduler.registerInsert(this, false, false, null);
}
public void cancel(ObjectContainer container, ClientContext context) {
Modified: branches/db4o/freenet/src/freenet/support/RandomGrabArray.java
===================================================================
--- branches/db4o/freenet/src/freenet/support/RandomGrabArray.java
2008-08-28 16:55:45 UTC (rev 22199)
+++ branches/db4o/freenet/src/freenet/support/RandomGrabArray.java
2008-08-28 17:38:32 UTC (rev 22200)
@@ -115,8 +115,15 @@
}
valid++;
}
- if(persistent && item
!= chosenItem && item != validItem)
+ if(persistent && item
!= chosenItem && item != validItem) {
+ if(logMINOR)
+
Logger.minor(this, "Deactivating "+item);
container.deactivate(item, 1);
+
if(container.ext().isActive(item))
+
Logger.error(this, "Still active after deactivation: "+item);
+ else
if(logMINOR)
+
Logger.minor(this, "Deactivated: "+item);
+ }
}
if(index != target) {
changedMe = true;