Author: toad
Date: 2008-07-10 19:28:28 +0000 (Thu, 10 Jul 2008)
New Revision: 21032
Added:
branches/db4o/freenet/src/freenet/client/async/GotKeyListener.java
Modified:
branches/db4o/freenet/src/freenet/client/async/BaseSingleFileFetcher.java
branches/db4o/freenet/src/freenet/client/async/ClientGetState.java
branches/db4o/freenet/src/freenet/client/async/ClientGetter.java
branches/db4o/freenet/src/freenet/client/async/ClientRequestScheduler.java
branches/db4o/freenet/src/freenet/client/async/ClientRequestSchedulerBase.java
branches/db4o/freenet/src/freenet/client/async/ClientRequestSchedulerCore.java
branches/db4o/freenet/src/freenet/client/async/ClientRequestSchedulerNonPersistent.java
branches/db4o/freenet/src/freenet/client/async/PendingKeyItem.java
branches/db4o/freenet/src/freenet/client/async/RegisterMe.java
branches/db4o/freenet/src/freenet/client/async/SimpleSingleFileFetcher.java
branches/db4o/freenet/src/freenet/client/async/SingleBlockInserter.java
branches/db4o/freenet/src/freenet/client/async/SingleFileFetcher.java
branches/db4o/freenet/src/freenet/client/async/SplitFileFetcher.java
branches/db4o/freenet/src/freenet/client/async/SplitFileFetcherSegment.java
branches/db4o/freenet/src/freenet/client/async/SplitFileFetcherSubSegment.java
branches/db4o/freenet/src/freenet/client/async/USKChecker.java
branches/db4o/freenet/src/freenet/client/async/USKFetcher.java
branches/db4o/freenet/src/freenet/client/async/USKFetcherTag.java
branches/db4o/freenet/src/freenet/client/async/USKInserter.java
branches/db4o/freenet/src/freenet/client/async/USKManager.java
branches/db4o/freenet/src/freenet/client/async/USKRetriever.java
branches/db4o/freenet/src/freenet/node/SendableGet.java
branches/db4o/freenet/src/freenet/node/SendableRequest.java
branches/db4o/freenet/src/freenet/node/SimpleSendableInsert.java
Log:
Major refactoring.
Separate onGotKey() etc out from SendableGet into new interface GotKeyListener.
Saves a lot of effort unregistering and reregistering
SplitFileFetcherSubSegment's on pendingKeys, since we now just register the
SplitFileFetcherSegment once and don't need to worry about it on a typical
nonfatal failure retry.
Also simplifies some code.
Modified:
branches/db4o/freenet/src/freenet/client/async/BaseSingleFileFetcher.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/BaseSingleFileFetcher.java
2008-07-10 15:37:53 UTC (rev 21031)
+++ branches/db4o/freenet/src/freenet/client/async/BaseSingleFileFetcher.java
2008-07-10 19:28:28 UTC (rev 21032)
@@ -17,7 +17,7 @@
import freenet.node.SendableGet;
import freenet.support.Logger;
-public abstract class BaseSingleFileFetcher extends SendableGet {
+public abstract class BaseSingleFileFetcher extends SendableGet implements
GotKeyListener {
final ClientKey key;
protected boolean cancelled;
@@ -102,7 +102,7 @@
}
return true; // We will retry, just not yet.
See requeueAfterCooldown(Key).
} else {
- schedule(container, context, false, true);
+ schedule(container, context, false);
}
return true;
}
@@ -131,10 +131,22 @@
synchronized(this) {
cancelled = true;
}
- if(persistent)
+ if(persistent) {
container.set(this);
- super.unregister(false, container);
+ container.activate(key, 5);
+ }
+
+ unregisterAll(container, context);
}
+
+ /**
+ * Remove the pendingKeys item and then remove from the queue as well.
+ * Call unregister(container) if you only want to remove from the queue.
+ */
+ public void unregisterAll(ObjectContainer container, ClientContext
context) {
+ getScheduler(context).removePendingKey(this, false,
key.getNodeKey(), container);
+ super.unregister(container);
+ }
public synchronized boolean isCancelled(ObjectContainer container) {
return cancelled;
@@ -148,6 +160,10 @@
return parent.getClient();
}
+ public boolean dontCache(ObjectContainer container) {
+ return !ctx.cacheLocalRequests;
+ }
+
public boolean dontCache() {
return !ctx.cacheLocalRequests;
}
@@ -207,7 +223,25 @@
}
if(Logger.shouldLog(Logger.MINOR, this))
Logger.minor(this, "Requeueing after cooldown "+key+"
for "+this);
- schedule(container, context, false, true);
+ schedule(container, context, false);
}
+
+ public void schedule(ObjectContainer container, ClientContext context,
boolean delayed) {
+ getScheduler(context).register(this, new SendableGet[] { this
}, delayed, persistent, true, ctx.blocks);
+ }
+ public SendableGet getRequest(Key key, ObjectContainer container) {
+ return this;
+ }
+
+ public Key[] listKeys(ObjectContainer container) {
+ if(cancelled || finished)
+ return new Key[0];
+ else {
+ if(persistent)
+ container.activate(key, 5);
+ return new Key[] { key.getNodeKey() };
+ }
+ }
+
}
Modified: branches/db4o/freenet/src/freenet/client/async/ClientGetState.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/ClientGetState.java
2008-07-10 15:37:53 UTC (rev 21031)
+++ branches/db4o/freenet/src/freenet/client/async/ClientGetState.java
2008-07-10 19:28:28 UTC (rev 21032)
@@ -11,7 +11,7 @@
*/
public interface ClientGetState {
- public void schedule(ObjectContainer container, ClientContext context,
boolean delayedRegister, boolean probablyNotInStore);
+ public void schedule(ObjectContainer container, ClientContext context,
boolean delayedRegister);
public void cancel(ObjectContainer container, ClientContext context);
Modified: branches/db4o/freenet/src/freenet/client/async/ClientGetter.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/ClientGetter.java
2008-07-10 15:37:53 UTC (rev 21031)
+++ branches/db4o/freenet/src/freenet/client/async/ClientGetter.java
2008-07-10 19:28:28 UTC (rev 21032)
@@ -113,7 +113,7 @@
return false;
}
}
- currentState.schedule(container, context,
false, false);
+ currentState.schedule(container, context,
false);
}
if(cancelled) cancel();
} catch (MalformedURLException e) {
Modified:
branches/db4o/freenet/src/freenet/client/async/ClientRequestScheduler.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/ClientRequestScheduler.java
2008-07-10 15:37:53 UTC (rev 21031)
+++ branches/db4o/freenet/src/freenet/client/async/ClientRequestScheduler.java
2008-07-10 19:28:28 UTC (rev 21032)
@@ -155,232 +155,157 @@
choosenPriorityScheduler = val;
}
- public void register(final SendableRequest req, boolean regmeOnly,
boolean probablyNotInStore) {
- register(req, databaseExecutor.onThread(), regmeOnly, null,
probablyNotInStore);
+ public void registerInsert(final SendableRequest req, boolean
persistent) {
+ registerInsert(req, persistent, databaseExecutor.onThread());
}
+ public void registerInsert(final SendableRequest req, boolean
persistent, boolean onDatabaseThread) {
+ if(persistent) {
+ if(onDatabaseThread) {
+ schedCore.innerRegister(req, random,
selectorContainer);
+ } else {
+ jobRunner.queue(new DBJob() {
+
+ public void run(ObjectContainer
container, ClientContext context) {
+ schedCore.innerRegister(req,
random, selectorContainer);
+ }
+
+ }, NativeThread.NORM_PRIORITY, false);
+ }
+ } else {
+ schedTransient.innerRegister(req, random, null);
+ }
+ }
+
/**
- * Register and then delete the RegisterMe which is passed in to avoid
querying.
+ * Register a group of requests (not inserts): a GotKeyListener and/or
one
+ * or more SendableGet's.
+ * @param listener Listeners for specific keys. Can be null if the
listener
+ * is already registered e.g. most of the time with SplitFileFetcher*.
+ * @param getters The actual requests to register to the request sender
queue.
+ * @param registerOffThread If true, create and store a RegisterMe to
ensure
+ * that the request is registered, but then schedule a job to complete
it
+ * after this job completes. Reduces the latency impact of scheduling a
big
+ * splitfile dramatically.
+ * @param persistent True if the request is persistent.
+ * @param onDatabaseThread True if we are running on the database
thread.
+ * NOTE: delayedStoreCheck/probablyNotInStore is unnecessary because we
only
+ * register the listener once.
*/
- public void register(final SendableRequest req, boolean
onDatabaseThread, final boolean regmeOnly, RegisterMe reg, final boolean
probablyNotInStore) {
+ public void register(final GotKeyListener listener, final SendableGet[]
getters, boolean registerOffThread, final boolean persistent, boolean
onDatabaseThread, final BlockSet blocks) {
logMINOR = Logger.shouldLog(Logger.MINOR, this);
- if(logMINOR) Logger.minor(this, "Registering "+req, new
Exception("debug"));
- final boolean persistent = req.persistent();
- if(isInsertScheduler != (req instanceof SendableInsert))
- throw new IllegalArgumentException("Expected a
SendableInsert: "+req);
- if(req instanceof SendableGet) {
- final SendableGet getter = (SendableGet)req;
-
- if(persistent && onDatabaseThread) {
- if(req.isEmpty(selectorContainer) ||
req.isCancelled(selectorContainer)) {
- Logger.error(this, "In register():
Request is empty/cancelled: "+req, new Exception("debug"));
- }
- if(regmeOnly) {
- assert(reg == null);
- reg = schedCore.queueRegister(getter,
databaseExecutor, selectorContainer);
- final RegisterMe regme = reg;
- clientContext.jobRunner.queue(new
DBJob() {
+ if(logMINOR)
+ Logger.minor(this,
"register("+persistent+","+listener+","+getters+","+registerOffThread);
+ if(persistent) {
+ if(onDatabaseThread) {
+ innerRegister(listener, getters,
registerOffThread, persistent, blocks);
+ } else {
+ jobRunner.queue(new DBJob() {
- public void run(ObjectContainer
container, ClientContext context) {
- register(req, true,
false, regme, probablyNotInStore);
- }
- // NORM_PRIORITY so the completion
(finishRegister()) runs before the next block does addPendingKeys().
- }, NativeThread.NORM_PRIORITY, false);
- return;
- }
- schedCore.addPendingKeys(getter,
selectorContainer);
- final Object[] keyTokens =
getter.sendableKeys(selectorContainer);
- final ClientKey[] keys = new
ClientKey[keyTokens.length];
-
- if(probablyNotInStore) {
- // Complete the registration *before*
checking the store.
- // Check the store anyway though!
- finishRegister(req, persistent, true,
true, reg);
- // RegisterMe has been deleted or was
null in the first place.
- reg = null;
- } else {
- if(reg == null)
- reg =
schedCore.queueRegister(getter, databaseExecutor, selectorContainer);
- }
- final RegisterMe regme = reg;
-
- for(int i=0;i<keyTokens.length;i++) {
- keys[i] = getter.getKey(keyTokens[i],
selectorContainer);
- selectorContainer.activate(keys[i], 5);
- }
- final BlockSet blocks =
getter.getContext().blocks;
- final boolean dontCache = getter.dontCache();
+ public void run(ObjectContainer
container, ClientContext context) {
+ // registerOffThread would be
pointless because this is a separate job.
+ innerRegister(listener,
getters, false, persistent, blocks);
+ }
+
+ }, NativeThread.NORM_PRIORITY, false);
+ }
+ } else {
+ if(listener != null) {
+ schedTransient.addPendingKeys(listener, null);
+ short prio =
listener.getPriorityClass(selectorContainer);
+ final Key[] keys =
listener.listKeys(selectorContainer);
+ final boolean dontCache =
listener.dontCache(null);
datastoreCheckerExecutor.execute(new Runnable()
{
public void run() {
- registerCheckStore(getter,
true, keyTokens, keys, regme, blocks, dontCache);
+ // Check the store, then queue
the requests to the main queue.
+ registerCheckStore(getters,
false, keys, null, blocks, dontCache);
}
- }, getter.getPriorityClass(selectorContainer),
"Checking datastore");
- } else if(persistent) {
- final RegisterMe regme = reg;
+ }, prio, "Checking datastore");
+ } else {
+ this.finishRegister(getters, persistent, false,
true, null);
+ }
+ }
+ }
+
+
+ private void innerRegister(final GotKeyListener listener, final
SendableGet[] getters, boolean registerOffThread, boolean persistent, final
BlockSet blocks) {
+ if(listener != null) {
+ if(registerOffThread) {
+ short prio =
listener.getPriorityClass(selectorContainer);
+ RegisterMe regme = new RegisterMe(listener,
getters, prio, schedCore, blocks);
+ selectorContainer.set(regme);
+ if(logMINOR) Logger.minor(this, "Added regme:
"+regme);
jobRunner.queue(new DBJob() {
public void run(ObjectContainer
container, ClientContext context) {
- container.activate(getter, 1);
-
schedCore.addPendingKeys(getter, container);
- RegisterMe reg = regme;
- if(probablyNotInStore) {
- // Complete the
registration *before* checking the store.
- // Check the store
anyway though!
- finishRegister(req,
persistent, true, true, reg);
- // RegisterMe has been
deleted or was null in the first place.
- reg = null;
- } else {
- if(reg == null)
- reg =
schedCore.queueRegister(getter, databaseExecutor, container);
- }
- final RegisterMe regInner = reg;
-
- final Object[] keyTokens =
getter.sendableKeys(container);
- final ClientKey[] keys = new
ClientKey[keyTokens.length];
- for(int
i=0;i<keyTokens.length;i++) {
- keys[i] =
getter.getKey(keyTokens[i], selectorContainer);
-
container.activate(keys[i], 5);
- }
- final BlockSet blocks =
getter.getContext().blocks;
- final boolean dontCache =
getter.dontCache();
-
datastoreCheckerExecutor.execute(new Runnable() {
-
- public void run() {
-
registerCheckStore(getter, true, keyTokens, keys, regInner, blocks, dontCache);
- }
-
- },
getter.getPriorityClass(container), "Checking datastore");
+ register(listener, getters,
false, true, true, blocks);
}
}, NativeThread.NORM_PRIORITY, false);
+ return;
} else {
- // Not persistent
- schedTransient.addPendingKeys(getter, null);
- // Check the store off-thread anyway.
- final Object[] keyTokens =
getter.sendableKeys(null);
- final ClientKey[] keys = new
ClientKey[keyTokens.length];
- for(int i=0;i<keyTokens.length;i++)
- keys[i] = getter.getKey(keyTokens[i],
null);
+ short prio =
listener.getPriorityClass(selectorContainer);
+ schedCore.addPendingKeys(listener,
selectorContainer);
+ final RegisterMe regme;
+ if(getters != null) {
+ regme = new RegisterMe(null, getters,
prio, schedCore, blocks);
+ selectorContainer.set(regme);
+ if(logMINOR) Logger.minor(this, "Added
regme: "+regme);
+ } else regme = null; // Nothing to finish
registering.
+ // Check the datastore before proceding.
+ final Key[] keys =
listener.listKeys(selectorContainer);
+ final boolean dontCache =
listener.dontCache(selectorContainer);
datastoreCheckerExecutor.execute(new Runnable()
{
public void run() {
- registerCheckStore(getter,
false, keyTokens, keys, null, getter.getContext().blocks, getter.dontCache());
+ // Check the store, then queue
the requests to the main queue.
+ registerCheckStore(getters,
true, keys, regme, blocks, dontCache);
}
- }, getter.getPriorityClass(null), "Checking
datastore");
+ }, prio, "Checking datastore");
+
}
} else {
- if(persistent) {
- if(onDatabaseThread) {
- schedCore.queueRegister(req,
databaseExecutor, selectorContainer);
- // Pretend to not be on the database
thread.
- // In some places (e.g.
SplitFileInserter.start(), we call register() *many* times within a single
transaction.
- // We can greatly improve
responsiveness at the cost of some throughput and RAM by only adding the tags
at this point.
- finishRegister(req, persistent, false,
true, reg);
- } else {
- final RegisterMe regme = reg;
- jobRunner.queue(new DBJob() {
-
- public void run(ObjectContainer
container, ClientContext context) {
- container.activate(req,
1);
- RegisterMe reg = regme;
- if(reg == null)
- reg =
schedCore.queueRegister(req, databaseExecutor, selectorContainer);
- // Self-contained job,
will complete quickly enough.
- finishRegister(req,
persistent, true, true, reg);
- }
-
- }, NativeThread.NORM_PRIORITY, false);
- }
- } else {
- finishRegister(req, persistent,
onDatabaseThread, true, reg);
+ // The listener is already registered.
+ // Ignore registerOffThread for now.
+ short prio = RequestStarter.MINIMUM_PRIORITY_CLASS;
+ for(int i=0;i<getters.length;i++) {
+ short p =
getters[i].getPriorityClass(selectorContainer);
+ if(p < prio) prio = p;
}
+ this.finishRegister(getters, persistent, true, true,
null);
}
}
- /**
- * Check the store for all the keys on the SendableGet. By now the
pendingKeys will have
- * been set up, and this is run on the datastore checker thread. Once
completed, this should
- * (for a persistent request) queue a job on the databaseExecutor and
(for a transient
- * request) finish registering the request immediately.
- * @param getter The SendableGet. NOTE: If persistent, DO NOT USE THIS
INLINE, because it won't
- * be activated. This is why we pass in extraBlocks and dontCache.
- * @param reg
- */
- protected void registerCheckStore(SendableGet getter, boolean
persistent, Object[] keyTokens, ClientKey[] keys, RegisterMe reg, BlockSet
extraBlocks, boolean dontCache) {
+ protected void registerCheckStore(SendableGet[] getters, boolean
persistent,
+ Key[] keys, RegisterMe regme, BlockSet extraBlocks,
boolean dontCache) {
boolean anyValid = false;
- for(int i=0;i<keyTokens.length;i++) {
- Object tok = keyTokens[i];
- ClientKeyBlock block = null;
- try {
- ClientKey key = keys[i];
- if(key == null) {
+ for(int i=0;i<keys.length;i++) {
+ Key key = keys[i];
+ KeyBlock block = null;
+ if(key == null) {
+ if(logMINOR) Logger.minor(this, "No key at "+i);
+ continue;
+ } else {
+ if(extraBlocks != null)
+ block = extraBlocks.get(key);
+ if(block == null)
+ block = node.fetch(key, dontCache);
+ if(block != null) {
if(logMINOR)
- Logger.minor(this, "No key for
"+tok+" for "+getter+" - already finished?");
- continue;
- } else {
- if(extraBlocks != null)
- block = extraBlocks.get(key);
- if(block == null)
- block = node.fetchKey(key,
dontCache);
- if(block == null) {
- if(!persistent) {
-
schedTransient.addPendingKey(key.getNodeKey(), getter, null);
- } // If persistent, when it is
registered (in a later job) the keys will be added first.
- } else {
- if(logMINOR)
- Logger.minor(this, "Got
"+block);
- }
+ Logger.minor(this, "Got
"+block);
}
- } catch (KeyVerifyException e) {
- // Verify exception, probably bogus at source;
- // verifies at low-level, but not at decode.
- if(logMINOR)
- Logger.minor(this, "Decode failed: "+e,
e);
- if(!persistent)
- getter.onFailure(new
LowLevelGetException(LowLevelGetException.DECODE_FAILED), tok, null,
clientContext);
- else {
- final SendableGet g = getter;
- final Object token = tok;
- jobRunner.queue(new DBJob() {
-
- public void run(ObjectContainer
container, ClientContext context) {
- container.activate(g,
1);
- g.onFailure(new
LowLevelGetException(LowLevelGetException.DECODE_FAILED), token, container,
context);
- }
- // NORM_PRIORITY+1 as must run
before finishRegister()
- }, NativeThread.NORM_PRIORITY+1, false);
- }
- continue; // other keys might be valid
}
if(block != null) {
- if(logMINOR) Logger.minor(this, "Can fulfill
"+getter+" ("+tok+") immediately from store");
- if(!persistent)
- getter.onSuccess(block, true, tok,
null, clientContext);
- else {
- final ClientKeyBlock b = block;
- final Object t = tok;
- final SendableGet g = getter;
- if(persistent) {
- jobRunner.queue(new DBJob() {
-
- public void
run(ObjectContainer container, ClientContext context) {
-
container.activate(g, 1);
- g.onSuccess(b,
true, t, container, context);
- }
- // NORM_PRIORITY+1 as
must run before finishRegister()
- },
NativeThread.NORM_PRIORITY+1, false);
- } else {
- g.onSuccess(b, true, t, null,
clientContext);
- }
- }
+ if(logMINOR) Logger.minor(this, "Found key");
+ tripPendingKey(block);
} else {
anyValid = true;
}
}
- finishRegister(getter, persistent, false, anyValid, reg);
+ finishRegister(getters, persistent, false, anyValid, regme);
}
/** If enabled, if the queue is less than 25% full, attempt to add
newly
@@ -388,7 +313,7 @@
* bypassing registration on the queue. Risky optimisation. */
static final boolean TRY_DIRECT = true;
- private void finishRegister(final SendableRequest req, boolean
persistent, boolean onDatabaseThread, final boolean anyValid, final RegisterMe
reg) {
+ private void finishRegister(final SendableGet[] getters, boolean
persistent, boolean onDatabaseThread, final boolean anyValid, final RegisterMe
reg) {
if(persistent) {
// Add to the persistent registration queue
if(onDatabaseThread) {
@@ -396,39 +321,49 @@
throw new IllegalStateException("Not on
database thread!");
}
if(persistent)
- selectorContainer.activate(req, 1);
+ selectorContainer.activate(getters, 1);
boolean tryDirect = false;
if(anyValid && TRY_DIRECT) {
synchronized(starterQueue) {
tryDirect = starterQueue.size()
< MAX_STARTER_QUEUE_SIZE * 1 / 4;
}
if(tryDirect) {
- while(true) {
- PersistentChosenRequest
cr = (PersistentChosenRequest) schedCore.maybeMakeChosenRequest(req,
selectorContainer, clientContext);
- if(cr == null) {
-
if(!(req.isEmpty(selectorContainer) || req.isCancelled(selectorContainer)))
- //
Still needs to be registered
-
tryDirect = false;
- break;
- }
-
synchronized(starterQueue) {
-
if(starterQueue.size() >= MAX_STARTER_QUEUE_SIZE) {
+ for(int
i=0;i<getters.length;i++) {
+ SendableGet getter =
getters[i];
+ while(true) {
+
PersistentChosenRequest cr = (PersistentChosenRequest)
schedCore.maybeMakeChosenRequest(getter, selectorContainer, clientContext);
+ if(cr == null) {
+
if(!(getter.isEmpty(selectorContainer) ||
getter.isCancelled(selectorContainer)))
+
// Still needs to be registered
+
tryDirect = false;
break;
}
-
starterQueue.add(cr);
+
synchronized(starterQueue) {
+
if(starterQueue.size() >= MAX_STARTER_QUEUE_SIZE) {
+
tryDirect = false;
+
break;
+ }
+
starterQueue.add(cr);
+ }
}
}
}
}
if(logMINOR)
- Logger.minor(this, "finishRegister()
for "+req);
+ Logger.minor(this, "finishRegister()
for "+getters);
if(anyValid) {
if(!tryDirect) {
-
if(req.isCancelled(selectorContainer) || req.isEmpty(selectorContainer)) {
- Logger.error(this,
"Request is empty/cancelled: "+req);
- } else {
-
schedCore.innerRegister(req, random, selectorContainer);
+ boolean wereAnyValid = false;
+ for(int
i=0;i<getters.length;i++) {
+ SendableGet getter =
getters[i];
+
if(!(getter.isCancelled(selectorContainer) ||
getter.isEmpty(selectorContainer))) {
+ wereAnyValid =
true;
+
schedCore.innerRegister(getter, random, selectorContainer);
+ }
}
+ if(!wereAnyValid) {
+ Logger.error(this, "No
requests valid: "+getters);
+ }
}
}
if(reg != null)
@@ -439,15 +374,20 @@
jobRunner.queue(new DBJob() {
public void run(ObjectContainer
container, ClientContext context) {
- container.activate(req, 1);
+ container.activate(getters, 1);
if(logMINOR)
- Logger.minor(this,
"finishRegister() for "+req);
- if(anyValid) {
-
if(req.isCancelled(container) || req.isEmpty(container)) {
-
Logger.error(this, "Request is empty/cancelled: "+req);
- } else
-
schedCore.innerRegister(req, random, container);
+ Logger.minor(this,
"finishRegister() for "+getters);
+ boolean wereAnyValid = false;
+ for(int
i=0;i<getters.length;i++) {
+ SendableGet getter =
getters[i];
+
if(!(getter.isCancelled(selectorContainer) ||
getter.isEmpty(selectorContainer))) {
+ wereAnyValid =
true;
+
schedCore.innerRegister(getter, random, selectorContainer);
+ }
}
+ if(!wereAnyValid) {
+ Logger.error(this, "No
requests valid: "+getters);
+ }
if(reg != null)
container.delete(reg);
maybeFillStarterQueue(container, context);
@@ -458,7 +398,8 @@
}
} else {
// Register immediately.
- schedTransient.innerRegister(req, random, null);
+ for(int i=0;i<getters.length;i++)
+ schedTransient.innerRegister(getters[i],
random, null);
starter.wakeUp();
}
}
@@ -471,7 +412,7 @@
requestStarterQueueFiller.run(container, context);
}
- void addPendingKey(final ClientKey key, final SendableGet getter) {
+ void addPendingKey(final ClientKey key, final GotKeyListener getter) {
if(getter.persistent()) {
if(!databaseExecutor.onThread()) {
throw new IllegalStateException("Not on
database thread!");
@@ -606,28 +547,35 @@
}
};
- public void removePendingKey(final SendableGet getter, final boolean
complain, final Key key, ObjectContainer container) {
+ public void removePendingKey(final GotKeyListener getter, final boolean
complain, final Key key, ObjectContainer container) {
if(!getter.persistent()) {
boolean dropped =
schedTransient.removePendingKey(getter, complain, key, container);
if(dropped && offeredKeys != null &&
!node.peersWantKey(key)) {
for(int i=0;i<offeredKeys.length;i++)
offeredKeys[i].remove(key);
}
- if(transientCooldownQueue != null)
- transientCooldownQueue.removeKey(key, getter,
getter.getCooldownWakeupByKey(key, null), null);
+ if(transientCooldownQueue != null) {
+ SendableGet cooldownGetter =
getter.getRequest(key, container);
+ if(cooldownGetter != null)
+ transientCooldownQueue.removeKey(key,
cooldownGetter, cooldownGetter.getCooldownWakeupByKey(key, null), null);
+ }
} else if(container != null) {
// We are on the database thread already.
schedCore.removePendingKey(getter, complain, key,
container);
- if(persistentCooldownQueue != null)
- persistentCooldownQueue.removeKey(key, getter,
getter.getCooldownWakeupByKey(key, container), container);
+ if(persistentCooldownQueue != null) {
+ SendableGet cooldownGetter =
getter.getRequest(key, container);
+ persistentCooldownQueue.removeKey(key,
cooldownGetter, cooldownGetter.getCooldownWakeupByKey(key, container),
container);
+ }
} else {
jobRunner.queue(new DBJob() {
public void run(ObjectContainer container,
ClientContext context) {
container.activate(getter, 1);
schedCore.removePendingKey(getter,
complain, key, container);
- if(persistentCooldownQueue != null)
-
persistentCooldownQueue.removeKey(key, getter,
getter.getCooldownWakeupByKey(key, container), container);
+ if(persistentCooldownQueue != null) {
+ SendableGet cooldownGetter =
getter.getRequest(key, container);
+
persistentCooldownQueue.removeKey(key, cooldownGetter,
cooldownGetter.getCooldownWakeupByKey(key, container), container);
+ }
}
}, NativeThread.NORM_PRIORITY, false);
@@ -640,7 +588,7 @@
* @param getter
* @param complain
*/
- public void removePendingKeys(SendableGet getter, boolean complain) {
+ public void removePendingKeys(GotKeyListener getter, boolean complain) {
ObjectContainer container;
if(getter.persistent()) {
container = selectorContainer;
@@ -650,16 +598,9 @@
} else {
container = null;
}
- Object[] keyTokens = getter.allKeys(container);
- for(int i=0;i<keyTokens.length;i++) {
- Object tok = keyTokens[i];
- ClientKey ckey = getter.getKey(tok, container);
- if(ckey == null) {
- if(complain)
- Logger.error(this, "Key "+tok+" is null
for "+getter, new Exception("debug"));
- continue;
- }
- removePendingKey(getter, complain, ckey.getNodeKey(),
container);
+ Key[] keys = getter.listKeys(container);
+ for(int i=0;i<keys.length;i++) {
+ removePendingKey(getter, complain, keys[i], container);
}
}
@@ -707,7 +648,7 @@
}
}
final Key key = block.getKey();
- final SendableGet[] transientGets =
schedTransient.removePendingKey(key, null);
+ final GotKeyListener[] transientGets =
schedTransient.removePendingKey(key, null);
if(transientGets != null && transientGets.length > 0) {
node.executor.execute(new Runnable() {
public void run() {
@@ -723,8 +664,12 @@
}
}, "Running off-thread callbacks for "+block.getKey());
if(transientCooldownQueue != null) {
- for(int i=0;i<transientGets.length;i++)
- transientCooldownQueue.removeKey(key,
transientGets[i], transientGets[i].getCooldownWakeupByKey(key, null), null);
+ for(int i=0;i<transientGets.length;i++) {
+ GotKeyListener got = transientGets[i];
+ SendableGet req = got.getRequest(key,
null);
+ if(req == null) continue;
+ transientCooldownQueue.removeKey(key,
req, req.getCooldownWakeupByKey(key, null), null);
+ }
}
}
@@ -734,12 +679,15 @@
public void run(ObjectContainer container,
ClientContext context) {
container.activate(key, 1);
- final SendableGet[] gets =
schedCore.removePendingKey(key, container);
+ final GotKeyListener[] gets =
schedCore.removePendingKey(key, container);
if(gets == null) return;
if(persistentCooldownQueue != null) {
for(int i=0;i<gets.length;i++) {
- container.activate(gets[i], 1);
-
persistentCooldownQueue.removeKey(key, gets[i],
gets[i].getCooldownWakeupByKey(key, container), container);
+ GotKeyListener got = gets[i];
+ container.activate(got, 1);
+ SendableGet req =
got.getRequest(key, null);
+ if(req == null) continue;
+
persistentCooldownQueue.removeKey(key, req, req.getCooldownWakeupByKey(key,
container), container);
}
}
// Call the callbacks on the database executor
thread, because the first thing
@@ -839,8 +787,8 @@
if(persistent)
container.activate(key, 5);
if(logMINOR) Logger.minor(this, "Restoring key: "+key);
- SendableGet[] gets =
schedCore.getClientsForPendingKey(key, container);
- SendableGet[] transientGets =
schedTransient.getClientsForPendingKey(key, null);
+ GotKeyListener[] gets =
schedCore.getClientsForPendingKey(key, container);
+ GotKeyListener[] transientGets =
schedTransient.getClientsForPendingKey(key, null);
if(gets == null && transientGets == null) {
// Not an error as this can happen due to race
conditions etc.
if(logMINOR) Logger.minor(this, "Restoring key
but no keys queued?? for "+key);
@@ -850,11 +798,22 @@
for(int i=0;i<gets.length;i++) {
if(persistent)
container.activate(gets[i], 1);
- gets[i].requeueAfterCooldown(key, now,
container, clientContext);
+ GotKeyListener got = gets[i];
+ SendableGet req = got.getRequest(key,
container);
+ if(req == null) {
+ Logger.error(this, "No request
for listener "+got+" while requeueing "+key);
+ }
+ req.requeueAfterCooldown(key, now,
container, clientContext);
}
if(transientGets != null)
- for(int i=0;i<transientGets.length;i++)
-
transientGets[i].requeueAfterCooldown(key, now, container, clientContext);
+ for(int i=0;i<transientGets.length;i++) {
+ GotKeyListener got = transientGets[i];
+ SendableGet req = got.getRequest(key,
null);
+ if(req == null) {
+ Logger.error(this, "No request
for listener "+got+" while requeueing "+key);
+ }
+ req.requeueAfterCooldown(key, now,
container, clientContext);
+ }
}
}
return found;
Modified:
branches/db4o/freenet/src/freenet/client/async/ClientRequestSchedulerBase.java
===================================================================
---
branches/db4o/freenet/src/freenet/client/async/ClientRequestSchedulerBase.java
2008-07-10 15:37:53 UTC (rev 21031)
+++
branches/db4o/freenet/src/freenet/client/async/ClientRequestSchedulerBase.java
2008-07-10 19:28:28 UTC (rev 21032)
@@ -10,11 +10,9 @@
import com.db4o.ObjectContainer;
import freenet.crypt.RandomSource;
-import freenet.keys.ClientKey;
import freenet.keys.Key;
import freenet.node.BaseSendableGet;
import freenet.node.RequestStarter;
-import freenet.node.SendableGet;
import freenet.node.SendableRequest;
import freenet.support.Logger;
import freenet.support.SectoredRandomGrabArrayWithInt;
@@ -142,7 +140,7 @@
for(int i=0;i<reqs.length;i++) {
SendableRequest req = reqs[i];
// Unregister from the RGA's, but keep the pendingKeys
and cooldown queue data.
- req.unregister(true, container);
+ req.unregister(container);
// Then can do innerRegister() (not register()).
innerRegister(req, random, container);
}
@@ -175,31 +173,25 @@
}
}
- public void addPendingKeys(SendableGet getter, ObjectContainer
container) {
- Object[] keyTokens = getter.sendableKeys(container);
- Object prevTok = null;
+ public void addPendingKeys(GotKeyListener getter, ObjectContainer
container) {
+ if(persistent())
+ container.activate(getter, 1);
+ Key[] keyTokens = getter.listKeys(container);
+ Key prevTok = null;
for(int i=0;i<keyTokens.length;i++) {
- Object tok = keyTokens[i];
- if(i != 0 && prevTok == tok || (prevTok != null && tok
!= null && prevTok.equals(tok))) {
+ Key key = keyTokens[i];
+ if(i != 0 && prevTok == key || (prevTok != null && key
!= null && prevTok.equals(key))) {
Logger.error(this, "Ignoring duplicate token");
continue;
}
- prevTok = tok;
- ClientKey key = getter.getKey(tok, container);
if(getter.persistent())
container.activate(key, 5);
- if(key == null) {
- if(logMINOR)
- Logger.minor(this, "No key for "+tok+"
for "+getter+" - already finished?");
- continue;
- } else {
- addPendingKey(key.getNodeKey(), getter,
container);
- }
+ addPendingKey(key, getter, container);
}
}
public short getKeyPrio(Key key, short priority, ObjectContainer
container) {
- SendableGet[] getters = getClientsForPendingKey(key, container);
+ GotKeyListener[] getters = getClientsForPendingKey(key,
container);
if(getters == null) return priority;
for(int i=0;i<getters.length;i++) {
if(persistent())
@@ -214,17 +206,17 @@
public abstract long countQueuedRequests(ObjectContainer container);
- protected abstract boolean inPendingKeys(SendableGet req, Key key,
ObjectContainer container);
+ protected abstract boolean inPendingKeys(GotKeyListener req, Key key,
ObjectContainer container);
- public abstract SendableGet[] getClientsForPendingKey(Key key,
ObjectContainer container);
+ public abstract GotKeyListener[] getClientsForPendingKey(Key key,
ObjectContainer container);
public abstract boolean anyWantKey(Key key, ObjectContainer container);
- public abstract SendableGet[] removePendingKey(Key key, ObjectContainer
container);
+ public abstract GotKeyListener[] removePendingKey(Key key,
ObjectContainer container);
- public abstract boolean removePendingKey(SendableGet getter, boolean
complain, Key key, ObjectContainer container);
+ public abstract boolean removePendingKey(GotKeyListener getter, boolean
complain, Key key, ObjectContainer container);
- abstract void addPendingKey(Key key, SendableGet getter,
ObjectContainer container);
+ abstract void addPendingKey(Key key, GotKeyListener getter,
ObjectContainer container);
protected abstract Set
makeSetForAllRequestsByClientRequest(ObjectContainer container);
Modified:
branches/db4o/freenet/src/freenet/client/async/ClientRequestSchedulerCore.java
===================================================================
---
branches/db4o/freenet/src/freenet/client/async/ClientRequestSchedulerCore.java
2008-07-10 15:37:53 UTC (rev 21031)
+++
branches/db4o/freenet/src/freenet/client/async/ClientRequestSchedulerCore.java
2008-07-10 19:28:28 UTC (rev 21032)
@@ -273,12 +273,6 @@
container.set(ret);
if(logMINOR)
Logger.minor(this, "Storing "+ret+" for
"+req);
- if((ctr++ & 15) == 0) {
- // This check is quite expensive, don't
do it all the time.
- if((req instanceof SendableGet) &&
!inPendingKeys((SendableGet)req, key, container)) {
- Logger.error(this, "Selected
key not in pendingKeys: key "+key+" for "+req);
- }
- }
} else {
ret = new ChosenRequest(req, token, key, ckey,
req.getPriorityClass(container));
}
@@ -542,17 +536,31 @@
if(logMINOR)
Logger.minor(this, "RegisterMe: next()
took "+(endNext-startNext));
container.delete(reg);
- container.activate(reg.getter, 2);
- if(reg.getter.isCancelled(container)) continue;
+ if(reg.getters != null) {
+ boolean allKilled = true;
+ for(int j=0;j<reg.getters.length;j++) {
+
container.activate(reg.getters[j], 2);
+
if(!reg.getters[i].isCancelled(container))
+ allKilled = false;
+ }
+ if(allKilled) {
+ if(logMINOR)
+ Logger.minor(this, "Not
registering as all SendableGet's already cancelled");
+ }
+ }
+
if(logMINOR)
- Logger.minor(this, "Running RegisterMe
for "+reg.getter+" : "+reg.key.addedTime+" : "+reg.key.priority);
+ Logger.minor(this, "Running RegisterMe
for "+reg.listener+" and "+reg.getters+" : "+reg.key.addedTime+" :
"+reg.key.priority);
// Don't need to activate, fields should exist?
FIXME
try {
- sched.register(reg.getter, true, false,
reg, false);
+ sched.register(reg.listener,
reg.getters, false, true, true, reg.blocks);
} catch (Throwable t) {
Logger.error(this, "Caught "+t+"
running RegisterMeRunner", t);
// Cancel the request, and commit so it
isn't tried again.
- reg.getter.internalError(null, t,
sched, container, context, true);
+ if(reg.getters != null) {
+ for(int
k=0;k<reg.getters.length;k++)
+
reg.getters[k].internalError(null, t, sched, container, context, true);
+ }
}
if(System.currentTimeMillis() > deadline) break;
}
@@ -565,17 +573,6 @@
}
}
- public RegisterMe queueRegister(SendableRequest req,
PrioritizedSerialExecutor databaseExecutor, ObjectContainer container) {
- if(!databaseExecutor.onThread()) {
- throw new IllegalStateException("Not on database
thread!");
- }
- RegisterMe reg = new RegisterMe(req,
req.getPriorityClass(container), this);
- container.set(reg);
- if(logMINOR)
- Logger.minor(this, "Queued RegisterMe for "+req+" :
"+reg);
- return reg;
- }
-
/**
* @return True unless the key was already present.
*/
@@ -620,7 +617,7 @@
return pending.size();
}
- protected boolean inPendingKeys(SendableGet req, final Key key,
ObjectContainer container) {
+ protected boolean inPendingKeys(GotKeyListener req, final Key key,
ObjectContainer container) {
final String pks = HexUtil.bytesToHex(key.getFullKey());
ObjectSet pending = container.query(new Predicate() {
public boolean match(PendingKeyItem item) {
@@ -656,7 +653,7 @@
return false;
}
- public SendableGet[] getClientsForPendingKey(final Key key,
ObjectContainer container) {
+ public GotKeyListener[] getClientsForPendingKey(final Key key,
ObjectContainer container) {
final String pks = HexUtil.bytesToHex(key.getFullKey());
ObjectSet pending = container.query(new Predicate() {
public boolean match(PendingKeyItem item) {
@@ -686,7 +683,7 @@
return pending.hasNext();
}
- public SendableGet[] removePendingKey(final Key key, ObjectContainer
container) {
+ public GotKeyListener[] removePendingKey(final Key key, ObjectContainer
container) {
final String pks = HexUtil.bytesToHex(key.getFullKey());
ObjectSet pending = container.query(new Predicate() {
public boolean match(PendingKeyItem item) {
@@ -698,14 +695,14 @@
});
if(pending.hasNext()) {
PendingKeyItem item = (PendingKeyItem) pending.next();
- SendableGet[] getters = item.getters();
+ GotKeyListener[] getters = item.getters();
container.delete(item);
return getters;
}
return null;
}
- public boolean removePendingKey(SendableGet getter, boolean complain,
final Key key, ObjectContainer container) {
+ public boolean removePendingKey(GotKeyListener getter, boolean
complain, final Key key, ObjectContainer container) {
final String pks = HexUtil.bytesToHex(key.getFullKey());
ObjectSet pending = container.query(new Predicate() {
public boolean match(PendingKeyItem item) {
@@ -728,7 +725,7 @@
return false;
}
- protected void addPendingKey(final Key key, SendableGet getter,
ObjectContainer container) {
+ protected void addPendingKey(final Key key, GotKeyListener getter,
ObjectContainer container) {
if(logMINOR)
Logger.minor(this, "Adding pending key for "+key+" for
"+getter);
long startTime = System.currentTimeMillis();
Modified:
branches/db4o/freenet/src/freenet/client/async/ClientRequestSchedulerNonPersistent.java
===================================================================
---
branches/db4o/freenet/src/freenet/client/async/ClientRequestSchedulerNonPersistent.java
2008-07-10 15:37:53 UTC (rev 21031)
+++
branches/db4o/freenet/src/freenet/client/async/ClientRequestSchedulerNonPersistent.java
2008-07-10 19:28:28 UTC (rev 21032)
@@ -63,7 +63,7 @@
* Register a pending key to an already-registered request. This is
necessary if we've
* already registered a SendableGet, but we later add some more keys to
it.
*/
- void addPendingKey(Key nodeKey, SendableGet getter, ObjectContainer
container) {
+ void addPendingKey(Key nodeKey, GotKeyListener getter, ObjectContainer
container) {
logMINOR = Logger.shouldLog(Logger.MINOR,
ClientRequestSchedulerBase.class);
if(logMINOR)
Logger.minor(this, "Adding pending key "+nodeKey+" for
"+getter);
@@ -71,13 +71,13 @@
Object o = pendingKeys.get(nodeKey);
if(o == null) {
pendingKeys.put(nodeKey, getter);
- } else if(o instanceof SendableGet) {
- SendableGet oldGet = (SendableGet) o;
+ } else if(o instanceof GotKeyListener) {
+ GotKeyListener oldGet = (GotKeyListener) o;
if(oldGet != getter) {
- pendingKeys.put(nodeKey, new
SendableGet[] { oldGet, getter });
+ pendingKeys.put(nodeKey, new
GotKeyListener[] { oldGet, getter });
}
} else {
- SendableGet[] gets = (SendableGet[]) o;
+ GotKeyListener[] gets = (GotKeyListener[]) o;
boolean found = false;
for(int j=0;j<gets.length;j++) {
if(gets[j] == getter) {
@@ -86,7 +86,7 @@
}
}
if(!found) {
- SendableGet[] newGets = new
SendableGet[gets.length+1];
+ GotKeyListener[] newGets = new
GotKeyListener[gets.length+1];
System.arraycopy(gets, 0, newGets, 0,
gets.length);
newGets[gets.length] = getter;
pendingKeys.put(nodeKey, newGets);
@@ -95,7 +95,7 @@
}
}
- public boolean removePendingKey(SendableGet getter, boolean complain,
Key key, ObjectContainer container) {
+ public boolean removePendingKey(GotKeyListener getter, boolean
complain, Key key, ObjectContainer container) {
if(logMINOR)
Logger.minor(this, "Removing pending key: "+getter+"
for "+key);
boolean dropped = false;
@@ -110,8 +110,8 @@
if(o == null) {
if(complain)
Logger.normal(this, "Not found:
"+getter+" for "+key+" removing (no such key)");
- } else if(o instanceof SendableGet) {
- SendableGet oldGet = (SendableGet) o;
+ } else if(o instanceof GotKeyListener) {
+ GotKeyListener oldGet = (GotKeyListener) o;
if(oldGet != getter) {
if(complain)
Logger.normal(this, "Not found:
"+getter+" for "+key+" removing (1 getter)");
@@ -122,9 +122,9 @@
Logger.minor(this, "Removed
only getter (1) for "+key, new Exception("debug"));
}
} else {
- SendableGet[] gets = (SendableGet[]) o;
+ GotKeyListener[] gets = (GotKeyListener[]) o;
final int getsLength = gets.length;
- SendableGet[] newGets = new
SendableGet[getsLength > 1 ? getsLength-1 : 0];
+ GotKeyListener[] newGets = new
GotKeyListener[getsLength > 1 ? getsLength-1 : 0];
boolean found = false;
int x = 0;
for(int j=0;j<getsLength;j++) {
@@ -152,7 +152,7 @@
pendingKeys.put(key, newGets[0]);
} else {
if(x != getsLength-1) {
- SendableGet[] newNewGets = new
SendableGet[x];
+ GotKeyListener[] newNewGets =
new GotKeyListener[x];
System.arraycopy(newGets, 0,
newNewGets, 0, x);
newGets = newNewGets;
}
@@ -163,19 +163,19 @@
return dropped;
}
- public SendableGet[] removePendingKey(Key key, ObjectContainer
container) {
+ public GotKeyListener[] removePendingKey(Key key, ObjectContainer
container) {
Object o;
- final SendableGet[] gets;
+ final GotKeyListener[] gets;
synchronized(pendingKeys) {
o = pendingKeys.remove(key);
}
if(o == null) return null;
- if(o instanceof SendableGet) {
- gets = new SendableGet[] { (SendableGet) o };
+ if(o instanceof GotKeyListener) {
+ gets = new GotKeyListener[] { (GotKeyListener) o };
if(logMINOR)
Logger.minor(this, "Removing all pending keys
for "+key+" (1)", new Exception("debug"));
} else {
- gets = (SendableGet[]) o;
+ gets = (GotKeyListener[]) o;
if(logMINOR)
Logger.minor(this, "Removing all pending keys
for "+key+" ("+gets.length+")", new Exception("debug"));
}
@@ -193,11 +193,11 @@
Object o = pendingKeys.get(key);
if(o == null) {
// Blah
- } else if(o instanceof SendableGet) {
- short p =
((SendableGet)o).getPriorityClass(container);
+ } else if(o instanceof GotKeyListener) {
+ short p =
((GotKeyListener)o).getPriorityClass(container);
if(p < priority) priority = p;
} else { // if(o instanceof SendableGet[]) {
- SendableGet[] gets = (SendableGet[]) o;
+ GotKeyListener[] gets = (GotKeyListener[]) o;
for(int i=0;i<gets.length;i++) {
short p =
gets[i].getPriorityClass(container);
if(p < priority) priority = p;
@@ -207,32 +207,32 @@
return priority;
}
- public SendableGet[] getClientsForPendingKey(Key key, ObjectContainer
container) {
+ public GotKeyListener[] getClientsForPendingKey(Key key,
ObjectContainer container) {
Object o;
synchronized(pendingKeys) {
o = pendingKeys.get(key);
}
if(o == null) {
return null;
- } else if(o instanceof SendableGet) {
- SendableGet get = (SendableGet) o;
- return new SendableGet[] { get };
+ } else if(o instanceof GotKeyListener) {
+ GotKeyListener get = (GotKeyListener) o;
+ return new GotKeyListener[] { get };
} else {
- return (SendableGet[]) o;
+ return (GotKeyListener[]) o;
}
}
- protected boolean inPendingKeys(SendableGet req, Key key,
ObjectContainer container) {
+ protected boolean inPendingKeys(GotKeyListener req, Key key,
ObjectContainer container) {
Object o;
synchronized(pendingKeys) {
o = pendingKeys.get(key);
}
if(o == null) {
return false;
- } else if(o instanceof SendableGet) {
+ } else if(o instanceof GotKeyListener) {
return o == req;
} else {
- SendableGet[] gets = (SendableGet[]) o;
+ GotKeyListener[] gets = (GotKeyListener[]) o;
for(int i=0;i<gets.length;i++)
if(gets[i] == req) return true;
}
Added: branches/db4o/freenet/src/freenet/client/async/GotKeyListener.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/GotKeyListener.java
(rev 0)
+++ branches/db4o/freenet/src/freenet/client/async/GotKeyListener.java
2008-07-10 19:28:28 UTC (rev 21032)
@@ -0,0 +1,58 @@
+package freenet.client.async;
+
+import com.db4o.ObjectContainer;
+
+import freenet.keys.Key;
+import freenet.keys.KeyBlock;
+import freenet.node.SendableGet;
+
+public interface GotKeyListener {
+
+ /**
+ * Callback for when a block is found. Will be called on the database
executor thread.
+ * @param key
+ * @param block
+ * @param sched
+ */
+ public abstract void onGotKey(Key key, KeyBlock block, ObjectContainer
container, ClientContext context);
+
+ /**
+ * What keys are we interested in?
+ * @param container Database handle.
+ */
+ Key[] listKeys(ObjectContainer container);
+
+ /**
+ * Is this related to a persistent request?
+ */
+ boolean persistent();
+
+ /**
+ * Priority of the associated request.
+ * @param container Database handle.
+ */
+ short getPriorityClass(ObjectContainer container);
+
+ /**
+ * Is the request cancelled/finished/invalid?
+ * @param container Database handle.
+ */
+ boolean isCancelled(ObjectContainer container);
+
+ /**
+ * Get the SendableGet for a specific key, if any.
+ * Used in requeueing requests after a cooldown has expired.
+ * @param key The key.
+ * @param container The database handle.
+ * @return Null if we don't want to register a request for the key,
+ * otherwise the SendableGet.
+ */
+ public abstract SendableGet getRequest(Key key, ObjectContainer
container);
+
+ /**
+ * @return True if when checking the datastore on initial registration,
we
+ * should not promote any blocks found.
+ */
+ public abstract boolean dontCache(ObjectContainer container);
+
+}
Modified: branches/db4o/freenet/src/freenet/client/async/PendingKeyItem.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/PendingKeyItem.java
2008-07-10 15:37:53 UTC (rev 21031)
+++ branches/db4o/freenet/src/freenet/client/async/PendingKeyItem.java
2008-07-10 19:28:28 UTC (rev 21032)
@@ -3,8 +3,6 @@
import com.db4o.ObjectContainer;
import freenet.keys.Key;
-import freenet.node.SendableGet;
-import freenet.node.SendableRequest;
import freenet.support.HexUtil;
public class PendingKeyItem {
@@ -20,20 +18,20 @@
* it... whereas doing so results in a fast index lookup.
*/
final String fullKeyAsBytes;
- private SendableGet[] getters;
+ private GotKeyListener[] getters;
- PendingKeyItem(Key key, SendableGet getter, long nodeDBHandle) {
+ PendingKeyItem(Key key, GotKeyListener getter, long nodeDBHandle) {
this.key = key;
- this.getters = new SendableGet[] { getter };
+ this.getters = new GotKeyListener[] { getter };
this.nodeDBHandle = nodeDBHandle;
this.fullKeyAsBytes = HexUtil.bytesToHex(key.getFullKey());
}
- public void addGetter(SendableGet getter) {
+ public void addGetter(GotKeyListener getter) {
for(int i=0;i<getters.length;i++) {
if(getters[i] == getter) return;
}
- SendableGet[] newGetters = new SendableGet[getters.length+1];
+ GotKeyListener[] newGetters = new
GotKeyListener[getters.length+1];
System.arraycopy(getters, 0, newGetters, 0, getters.length);
newGetters[getters.length] = getter;
getters = newGetters;
@@ -43,16 +41,16 @@
* @param getter
* @return True if the getter was removed. Caller should check
isEmpty() afterwards.
*/
- public boolean removeGetter(SendableGet getter) {
+ public boolean removeGetter(GotKeyListener getter) {
int found = 0;
for(int i=0;i<getters.length;i++) {
if(getters[i] == getter) found++;
}
if(found == 0) return false;
if(found == getters.length)
- getters = new SendableGet[0];
+ getters = new GotKeyListener[0];
else {
- SendableGet[] newGetters = new
SendableGet[getters.length - found];
+ GotKeyListener[] newGetters = new
GotKeyListener[getters.length - found];
int x = 0;
for(int i=0;i<getters.length;i++) {
if(getters[i] == getter) continue;
@@ -67,13 +65,13 @@
return getters.length == 0;
}
- public boolean hasGetter(SendableRequest req) {
+ public boolean hasGetter(GotKeyListener req) {
for(int i=0;i<getters.length;i++)
if(getters[i] == req) return true;
return false;
}
- public SendableGet[] getters() {
+ public GotKeyListener[] getters() {
return getters;
}
Modified: branches/db4o/freenet/src/freenet/client/async/RegisterMe.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/RegisterMe.java
2008-07-10 15:37:53 UTC (rev 21031)
+++ branches/db4o/freenet/src/freenet/client/async/RegisterMe.java
2008-07-10 19:28:28 UTC (rev 21032)
@@ -2,7 +2,7 @@
import com.db4o.ObjectContainer;
-import freenet.node.SendableRequest;
+import freenet.node.SendableGet;
/**
* These must be deleted once the request has been registered.
@@ -10,16 +10,28 @@
* @author toad
*/
public class RegisterMe {
- final SendableRequest getter;
+ final GotKeyListener listener;
+ final SendableGet[] getters;
final ClientRequestSchedulerCore core;
final RegisterMeSortKey key;
private final int hashCode;
+ public final BlockSet blocks;
- RegisterMe(SendableRequest getter, short prio,
ClientRequestSchedulerCore core) {
- hashCode = (getter.hashCode() * prio) ^ core.hashCode();
- this.getter = getter;
+ RegisterMe(GotKeyListener listener, SendableGet[] getters, short prio,
ClientRequestSchedulerCore core, BlockSet blocks) {
+ this.listener = listener;
+ this.getters = getters;
this.core = core;
this.key = new RegisterMeSortKey(prio);
+ this.blocks = blocks;
+ int hash = core.hashCode();
+ if(listener != null)
+ hash ^= listener.hashCode();
+ if(getters != null) {
+ for(int i=0;i<getters.length;i++)
+ hash ^= getters[i].hashCode();
+ }
+ hash *= prio;
+ hashCode = hash;
}
public void objectOnActivate(ObjectContainer container) {
Modified:
branches/db4o/freenet/src/freenet/client/async/SimpleSingleFileFetcher.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/SimpleSingleFileFetcher.java
2008-07-10 15:37:53 UTC (rev 21031)
+++ branches/db4o/freenet/src/freenet/client/async/SimpleSingleFileFetcher.java
2008-07-10 19:28:28 UTC (rev 21032)
@@ -101,7 +101,7 @@
}
}
// :(
- unregister(false, container);
+ unregisterAll(container, context);
if(e.isFatal() || forceFatal)
parent.fatallyFailedBlock(container, context);
else
@@ -115,7 +115,7 @@
container.activate(parent, 1);
container.activate(rcb, 1);
}
- unregister(false, container);
+ unregister(container); // pending key has already been removed
if(parent.isCancelled()) {
data.asBucket().free();
onFailure(new FetchException(FetchException.CANCELLED),
false, container, context);
Modified:
branches/db4o/freenet/src/freenet/client/async/SingleBlockInserter.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/SingleBlockInserter.java
2008-07-10 15:37:53 UTC (rev 21031)
+++ branches/db4o/freenet/src/freenet/client/async/SingleBlockInserter.java
2008-07-10 19:28:28 UTC (rev 21032)
@@ -199,7 +199,7 @@
}
if(persistent)
container.set(this);
- getScheduler(context).register(this, false, false);
+ getScheduler(context).registerInsert(this, persistent, true);
}
private void fail(InsertException e, ObjectContainer container,
ClientContext context) {
@@ -263,7 +263,7 @@
if(persistent)
container.set(this);
} else {
- getScheduler(context).register(this, false, false);
+ getScheduler(context).registerInsert(this, persistent,
true);
}
}
@@ -329,7 +329,7 @@
container.set(this);
container.activate(cb, 1);
}
- super.unregister(false, container);
+ super.unregister(container);
cb.onFailure(new InsertException(InsertException.CANCELLED),
this, container, context);
}
Modified: branches/db4o/freenet/src/freenet/client/async/SingleFileFetcher.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/SingleFileFetcher.java
2008-07-10 15:37:53 UTC (rev 21031)
+++ branches/db4o/freenet/src/freenet/client/async/SingleFileFetcher.java
2008-07-10 19:28:28 UTC (rev 21032)
@@ -6,7 +6,6 @@
import java.io.IOException;
import java.net.MalformedURLException;
import java.util.ArrayList;
-import java.util.LinkedList;
import java.util.List;
import com.db4o.ObjectContainer;
@@ -182,7 +181,7 @@
}
protected void onSuccess(FetchResult result, ObjectContainer container,
ClientContext context) {
- unregister(false, container);
+ unregister(container); // Key has already been removed from
pendingKeys
if(persistent) {
container.activate(decompressors, 1);
container.activate(parent, 1);
@@ -522,7 +521,7 @@
f.addDecompressor(codec);
}
parent.onTransition(this, f, container);
- f.schedule(container, context, false, false);
+ f.schedule(container, context, false);
if(persistent) {
container.set(metaStrings);
container.set(this);
@@ -599,7 +598,7 @@
SplitFileFetcher sf = new
SplitFileFetcher(metadata, rcb, parent, ctx,
decompressors, clientMetadata,
actx, recursionLevel, returnBucket, token, container);
parent.onTransition(this, sf, container);
- sf.schedule(container, context, false, false);
+ sf.schedule(container, context, false);
rcb.onBlockSetFinished(this, container,
context);
// Clear our own metadata, we won't need it any
more.
// For multi-level metadata etc see above.
@@ -925,7 +924,7 @@
if(l == usk.suggestedEdition) {
SingleFileFetcher sf = new
SingleFileFetcher(parent, cb, clientMetadata, key, metaStrings,
key.getURI().addMetaStrings(metaStrings),
0, ctx, actx, null,
null, maxRetries, recursionLevel+1, dontTellClientGet, token, false,
returnBucket, true, container, context);
- sf.schedule(container, context, false,
false);
+ sf.schedule(container, context, false);
} else {
cb.onFailure(new
FetchException(FetchException.PERMANENT_REDIRECT,
newUSK.getURI().addMetaStrings(metaStrings)), null, container, context);
}
Modified: branches/db4o/freenet/src/freenet/client/async/SplitFileFetcher.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/SplitFileFetcher.java
2008-07-10 15:37:53 UTC (rev 21031)
+++ branches/db4o/freenet/src/freenet/client/async/SplitFileFetcher.java
2008-07-10 19:28:28 UTC (rev 21032)
@@ -336,14 +336,14 @@
}
}
- public void schedule(ObjectContainer container, ClientContext context,
boolean regmeOnly, boolean probablyNotInStore) {
+ public void schedule(ObjectContainer container, ClientContext context,
boolean regmeOnly) {
if(persistent)
container.activate(this, 1);
if(segments.length > 1)
regmeOnly = true;
if(Logger.shouldLog(Logger.MINOR, this)) Logger.minor(this,
"Scheduling "+this);
for(int i=0;i<segments.length;i++) {
- segments[i].schedule(container, context, regmeOnly,
probablyNotInStore);
+ segments[i].schedule(container, context, regmeOnly);
}
}
Modified:
branches/db4o/freenet/src/freenet/client/async/SplitFileFetcherSegment.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/SplitFileFetcherSegment.java
2008-07-10 15:37:53 UTC (rev 21031)
+++ branches/db4o/freenet/src/freenet/client/async/SplitFileFetcherSegment.java
2008-07-10 19:28:28 UTC (rev 21032)
@@ -22,13 +22,18 @@
import freenet.client.SplitfileBlock;
import freenet.keys.CHKBlock;
import freenet.keys.CHKEncodeException;
+import freenet.keys.CHKVerifyException;
import freenet.keys.ClientCHK;
import freenet.keys.ClientCHKBlock;
import freenet.keys.ClientKey;
import freenet.keys.ClientKeyBlock;
import freenet.keys.Key;
+import freenet.keys.KeyBlock;
+import freenet.keys.KeyDecodeException;
import freenet.keys.NodeCHK;
+import freenet.keys.TooBigException;
import freenet.node.RequestScheduler;
+import freenet.node.SendableGet;
import freenet.support.Logger;
import freenet.support.RandomGrabArray;
import freenet.support.api.Bucket;
@@ -38,7 +43,7 @@
* A single segment within a SplitFileFetcher.
* This in turn controls a large number of SplitFileFetcherSubSegment's, which
are registered on the ClientRequestScheduler.
*/
-public class SplitFileFetcherSegment implements FECCallback {
+public class SplitFileFetcherSegment implements FECCallback, GotKeyListener {
private static volatile boolean logMINOR;
final short splitfileType;
@@ -179,13 +184,13 @@
return fatallyFailedBlocks;
}
- public void onSuccess(Bucket data, int blockNo,
SplitFileFetcherSubSegment seg, ClientKeyBlock block, ObjectContainer
container, ClientContext context) {
+ public void onSuccess(Bucket data, int blockNo, ClientKeyBlock block,
ObjectContainer container, ClientContext context) {
if(persistent)
container.activate(this, 1);
if(data == null) throw new NullPointerException();
boolean decodeNow = false;
logMINOR = Logger.shouldLog(Logger.MINOR, this);
- if(logMINOR) Logger.minor(this, "Fetched block "+blockNo+" on
"+seg);
+ if(logMINOR) Logger.minor(this, "Fetched block "+blockNo);
if(parentFetcher.parent instanceof ClientGetter)
((ClientGetter)parentFetcher.parent).addKeyToBinaryBlob(block, container,
context);
// No need to unregister key, because it will be cleared in
tripPendingKey().
@@ -249,8 +254,8 @@
container.activate(parentFetcher.parent, 1);
}
parentFetcher.parent.completedBlock(dontNotify, container,
context);
- seg.possiblyRemoveFromParent(container);
if(decodeNow) {
+ context.getChkFetchScheduler().removePendingKeys(this,
true);
removeSubSegments(container);
decode(container, context);
}
@@ -465,7 +470,9 @@
boolean allFailed;
// Since we can't keep the key, we need to unregister for it at
this point to avoid a memory leak
NodeCHK key = getBlockNodeKey(blockNo, container);
- if(key != null) seg.unregisterKey(key, context, container);
+ if(key != null)
+ // don't complain as may already have been removed e.g.
if we have a decode error in onGotKey; don't NPE for same reason
+ context.getChkFetchScheduler().removePendingKey(this,
false, key, container);
synchronized(this) {
if(isFinishing(container)) return; // this failure is
now irrelevant, and cleanup will occur on the decoder thread
if(blockNo < dataKeys.length) {
@@ -497,7 +504,7 @@
container.set(this);
if(allFailed)
fail(new FetchException(FetchException.SPLITFILE_ERROR,
errors), container, context);
- else
+ else if(seg != null)
seg.possiblyRemoveFromParent(container);
}
@@ -524,7 +531,7 @@
tries = ++dataRetries[blockNo];
if(tries > maxTries && maxTries >= 0) failed =
true;
else {
- sub = getSubSegment(tries, container);
+ sub = getSubSegment(tries, container,
false);
if(tries %
ClientRequestScheduler.COOLDOWN_RETRIES == 0) {
long now =
System.currentTimeMillis();
if(dataCooldownTimes[blockNo] >
now)
@@ -542,7 +549,7 @@
tries = ++checkRetries[checkNo];
if(tries > maxTries && maxTries >= 0) failed =
true;
else {
- sub = getSubSegment(tries, container);
+ sub = getSubSegment(tries, container,
false);
if(tries %
ClientRequestScheduler.COOLDOWN_RETRIES == 0) {
long now =
System.currentTimeMillis();
if(checkCooldownTimes[checkNo]
> now)
@@ -567,20 +574,16 @@
}
if(cooldown) {
// Register to the next sub-segment before removing
from the old one.
- sub.getScheduler(context).addPendingKey(key, sub);
- seg.unregisterKey(key.getNodeKey(), context, container);
if(logMINOR)
Logger.minor(this, "Adding to cooldown queue:
"+key+" for "+this+" was on segment "+seg+" now registered to "+sub);
} else {
// If we are here we are going to retry
if(logMINOR)
Logger.minor(this, "Retrying block "+blockNo+"
on "+this+" : tries="+tries+"/"+maxTries+" : "+sub);
- sub.add(blockNo, false, container, context, false);
- seg.unregisterKey(key.getNodeKey(), context, container);
}
}
- private SplitFileFetcherSubSegment getSubSegment(int retryCount,
ObjectContainer container) {
+ private SplitFileFetcherSubSegment getSubSegment(int retryCount,
ObjectContainer container, boolean noCreate) {
SplitFileFetcherSubSegment sub;
if(persistent)
container.activate(subSegments, 1);
@@ -589,6 +592,7 @@
sub = (SplitFileFetcherSubSegment)
subSegments.get(i);
if(sub.retryCount == retryCount) return sub;
}
+ if(noCreate) return null;
sub = new SplitFileFetcherSubSegment(this, retryCount);
subSegments.add(sub);
}
@@ -627,26 +631,27 @@
checkBuckets[i] = null;
}
}
+ context.getChkFetchScheduler().removePendingKeys(this, true);
removeSubSegments(container);
if(persistent)
container.set(this);
parentFetcher.segmentFinished(this, container, context);
}
- public void schedule(ObjectContainer container, ClientContext context,
boolean regmeOnly, boolean probablyNotInStore) {
+ public void schedule(ObjectContainer container, ClientContext context,
boolean regmeOnly) {
if(persistent) {
container.activate(this, 1);
container.activate(parentFetcher, 1);
container.activate(parentFetcher.parent, 1);
}
try {
- SplitFileFetcherSubSegment seg = getSubSegment(0,
container);
+ SplitFileFetcherSubSegment seg = getSubSegment(0,
container, false);
if(persistent)
container.activate(seg, 1);
for(int
i=0;i<dataRetries.length+checkRetries.length;i++)
seg.add(i, true, container, context, false);
- seg.schedule(container, context, regmeOnly,
probablyNotInStore);
+ seg.schedule(container, context, true, regmeOnly);
synchronized(this) {
scheduled = true;
}
@@ -760,9 +765,9 @@
}
/**
- * @return True if the key was wanted and the scheduled segment was the
one that called, false otherwise.
+ * @return True if the key was wanted, false otherwise.
*/
- public boolean requeueAfterCooldown(Key key, long time, ObjectContainer
container, ClientContext context, SplitFileFetcherSubSegment segment) {
+ public boolean requeueAfterCooldown(Key key, long time, ObjectContainer
container, ClientContext context) {
if(persistent)
container.activate(this, 1);
Vector v = null;
@@ -782,7 +787,7 @@
return false;
}
int tries = dataRetries[i];
- SplitFileFetcherSubSegment sub =
getSubSegment(tries, container);
+ SplitFileFetcherSubSegment sub =
getSubSegment(tries, container, false);
if(logMINOR)
Logger.minor(this, "Retrying after
cooldown on "+this+": data block "+i+" on "+this+" :
tries="+tries+"/"+maxTries+" : "+sub);
if(v == null) v = new Vector();
@@ -803,7 +808,7 @@
return false;
}
int tries = checkRetries[i];
- SplitFileFetcherSubSegment sub =
getSubSegment(tries, container);
+ SplitFileFetcherSubSegment sub =
getSubSegment(tries, container, false);
if(logMINOR)
Logger.minor(this, "Retrying after
cooldown on "+this+": check block "+i+" on "+this+" :
tries="+tries+"/"+maxTries+" : "+sub);
if(v == null) v = new Vector();
@@ -816,27 +821,25 @@
if(notFound) {
Logger.error(this, "requeueAfterCooldown: Key not
found!: "+key+" on "+this);
}
- boolean foundCaller = false;
if(v != null) {
for(int i=0;i<v.size();i++) {
- if(v.get(i) == segment) foundCaller = true;
SplitFileFetcherSubSegment sub =
(SplitFileFetcherSubSegment) v.get(i);
RandomGrabArray rga = sub.getParentGrabArray();
if(sub.getParentGrabArray() == null) {
- sub.schedule(container, context, false,
true);
+ sub.schedule(container, context, false,
false);
} else {
// if(logMINOR) {
container.activate(rga, 1);
if(!rga.contains(sub,
container)) {
Logger.error(this,
"Sub-segment has RGA but isn't registered to it!!: "+sub+" for "+rga);
- sub.schedule(container,
context, false, true);
+ sub.schedule(container,
context, false, false);
}
container.deactivate(rga, 1);
// }
}
}
}
- return foundCaller;
+ return true;
}
public synchronized long getCooldownWakeupByKey(Key key,
ObjectContainer container) {
@@ -929,4 +932,107 @@
return checkBuckets[blockNo].hasData();
}
}
+
+ public boolean dontCache(ObjectContainer container) {
+ return !blockFetchContext.cacheLocalRequests;
+ }
+
+ public short getPriorityClass(ObjectContainer container) {
+ container.activate(parent, 1);
+ return parent.priorityClass;
+ }
+
+ public SendableGet getRequest(Key key, ObjectContainer container) {
+ int blockNum = this.getBlockNumber(key, container);
+ if(blockNum < 0) return null;
+ int retryCount = getBlockRetryCount(blockNum);
+ return getSubSegment(retryCount, container, false);
+ }
+
+ public boolean isCancelled(ObjectContainer container) {
+ return isFinishing(container);
+ }
+
+ public Key[] listKeys(ObjectContainer container) {
+ Vector v = new Vector();
+ synchronized(this) {
+ for(int i=0;i<dataKeys.length;i++) {
+ if(dataKeys[i] != null) {
+ if(persistent)
+ container.activate(dataKeys[i],
5);
+ v.add(dataKeys[i].getNodeKey());
+ }
+ }
+ for(int i=0;i<checkKeys.length;i++) {
+ if(checkKeys[i] != null) {
+ if(persistent)
+
container.activate(checkKeys[i], 5);
+ v.add(checkKeys[i].getNodeKey());
+ }
+ }
+ }
+ return (Key[]) v.toArray(new Key[v.size()]);
+ }
+
+ public void onGotKey(Key key, KeyBlock block, ObjectContainer
container, ClientContext context) {
+ int blockNum = this.getBlockNumber(key, container);
+ if(blockNum < 0) return;
+ ClientCHK ckey = this.getBlockKey(blockNum, container);
+ ClientCHKBlock cb;
+ int retryCount = getBlockRetryCount(blockNum);
+ SplitFileFetcherSubSegment seg = this.getSubSegment(retryCount,
container, true);
+ seg.removeBlockNum(blockNum);
+ seg.possiblyRemoveFromParent(container);
+ try {
+ cb = new ClientCHKBlock((CHKBlock)block, ckey);
+ } catch (CHKVerifyException e) {
+ this.onFatalFailure(new
FetchException(FetchException.BLOCK_DECODE_ERROR, e), blockNum, null,
container, context);
+ return;
+ }
+ Bucket data = extract(cb, blockNum, container, context);
+ if(data == null) return;
+
+ if(!cb.isMetadata()) {
+ this.onSuccess(data, blockNum, cb, container, context);
+ } else {
+ this.onFatalFailure(new
FetchException(FetchException.INVALID_METADATA, "Metadata where expected
data"), blockNum, null, container, context);
+ }
+ }
+
+ private int getBlockRetryCount(int blockNum) {
+ if(blockNum < dataRetries.length)
+ return dataRetries[blockNum];
+ blockNum -= dataRetries.length;
+ return checkRetries[blockNum];
+ }
+
+ /** Convert a ClientKeyBlock to a Bucket. If an error occurs, report it
via onFailure
+ * and return null.
+ */
+ protected Bucket extract(ClientKeyBlock block, int blockNum,
ObjectContainer container, ClientContext context) {
+ Bucket data;
+ try {
+ data =
block.decode(context.getBucketFactory(persistent),
(int)(Math.min(this.blockFetchContext.maxOutputLength, Integer.MAX_VALUE)),
false);
+ } catch (KeyDecodeException e1) {
+ if(Logger.shouldLog(Logger.MINOR, this))
+ Logger.minor(this, "Decode failure: "+e1, e1);
+ this.onFatalFailure(new
FetchException(FetchException.BLOCK_DECODE_ERROR, e1.getMessage()), blockNum,
null, container, context);
+ return null;
+ } catch (TooBigException e) {
+ this.onFatalFailure(new
FetchException(FetchException.TOO_BIG, e.getMessage()), blockNum, null,
container, context);
+ return null;
+ } catch (IOException e) {
+ Logger.error(this, "Could not capture data - disk
full?: "+e, e);
+ this.onFatalFailure(new
FetchException(FetchException.BUCKET_ERROR, e), blockNum, null, container,
context);
+ return null;
+ }
+ if(Logger.shouldLog(Logger.MINOR, this))
+ Logger.minor(this, data == null ? "Could not decode:
null" : ("Decoded "+data.size()+" bytes"));
+ return data;
+ }
+
+
+ public boolean persistent() {
+ return persistent;
+ }
}
Modified:
branches/db4o/freenet/src/freenet/client/async/SplitFileFetcherSubSegment.java
===================================================================
---
branches/db4o/freenet/src/freenet/client/async/SplitFileFetcherSubSegment.java
2008-07-10 15:37:53 UTC (rev 21031)
+++
branches/db4o/freenet/src/freenet/client/async/SplitFileFetcherSubSegment.java
2008-07-10 19:28:28 UTC (rev 21032)
@@ -329,7 +329,7 @@
onFailure(new FetchException(FetchException.CANCELLED),
token, container, context);
return;
}
- segment.onSuccess(data, blockNo, this, block, container,
context);
+ segment.onSuccess(data, blockNo, block, container, context);
}
/** Convert a ClientKeyBlock to a Bucket. If an error occurs, report it
via onFailure
@@ -456,10 +456,8 @@
}
if(persistent)
container.set(blockNums);
- if(schedule) schedule(container, context, false, true); //
Retrying so not in store
- else if(!dontSchedule)
- // Already scheduled, however this key may not be
registered.
-
getScheduler(context).addPendingKey(segment.getBlockKey(blockNo, container),
this);
+ if(schedule)
+ context.getChkFetchScheduler().register(null, new
SendableGet[] { this }, false, persistent, true, null);
}
public String toString() {
@@ -480,7 +478,7 @@
Logger.minor(this, "Definitely removing from
parent: "+this);
if(!segment.maybeRemoveSeg(this, container)) return;
}
- unregister(false, container);
+ unregister(container);
}
public void onGotKey(Key key, KeyBlock block, ObjectContainer
container, ClientContext context) {
@@ -542,7 +540,7 @@
if(logMINOR)
Logger.minor(this, "Killing "+this);
// Do unregister() first so can get and unregister each key and
avoid a memory leak
- unregister(false, container);
+ unregister(container);
synchronized(segment) {
blockNums.clear();
cancelled = true;
@@ -564,9 +562,8 @@
}
if(Logger.shouldLog(Logger.MINOR, this))
Logger.minor(this, "Requeueing after cooldown "+key+"
for "+this);
- if(!segment.requeueAfterCooldown(key, time, container, context,
this)) {
- Logger.error(this, "Removing key "+key+" for "+this+"
in requeueAfterCooldown as is now registered to a different subsegment");
- unregisterKey(key, context, container);
+ if(!segment.requeueAfterCooldown(key, time, container,
context)) {
+ Logger.error(this, "Key was not wanted after cooldown:
"+key+" for "+this+" in requeueAfterCooldown");
}
}
@@ -588,4 +585,21 @@
}
}
+ public void schedule(ObjectContainer container, ClientContext context,
boolean firstTime, boolean regmeOnly) {
+ getScheduler(context).register(firstTime ? segment : null, new
SendableGet[] { this }, regmeOnly, persistent, true,
segment.blockFetchContext.blocks);
+ }
+
+ public void removeBlockNum(int blockNum) {
+ synchronized(segment) {
+ for(int i=0;i<blockNums.size();i++) {
+ Integer token = (Integer) blockNums.get(i);
+ int num = ((Integer)token).intValue();
+ if(num == blockNum) {
+ blockNums.remove(i);
+ break;
+ }
+ }
+ }
+ }
+
}
Modified: branches/db4o/freenet/src/freenet/client/async/USKChecker.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/USKChecker.java
2008-07-10 15:37:53 UTC (rev 21031)
+++ branches/db4o/freenet/src/freenet/client/async/USKChecker.java
2008-07-10 19:28:28 UTC (rev 21032)
@@ -32,7 +32,7 @@
container.activate(this, 1);
container.activate(cb, 1);
}
- unregister(false, container);
+ unregister(container); // Key has already been removed from
pendingKeys
cb.onSuccess((ClientSSKBlock)block, context);
}
@@ -73,7 +73,7 @@
if(canRetry && retry(container, context)) return;
// Ran out of retries.
- unregister(false, container);
+ unregisterAll(container, context);
if(e.code == LowLevelGetException.CANCELLED){
cb.onCancelled(context);
return;
Modified: branches/db4o/freenet/src/freenet/client/async/USKFetcher.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/USKFetcher.java
2008-07-10 15:37:53 UTC (rev 21031)
+++ branches/db4o/freenet/src/freenet/client/async/USKFetcher.java
2008-07-10 19:28:28 UTC (rev 21032)
@@ -159,7 +159,7 @@
if(logMINOR)
Logger.minor(this, "Checker == null in
schedule() for "+this, new Exception("debug"));
} else
- checker.schedule(container, context, false,
false);
+ checker.schedule(container, context, false);
}
public String toString() {
@@ -469,17 +469,17 @@
public void schedule(long delay, ObjectContainer container, final
ClientContext context) {
assert(container == null);
if (delay<=0) {
- schedule(container, context, false, false);
+ schedule(container, context, false);
} else {
uskManager.ticker.queueTimedJob(new Runnable() {
public void run() {
- USKFetcher.this.schedule(null, context,
false, false);
+ USKFetcher.this.schedule(null, context,
false);
}
}, delay);
}
}
- public void schedule(ObjectContainer container, ClientContext context,
boolean regmeOnly, boolean probablyNotInStore) {
+ public void schedule(ObjectContainer container, ClientContext context,
boolean regmeOnly) {
USKAttempt[] attempts;
long lookedUp = uskManager.lookup(origUSK);
synchronized(this) {
Modified: branches/db4o/freenet/src/freenet/client/async/USKFetcherTag.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/USKFetcherTag.java
2008-07-10 15:37:53 UTC (rev 21031)
+++ branches/db4o/freenet/src/freenet/client/async/USKFetcherTag.java
2008-07-10 19:28:28 UTC (rev 21032)
@@ -91,7 +91,7 @@
usk = usk.copy(edition);
fetcher = manager.getFetcher(usk, ctx, new
USKFetcherWrapper(usk, priority, client), keepLastData);
fetcher.addCallback(this);
- fetcher.schedule(null, context, false, false); // non-persistent
+ fetcher.schedule(null, context, false); // non-persistent
}
public void cancel(ObjectContainer container, ClientContext context) {
@@ -118,7 +118,7 @@
return token;
}
- public void schedule(ObjectContainer container, ClientContext context,
boolean regmeOnly, boolean probablyNotInStore) {
+ public void schedule(ObjectContainer container, ClientContext context,
boolean regmeOnly) {
start(context.uskManager, context);
}
Modified: branches/db4o/freenet/src/freenet/client/async/USKInserter.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/USKInserter.java
2008-07-10 15:37:53 UTC (rev 21031)
+++ branches/db4o/freenet/src/freenet/client/async/USKInserter.java
2008-07-10 19:28:28 UTC (rev 21032)
@@ -78,7 +78,7 @@
if(finished) return;
fetcher =
context.uskManager.getFetcherForInsertDontSchedule(pubUSK,
parent.priorityClass, this, parent.getClient(), container, context);
}
- fetcher.schedule(container, context, false, false);
+ fetcher.schedule(container, context, false);
}
public void onFoundEdition(long l, USK key, ObjectContainer container,
ClientContext context, boolean lastContentWasMetadata, short codec, byte[]
hisData) {
Modified: branches/db4o/freenet/src/freenet/client/async/USKManager.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/USKManager.java
2008-07-10 15:37:53 UTC (rev 21031)
+++ branches/db4o/freenet/src/freenet/client/async/USKManager.java
2008-07-10 19:28:28 UTC (rev 21032)
@@ -140,7 +140,7 @@
fetcher.cancel(null, context);
}
}
- if(sched != null) sched.schedule(null, context, false, false);
+ if(sched != null) sched.schedule(null, context, false);
}
void update(final USK origUSK, final long number, final ClientContext
context) {
@@ -216,7 +216,7 @@
if(fetcher != null) {
ticker.queueTimedJob(new Runnable() {
public void run() {
- fetcher.schedule(null, context, false,
false);
+ fetcher.schedule(null, context, false);
}
}, 0);
}
Modified: branches/db4o/freenet/src/freenet/client/async/USKRetriever.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/USKRetriever.java
2008-07-10 15:37:53 UTC (rev 21031)
+++ branches/db4o/freenet/src/freenet/client/async/USKRetriever.java
2008-07-10 19:28:28 UTC (rev 21032)
@@ -48,7 +48,7 @@
SingleFileFetcher getter =
(SingleFileFetcher)
SingleFileFetcher.create(this, this, new ClientMetadata(), uri, ctx, new
ArchiveContext(ctx.maxArchiveLevels),
ctx.maxNonSplitfileRetries, 0,
true, l, true, null, false, null, context);
- getter.schedule(null, context, false, false);
+ getter.schedule(null, context, false);
} catch (MalformedURLException e) {
Logger.error(this, "Impossible: "+e, e);
} catch (FetchException e) {
Modified: branches/db4o/freenet/src/freenet/node/SendableGet.java
===================================================================
--- branches/db4o/freenet/src/freenet/node/SendableGet.java 2008-07-10
15:37:53 UTC (rev 21031)
+++ branches/db4o/freenet/src/freenet/node/SendableGet.java 2008-07-10
19:28:28 UTC (rev 21032)
@@ -99,12 +99,6 @@
return true;
}
- public void schedule(ObjectContainer container, ClientContext context,
boolean regmeOnly, boolean assumeNotInStore) {
- if(Logger.shouldLog(Logger.MINOR, this))
- Logger.minor(this, "Scheduling "+this);
- getScheduler(context).register(this, regmeOnly,
assumeNotInStore);
- }
-
public ClientRequestScheduler getScheduler(ClientContext context) {
if(isSSK())
return context.getSskFetchScheduler();
@@ -113,14 +107,6 @@
}
/**
- * Callback for when a block is found. Will be called on the database
executor thread.
- * @param key
- * @param block
- * @param sched
- */
- public abstract void onGotKey(Key key, KeyBlock block, ObjectContainer
container, ClientContext context);
-
- /**
* Get the time at which the key specified by the given token will wake
up from the
* cooldown queue.
* @param token
@@ -133,16 +119,6 @@
/** Reset the cooldown times when the request is reregistered. */
public abstract void resetCooldownTimes(ObjectContainer container);
- public final void unregister(boolean staySubscribed, ObjectContainer
container, ClientContext context) {
- if(!staySubscribed)
- getScheduler(context).removePendingKeys(this, false);
- super.unregister(staySubscribed, container);
- }
-
- public final void unregisterKey(Key key, ClientContext context,
ObjectContainer container) {
- getScheduler(context).removePendingKey(this, false, key,
container);
- }
-
public void internalError(final Object keyNum, final Throwable t, final
RequestScheduler sched, ObjectContainer container, ClientContext context,
boolean persistent) {
sched.callFailure(this, new
LowLevelGetException(LowLevelGetException.INTERNAL_ERROR, t.getMessage(), t),
keyNum, NativeThread.MAX_PRIORITY, null, persistent);
}
Modified: branches/db4o/freenet/src/freenet/node/SendableRequest.java
===================================================================
--- branches/db4o/freenet/src/freenet/node/SendableRequest.java 2008-07-10
15:37:53 UTC (rev 21031)
+++ branches/db4o/freenet/src/freenet/node/SendableRequest.java 2008-07-10
19:28:28 UTC (rev 21032)
@@ -96,7 +96,7 @@
container.set(this);
}
- public void unregister(boolean staySubscribed, ObjectContainer
container) {
+ public void unregister(ObjectContainer container) {
RandomGrabArray arr = getParentGrabArray();
if(arr != null) {
if(persistent)
Modified: branches/db4o/freenet/src/freenet/node/SimpleSendableInsert.java
===================================================================
--- branches/db4o/freenet/src/freenet/node/SimpleSendableInsert.java
2008-07-10 15:37:53 UTC (rev 21031)
+++ branches/db4o/freenet/src/freenet/node/SimpleSendableInsert.java
2008-07-10 19:28:28 UTC (rev 21032)
@@ -110,7 +110,7 @@
public void schedule() {
finished = false; // can reschedule
- scheduler.register(this, false, false);
+ scheduler.registerInsert(this, false, false);
}
public void cancel(ObjectContainer container, ClientContext context) {
@@ -118,7 +118,7 @@
if(finished) return;
finished = true;
}
- super.unregister(false, container);
+ super.unregister(container);
}
public boolean shouldCache() {