Author: toad
Date: 2008-06-03 20:49:20 +0000 (Tue, 03 Jun 2008)
New Revision: 20189
Modified:
branches/db4o/freenet/src/freenet/client/FECCodec.java
branches/db4o/freenet/src/freenet/client/async/BaseSingleFileFetcher.java
branches/db4o/freenet/src/freenet/client/async/ClientGetState.java
branches/db4o/freenet/src/freenet/client/async/ClientPutState.java
branches/db4o/freenet/src/freenet/client/async/ClientRequestScheduler.java
branches/db4o/freenet/src/freenet/client/async/ClientRequestSchedulerBase.java
branches/db4o/freenet/src/freenet/client/async/ClientRequestSchedulerCore.java
branches/db4o/freenet/src/freenet/client/async/ClientRequester.java
branches/db4o/freenet/src/freenet/client/async/GetCompletionCallback.java
branches/db4o/freenet/src/freenet/client/async/OfferedKeysList.java
branches/db4o/freenet/src/freenet/client/async/PutCompletionCallback.java
branches/db4o/freenet/src/freenet/client/async/SimpleHealingQueue.java
branches/db4o/freenet/src/freenet/client/async/SimpleSingleFileFetcher.java
branches/db4o/freenet/src/freenet/client/async/SingleBlockInserter.java
branches/db4o/freenet/src/freenet/client/async/SingleFileFetcher.java
branches/db4o/freenet/src/freenet/client/async/SplitFileFetcher.java
branches/db4o/freenet/src/freenet/client/async/SplitFileFetcherSegment.java
branches/db4o/freenet/src/freenet/client/async/SplitFileFetcherSubSegment.java
branches/db4o/freenet/src/freenet/client/async/SplitFileInserter.java
branches/db4o/freenet/src/freenet/client/async/SplitFileInserterSegment.java
branches/db4o/freenet/src/freenet/client/async/USKCallback.java
branches/db4o/freenet/src/freenet/client/async/USKChecker.java
branches/db4o/freenet/src/freenet/client/async/USKFetcherCallback.java
branches/db4o/freenet/src/freenet/client/async/USKFetcherWrapper.java
branches/db4o/freenet/src/freenet/client/async/USKInserter.java
branches/db4o/freenet/src/freenet/clients/http/bookmark/BookmarkManager.java
branches/db4o/freenet/src/freenet/node/BaseSendableGet.java
branches/db4o/freenet/src/freenet/node/NodeClientCore.java
branches/db4o/freenet/src/freenet/node/RequestScheduler.java
branches/db4o/freenet/src/freenet/node/SendableGet.java
branches/db4o/freenet/src/freenet/node/SendableInsert.java
branches/db4o/freenet/src/freenet/node/SendableRequest.java
branches/db4o/freenet/src/freenet/node/SimpleSendableInsert.java
Log:
[21:22] * toad_ commits his bomb before it gets any bigger... doesn't compile?
who cares! :)
Pass in an ObjectContainer to almost everything doing anything in the client
layer. Obviously it will be null for nonpersistent requests.
Add callFailure and callSuccess to RequestScheduler for calling callbacks on
the database thread (from e.g. the request starter thread).
Add a dedicated datastore checker thread (also prioritised but serial).
Refactor register() for SendableGet's significantly. 3 phase register: on db
thread, add pending keys and add RegisterMe. on store checker thread, check
everything. then on db thread, add to queue and remove RegisterMe. Obviously it
is simpler for nonpersistent requests (the store check is still on the db
thread though).
- The code in SchedCore dealing with this (e.g. on startup) is rewritten
somewhat.
Get rid of splitfilefetcher.scheduleOffThread. (Given the above, not needed;
the one-thread-per-priority scheduler threads will go away soon).
Remove some cooldown error checking which hopefully isn't needed now. :( Can't
do that check on the request starter thread.
Move addPendingKeys() to ClientRequestSchedulerBase.
Check whether on the database thread in various places.
Get rid of SimpleSingleFileFetcher.onFailure(FetchException, RequestScheduler).
(trivial, use the other one)
Comments/javadocs.
This should probably have been several commits. Of course each one would have
been incomplete, so is this one...
Modified: branches/db4o/freenet/src/freenet/client/FECCodec.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/FECCodec.java 2008-06-02
15:53:31 UTC (rev 20188)
+++ branches/db4o/freenet/src/freenet/client/FECCodec.java 2008-06-03
20:49:20 UTC (rev 20189)
@@ -8,6 +8,7 @@
import java.io.OutputStream;
import java.util.LinkedList;
+import com.db4o.ObjectContainer;
import com.onionnetworks.fec.FECCode;
import com.onionnetworks.util.Buffer;
@@ -478,8 +479,8 @@
*/
public interface StandardOnionFECCodecEncoderCallback {
- public void onEncodedSegment();
+ public void onEncodedSegment(ObjectContainer container);
- public void onDecodedSegment();
+ public void onDecodedSegment(ObjectContainer container);
}
}
Modified:
branches/db4o/freenet/src/freenet/client/async/BaseSingleFileFetcher.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/BaseSingleFileFetcher.java
2008-06-02 15:53:31 UTC (rev 20188)
+++ branches/db4o/freenet/src/freenet/client/async/BaseSingleFileFetcher.java
2008-06-03 20:49:20 UTC (rev 20189)
@@ -3,6 +3,8 @@
* http://www.gnu.org/ for further details of the GPL. */
package freenet.client.async;
+import com.db4o.ObjectContainer;
+
import freenet.client.FetchContext;
import freenet.keys.ClientKey;
import freenet.keys.ClientSSK;
@@ -36,24 +38,24 @@
cooldownWakeupTime = -1;
}
- public Object[] allKeys() {
+ public Object[] allKeys(ObjectContainer container) {
return keys;
}
- public Object[] sendableKeys() {
+ public Object[] sendableKeys(ObjectContainer container) {
return keys;
}
- public Object chooseKey(KeysFetchingLocally fetching) {
+ public Object chooseKey(KeysFetchingLocally fetching, ObjectContainer
container) {
if(fetching.hasKey(key.getNodeKey())) return null;
return keys[0];
}
- public boolean hasValidKeys(KeysFetchingLocally fetching) {
+ public boolean hasValidKeys(KeysFetchingLocally fetching,
ObjectContainer container) {
return !fetching.hasKey(key.getNodeKey());
}
- public ClientKey getKey(Object token) {
+ public ClientKey getKey(Object token, ObjectContainer container) {
return key;
}
@@ -67,7 +69,7 @@
/** Try again - returns true if we can retry
* @param sched */
- protected boolean retry(RequestScheduler sched) {
+ protected boolean retry(RequestScheduler sched, ObjectContainer
container) {
retryCount++;
if(Logger.shouldLog(Logger.MINOR, this))
Logger.minor(this, "Attempting to retry... (max
"+maxRetries+", current "+retryCount+ ')');
@@ -82,7 +84,7 @@
cooldownWakeupTime = sched.queueCooldown(key,
this);
return true; // We will retry, just not yet.
See requeueAfterCooldown(Key).
} else {
- schedule();
+ schedule(container);
}
return true;
}
@@ -105,7 +107,7 @@
return ctx.ignoreStore;
}
- public void cancel() {
+ public void cancel(ObjectContainer container) {
synchronized(this) {
cancelled = true;
}
@@ -133,7 +135,7 @@
return true;
}
- public void onGotKey(Key key, KeyBlock block, RequestScheduler sched) {
+ public void onGotKey(Key key, KeyBlock block, RequestScheduler sched,
ObjectContainer container) {
synchronized(this) {
if(isCancelled()) return;
if(!key.equals(this.key.getNodeKey())) {
@@ -142,7 +144,7 @@
}
}
try {
- onSuccess(Key.createKeyBlock(this.key, block), false,
null, sched);
+ onSuccess(Key.createKeyBlock(this.key, block), false,
null, sched, container);
} catch (KeyVerifyException e) {
Logger.error(this, "onGotKey("+key+","+block+") got
"+e+" for "+this, e);
// FIXME if we get rid of the direct route this must
call onFailure()
@@ -150,19 +152,19 @@
}
- public long getCooldownWakeup(Object token) {
+ public long getCooldownWakeup(Object token, ObjectContainer container) {
return cooldownWakeupTime;
}
- public long getCooldownWakeupByKey(Key key) {
+ public long getCooldownWakeupByKey(Key key, ObjectContainer container) {
return cooldownWakeupTime;
}
- public synchronized void resetCooldownTimes() {
+ public synchronized void resetCooldownTimes(ObjectContainer container) {
cooldownWakeupTime = -1;
}
- public void requeueAfterCooldown(Key key, long time) {
+ public void requeueAfterCooldown(Key key, long time, ObjectContainer
container) {
if(cooldownWakeupTime > time) {
if(Logger.shouldLog(Logger.MINOR, this))
Logger.minor(this, "Not requeueing as deadline has not passed yet");
return;
@@ -173,7 +175,7 @@
}
if(Logger.shouldLog(Logger.MINOR, this))
Logger.minor(this, "Requeueing after cooldown "+key+"
for "+this);
- schedule();
+ schedule(container);
}
}
Modified: branches/db4o/freenet/src/freenet/client/async/ClientGetState.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/ClientGetState.java
2008-06-02 15:53:31 UTC (rev 20188)
+++ branches/db4o/freenet/src/freenet/client/async/ClientGetState.java
2008-06-03 20:49:20 UTC (rev 20189)
@@ -3,15 +3,17 @@
* http://www.gnu.org/ for further details of the GPL. */
package freenet.client.async;
+import com.db4o.ObjectContainer;
+
/**
* A ClientGetState.
* Represents a stage in the fetch process.
*/
public interface ClientGetState {
- public void schedule();
+ public void schedule(ObjectContainer container);
- public void cancel();
+ public void cancel(ObjectContainer container);
public long getToken();
}
Modified: branches/db4o/freenet/src/freenet/client/async/ClientPutState.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/ClientPutState.java
2008-06-02 15:53:31 UTC (rev 20188)
+++ branches/db4o/freenet/src/freenet/client/async/ClientPutState.java
2008-06-03 20:49:20 UTC (rev 20189)
@@ -3,6 +3,8 @@
* http://www.gnu.org/ for further details of the GPL. */
package freenet.client.async;
+import com.db4o.ObjectContainer;
+
import freenet.client.InsertException;
import freenet.support.SimpleFieldSet;
@@ -17,10 +19,10 @@
public abstract BaseClientPutter getParent();
/** Cancel the request. */
- public abstract void cancel();
+ public abstract void cancel(ObjectContainer container);
/** Schedule the request. */
- public abstract void schedule() throws InsertException;
+ public abstract void schedule(ObjectContainer container) throws
InsertException;
/**
* Get the token, an object which is passed around with the insert and
may be
Modified:
branches/db4o/freenet/src/freenet/client/async/ClientRequestScheduler.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/ClientRequestScheduler.java
2008-06-02 15:53:31 UTC (rev 20188)
+++ branches/db4o/freenet/src/freenet/client/async/ClientRequestScheduler.java
2008-06-03 20:49:20 UTC (rev 20189)
@@ -19,6 +19,7 @@
import freenet.node.BaseSendableGet;
import freenet.node.KeysFetchingLocally;
import freenet.node.LowLevelGetException;
+import freenet.node.LowLevelPutException;
import freenet.node.Node;
import freenet.node.NodeClientCore;
import freenet.node.RequestScheduler;
@@ -98,6 +99,7 @@
private final CooldownQueue transientCooldownQueue;
private final CooldownQueue persistentCooldownQueue;
final PrioritizedSerialExecutor databaseExecutor;
+ final PrioritizedSerialExecutor datastoreCheckerExecutor;
public static final String PRIORITY_NONE = "NONE";
public static final String PRIORITY_SOFT = "SOFT";
@@ -112,6 +114,7 @@
schedCore.start();
persistentCooldownQueue = schedCore.persistentCooldownQueue;
this.databaseExecutor = core.clientDatabaseExecutor;
+ this.datastoreCheckerExecutor = core.datastoreCheckerExecutor;
this.starter = starter;
this.random = random;
this.node = node;
@@ -158,81 +161,159 @@
if(isInsertScheduler != (req instanceof SendableInsert))
throw new IllegalArgumentException("Expected a
SendableInsert: "+req);
if(req instanceof SendableGet) {
- SendableGet getter = (SendableGet)req;
- if(!getter.ignoreStore()) {
- boolean anyValid = false;
- Object[] keyTokens = getter.sendableKeys();
- for(int i=0;i<keyTokens.length;i++) {
- Object tok = keyTokens[i];
- ClientKeyBlock block = null;
- try {
- ClientKey key =
getter.getKey(tok);
- if(key == null) {
- if(logMINOR)
-
Logger.minor(this, "No key for "+tok+" for "+getter+" - already finished?");
- continue;
- } else {
-
if(getter.getContext().blocks != null)
- block =
getter.getContext().blocks.get(key);
- if(block == null)
- block =
node.fetchKey(key, getter.dontCache());
- if(block == null) {
- if(!persistent)
{
-
schedTransient.addPendingKey(key, getter);
- } // If
persistent, when it is registered (in a later job) the keys will be added first.
- } else {
- if(logMINOR)
-
Logger.minor(this, "Got "+block);
+ final SendableGet getter = (SendableGet)req;
+
+ if(persistent && onDatabaseThread) {
+ schedCore.addPendingKeys(getter,
selectorContainer);
+ schedCore.queueRegister(getter,
databaseExecutor);
+ final Object[] keyTokens =
getter.sendableKeys(selectorContainer);
+ final ClientKey[] keys = new
ClientKey[keyTokens.length];
+ for(int i=0;i<keyTokens.length;i++)
+ keys[i] = getter.getKey(keyTokens[i],
selectorContainer);
+ datastoreCheckerExecutor.execute(new Runnable()
{
+
+ public void run() {
+ registerCheckStore(getter,
true, keyTokens, keys);
+ }
+
+ }, getter.getPriorityClass(), "Checking
datastore");
+ } else if(persistent) {
+ databaseExecutor.execute(new Runnable() {
+
+ public void run() {
+
schedCore.addPendingKeys(getter, selectorContainer);
+ schedCore.queueRegister(getter,
databaseExecutor);
+ final Object[] keyTokens =
getter.sendableKeys(selectorContainer);
+ final ClientKey[] keys = new
ClientKey[keyTokens.length];
+ for(int
i=0;i<keyTokens.length;i++)
+ keys[i] =
getter.getKey(keyTokens[i], selectorContainer);
+
datastoreCheckerExecutor.execute(new Runnable() {
+
+ public void run() {
+
registerCheckStore(getter, true, keyTokens, keys);
}
- }
- } catch (KeyVerifyException e) {
- // Verify exception, probably
bogus at source;
- // verifies at low-level, but
not at decode.
- if(logMINOR)
- Logger.minor(this,
"Decode failed: "+e, e);
- if(onDatabaseThread)
- getter.onFailure(new
LowLevelGetException(LowLevelGetException.DECODE_FAILED), tok, this);
- else {
- final SendableGet g =
getter;
- final Object token =
tok;
-
databaseExecutor.execute(new Runnable() {
- public void
run() {
-
g.onFailure(new LowLevelGetException(LowLevelGetException.DECODE_FAILED),
token, ClientRequestScheduler.this);
- }
- },
NativeThread.NORM_PRIORITY, "Block decode failed");
- }
- continue; // other keys might
be valid
+
+ }, getter.getPriorityClass(),
"Checking datastore");
}
- if(block != null) {
- if(logMINOR) Logger.minor(this,
"Can fulfill "+req+" ("+tok+") immediately from store");
- getter.onSuccess(block, true,
tok, this);
- // Even with working thread
priorities, we still get very high latency accessing
- // the datastore when
background threads are doing it in parallel.
- // So yield() here, unless
priority is very high.
- if(req.getPriorityClass() >
RequestStarter.IMMEDIATE_SPLITFILE_PRIORITY_CLASS)
- Thread.yield();
+
+ }, NativeThread.NORM_PRIORITY, "Registering
request");
+ } else {
+ // Not persistent
+ schedTransient.addPendingKeys(getter, null);
+ // Check the store off-thread anyway.
+ final Object[] keyTokens =
getter.sendableKeys(null);
+ final ClientKey[] keys = new
ClientKey[keyTokens.length];
+ for(int i=0;i<keyTokens.length;i++)
+ keys[i] = getter.getKey(keyTokens[i],
null);
+ datastoreCheckerExecutor.execute(new Runnable()
{
+
+ public void run() {
+ registerCheckStore(getter,
false, keyTokens, keys);
+ }
+
+ }, getter.getPriorityClass(), "Checking
datastore");
+ }
+ } else {
+ finishRegister(req, persistent, onDatabaseThread);
+ }
+ }
+
+ /**
+ * Check the store for all the keys on the SendableGet. By now the
pendingKeys will have
+ * been set up, and this is run on the datastore checker thread. Once
completed, this should
+ * (for a persistent request) queue a job on the databaseExecutor and
(for a transient
+ * request) finish registering the request immediately.
+ * @param getter
+ */
+ protected void registerCheckStore(SendableGet getter, boolean
persistent, Object[] keyTokens, ClientKey[] keys) {
+ boolean anyValid = false;
+ for(int i=0;i<keyTokens.length;i++) {
+ Object tok = keyTokens[i];
+ ClientKeyBlock block = null;
+ try {
+ ClientKey key = keys[i];
+ if(key == null) {
+ if(logMINOR)
+ Logger.minor(this, "No key for
"+tok+" for "+getter+" - already finished?");
+ continue;
+ } else {
+ if(getter.getContext().blocks != null)
+ block =
getter.getContext().blocks.get(key);
+ if(block == null)
+ block = node.fetchKey(key,
getter.dontCache());
+ if(block == null) {
+ if(!persistent) {
+
schedTransient.addPendingKey(key, getter);
+ } // If persistent, when it is
registered (in a later job) the keys will be added first.
} else {
- anyValid = true;
+ if(logMINOR)
+ Logger.minor(this, "Got
"+block);
}
}
- if(!anyValid) {
- if(logMINOR)
- Logger.minor(this, "No valid
keys, returning without registering for "+req);
- return;
+ } catch (KeyVerifyException e) {
+ // Verify exception, probably bogus at source;
+ // verifies at low-level, but not at decode.
+ if(logMINOR)
+ Logger.minor(this, "Decode failed: "+e,
e);
+ if(!persistent)
+ getter.onFailure(new
LowLevelGetException(LowLevelGetException.DECODE_FAILED), tok, this, persistent
? selectorContainer : null);
+ else {
+ final SendableGet g = getter;
+ final Object token = tok;
+ databaseExecutor.execute(new Runnable()
{
+ public void run() {
+ g.onFailure(new
LowLevelGetException(LowLevelGetException.DECODE_FAILED), token,
ClientRequestScheduler.this, selectorContainer);
+
selectorContainer.commit();
+ }
+ }, NativeThread.NORM_PRIORITY, "Block
decode failed");
}
+ continue; // other keys might be valid
}
+ if(block != null) {
+ if(logMINOR) Logger.minor(this, "Can fulfill
"+getter+" ("+tok+") immediately from store");
+ if(!persistent)
+ getter.onSuccess(block, true, tok,
this, persistent ? selectorContainer : null);
+ else {
+ final ClientKeyBlock b = block;
+ final Object t = tok;
+ final SendableGet g = getter;
+ databaseExecutor.execute(new Runnable()
{
+ public void run() {
+ g.onSuccess(b, true, t,
ClientRequestScheduler.this, selectorContainer);
+ }
+ }, NativeThread.NORM_PRIORITY, "Block
found on register");
+ }
+ // Even with working thread priorities, we
still get very high latency accessing
+ // the datastore when background threads are
doing it in parallel.
+ // So yield() here, unless priority is very
high.
+ if(getter.getPriorityClass() >
RequestStarter.IMMEDIATE_SPLITFILE_PRIORITY_CLASS)
+ Thread.yield();
+ } else {
+ anyValid = true;
+ }
}
+ if(!anyValid) {
+ if(logMINOR)
+ Logger.minor(this, "No valid keys, returning
without registering for "+getter);
+ return;
+ }
+ finishRegister(getter, persistent, false);
+ }
+
+ private void finishRegister(final SendableRequest req, boolean
persistent, boolean onDatabaseThread) {
if(persistent) {
// Add to the persistent registration queue
if(onDatabaseThread) {
if(!databaseExecutor.onThread()) {
throw new IllegalStateException("Not on
database thread!");
}
- schedCore.queueRegister(req, databaseExecutor);
+ schedCore.innerRegister(req, random);
+ schedCore.deleteRegisterMe(req);
} else {
databaseExecutor.execute(new Runnable() {
public void run() {
- schedCore.queueRegister(req,
databaseExecutor);
+ schedCore.innerRegister(req,
random);
+ schedCore.deleteRegisterMe(req);
selectorContainer.commit();
}
}, NativeThread.NORM_PRIORITY, "Add persistent
job to queue");
@@ -326,14 +407,14 @@
offeredKeys[i].remove(key);
}
if(transientCooldownQueue != null)
- transientCooldownQueue.removeKey(key, getter,
getter.getCooldownWakeupByKey(key), null);
+ transientCooldownQueue.removeKey(key, getter,
getter.getCooldownWakeupByKey(key, null), null);
} else {
databaseExecutor.execute(new Runnable() {
public void run() {
try {
schedCore.removePendingKey(getter, complain, key);
if(persistentCooldownQueue !=
null)
-
persistentCooldownQueue.removeKey(key, getter,
getter.getCooldownWakeupByKey(key), selectorContainer);
+
persistentCooldownQueue.removeKey(key, getter,
getter.getCooldownWakeupByKey(key, selectorContainer), selectorContainer);
selectorContainer.commit();
} catch (Throwable t) {
Logger.error(this, "Caught "+t,
t);
@@ -351,11 +432,19 @@
* @param complain
*/
public void removePendingKeys(SendableGet getter, boolean complain) {
- // FIXME should this be a single databaseExecutor thread??
- Object[] keyTokens = getter.allKeys();
+ ObjectContainer container;
+ if(getter.persistent()) {
+ container = selectorContainer;
+ if(!databaseExecutor.onThread()) {
+ throw new IllegalStateException("Not on
database thread!");
+ }
+ } else {
+ container = null;
+ }
+ Object[] keyTokens = getter.allKeys(container);
for(int i=0;i<keyTokens.length;i++) {
Object tok = keyTokens[i];
- ClientKey ckey = getter.getKey(tok);
+ ClientKey ckey = getter.getKey(tok, container);
if(ckey == null) {
if(complain)
Logger.error(this, "Key "+tok+" is null
for "+getter, new Exception("debug"));
@@ -422,7 +511,7 @@
for(int i=0;i<transientGets.length;i++) {
try {
if(logMINOR) Logger.minor(this,
"Calling callback for "+transientGets[i]+" for "+key);
- transientGets[i].onGotKey(key,
block, ClientRequestScheduler.this);
+ transientGets[i].onGotKey(key,
block, ClientRequestScheduler.this, null);
} catch (Throwable t) {
Logger.error(this, "Caught
"+t+" running callback "+transientGets[i]+" for "+key);
}
@@ -431,7 +520,7 @@
}, "Running off-thread callbacks for "+block.getKey());
if(transientCooldownQueue != null) {
for(int i=0;i<transientGets.length;i++)
- transientCooldownQueue.removeKey(key,
transientGets[i], transientGets[i].getCooldownWakeupByKey(key), null);
+ transientCooldownQueue.removeKey(key,
transientGets[i], transientGets[i].getCooldownWakeupByKey(key, null), null);
}
// Now the persistent stuff
@@ -443,7 +532,7 @@
if(gets == null) return;
if(persistentCooldownQueue != null) {
for(int i=0;i<gets.length;i++)
-
persistentCooldownQueue.removeKey(key, gets[i],
gets[i].getCooldownWakeupByKey(key), selectorContainer);
+
persistentCooldownQueue.removeKey(key, gets[i],
gets[i].getCooldownWakeupByKey(key, selectorContainer), selectorContainer);
}
// Call the callbacks on the database executor
thread, because the first thing
// they will need to do is access the database
to decide whether they need to
@@ -451,7 +540,7 @@
for(int i=0;i<gets.length;i++) {
try {
if(logMINOR) Logger.minor(this,
"Calling callback for "+gets[i]+" for "+key);
- gets[i].onGotKey(key, block,
ClientRequestScheduler.this);
+ gets[i].onGotKey(key, block,
ClientRequestScheduler.this, selectorContainer);
} catch (Throwable t) {
Logger.error(this, "Caught
"+t+" running callback "+gets[i]+" for "+key);
}
@@ -544,10 +633,10 @@
} else {
if(gets != null)
for(int i=0;i<gets.length;i++)
- gets[i].requeueAfterCooldown(key, now);
+ gets[i].requeueAfterCooldown(key, now,
container);
if(transientGets != null)
for(int i=0;i<transientGets.length;i++)
-
transientGets[i].requeueAfterCooldown(key, now);
+
transientGets[i].requeueAfterCooldown(key, now, container);
}
}
if(keys.length < MAX_KEYS) return found;
@@ -574,10 +663,28 @@
public void callFailure(final SendableGet get, final
LowLevelGetException e, final Object keyNum, int prio, String name) {
databaseExecutor.execute(new Runnable() {
public void run() {
- get.onFailure(e, keyNum,
ClientRequestScheduler.this);
+ get.onFailure(e, keyNum,
ClientRequestScheduler.this, selectorContainer);
selectorContainer.commit();
}
}, prio, name);
}
+
+ public void callFailure(final SendableInsert put, final
LowLevelPutException e, final Object keyNum, int prio, String name) {
+ databaseExecutor.execute(new Runnable() {
+ public void run() {
+ put.onFailure(e, keyNum, selectorContainer);
+ selectorContainer.commit();
+ }
+ }, prio, name);
+ }
+ public void callSuccess(final SendableInsert put, final Object keyNum,
int prio, String name) {
+ databaseExecutor.execute(new Runnable() {
+ public void run() {
+ put.onSuccess(keyNum, selectorContainer);
+ selectorContainer.commit();
+ }
+ }, prio, name);
+ }
+
}
Modified:
branches/db4o/freenet/src/freenet/client/async/ClientRequestSchedulerBase.java
===================================================================
---
branches/db4o/freenet/src/freenet/client/async/ClientRequestSchedulerBase.java
2008-06-02 15:53:31 UTC (rev 20188)
+++
branches/db4o/freenet/src/freenet/client/async/ClientRequestSchedulerBase.java
2008-06-03 20:49:20 UTC (rev 20189)
@@ -327,4 +327,19 @@
}
}
+ public void addPendingKeys(SendableGet getter, ObjectContainer
container) {
+ Object[] keyTokens = getter.sendableKeys(container);
+ for(int i=0;i<keyTokens.length;i++) {
+ Object tok = keyTokens[i];
+ ClientKey key = getter.getKey(tok, container);
+ if(key == null) {
+ if(logMINOR)
+ Logger.minor(this, "No key for "+tok+"
for "+getter+" - already finished?");
+ continue;
+ } else {
+ addPendingKey(key, getter);
+ }
+ }
+ }
+
}
Modified:
branches/db4o/freenet/src/freenet/client/async/ClientRequestSchedulerCore.java
===================================================================
---
branches/db4o/freenet/src/freenet/client/async/ClientRequestSchedulerCore.java
2008-06-02 15:53:31 UTC (rev 20188)
+++
branches/db4o/freenet/src/freenet/client/async/ClientRequestSchedulerCore.java
2008-06-03 20:49:20 UTC (rev 20189)
@@ -178,7 +178,7 @@
// The worry is ... is there any nested locking outside of the
hierarchy?
ChosenRequest removeFirst(int fuzz, RandomSource random,
OfferedKeysList[] offeredKeys, RequestStarter starter,
ClientRequestSchedulerNonPersistent schedTransient, boolean transientOnly,
short maxPrio, int retryCount) {
SendableRequest req = removeFirstInner(fuzz, random,
offeredKeys, starter, schedTransient, transientOnly, maxPrio, retryCount);
- Object token = req.chooseKey(this);
+ Object token = req.chooseKey(this, req.persistent() ? container
: null);
if(token == null) {
return null;
} else {
@@ -186,7 +186,7 @@
if(isInsertScheduler)
key = null;
else
- key = ((BaseSendableGet)req).getNodeKey(token);
+ key = ((BaseSendableGet)req).getNodeKey(token,
persistent() ? container : null);
PersistentChosenRequest ret = new
PersistentChosenRequest(this, req, token, key);
if(req.persistent())
container.set(ret);
@@ -213,7 +213,7 @@
for(;choosenPriorityClass <=
RequestStarter.MINIMUM_PRIORITY_CLASS;choosenPriorityClass++) {
if(logMINOR) Logger.minor(this, "Using priority
"+choosenPriorityClass);
if(tryOfferedKeys) {
- if(offeredKeys[choosenPriorityClass].hasValidKeys(this))
+ if(offeredKeys[choosenPriorityClass].hasValidKeys(this,
null))
return offeredKeys[choosenPriorityClass];
}
SortedVectorByNumber perm = null;
@@ -414,21 +414,16 @@
query.descend("priority").orderAscending();
query.descend("addedTime").orderAscending();
ObjectSet result = query.execute();
- if(result.hasNext()) {
+ while(result.hasNext()) {
RegisterMe reg = (RegisterMe) result.next();
- if(result.hasNext()) {
-
databaseExecutor.execute(registerMeRunner, NativeThread.NORM_PRIORITY,
"Register request");
- }
container.delete(reg);
// Don't need to activate, fields should exist?
FIXME
try {
- if(reg.getter instanceof SendableGet)
- addPendingKeys((SendableGet)
reg.getter);
- innerRegister(reg.getter, random);
+ sched.register(reg.getter, true);
} catch (Throwable t) {
Logger.error(this, "Caught "+t+"
running RegisterMeRunner", t);
// Cancel the request, and commit so it
isn't tried again.
- reg.getter.internalError(null, t,
sched);
+ reg.getter.internalError(null, t,
sched, container);
}
container.commit();
}
@@ -441,24 +436,22 @@
}
RegisterMe reg = new RegisterMe(req, this);
container.set(reg);
- databaseExecutor.execute(registerMeRunner,
NativeThread.NORM_PRIORITY, "Register request");
}
- public void addPendingKeys(SendableGet getter) {
- Object[] keyTokens = getter.sendableKeys();
- for(int i=0;i<keyTokens.length;i++) {
- Object tok = keyTokens[i];
- ClientKey key = getter.getKey(tok);
- if(key == null) {
- if(logMINOR)
- Logger.minor(this, "No key for "+tok+"
for "+getter+" - already finished?");
- continue;
- } else {
- addPendingKey(key, getter);
+ public void deleteRegisterMe(final SendableRequest req) {
+ ObjectSet result = container.query(new Predicate() {
+ public boolean match(RegisterMe reg) {
+ if(reg.core != ClientRequestSchedulerCore.this)
return false;
+ if(reg.getter != req) return false;
+ return true;
}
+ });
+ while(result.hasNext()) {
+ RegisterMe me = (RegisterMe) result.next();
+ container.delete(me);
}
}
-
+
public boolean hasKey(Key key) {
synchronized(keysFetching) {
return keysFetching.contains(key);
@@ -485,7 +478,7 @@
}
}, NativeThread.NORM_PRIORITY, "Remove fetching key");
}
-
+
}
class RegisterMe {
Modified: branches/db4o/freenet/src/freenet/client/async/ClientRequester.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/ClientRequester.java
2008-06-02 15:53:31 UTC (rev 20188)
+++ branches/db4o/freenet/src/freenet/client/async/ClientRequester.java
2008-06-03 20:49:20 UTC (rev 20189)
@@ -3,6 +3,8 @@
* http://www.gnu.org/ for further details of the GPL. */
package freenet.client.async;
+import com.db4o.ObjectContainer;
+
import freenet.keys.FreenetURI;
import freenet.node.RequestClient;
import freenet.support.Logger;
@@ -14,7 +16,7 @@
*/
public abstract class ClientRequester {
- public abstract void onTransition(ClientGetState oldState,
ClientGetState newState);
+ public abstract void onTransition(ClientGetState oldState,
ClientGetState newState, ObjectContainer container);
// FIXME move the priority classes from RequestStarter here
protected short priorityClass;
Modified:
branches/db4o/freenet/src/freenet/client/async/GetCompletionCallback.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/GetCompletionCallback.java
2008-06-02 15:53:31 UTC (rev 20188)
+++ branches/db4o/freenet/src/freenet/client/async/GetCompletionCallback.java
2008-06-03 20:49:20 UTC (rev 20189)
@@ -3,6 +3,8 @@
* http://www.gnu.org/ for further details of the GPL. */
package freenet.client.async;
+import com.db4o.ObjectContainer;
+
import freenet.client.FetchException;
import freenet.client.FetchResult;
@@ -12,21 +14,21 @@
*/
public interface GetCompletionCallback {
- public void onSuccess(FetchResult result, ClientGetState state);
+ public void onSuccess(FetchResult result, ClientGetState state,
ObjectContainer container);
- public void onFailure(FetchException e, ClientGetState state);
+ public void onFailure(FetchException e, ClientGetState state,
ObjectContainer container);
/** Called when the ClientGetState knows that it knows about
* all the blocks it will need to fetch.
*/
- public void onBlockSetFinished(ClientGetState state);
+ public void onBlockSetFinished(ClientGetState state, ObjectContainer
container);
- public void onTransition(ClientGetState oldState, ClientGetState
newState);
+ public void onTransition(ClientGetState oldState, ClientGetState
newState, ObjectContainer container);
- public void onExpectedSize(long size);
+ public void onExpectedSize(long size, ObjectContainer container);
- public void onExpectedMIME(String mime);
+ public void onExpectedMIME(String mime, ObjectContainer container);
- public void onFinalizedMetadata();
+ public void onFinalizedMetadata(ObjectContainer container);
}
Modified: branches/db4o/freenet/src/freenet/client/async/OfferedKeysList.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/OfferedKeysList.java
2008-06-02 15:53:31 UTC (rev 20188)
+++ branches/db4o/freenet/src/freenet/client/async/OfferedKeysList.java
2008-06-03 20:49:20 UTC (rev 20189)
@@ -6,6 +6,8 @@
import java.util.HashSet;
import java.util.Vector;
+import com.db4o.ObjectContainer;
+
import freenet.crypt.RandomSource;
import freenet.keys.Key;
import freenet.node.BaseSendableGet;
@@ -63,17 +65,17 @@
return keys.isEmpty();
}
- public Object[] allKeys() {
+ public Object[] allKeys(ObjectContainer container) {
// Not supported.
throw new UnsupportedOperationException();
}
- public Object[] sendableKeys() {
+ public Object[] sendableKeys(ObjectContainer container) {
// Not supported.
throw new UnsupportedOperationException();
}
- public synchronized Object chooseKey(KeysFetchingLocally fetching) {
+ public synchronized Object chooseKey(KeysFetchingLocally fetching,
ObjectContainer container) {
assert(keysList.size() == keys.size());
if(keys.size() == 1) {
// Shortcut the common case
@@ -99,7 +101,7 @@
return null;
}
- public synchronized boolean hasValidKeys(KeysFetchingLocally fetching) {
+ public synchronized boolean hasValidKeys(KeysFetchingLocally fetching,
ObjectContainer container) {
assert(keysList.size() == keys.size());
if(keys.size() == 1) {
// Shortcut the common case
@@ -135,7 +137,7 @@
return 0; // All keys have equal chance even if they've been
tried before.
}
- public void internalError(Object keyNum, Throwable t, RequestScheduler
sched) {
+ public void internalError(Object keyNum, Throwable t, RequestScheduler
sched, ObjectContainer container) {
Logger.error(this, "Internal error: "+t, t);
}
@@ -172,7 +174,7 @@
assert(keysList.size() == keys.size());
}
- public Key getNodeKey(Object token) {
+ public Key getNodeKey(Object token, ObjectContainer container) {
return (Key) token;
}
Modified:
branches/db4o/freenet/src/freenet/client/async/PutCompletionCallback.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/PutCompletionCallback.java
2008-06-02 15:53:31 UTC (rev 20188)
+++ branches/db4o/freenet/src/freenet/client/async/PutCompletionCallback.java
2008-06-03 20:49:20 UTC (rev 20189)
@@ -1,5 +1,7 @@
package freenet.client.async;
+import com.db4o.ObjectContainer;
+
import freenet.client.InsertException;
import freenet.client.Metadata;
import freenet.keys.BaseClientKey;
@@ -9,29 +11,29 @@
*/
public interface PutCompletionCallback {
- public void onSuccess(ClientPutState state);
+ public void onSuccess(ClientPutState state, ObjectContainer container);
- public void onFailure(InsertException e, ClientPutState state);
+ public void onFailure(InsertException e, ClientPutState state,
ObjectContainer container);
- public void onEncode(BaseClientKey usk, ClientPutState state);
+ public void onEncode(BaseClientKey usk, ClientPutState state,
ObjectContainer container);
- public void onTransition(ClientPutState oldState, ClientPutState
newState);
+ public void onTransition(ClientPutState oldState, ClientPutState
newState, ObjectContainer container);
/** Only called if explicitly asked for, in which case, generally
* the metadata won't be inserted. Won't be called if there isn't
* any!
*/
- public void onMetadata(Metadata m, ClientPutState state);
+ public void onMetadata(Metadata m, ClientPutState state,
ObjectContainer container);
/** Called when enough data has been inserted that the file can be
* retrieved, even if not all data has been inserted yet. Note that this
* is only supported for splitfiles; if you get onSuccess() first,
assume
* that onFetchable() isn't coming. */
- public void onFetchable(ClientPutState state);
+ public void onFetchable(ClientPutState state, ObjectContainer
container);
/** Called when the ClientPutState knows that it knows about
* all the blocks it will need to put.
*/
- public void onBlockSetFinished(ClientPutState state);
+ public void onBlockSetFinished(ClientPutState state, ObjectContainer
container);
}
Modified: branches/db4o/freenet/src/freenet/client/async/SimpleHealingQueue.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/SimpleHealingQueue.java
2008-06-02 15:53:31 UTC (rev 20188)
+++ branches/db4o/freenet/src/freenet/client/async/SimpleHealingQueue.java
2008-06-03 20:49:20 UTC (rev 20189)
@@ -5,6 +5,8 @@
import java.util.HashMap;
+import com.db4o.ObjectContainer;
+
import freenet.client.InsertContext;
import freenet.client.InsertException;
import freenet.client.Metadata;
@@ -49,7 +51,7 @@
runningInserters.put(data, sbi);
}
try {
- sbi.schedule();
+ sbi.schedule(null);
if(Logger.shouldLog(Logger.MINOR, this))
Logger.minor(this, "Started healing insert
"+ctr+" for "+data);
return true;
@@ -80,7 +82,7 @@
// Do nothing
}
- public void onSuccess(ClientPutState state) {
+ public void onSuccess(ClientPutState state, ObjectContainer container) {
SingleBlockInserter sbi = (SingleBlockInserter)state;
Bucket data = (Bucket) sbi.getToken();
synchronized(this) {
@@ -91,7 +93,7 @@
data.free();
}
- public void onFailure(InsertException e, ClientPutState state) {
+ public void onFailure(InsertException e, ClientPutState state,
ObjectContainer container) {
SingleBlockInserter sbi = (SingleBlockInserter)state;
Bucket data = (Bucket) sbi.getToken();
synchronized(this) {
@@ -102,29 +104,29 @@
data.free();
}
- public void onEncode(BaseClientKey usk, ClientPutState state) {
+ public void onEncode(BaseClientKey usk, ClientPutState state,
ObjectContainer container) {
// Ignore
}
- public void onTransition(ClientPutState oldState, ClientPutState
newState) {
+ public void onTransition(ClientPutState oldState, ClientPutState
newState, ObjectContainer container) {
// Should never happen
Logger.error(this, "impossible: onTransition on
SimpleHealingQueue from "+oldState+" to "+newState, new Exception("debug"));
}
- public void onMetadata(Metadata m, ClientPutState state) {
+ public void onMetadata(Metadata m, ClientPutState state,
ObjectContainer container) {
// Should never happen
Logger.error(this, "Got metadata on SimpleHealingQueue from
"+state+": "+m, new Exception("debug"));
}
- public void onBlockSetFinished(ClientPutState state) {
+ public void onBlockSetFinished(ClientPutState state, ObjectContainer
container) {
// Ignore
}
- public void onFetchable(ClientPutState state) {
+ public void onFetchable(ClientPutState state, ObjectContainer
container) {
// Ignore
}
- public void onTransition(ClientGetState oldState, ClientGetState
newState) {
+ public void onTransition(ClientGetState oldState, ClientGetState
newState, ObjectContainer container) {
// Ignore
}
Modified:
branches/db4o/freenet/src/freenet/client/async/SimpleSingleFileFetcher.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/SimpleSingleFileFetcher.java
2008-06-02 15:53:31 UTC (rev 20188)
+++ branches/db4o/freenet/src/freenet/client/async/SimpleSingleFileFetcher.java
2008-06-03 20:49:20 UTC (rev 20189)
@@ -5,6 +5,8 @@
import java.io.IOException;
+import com.db4o.ObjectContainer;
+
import freenet.client.ClientMetadata;
import freenet.client.FetchContext;
import freenet.client.FetchException;
@@ -41,51 +43,47 @@
final long token;
// Translate it, then call the real onFailure
- public void onFailure(LowLevelGetException e, Object reqTokenIgnored,
RequestScheduler sched) {
+ public void onFailure(LowLevelGetException e, Object reqTokenIgnored,
RequestScheduler sched, ObjectContainer container) {
switch(e.code) {
case LowLevelGetException.DATA_NOT_FOUND:
- onFailure(new
FetchException(FetchException.DATA_NOT_FOUND), sched);
+ onFailure(new
FetchException(FetchException.DATA_NOT_FOUND), false, sched, container);
return;
case LowLevelGetException.DATA_NOT_FOUND_IN_STORE:
- onFailure(new
FetchException(FetchException.DATA_NOT_FOUND), sched);
+ onFailure(new
FetchException(FetchException.DATA_NOT_FOUND), false, sched, container);
return;
case LowLevelGetException.RECENTLY_FAILED:
- onFailure(new
FetchException(FetchException.RECENTLY_FAILED), sched);
+ onFailure(new
FetchException(FetchException.RECENTLY_FAILED), false, sched, container);
return;
case LowLevelGetException.DECODE_FAILED:
- onFailure(new
FetchException(FetchException.BLOCK_DECODE_ERROR), sched);
+ onFailure(new
FetchException(FetchException.BLOCK_DECODE_ERROR), false, sched, container);
return;
case LowLevelGetException.INTERNAL_ERROR:
- onFailure(new
FetchException(FetchException.INTERNAL_ERROR), sched);
+ onFailure(new
FetchException(FetchException.INTERNAL_ERROR), false, sched, container);
return;
case LowLevelGetException.REJECTED_OVERLOAD:
- onFailure(new
FetchException(FetchException.REJECTED_OVERLOAD), sched);
+ onFailure(new
FetchException(FetchException.REJECTED_OVERLOAD), false, sched, container);
return;
case LowLevelGetException.ROUTE_NOT_FOUND:
- onFailure(new
FetchException(FetchException.ROUTE_NOT_FOUND), sched);
+ onFailure(new
FetchException(FetchException.ROUTE_NOT_FOUND), false, sched, container);
return;
case LowLevelGetException.TRANSFER_FAILED:
- onFailure(new
FetchException(FetchException.TRANSFER_FAILED), sched);
+ onFailure(new
FetchException(FetchException.TRANSFER_FAILED), false, sched, container);
return;
case LowLevelGetException.VERIFY_FAILED:
- onFailure(new
FetchException(FetchException.BLOCK_DECODE_ERROR), sched);
+ onFailure(new
FetchException(FetchException.BLOCK_DECODE_ERROR), false, sched, container);
return;
case LowLevelGetException.CANCELLED:
- onFailure(new FetchException(FetchException.CANCELLED),
sched);
+ onFailure(new FetchException(FetchException.CANCELLED),
false, sched, container);
return;
default:
Logger.error(this, "Unknown LowLevelGetException code:
"+e.code);
- onFailure(new
FetchException(FetchException.INTERNAL_ERROR), sched);
+ onFailure(new
FetchException(FetchException.INTERNAL_ERROR), false, sched, container);
return;
}
}
- final void onFailure(FetchException e, RequestScheduler sched) {
- onFailure(e, false, sched);
- }
-
// Real onFailure
- protected void onFailure(FetchException e, boolean forceFatal,
RequestScheduler sched) {
+ protected void onFailure(FetchException e, boolean forceFatal,
RequestScheduler sched, ObjectContainer container) {
boolean logMINOR = Logger.shouldLog(Logger.MINOR, this);
if(logMINOR) Logger.minor(this, "onFailure( "+e+" ,
"+forceFatal+")", e);
if(parent.isCancelled() || cancelled) {
@@ -94,7 +92,7 @@
forceFatal = true;
}
if(!(e.isFatal() || forceFatal) ) {
- if(retry(sched)) {
+ if(retry(sched, container)) {
if(logMINOR) Logger.minor(this, "Retrying");
return;
}
@@ -105,50 +103,50 @@
parent.fatallyFailedBlock();
else
parent.failedBlock();
- rcb.onFailure(e, this);
+ rcb.onFailure(e, this, container);
}
/** Will be overridden by SingleFileFetcher */
- protected void onSuccess(FetchResult data, RequestScheduler sched) {
+ protected void onSuccess(FetchResult data, RequestScheduler sched,
ObjectContainer container) {
unregister(false);
if(parent.isCancelled()) {
data.asBucket().free();
- onFailure(new FetchException(FetchException.CANCELLED),
sched);
+ onFailure(new FetchException(FetchException.CANCELLED),
false, sched, container);
return;
}
- rcb.onSuccess(data, this);
+ rcb.onSuccess(data, this, container);
}
- public void onSuccess(ClientKeyBlock block, boolean fromStore, Object
reqTokenIgnored, RequestScheduler sched) {
+ public void onSuccess(ClientKeyBlock block, boolean fromStore, Object
reqTokenIgnored, RequestScheduler sched, ObjectContainer container) {
if(parent instanceof ClientGetter)
((ClientGetter)parent).addKeyToBinaryBlob(block);
- Bucket data = extract(block, sched);
+ Bucket data = extract(block, sched, container);
if(data == null) return; // failed
if(!block.isMetadata()) {
- onSuccess(new FetchResult((ClientMetadata)null, data),
sched);
+ onSuccess(new FetchResult((ClientMetadata)null, data),
sched, container);
} else {
- onFailure(new
FetchException(FetchException.INVALID_METADATA, "Metadata where expected
data"), sched);
+ onFailure(new
FetchException(FetchException.INVALID_METADATA, "Metadata where expected
data"), false, sched, container);
}
}
/** Convert a ClientKeyBlock to a Bucket. If an error occurs, report it
via onFailure
* and return null.
*/
- protected Bucket extract(ClientKeyBlock block, RequestScheduler sched) {
+ protected Bucket extract(ClientKeyBlock block, RequestScheduler sched,
ObjectContainer container) {
Bucket data;
try {
data = block.decode(ctx.bucketFactory,
(int)(Math.min(ctx.maxOutputLength, Integer.MAX_VALUE)), false);
} catch (KeyDecodeException e1) {
if(Logger.shouldLog(Logger.MINOR, this))
Logger.minor(this, "Decode failure: "+e1, e1);
- onFailure(new
FetchException(FetchException.BLOCK_DECODE_ERROR, e1.getMessage()), sched);
+ onFailure(new
FetchException(FetchException.BLOCK_DECODE_ERROR, e1.getMessage()), false,
sched, container);
return null;
} catch (TooBigException e) {
- onFailure(new FetchException(FetchException.TOO_BIG,
e), sched);
+ onFailure(new FetchException(FetchException.TOO_BIG,
e), false, sched, container);
return null;
} catch (IOException e) {
Logger.error(this, "Could not capture data - disk
full?: "+e, e);
- onFailure(new
FetchException(FetchException.BUCKET_ERROR, e), sched);
+ onFailure(new
FetchException(FetchException.BUCKET_ERROR, e), false, sched, container);
return null;
}
return data;
Modified:
branches/db4o/freenet/src/freenet/client/async/SingleBlockInserter.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/SingleBlockInserter.java
2008-06-02 15:53:31 UTC (rev 20188)
+++ branches/db4o/freenet/src/freenet/client/async/SingleBlockInserter.java
2008-06-03 20:49:20 UTC (rev 20189)
@@ -7,6 +7,8 @@
import java.lang.ref.SoftReference;
import java.net.MalformedURLException;
+import com.db4o.ObjectContainer;
+
import freenet.client.FailureCodeTracker;
import freenet.client.InsertContext;
import freenet.client.InsertException;
@@ -25,6 +27,7 @@
import freenet.support.Logger;
import freenet.support.SimpleFieldSet;
import freenet.support.api.Bucket;
+import freenet.support.io.NativeThread;
/**
* Insert *ONE KEY*.
@@ -107,7 +110,7 @@
}
}
- protected ClientKeyBlock encode() throws InsertException {
+ protected ClientKeyBlock encode(ObjectContainer container) throws
InsertException {
ClientKeyBlock block;
boolean shouldSend;
synchronized(this) {
@@ -122,7 +125,7 @@
resultingURI = block.getClientKey().getURI();
}
if(shouldSend && !dontSendEncoded)
- cb.onEncode(block.getClientKey(), this);
+ cb.onEncode(block.getClientKey(), this, container);
return block;
}
@@ -138,15 +141,15 @@
return retries;
}
- public void onFailure(LowLevelPutException e, Object keyNum) {
+ public void onFailure(LowLevelPutException e, Object keyNum,
ObjectContainer container) {
if(parent.isCancelled()) {
- fail(new InsertException(InsertException.CANCELLED));
+ fail(new InsertException(InsertException.CANCELLED),
container);
return;
}
switch(e.code) {
case LowLevelPutException.COLLISION:
- fail(new InsertException(InsertException.COLLISION));
+ fail(new InsertException(InsertException.COLLISION),
container);
break;
case LowLevelPutException.INTERNAL_ERROR:
errors.inc(InsertException.INTERNAL_ERROR);
@@ -169,7 +172,7 @@
if(logMINOR) Logger.minor(this, "Consecutive RNFs:
"+consecutiveRNFs+" / "+ctx.consecutiveRNFsCountAsSuccess);
if(consecutiveRNFs ==
ctx.consecutiveRNFsCountAsSuccess) {
if(logMINOR) Logger.minor(this, "Consecutive
RNFs: "+consecutiveRNFs+" - counting as success");
- onSuccess(keyNum);
+ onSuccess(keyNum, container);
return;
}
} else
@@ -177,17 +180,17 @@
if(logMINOR) Logger.minor(this, "Failed: "+e);
retries++;
if((retries > ctx.maxInsertRetries) && (ctx.maxInsertRetries !=
-1)) {
- fail(InsertException.construct(errors));
+ fail(InsertException.construct(errors), container);
return;
}
getScheduler().register(this);
}
- private void fail(InsertException e) {
- fail(e, false);
+ private void fail(InsertException e, ObjectContainer container) {
+ fail(e, false, container);
}
- private void fail(InsertException e, boolean forceFatal) {
+ private void fail(InsertException e, boolean forceFatal,
ObjectContainer container) {
synchronized(this) {
if(finished) return;
finished = true;
@@ -196,7 +199,7 @@
parent.fatallyFailedBlock();
else
parent.failedBlock();
- cb.onFailure(e, this);
+ cb.onFailure(e, this, container);
}
public ClientKeyBlock getBlock() {
@@ -215,15 +218,15 @@
}
}
- public void schedule() throws InsertException {
+ public void schedule(ObjectContainer container) throws InsertException {
synchronized(this) {
if(finished) return;
}
if(getCHKOnly) {
- ClientKeyBlock block = encode();
- cb.onEncode(block.getClientKey(), this);
+ ClientKeyBlock block = encode(container);
+ cb.onEncode(block.getClientKey(), this, container);
parent.completedBlock(false);
- cb.onSuccess(this);
+ cb.onSuccess(this, container);
finished = true;
} else {
getScheduler().register(this);
@@ -255,30 +258,30 @@
return resultingURI;
}
- public void onSuccess(Object keyNum) {
+ public void onSuccess(Object keyNum, ObjectContainer container) {
if(logMINOR) Logger.minor(this, "Succeeded ("+this+"): "+token);
if(parent.isCancelled()) {
- fail(new InsertException(InsertException.CANCELLED));
+ fail(new InsertException(InsertException.CANCELLED),
container);
return;
}
synchronized(this) {
finished = true;
}
parent.completedBlock(false);
- cb.onSuccess(this);
+ cb.onSuccess(this, container);
}
public BaseClientPutter getParent() {
return parent;
}
- public void cancel() {
+ public void cancel(ObjectContainer container) {
synchronized(this) {
if(finished) return;
finished = true;
}
super.unregister(false);
- cb.onFailure(new InsertException(InsertException.CANCELLED),
this);
+ cb.onFailure(new InsertException(InsertException.CANCELLED),
this, container);
}
public synchronized boolean isEmpty() {
@@ -304,18 +307,18 @@
}
}
if(parent.isCancelled())
- fail(new
InsertException(InsertException.CANCELLED));
+ fail(new
InsertException(InsertException.CANCELLED), null);
else
- fail(new
InsertException(InsertException.BUCKET_ERROR, "Empty block", null));
+ fail(new
InsertException(InsertException.BUCKET_ERROR, "Empty block", null), null);
return false;
}
} catch (LowLevelPutException e) {
- onFailure(e, keyNum);
+ sched.callFailure((SendableInsert) this, e, keyNum,
NativeThread.NORM_PRIORITY, "Insert failed");
if(logMINOR) Logger.minor(this, "Request failed:
"+this+" for "+e);
return true;
}
if(logMINOR) Logger.minor(this, "Request succeeded: "+this);
- onSuccess(keyNum);
+ sched.callSuccess(this, keyNum, NativeThread.NORM_PRIORITY,
"Insert succeeded");
return true;
}
@@ -352,18 +355,18 @@
return true;
}
- public synchronized Object[] sendableKeys() {
+ public synchronized Object[] sendableKeys(ObjectContainer container) {
if(finished)
return new Object[] {};
else
return new Object[] { new Integer(0) };
}
- public synchronized Object[] allKeys() {
- return sendableKeys();
+ public synchronized Object[] allKeys(ObjectContainer container) {
+ return sendableKeys(container);
}
- public synchronized Object chooseKey(KeysFetchingLocally ignored) {
+ public synchronized Object chooseKey(KeysFetchingLocally ignored,
ObjectContainer container) {
if(finished) return null;
else return new Integer(0);
}
Modified: branches/db4o/freenet/src/freenet/client/async/SingleFileFetcher.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/SingleFileFetcher.java
2008-06-02 15:53:31 UTC (rev 20188)
+++ branches/db4o/freenet/src/freenet/client/async/SingleFileFetcher.java
2008-06-03 20:49:20 UTC (rev 20189)
@@ -7,6 +7,8 @@
import java.net.MalformedURLException;
import java.util.LinkedList;
+import com.db4o.ObjectContainer;
+
import freenet.client.ArchiveContext;
import freenet.client.ArchiveExtractCallback;
import freenet.client.ArchiveFailureException;
@@ -121,7 +123,7 @@
// Process the completed data. May result in us going to a
// splitfile, or another SingleFileFetcher, etc.
- public void onSuccess(ClientKeyBlock block, boolean fromStore, Object
token, RequestScheduler sched) {
+ public void onSuccess(ClientKeyBlock block, boolean fromStore, Object
token, RequestScheduler sched, ObjectContainer container) {
this.sched = sched;
if(parent instanceof ClientGetter)
((ClientGetter)parent).addKeyToBinaryBlob(block);
@@ -132,7 +134,7 @@
Logger.error(this, "block is null!
fromStore="+fromStore+", token="+token, new Exception("error"));
return;
}
- Bucket data = extract(block, sched);
+ Bucket data = extract(block, sched, container);
if(data == null) {
if(logMINOR)
Logger.minor(this, "No data");
@@ -142,43 +144,43 @@
if(logMINOR)
Logger.minor(this, "Block "+(block.isMetadata() ? "is
metadata" : "is not metadata")+" on "+this);
if(!block.isMetadata()) {
- onSuccess(new FetchResult(clientMetadata, data), sched);
+ onSuccess(new FetchResult(clientMetadata, data), sched,
container);
} else {
if(!ctx.followRedirects) {
- onFailure(new
FetchException(FetchException.INVALID_METADATA, "Told me not to follow
redirects (splitfile block??)"), sched);
+ onFailure(new
FetchException(FetchException.INVALID_METADATA, "Told me not to follow
redirects (splitfile block??)"), false, sched, container);
return;
}
if(parent.isCancelled()) {
- onFailure(new
FetchException(FetchException.CANCELLED), sched);
+ onFailure(new
FetchException(FetchException.CANCELLED), false, sched, container);
return;
}
if(data.size() > ctx.maxMetadataSize) {
- onFailure(new
FetchException(FetchException.TOO_BIG_METADATA), sched);
+ onFailure(new
FetchException(FetchException.TOO_BIG_METADATA), false, sched, container);
return;
}
// Parse metadata
try {
metadata = Metadata.construct(data);
} catch (MetadataParseException e) {
- onFailure(new FetchException(e), sched);
+ onFailure(new FetchException(e), false, sched,
container);
return;
} catch (IOException e) {
// Bucket error?
- onFailure(new
FetchException(FetchException.BUCKET_ERROR, e), sched);
+ onFailure(new
FetchException(FetchException.BUCKET_ERROR, e), false, sched, container);
return;
}
- wrapHandleMetadata(false);
+ wrapHandleMetadata(false, container);
}
}
- protected void onSuccess(FetchResult result, RequestScheduler sched) {
+ protected void onSuccess(FetchResult result, RequestScheduler sched,
ObjectContainer container) {
this.sched = sched;
unregister(false);
if(parent.isCancelled()) {
if(logMINOR)
Logger.minor(this, "Parent is cancelled");
result.asBucket().free();
- onFailure(new FetchException(FetchException.CANCELLED),
sched);
+ onFailure(new FetchException(FetchException.CANCELLED),
false, sched, container);
return;
}
if(!decompressors.isEmpty()) {
@@ -189,10 +191,10 @@
long maxLen =
Math.max(ctx.maxTempLength, ctx.maxOutputLength);
data = c.decompress(data,
ctx.bucketFactory, maxLen, maxLen * 4, decompressors.isEmpty() ? returnBucket :
null);
} catch (IOException e) {
- onFailure(new
FetchException(FetchException.BUCKET_ERROR, e), sched);
+ onFailure(new
FetchException(FetchException.BUCKET_ERROR, e), false, sched, container);
return;
} catch (CompressionOutputSizeException e) {
- onFailure(new
FetchException(FetchException.TOO_BIG, e.estimatedSize, (rcb == parent),
result.getMimeType()), sched);
+ onFailure(new
FetchException(FetchException.TOO_BIG, e.estimatedSize, (rcb == parent),
result.getMimeType()), false, sched, container);
return;
}
}
@@ -205,7 +207,7 @@
// It would be useful to be able to fetch the
data ...
// On the other hand such inserts could cause
unpredictable results?
// Would be useful to make a redirect to the
key we actually fetched.
- rcb.onFailure(new
FetchException(FetchException.INVALID_METADATA, "Invalid metadata: too many
path components in redirects", thisKey), this);
+ rcb.onFailure(new
FetchException(FetchException.INVALID_METADATA, "Invalid metadata: too many
path components in redirects", thisKey), this, container);
} else {
// TOO_MANY_PATH_COMPONENTS
// report to user
@@ -214,15 +216,15 @@
}
FreenetURI tryURI = uri;
tryURI =
tryURI.dropLastMetaStrings(metaStrings.size());
- rcb.onFailure(new
FetchException(FetchException.TOO_MANY_PATH_COMPONENTS, result.size(), (rcb ==
parent), result.getMimeType(), tryURI), this);
+ rcb.onFailure(new
FetchException(FetchException.TOO_MANY_PATH_COMPONENTS, result.size(), (rcb ==
parent), result.getMimeType(), tryURI), this, container);
}
result.asBucket().free();
return;
} else if(result.size() > ctx.maxOutputLength) {
- rcb.onFailure(new
FetchException(FetchException.TOO_BIG, result.size(), (rcb == parent),
result.getMimeType()), this);
+ rcb.onFailure(new
FetchException(FetchException.TOO_BIG, result.size(), (rcb == parent),
result.getMimeType()), this, container);
result.asBucket().free();
} else {
- rcb.onSuccess(result, this);
+ rcb.onSuccess(result, this, container);
}
}
@@ -235,7 +237,7 @@
* @throws ArchiveFailureException
* @throws ArchiveRestartException
*/
- private synchronized void handleMetadata() throws FetchException,
MetadataParseException, ArchiveFailureException, ArchiveRestartException {
+ private synchronized void handleMetadata(final ObjectContainer
container) throws FetchException, MetadataParseException,
ArchiveFailureException, ArchiveRestartException {
while(true) {
if(metadata.isSimpleManifest()) {
if(logMINOR) Logger.minor(this, "Is simple
manifest");
@@ -288,19 +290,19 @@
metadata =
Metadata.construct(data);
} catch
(MetadataParseException e) {
// Invalid
metadata
- onFailure(new
FetchException(FetchException.INVALID_METADATA, e), sched);
+ onFailure(new
FetchException(FetchException.INVALID_METADATA, e), false, sched, container);
return;
} catch (IOException e)
{
// Bucket error?
- onFailure(new
FetchException(FetchException.BUCKET_ERROR, e), sched);
+ onFailure(new
FetchException(FetchException.BUCKET_ERROR, e), false, sched, container);
return;
}
-
wrapHandleMetadata(true);
+
wrapHandleMetadata(true, container);
}
public void notInArchive() {
- onFailure(new
FetchException(FetchException.INTERNAL_ERROR, "No metadata in container! Cannot
happen as ArchiveManager should synthesise some!"), sched);
+ onFailure(new
FetchException(FetchException.INTERNAL_ERROR, "No metadata in container! Cannot
happen as ArchiveManager should synthesise some!"), false, sched, container);
}
- }); // will result in this function
being called again
+ }, container); // will result in this
function being called again
return;
}
continue;
@@ -308,7 +310,7 @@
if(logMINOR) Logger.minor(this, "Is
archive-internal redirect");
clientMetadata.mergeNoOverwrite(metadata.getClientMetadata());
String mime = clientMetadata.getMIMEType();
- if(mime != null) rcb.onExpectedMIME(mime);
+ if(mime != null) rcb.onExpectedMIME(mime,
container);
if(metaStrings.isEmpty() && isFinal &&
clientMetadata.getMIMETypeNoParams() != null && ctx.allowedMIMETypes != null &&
!ctx.allowedMIMETypes.contains(clientMetadata.getMIMETypeNoParams())) {
throw new
FetchException(FetchException.WRONG_MIME_TYPE, -1, false,
clientMetadata.getMIMEType());
@@ -337,7 +339,7 @@
// Return the data
ctx.executor.execute(new Runnable() {
public void run() {
- onSuccess(new
FetchResult(clientMetadata, out), sched);
+ onSuccess(new
FetchResult(clientMetadata, out), sched, container);
}
}, "SingleFileFetcher onSuccess
callback for "+this);
@@ -361,16 +363,16 @@
out =
data;
}
} catch (IOException e)
{
- onFailure(new
FetchException(FetchException.BUCKET_ERROR), sched);
+ onFailure(new
FetchException(FetchException.BUCKET_ERROR), false, sched, container);
return;
}
// Return the data
- onSuccess(new
FetchResult(clientMetadata, out), sched);
+ onSuccess(new
FetchResult(clientMetadata, out), sched, container);
}
public void notInArchive() {
- onFailure(new
FetchException(FetchException.NOT_IN_ARCHIVE), sched);
+ onFailure(new
FetchException(FetchException.NOT_IN_ARCHIVE), false, sched, container);
}
- });
+ }, container);
// Will call back into this function
when it has been fetched.
return;
}
@@ -383,7 +385,7 @@
this.metadata = null;
ctx.ticker.queueTimedJob(new Runnable() {
public void run() {
- f.wrapHandleMetadata(true);
+ f.wrapHandleMetadata(true,
container);
}
}, 0);
return;
@@ -391,7 +393,7 @@
if(logMINOR) Logger.minor(this, "Is single-file
redirect");
clientMetadata.mergeNoOverwrite(metadata.getClientMetadata()); // even
splitfiles can have mime types!
String mime = clientMetadata.getMIMEType();
- if(mime != null) rcb.onExpectedMIME(mime);
+ if(mime != null) rcb.onExpectedMIME(mime,
container);
String mimeType =
clientMetadata.getMIMETypeNoParams();
if(mimeType != null &&
ArchiveManager.isUsableArchiveType(mimeType) && metaStrings.size() > 0) {
@@ -438,7 +440,7 @@
final SingleFileFetcher f = new
SingleFileFetcher(parent, rcb, clientMetadata, redirectedKey, metaStrings,
this.uri, addedMetaStrings, ctx, actx, ah, archiveMetadata, maxRetries,
recursionLevel, false, token, true, returnBucket, isFinal);
if((redirectedKey instanceof ClientCHK) &&
!((ClientCHK)redirectedKey).isMetadata())
- rcb.onBlockSetFinished(this);
+ rcb.onBlockSetFinished(this, container);
if(metadata.isCompressed()) {
Compressor codec =
Compressor.getCompressionAlgorithmByMetadataID(metadata.getCompressionCodec());
f.addDecompressor(codec);
@@ -446,7 +448,7 @@
parent.onTransition(this, f);
ctx.slowSerialExecutor[parent.priorityClass].execute(new Runnable() {
public void run() {
- f.schedule();
+ f.schedule(container);
}
}, "Schedule "+this);
// All done! No longer our problem!
@@ -456,7 +458,7 @@
clientMetadata.mergeNoOverwrite(metadata.getClientMetadata()); // even
splitfiles can have mime types!
String mime = clientMetadata.getMIMEType();
- if(mime != null) rcb.onExpectedMIME(mime);
+ if(mime != null) rcb.onExpectedMIME(mime,
container);
String mimeType =
clientMetadata.getMIMETypeNoParams();
if(mimeType != null &&
ArchiveManager.isUsableArchiveType(mimeType) && metaStrings.size() > 0) {
@@ -488,13 +490,13 @@
// It would be useful
to be able to fetch the data ...
// On the other hand
such inserts could cause unpredictable results?
// Would be useful to
make a redirect to the key we actually fetched.
- rcb.onFailure(new
FetchException(FetchException.INVALID_METADATA, "Invalid metadata: too many
path components in redirects", thisKey), this);
+ rcb.onFailure(new
FetchException(FetchException.INVALID_METADATA, "Invalid metadata: too many
path components in redirects", thisKey), this, container);
} else {
//
TOO_MANY_PATH_COMPONENTS
// report to user
FreenetURI tryURI = uri;
tryURI =
tryURI.dropLastMetaStrings(metaStrings.size());
- rcb.onFailure(new
FetchException(FetchException.TOO_MANY_PATH_COMPONENTS,
metadata.uncompressedDataLength(), (rcb == parent),
clientMetadata.getMIMEType(), tryURI), this);
+ rcb.onFailure(new
FetchException(FetchException.TOO_MANY_PATH_COMPONENTS,
metadata.uncompressedDataLength(), (rcb == parent),
clientMetadata.getMIMEType(), tryURI), this, container);
}
return;
}
@@ -512,10 +514,10 @@
}
SplitFileFetcher sf = new
SplitFileFetcher(metadata, rcb, parent, ctx,
- decompressors, clientMetadata,
actx, recursionLevel, returnBucket, token);
+ decompressors, clientMetadata,
actx, recursionLevel, returnBucket, token, container);
parent.onTransition(this, sf);
- sf.scheduleOffThread();
- rcb.onBlockSetFinished(this);
+ sf.schedule(container);
+ rcb.onBlockSetFinished(this, container);
// Clear our own metadata, we won't need it any
more.
// For multi-level metadata etc see above.
metadata = null;
@@ -541,7 +543,7 @@
decompressors.addLast(codec);
}
- private void fetchArchive(boolean forData, Metadata meta, String
element, ArchiveExtractCallback callback) throws FetchException,
MetadataParseException, ArchiveFailureException, ArchiveRestartException {
+ private void fetchArchive(boolean forData, Metadata meta, String
element, ArchiveExtractCallback callback, final ObjectContainer container)
throws FetchException, MetadataParseException, ArchiveFailureException,
ArchiveRestartException {
if(logMINOR) Logger.minor(this, "fetchArchive()");
// Fetch the archive
// How?
@@ -559,7 +561,7 @@
// Fetch the archive. The archive fetcher
callback will unpack it, and either call the element
// callback, or just go back around
handleMetadata() on this, which will see that the data is now
// available.
- f.wrapHandleMetadata(true);
+ f.wrapHandleMetadata(true, container);
}
}, "Fetching archive for "+this);
}
@@ -567,19 +569,19 @@
/**
* Call handleMetadata(), and deal with any resulting exceptions
*/
- private void wrapHandleMetadata(boolean notFinalizedSize) {
+ private void wrapHandleMetadata(boolean notFinalizedSize,
ObjectContainer container) {
try {
- handleMetadata();
+ handleMetadata(container);
} catch (MetadataParseException e) {
- onFailure(new FetchException(e), sched);
+ onFailure(new FetchException(e), false, sched,
container);
} catch (FetchException e) {
if(notFinalizedSize)
e.setNotFinalizedSize();
- onFailure(e, sched);
+ onFailure(e, false, sched, container);
} catch (ArchiveFailureException e) {
- onFailure(new FetchException(e), sched);
+ onFailure(new FetchException(e), false, sched,
container);
} catch (ArchiveRestartException e) {
- onFailure(new FetchException(e), sched);
+ onFailure(new FetchException(e), false, sched,
container);
}
}
@@ -595,44 +597,44 @@
this.callback = cb;
}
- public void onSuccess(FetchResult result, ClientGetState state)
{
+ public void onSuccess(FetchResult result, ClientGetState state,
ObjectContainer container) {
try {
ah.extractToCache(result.asBucket(), actx,
element, callback);
} catch (ArchiveFailureException e) {
- SingleFileFetcher.this.onFailure(new
FetchException(e), sched);
+ SingleFileFetcher.this.onFailure(new
FetchException(e), false, sched, container);
return;
} catch (ArchiveRestartException e) {
- SingleFileFetcher.this.onFailure(new
FetchException(e), sched);
+ SingleFileFetcher.this.onFailure(new
FetchException(e), false, sched, container);
return;
}
if(callback != null) return;
- wrapHandleMetadata(true);
+ wrapHandleMetadata(true, container);
}
- public void onFailure(FetchException e, ClientGetState state) {
+ public void onFailure(FetchException e, ClientGetState state,
ObjectContainer container) {
// Force fatal as the fetcher is presumed to have made
a reasonable effort.
- SingleFileFetcher.this.onFailure(e, true, sched);
+ SingleFileFetcher.this.onFailure(e, true, sched,
container);
}
- public void onBlockSetFinished(ClientGetState state) {
+ public void onBlockSetFinished(ClientGetState state,
ObjectContainer container) {
if(wasFetchingFinalData) {
- rcb.onBlockSetFinished(SingleFileFetcher.this);
+ rcb.onBlockSetFinished(SingleFileFetcher.this,
container);
}
}
- public void onTransition(ClientGetState oldState,
ClientGetState newState) {
+ public void onTransition(ClientGetState oldState,
ClientGetState newState, ObjectContainer container) {
// Ignore
}
- public void onExpectedMIME(String mime) {
+ public void onExpectedMIME(String mime, ObjectContainer
container) {
// Ignore
}
- public void onExpectedSize(long size) {
- rcb.onExpectedSize(size);
+ public void onExpectedSize(long size, ObjectContainer
container) {
+ rcb.onExpectedSize(size, container);
}
- public void onFinalizedMetadata() {
+ public void onFinalizedMetadata(ObjectContainer container) {
// Ignore
}
@@ -640,42 +642,42 @@
class MultiLevelMetadataCallback implements GetCompletionCallback {
- public void onSuccess(FetchResult result, ClientGetState state)
{
+ public void onSuccess(FetchResult result, ClientGetState state,
ObjectContainer container) {
try {
metadata =
Metadata.construct(result.asBucket());
} catch (MetadataParseException e) {
- SingleFileFetcher.this.onFailure(new
FetchException(FetchException.INVALID_METADATA, e), sched);
+ SingleFileFetcher.this.onFailure(new
FetchException(FetchException.INVALID_METADATA, e), false, sched, container);
return;
} catch (IOException e) {
// Bucket error?
- SingleFileFetcher.this.onFailure(new
FetchException(FetchException.BUCKET_ERROR, e), sched);
+ SingleFileFetcher.this.onFailure(new
FetchException(FetchException.BUCKET_ERROR, e), false, sched, container);
return;
}
- wrapHandleMetadata(true);
+ wrapHandleMetadata(true, container);
}
- public void onFailure(FetchException e, ClientGetState state) {
+ public void onFailure(FetchException e, ClientGetState state,
ObjectContainer container) {
// Pass it on; fetcher is assumed to have retried as
appropriate already, so this is fatal.
- SingleFileFetcher.this.onFailure(e, true, sched);
+ SingleFileFetcher.this.onFailure(e, true, sched,
container);
}
- public void onBlockSetFinished(ClientGetState state) {
+ public void onBlockSetFinished(ClientGetState state,
ObjectContainer container) {
// Ignore as we are fetching metadata here
}
- public void onTransition(ClientGetState oldState,
ClientGetState newState) {
+ public void onTransition(ClientGetState oldState,
ClientGetState newState, ObjectContainer container) {
// Ignore
}
- public void onExpectedMIME(String mime) {
+ public void onExpectedMIME(String mime, ObjectContainer
container) {
// Ignore
}
- public void onExpectedSize(long size) {
- rcb.onExpectedSize(size);
+ public void onExpectedSize(long size, ObjectContainer
container) {
+ rcb.onExpectedSize(size, container);
}
- public void onFinalizedMetadata() {
+ public void onFinalizedMetadata(ObjectContainer container) {
// Ignore
}
@@ -691,7 +693,7 @@
public static ClientGetState create(ClientRequester requester,
GetCompletionCallback cb,
ClientMetadata clientMetadata, FreenetURI uri,
FetchContext ctx, ArchiveContext actx,
int maxRetries, int recursionLevel, boolean
dontTellClientGet, long l, boolean isEssential,
- Bucket returnBucket, boolean isFinal) throws
MalformedURLException, FetchException {
+ Bucket returnBucket, boolean isFinal, ObjectContainer
container) throws MalformedURLException, FetchException {
BaseClientKey key = BaseClientKey.getBaseKey(uri);
if((clientMetadata == null || clientMetadata.isTrivial()) &&
(!uri.hasMetaStrings()) &&
ctx.allowSplitfiles == false &&
ctx.followRedirects == false &&
@@ -700,11 +702,11 @@
if(key instanceof ClientKey)
return new SingleFileFetcher(requester, cb,
clientMetadata, (ClientKey)key, uri.listMetaStrings(), uri, 0, ctx, actx, null,
null, maxRetries, recursionLevel, dontTellClientGet, l, isEssential,
returnBucket, isFinal);
else {
- return uskCreate(requester, cb, clientMetadata,
(USK)key, uri.listMetaStrings(), ctx, actx, maxRetries, recursionLevel,
dontTellClientGet, l, isEssential, returnBucket, isFinal);
+ return uskCreate(requester, cb, clientMetadata,
(USK)key, uri.listMetaStrings(), ctx, actx, maxRetries, recursionLevel,
dontTellClientGet, l, isEssential, returnBucket, isFinal, container);
}
}
- private static ClientGetState uskCreate(ClientRequester requester,
GetCompletionCallback cb, ClientMetadata clientMetadata, USK usk, LinkedList
metaStrings, FetchContext ctx, ArchiveContext actx, int maxRetries, int
recursionLevel, boolean dontTellClientGet, long l, boolean isEssential, Bucket
returnBucket, boolean isFinal) throws FetchException {
+ private static ClientGetState uskCreate(ClientRequester requester,
GetCompletionCallback cb, ClientMetadata clientMetadata, USK usk, LinkedList
metaStrings, FetchContext ctx, ArchiveContext actx, int maxRetries, int
recursionLevel, boolean dontTellClientGet, long l, boolean isEssential, Bucket
returnBucket, boolean isFinal, ObjectContainer container) throws FetchException
{
if(usk.suggestedEdition >= 0) {
// Return the latest known version but at least
suggestedEdition.
long edition = ctx.uskManager.lookup(usk);
@@ -714,7 +716,7 @@
edition = ctx.uskManager.lookup(usk);
if(edition > usk.suggestedEdition) {
if(logMINOR)
Logger.minor(SingleFileFetcher.class, "Redirecting to edition "+edition);
- cb.onFailure(new
FetchException(FetchException.PERMANENT_REDIRECT,
usk.copy(edition).getURI().addMetaStrings(metaStrings)), null);
+ cb.onFailure(new
FetchException(FetchException.PERMANENT_REDIRECT,
usk.copy(edition).getURI().addMetaStrings(metaStrings)), null, container);
return null;
} else {
// Transition to SingleFileFetcher
@@ -728,7 +730,7 @@
return sf;
}
} else {
- cb.onFailure(new
FetchException(FetchException.PERMANENT_REDIRECT,
usk.copy(edition).getURI().addMetaStrings(metaStrings)), null);
+ cb.onFailure(new
FetchException(FetchException.PERMANENT_REDIRECT,
usk.copy(edition).getURI().addMetaStrings(metaStrings)), null, container);
return null;
}
} else {
@@ -772,27 +774,27 @@
this.returnBucket = returnBucket;
}
- public void onFoundEdition(long l, USK newUSK) {
+ public void onFoundEdition(long l, USK newUSK, ObjectContainer
container) {
ClientSSK key = usk.getSSK(l);
try {
if(l == usk.suggestedEdition) {
SingleFileFetcher sf = new
SingleFileFetcher(parent, cb, clientMetadata, key, metaStrings,
key.getURI().addMetaStrings(metaStrings),
0, ctx, actx, null,
null, maxRetries, recursionLevel+1, dontTellClientGet, token, false,
returnBucket, true);
- sf.schedule();
+ sf.schedule(container);
} else {
- cb.onFailure(new
FetchException(FetchException.PERMANENT_REDIRECT,
newUSK.getURI().addMetaStrings(metaStrings)), null);
+ cb.onFailure(new
FetchException(FetchException.PERMANENT_REDIRECT,
newUSK.getURI().addMetaStrings(metaStrings)), null, container);
}
} catch (FetchException e) {
- cb.onFailure(e, null);
+ cb.onFailure(e, null, container);
}
}
- public void onFailure() {
- cb.onFailure(new
FetchException(FetchException.DATA_NOT_FOUND, "No USK found"), null);
+ public void onFailure(ObjectContainer container) {
+ cb.onFailure(new
FetchException(FetchException.DATA_NOT_FOUND, "No USK found"), null, container);
}
- public void onCancelled() {
- cb.onFailure(new
FetchException(FetchException.CANCELLED, (String)null), null);
+ public void onCancelled(ObjectContainer container) {
+ cb.onFailure(new
FetchException(FetchException.CANCELLED, (String)null), null, container);
}
public short getPollingPriorityNormal() {
Modified: branches/db4o/freenet/src/freenet/client/async/SplitFileFetcher.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/SplitFileFetcher.java
2008-06-02 15:53:31 UTC (rev 20188)
+++ branches/db4o/freenet/src/freenet/client/async/SplitFileFetcher.java
2008-06-03 20:49:20 UTC (rev 20189)
@@ -7,6 +7,8 @@
import java.io.OutputStream;
import java.util.LinkedList;
+import com.db4o.ObjectContainer;
+
import freenet.client.ArchiveContext;
import freenet.client.ClientMetadata;
import freenet.client.FetchContext;
@@ -59,7 +61,7 @@
public SplitFileFetcher(Metadata metadata, GetCompletionCallback rcb,
ClientRequester parent2,
FetchContext newCtx, LinkedList decompressors,
ClientMetadata clientMetadata,
- ArchiveContext actx, int recursionLevel, Bucket
returnBucket, long token2) throws FetchException, MetadataParseException {
+ ArchiveContext actx, int recursionLevel, Bucket
returnBucket, long token2, ObjectContainer container) throws FetchException,
MetadataParseException {
this.finished = false;
this.returnBucket = returnBucket;
this.fetchContext = newCtx;
@@ -86,12 +88,12 @@
finalLength = overrideLength;
}
long eventualLength = Math.max(overrideLength,
metadata.uncompressedDataLength());
- cb.onExpectedSize(eventualLength);
+ cb.onExpectedSize(eventualLength, container);
String mimeType = metadata.getMIMEType();
if(mimeType != null)
- cb.onExpectedMIME(mimeType);
+ cb.onExpectedMIME(mimeType, container);
if(metadata.uncompressedDataLength() > 0)
- cb.onFinalizedMetadata();
+ cb.onFinalizedMetadata(container);
if(eventualLength > 0 && newCtx.maxOutputLength > 0 &&
eventualLength > newCtx.maxOutputLength)
throw new FetchException(FetchException.TOO_BIG,
eventualLength, true, clientMetadata.getMIMEType());
@@ -219,7 +221,7 @@
return output;
}
- public void segmentFinished(SplitFileFetcherSegment segment) {
+ public void segmentFinished(SplitFileFetcherSegment segment,
ObjectContainer container) {
boolean logMINOR = Logger.shouldLog(Logger.MINOR, this);
if(logMINOR) Logger.minor(this, "Finished segment: "+segment);
boolean finish = false;
@@ -240,10 +242,10 @@
}
notifyAll();
}
- if(finish) finish();
+ if(finish) finish(container);
}
- private void finish() {
+ private void finish(ObjectContainer container) {
try {
synchronized(this) {
if(finished) {
@@ -262,47 +264,39 @@
if(!decompressors.isEmpty()) out = null;
data = c.decompress(data,
fetchContext.bucketFactory, maxLen, maxLen * 4, out);
} catch (IOException e) {
- cb.onFailure(new
FetchException(FetchException.BUCKET_ERROR, e), this);
+ cb.onFailure(new
FetchException(FetchException.BUCKET_ERROR, e), this, container);
return;
} catch (CompressionOutputSizeException e) {
- cb.onFailure(new
FetchException(FetchException.TOO_BIG, e.estimatedSize, false /* FIXME */,
clientMetadata.getMIMEType()), this);
+ cb.onFailure(new
FetchException(FetchException.TOO_BIG, e.estimatedSize, false /* FIXME */,
clientMetadata.getMIMEType()), this, container);
return;
}
}
- cb.onSuccess(new FetchResult(clientMetadata, data),
this);
+ cb.onSuccess(new FetchResult(clientMetadata, data),
this, container);
} catch (FetchException e) {
- cb.onFailure(e, this);
+ cb.onFailure(e, this, container);
} catch (OutOfMemoryError e) {
OOMHandler.handleOOM(e);
System.err.println("Failing above attempted fetch...");
- cb.onFailure(new
FetchException(FetchException.INTERNAL_ERROR, e), this);
+ cb.onFailure(new
FetchException(FetchException.INTERNAL_ERROR, e), this, container);
} catch (Throwable t) {
- cb.onFailure(new
FetchException(FetchException.INTERNAL_ERROR, t), this);
+ cb.onFailure(new
FetchException(FetchException.INTERNAL_ERROR, t), this, container);
}
}
- public void schedule() {
+ public void schedule(ObjectContainer container) {
if(Logger.shouldLog(Logger.MINOR, this)) Logger.minor(this,
"Scheduling "+this);
for(int i=0;i<segments.length;i++) {
- segments[i].schedule();
+ segments[i].schedule(container);
}
}
- public void cancel() {
+ public void cancel(ObjectContainer container) {
for(int i=0;i<segments.length;i++)
- segments[i].cancel();
+ segments[i].cancel(container);
}
public long getToken() {
return token;
}
- public void scheduleOffThread() {
-
fetchContext.slowSerialExecutor[parent.priorityClass].execute(new Runnable() {
- public void run() {
- schedule();
- }
- }, "Splitfile scheduler thread for "+this);
- }
-
}
Modified:
branches/db4o/freenet/src/freenet/client/async/SplitFileFetcherSegment.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/SplitFileFetcherSegment.java
2008-06-02 15:53:31 UTC (rev 20188)
+++ branches/db4o/freenet/src/freenet/client/async/SplitFileFetcherSegment.java
2008-06-03 20:49:20 UTC (rev 20189)
@@ -7,6 +7,8 @@
import java.io.OutputStream;
import java.util.Vector;
+import com.db4o.ObjectContainer;
+
import freenet.client.ArchiveContext;
import freenet.client.FECCodec;
import freenet.client.FECJob;
@@ -151,7 +153,7 @@
return fatallyFailedBlocks;
}
- public void onSuccess(Bucket data, int blockNo,
SplitFileFetcherSubSegment seg, ClientKeyBlock block) {
+ public void onSuccess(Bucket data, int blockNo,
SplitFileFetcherSubSegment seg, ClientKeyBlock block, ObjectContainer
container) {
boolean decodeNow = false;
logMINOR = Logger.shouldLog(Logger.MINOR, this);
if(logMINOR) Logger.minor(this, "Fetched block "+blockNo+" on
"+seg);
@@ -214,7 +216,7 @@
}
}
- public void onDecodedSegment() {
+ public void onDecodedSegment(ObjectContainer container) {
try {
if(isCollectingBinaryBlob()) {
for(int i=0;i<dataBuckets.length;i++) {
@@ -222,7 +224,7 @@
try {
maybeAddToBinaryBlob(data, i,
false);
} catch (FetchException e) {
- fail(e);
+ fail(e, container);
return;
}
}
@@ -241,14 +243,14 @@
// Otherwise a race is possible that might result in it
not seeing our finishing.
finished = true;
if(codec == null || !isCollectingBinaryBlob())
-
parentFetcher.segmentFinished(SplitFileFetcherSegment.this);
+
parentFetcher.segmentFinished(SplitFileFetcherSegment.this, container);
} catch (IOException e) {
Logger.normal(this, "Caught bucket error?: "+e, e);
synchronized(this) {
finished = true;
failureException = new
FetchException(FetchException.BUCKET_ERROR);
}
-
parentFetcher.segmentFinished(SplitFileFetcherSegment.this);
+
parentFetcher.segmentFinished(SplitFileFetcherSegment.this, container);
return;
}
@@ -265,7 +267,7 @@
}
}
- public void onEncodedSegment() {
+ public void onEncodedSegment(ObjectContainer container) {
synchronized(this) {
// Now insert *ALL* blocks on which we had at least one
failure, and didn't eventually succeed
for(int i=0;i<dataBuckets.length;i++) {
@@ -288,7 +290,7 @@
try {
maybeAddToBinaryBlob(data, i, true);
} catch (FetchException e) {
- fail(e);
+ fail(e, container);
return;
}
if(checkRetries[i] > 0)
@@ -304,7 +306,7 @@
}
// Defer the completion until we have generated healing blocks
if we are collecting binary blobs.
if(isCollectingBinaryBlob())
-
parentFetcher.segmentFinished(SplitFileFetcherSegment.this);
+
parentFetcher.segmentFinished(SplitFileFetcherSegment.this, container);
}
boolean isCollectingBinaryBlob() {
@@ -337,8 +339,9 @@
fetchContext.healingQueue.queue(data);
}
- /** This is after any retries and therefore is either out-of-retries or
fatal */
- public synchronized void onFatalFailure(FetchException e, int blockNo,
SplitFileFetcherSubSegment seg) {
+ /** This is after any retries and therefore is either out-of-retries or
fatal
+ * @param container */
+ public synchronized void onFatalFailure(FetchException e, int blockNo,
SplitFileFetcherSubSegment seg, ObjectContainer container) {
logMINOR = Logger.shouldLog(Logger.MINOR, this);
if(logMINOR) Logger.minor(this, "Permanently failed block:
"+blockNo+" on "+this+" : "+e, e);
boolean allFailed;
@@ -373,13 +376,14 @@
allFailed = failedBlocks + fatallyFailedBlocks >
(dataKeys.length + checkKeys.length - minFetched);
}
if(allFailed)
- fail(new FetchException(FetchException.SPLITFILE_ERROR,
errors));
+ fail(new FetchException(FetchException.SPLITFILE_ERROR,
errors), container);
else
seg.possiblyRemoveFromParent();
}
- /** A request has failed non-fatally, so the block may be retried */
- public void onNonFatalFailure(FetchException e, int blockNo,
SplitFileFetcherSubSegment seg, RequestScheduler sched) {
+ /** A request has failed non-fatally, so the block may be retried
+ * @param container */
+ public void onNonFatalFailure(FetchException e, int blockNo,
SplitFileFetcherSubSegment seg, RequestScheduler sched, ObjectContainer
container) {
int tries;
int maxTries = blockFetchContext.maxNonSplitfileRetries;
boolean failed = false;
@@ -422,7 +426,7 @@
}
}
if(failed) {
- onFatalFailure(e, blockNo, seg);
+ onFatalFailure(e, blockNo, seg, container);
if(logMINOR)
Logger.minor(this, "Not retrying block
"+blockNo+" on "+this+" : tries="+tries+"/"+maxTries);
return;
@@ -437,7 +441,7 @@
seg.unregisterKey(key.getNodeKey());
if(logMINOR)
Logger.minor(this, "Retrying block "+blockNo+"
on "+this+" : tries="+tries+"/"+maxTries+" : "+sub);
- sub.add(blockNo, false);
+ sub.add(blockNo, false, container);
}
}
@@ -454,7 +458,7 @@
return sub;
}
- private void fail(FetchException e) {
+ private void fail(FetchException e, ObjectContainer container) {
synchronized(this) {
if(finished) return;
finished = true;
@@ -481,16 +485,16 @@
}
}
removeSubSegments();
- parentFetcher.segmentFinished(this);
+ parentFetcher.segmentFinished(this, container);
}
- public void schedule() {
+ public void schedule(ObjectContainer container) {
try {
SplitFileFetcherSubSegment seg = getSubSegment(0);
for(int
i=0;i<dataRetries.length+checkRetries.length;i++)
- seg.add(i, true);
+ seg.add(i, true, container);
- seg.schedule();
+ seg.schedule(container);
synchronized(this) {
scheduled = true;
}
@@ -499,12 +503,12 @@
Logger.minor(this, "scheduling "+seg+" :
"+seg.blockNums);
} catch (Throwable t) {
Logger.error(this, "Caught "+t+" scheduling "+this, t);
- fail(new FetchException(FetchException.INTERNAL_ERROR,
t));
+ fail(new FetchException(FetchException.INTERNAL_ERROR,
t), container);
}
}
- public void cancel() {
- fail(new FetchException(FetchException.CANCELLED));
+ public void cancel(ObjectContainer container) {
+ fail(new FetchException(FetchException.CANCELLED), container);
}
public void onBlockSetFinished(ClientGetState state) {
@@ -582,7 +586,7 @@
return checkCooldownTimes[blockNum - dataKeys.length];
}
- public void requeueAfterCooldown(Key key, long time) {
+ public void requeueAfterCooldown(Key key, long time, ObjectContainer
container) {
Vector v = null;
boolean notFound = true;
synchronized(this) {
@@ -601,7 +605,7 @@
if(logMINOR)
Logger.minor(this, "Retrying after
cooldown on "+this+": data block "+i+" on "+this+" :
tries="+tries+"/"+maxTries+" : "+sub);
if(v == null) v = new Vector();
- sub.add(i, true);
+ sub.add(i, true, container);
if(!v.contains(sub)) v.add(sub);
notFound = false;
}
@@ -619,7 +623,7 @@
if(logMINOR)
Logger.minor(this, "Retrying after
cooldown on "+this+": check block "+i+" on "+this+" :
tries="+tries+"/"+maxTries+" : "+sub);
if(v == null) v = new Vector();
- sub.add(i+dataKeys.length, true);
+ sub.add(i+dataKeys.length, true, container);
if(!v.contains(sub)) v.add(sub);
notFound = false;
}
@@ -630,7 +634,7 @@
}
if(v != null) {
for(int i=0;i<v.size();i++) {
- ((SplitFileFetcherSubSegment)
v.get(i)).schedule();
+ ((SplitFileFetcherSubSegment)
v.get(i)).schedule(container);
}
}
}
Modified:
branches/db4o/freenet/src/freenet/client/async/SplitFileFetcherSubSegment.java
===================================================================
---
branches/db4o/freenet/src/freenet/client/async/SplitFileFetcherSubSegment.java
2008-06-02 15:53:31 UTC (rev 20188)
+++
branches/db4o/freenet/src/freenet/client/async/SplitFileFetcherSubSegment.java
2008-06-03 20:49:20 UTC (rev 20189)
@@ -3,6 +3,8 @@
import java.io.IOException;
import java.util.Vector;
+import com.db4o.ObjectContainer;
+
import freenet.client.FetchContext;
import freenet.client.FetchException;
import freenet.keys.CHKBlock;
@@ -64,12 +66,12 @@
return ctx;
}
- public Object chooseKey(KeysFetchingLocally keys) {
+ public Object chooseKey(KeysFetchingLocally keys, ObjectContainer
container) {
if(cancelled) return null;
return removeRandomBlockNum(keys);
}
- public ClientKey getKey(Object token) {
+ public ClientKey getKey(Object token, ObjectContainer container) {
synchronized(segment) {
if(cancelled) {
if(logMINOR)
@@ -94,14 +96,14 @@
* Fetch the array from the segment because we need to include *ALL*
keys, especially
* those on cooldown queues. This is important when unregistering.
*/
- public Object[] allKeys() {
+ public Object[] allKeys(ObjectContainer container) {
return segment.getKeyNumbersAtRetryLevel(retryCount);
}
/**
* Just those keys which are eligible to be started now.
*/
- public Object[] sendableKeys() {
+ public Object[] sendableKeys(ObjectContainer container) {
return blockNums.toArray();
}
@@ -136,7 +138,7 @@
}
}
- public boolean hasValidKeys(KeysFetchingLocally keys) {
+ public boolean hasValidKeys(KeysFetchingLocally keys, ObjectContainer
container) {
synchronized(segment) {
for(int i=0;i<10;i++) {
Object ret;
@@ -165,49 +167,49 @@
// Translate it, then call the real onFailure
// FIXME refactor this out to a common method; see
SimpleSingleFileFetcher
- public void onFailure(LowLevelGetException e, Object token,
RequestScheduler sched) {
+ public void onFailure(LowLevelGetException e, Object token,
RequestScheduler sched, ObjectContainer container) {
if(logMINOR)
Logger.minor(this, "onFailure("+e+" , "+token);
switch(e.code) {
case LowLevelGetException.DATA_NOT_FOUND:
- onFailure(new
FetchException(FetchException.DATA_NOT_FOUND), token, sched);
+ onFailure(new
FetchException(FetchException.DATA_NOT_FOUND), token, sched, container);
return;
case LowLevelGetException.DATA_NOT_FOUND_IN_STORE:
- onFailure(new
FetchException(FetchException.DATA_NOT_FOUND), token, sched);
+ onFailure(new
FetchException(FetchException.DATA_NOT_FOUND), token, sched, container);
return;
case LowLevelGetException.RECENTLY_FAILED:
- onFailure(new
FetchException(FetchException.RECENTLY_FAILED), token, sched);
+ onFailure(new
FetchException(FetchException.RECENTLY_FAILED), token, sched, container);
return;
case LowLevelGetException.DECODE_FAILED:
- onFailure(new
FetchException(FetchException.BLOCK_DECODE_ERROR), token, sched);
+ onFailure(new
FetchException(FetchException.BLOCK_DECODE_ERROR), token, sched, container);
return;
case LowLevelGetException.INTERNAL_ERROR:
- onFailure(new
FetchException(FetchException.INTERNAL_ERROR), token, sched);
+ onFailure(new
FetchException(FetchException.INTERNAL_ERROR), token, sched, container);
return;
case LowLevelGetException.REJECTED_OVERLOAD:
- onFailure(new
FetchException(FetchException.REJECTED_OVERLOAD), token, sched);
+ onFailure(new
FetchException(FetchException.REJECTED_OVERLOAD), token, sched, container);
return;
case LowLevelGetException.ROUTE_NOT_FOUND:
- onFailure(new
FetchException(FetchException.ROUTE_NOT_FOUND), token, sched);
+ onFailure(new
FetchException(FetchException.ROUTE_NOT_FOUND), token, sched, container);
return;
case LowLevelGetException.TRANSFER_FAILED:
- onFailure(new
FetchException(FetchException.TRANSFER_FAILED), token, sched);
+ onFailure(new
FetchException(FetchException.TRANSFER_FAILED), token, sched, container);
return;
case LowLevelGetException.VERIFY_FAILED:
- onFailure(new
FetchException(FetchException.BLOCK_DECODE_ERROR), token, sched);
+ onFailure(new
FetchException(FetchException.BLOCK_DECODE_ERROR), token, sched, container);
return;
case LowLevelGetException.CANCELLED:
- onFailure(new FetchException(FetchException.CANCELLED),
token, sched);
+ onFailure(new FetchException(FetchException.CANCELLED),
token, sched, container);
return;
default:
Logger.error(this, "Unknown LowLevelGetException code:
"+e.code);
- onFailure(new
FetchException(FetchException.INTERNAL_ERROR), token, sched);
+ onFailure(new
FetchException(FetchException.INTERNAL_ERROR), token, sched, container);
return;
}
}
// Real onFailure
- protected void onFailure(FetchException e, Object token,
RequestScheduler sched) {
+ protected void onFailure(FetchException e, Object token,
RequestScheduler sched, ObjectContainer container) {
boolean forceFatal = false;
if(parent.isCancelled()) {
if(Logger.shouldLog(Logger.MINOR, this))
@@ -217,14 +219,14 @@
}
segment.errors.inc(e.getMode());
if(e.isFatal() || forceFatal) {
- segment.onFatalFailure(e, ((Integer)token).intValue(),
this);
+ segment.onFatalFailure(e, ((Integer)token).intValue(),
this, container);
} else {
- segment.onNonFatalFailure(e,
((Integer)token).intValue(), this, sched);
+ segment.onNonFatalFailure(e,
((Integer)token).intValue(), this, sched, container);
}
}
- public void onSuccess(ClientKeyBlock block, boolean fromStore, Object
token, RequestScheduler sched) {
- Bucket data = extract(block, token, sched);
+ public void onSuccess(ClientKeyBlock block, boolean fromStore, Object
token, RequestScheduler sched, ObjectContainer container) {
+ Bucket data = extract(block, token, sched, container);
if(fromStore) {
// Normally when this method is called the block number
has already
// been removed. However if fromStore=true, it won't
have been, so
@@ -242,39 +244,39 @@
}
}
if(!block.isMetadata()) {
- onSuccess(data, fromStore, (Integer)token,
((Integer)token).intValue(), block, sched);
+ onSuccess(data, fromStore, (Integer)token,
((Integer)token).intValue(), block, sched, container);
} else {
- onFailure(new
FetchException(FetchException.INVALID_METADATA, "Metadata where expected
data"), token, sched);
+ onFailure(new
FetchException(FetchException.INVALID_METADATA, "Metadata where expected
data"), token, sched, container);
}
}
- protected void onSuccess(Bucket data, boolean fromStore, Integer token,
int blockNo, ClientKeyBlock block, RequestScheduler sched) {
+ protected void onSuccess(Bucket data, boolean fromStore, Integer token,
int blockNo, ClientKeyBlock block, RequestScheduler sched, ObjectContainer
container) {
if(parent.isCancelled()) {
data.free();
- onFailure(new FetchException(FetchException.CANCELLED),
token, sched);
+ onFailure(new FetchException(FetchException.CANCELLED),
token, sched, container);
return;
}
- segment.onSuccess(data, blockNo, this, block);
+ segment.onSuccess(data, blockNo, this, block, container);
}
/** Convert a ClientKeyBlock to a Bucket. If an error occurs, report it
via onFailure
* and return null.
*/
- protected Bucket extract(ClientKeyBlock block, Object token,
RequestScheduler sched) {
+ protected Bucket extract(ClientKeyBlock block, Object token,
RequestScheduler sched, ObjectContainer container) {
Bucket data;
try {
data = block.decode(ctx.bucketFactory,
(int)(Math.min(ctx.maxOutputLength, Integer.MAX_VALUE)), false);
} catch (KeyDecodeException e1) {
if(Logger.shouldLog(Logger.MINOR, this))
Logger.minor(this, "Decode failure: "+e1, e1);
- onFailure(new
FetchException(FetchException.BLOCK_DECODE_ERROR, e1.getMessage()), token,
sched);
+ onFailure(new
FetchException(FetchException.BLOCK_DECODE_ERROR, e1.getMessage()), token,
sched, container);
return null;
} catch (TooBigException e) {
- onFailure(new FetchException(FetchException.TOO_BIG,
e.getMessage()), token, sched);
+ onFailure(new FetchException(FetchException.TOO_BIG,
e.getMessage()), token, sched, container);
return null;
} catch (IOException e) {
Logger.error(this, "Could not capture data - disk
full?: "+e, e);
- onFailure(new
FetchException(FetchException.BUCKET_ERROR, e), token, sched);
+ onFailure(new
FetchException(FetchException.BUCKET_ERROR, e), token, sched, container);
return null;
}
if(Logger.shouldLog(Logger.MINOR, this))
@@ -327,7 +329,7 @@
return false;
}
- public void add(int blockNo, boolean dontSchedule) {
+ public void add(int blockNo, boolean dontSchedule, ObjectContainer
container) {
boolean logMINOR = Logger.shouldLog(Logger.MINOR, this);
if(logMINOR) Logger.minor(this, "Adding block "+blockNo+" to
"+this+" dontSchedule="+dontSchedule);
if(blockNo < 0) throw new IllegalArgumentException();
@@ -357,7 +359,7 @@
schedule = false;
}
}
- if(schedule) schedule();
+ if(schedule) schedule(container);
else if(!dontSchedule)
// Already scheduled, however this key may not be
registered.
getScheduler().addPendingKey(segment.getBlockKey(blockNo), this);
@@ -380,7 +382,7 @@
unregister(false);
}
- public void onGotKey(Key key, KeyBlock block, RequestScheduler sched) {
+ public void onGotKey(Key key, KeyBlock block, RequestScheduler sched,
ObjectContainer container) {
if(logMINOR) Logger.minor(this, "onGotKey("+key+")");
// Find and remove block if it is on this subsegment. However
it may have been
// removed already.
@@ -407,15 +409,15 @@
try {
cb = new ClientCHKBlock((CHKBlock)block, ckey);
} catch (CHKVerifyException e) {
- onFailure(new
FetchException(FetchException.BLOCK_DECODE_ERROR, e), token, sched);
+ onFailure(new
FetchException(FetchException.BLOCK_DECODE_ERROR, e), token, sched, container);
return;
}
- Bucket data = extract(cb, token, sched);
+ Bucket data = extract(cb, token, sched, container);
if(!cb.isMetadata()) {
- onSuccess(data, false, (Integer)token,
((Integer)token).intValue(), cb, sched);
+ onSuccess(data, false, (Integer)token,
((Integer)token).intValue(), cb, sched, container);
} else {
- onFailure(new
FetchException(FetchException.INVALID_METADATA, "Metadata where expected
data"), token, sched);
+ onFailure(new
FetchException(FetchException.INVALID_METADATA, "Metadata where expected
data"), token, sched, container);
}
}
@@ -435,21 +437,21 @@
}
}
- public long getCooldownWakeup(Object token) {
+ public long getCooldownWakeup(Object token, ObjectContainer container) {
return segment.getCooldownWakeup(((Integer)token).intValue());
}
- public void requeueAfterCooldown(Key key, long time) {
+ public void requeueAfterCooldown(Key key, long time, ObjectContainer
container) {
if(Logger.shouldLog(Logger.MINOR, this))
Logger.minor(this, "Requeueing after cooldown "+key+"
for "+this);
- segment.requeueAfterCooldown(key, time);
+ segment.requeueAfterCooldown(key, time, container);
}
- public long getCooldownWakeupByKey(Key key) {
+ public long getCooldownWakeupByKey(Key key, ObjectContainer container) {
return segment.getCooldownWakeupByKey(key);
}
- public void resetCooldownTimes() {
+ public void resetCooldownTimes(ObjectContainer container) {
synchronized(segment) {
segment.resetCooldownTimes((Integer[])blockNums.toArray(new
Integer[blockNums.size()]));
}
Modified: branches/db4o/freenet/src/freenet/client/async/SplitFileInserter.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/SplitFileInserter.java
2008-06-02 15:53:31 UTC (rev 20188)
+++ branches/db4o/freenet/src/freenet/client/async/SplitFileInserter.java
2008-06-03 20:49:20 UTC (rev 20189)
@@ -6,6 +6,8 @@
import java.io.IOException;
import java.util.Vector;
+import com.db4o.ObjectContainer;
+
import freenet.client.ClientMetadata;
import freenet.client.FECCodec;
import freenet.client.FailureCodeTracker;
@@ -228,9 +230,9 @@
return (SplitFileInserterSegment[]) segs.toArray(new
SplitFileInserterSegment[segs.size()]);
}
- public void start() throws InsertException {
+ public void start(ObjectContainer container) throws InsertException {
for(int i=0;i<segments.length;i++)
- segments[i].start();
+ segments[i].start(container);
if(countDataBlocks > 32)
parent.onMajorProgress();
@@ -359,7 +361,7 @@
return parent;
}
- public void segmentFinished(SplitFileInserterSegment segment) {
+ public void segmentFinished(SplitFileInserterSegment segment,
ObjectContainer container) {
if(logMINOR) Logger.minor(this, "Segment finished: "+segment,
new Exception("debug"));
boolean allGone = true;
if(countDataBlocks > 32)
@@ -379,7 +381,7 @@
InsertException e = segment.getException();
if((e != null) && e.isFatal()) {
- cancel();
+ cancel(container);
} else {
if(!allGone) return;
}
@@ -431,7 +433,7 @@
}
}
- public void cancel() {
+ public void cancel(ObjectContainer container) {
synchronized(this) {
if(finished) return;
finished = true;
@@ -440,8 +442,8 @@
segments[i].cancel();
}
- public void schedule() throws InsertException {
- start();
+ public void schedule(ObjectContainer container) throws InsertException {
+ start(container);
}
public Object getToken() {
Modified:
branches/db4o/freenet/src/freenet/client/async/SplitFileInserterSegment.java
===================================================================
---
branches/db4o/freenet/src/freenet/client/async/SplitFileInserterSegment.java
2008-06-02 15:53:31 UTC (rev 20188)
+++
branches/db4o/freenet/src/freenet/client/async/SplitFileInserterSegment.java
2008-06-03 20:49:20 UTC (rev 20189)
@@ -2,6 +2,8 @@
import java.net.MalformedURLException;
+import com.db4o.ObjectContainer;
+
import freenet.client.FECCodec;
import freenet.client.FECJob;
import freenet.client.FailureCodeTracker;
@@ -393,7 +395,7 @@
return fs;
}
- public void start() throws InsertException {
+ public void start(ObjectContainer container) throws InsertException {
if (logMINOR)
Logger.minor(this, "Starting segment " + segNo + " of "
+ parent
+ " (" + parent.dataLength + "): " +
this + " ( finished="
@@ -444,7 +446,7 @@
} else
parent.parent.completedBlock(true);
}
- onEncodedSegment();
+ onEncodedSegment(container);
}
if (hasURIs) {
parent.segmentHasURIs(this);
@@ -458,13 +460,13 @@
if (fin)
finish();
if (finished) {
- parent.segmentFinished(this);
+ parent.segmentFinished(this, container);
}
}
- public void onDecodedSegment() {} // irrevelant
+ public void onDecodedSegment(ObjectContainer container) {} // irrevelant
- public void onEncodedSegment() {
+ public void onEncodedSegment(ObjectContainer container) {
// Start the inserts
try {
for (int i = 0; i < checkBlockInserters.length; i++) {
Modified: branches/db4o/freenet/src/freenet/client/async/USKCallback.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/USKCallback.java
2008-06-02 15:53:31 UTC (rev 20188)
+++ branches/db4o/freenet/src/freenet/client/async/USKCallback.java
2008-06-03 20:49:20 UTC (rev 20189)
@@ -3,6 +3,8 @@
* http://www.gnu.org/ for further details of the GPL. */
package freenet.client.async;
+import com.db4o.ObjectContainer;
+
import freenet.keys.USK;
/**
@@ -14,7 +16,7 @@
/** Found the latest edition.
* @param l The edition number.
* @param key The key. */
- void onFoundEdition(long l, USK key);
+ void onFoundEdition(long l, USK key, ObjectContainer container);
/**
* Priority at which the polling should run normally.
Modified: branches/db4o/freenet/src/freenet/client/async/USKChecker.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/USKChecker.java
2008-06-02 15:53:31 UTC (rev 20188)
+++ branches/db4o/freenet/src/freenet/client/async/USKChecker.java
2008-06-03 20:49:20 UTC (rev 20189)
@@ -3,6 +3,8 @@
* http://www.gnu.org/ for further details of the GPL. */
package freenet.client.async;
+import com.db4o.ObjectContainer;
+
import freenet.client.FetchContext;
import freenet.keys.ClientKey;
import freenet.keys.ClientKeyBlock;
@@ -26,12 +28,12 @@
this.cb = cb;
}
- public void onSuccess(ClientKeyBlock block, boolean fromStore, Object
token, RequestScheduler sched) {
+ public void onSuccess(ClientKeyBlock block, boolean fromStore, Object
token, RequestScheduler sched, ObjectContainer container) {
unregister(false);
cb.onSuccess((ClientSSKBlock)block);
}
- public void onFailure(LowLevelGetException e, Object token,
RequestScheduler sched) {
+ public void onFailure(LowLevelGetException e, Object token,
RequestScheduler sched, ObjectContainer container) {
if(Logger.shouldLog(Logger.MINOR, this))
Logger.minor(this, "onFailure: "+e+" for "+this);
// Firstly, can we retry?
@@ -61,7 +63,7 @@
canRetry = true;
}
- if(canRetry && retry(sched)) return;
+ if(canRetry && retry(sched, container)) return;
// Ran out of retries.
unregister(false);
Modified: branches/db4o/freenet/src/freenet/client/async/USKFetcherCallback.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/USKFetcherCallback.java
2008-06-02 15:53:31 UTC (rev 20188)
+++ branches/db4o/freenet/src/freenet/client/async/USKFetcherCallback.java
2008-06-03 20:49:20 UTC (rev 20189)
@@ -3,6 +3,8 @@
* http://www.gnu.org/ for further details of the GPL. */
package freenet.client.async;
+import com.db4o.ObjectContainer;
+
/**
* Callback interface for USK fetches. If you submit a USK fetch via
* USKManager.getFetcher, then register yourself on it as a listener, then you
@@ -11,8 +13,8 @@
public interface USKFetcherCallback extends USKCallback {
/** Failed to find any edition at all (later than or equal to the
specified hint) */
- void onFailure();
+ void onFailure(ObjectContainer container);
- void onCancelled();
+ void onCancelled(ObjectContainer container);
}
Modified: branches/db4o/freenet/src/freenet/client/async/USKFetcherWrapper.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/USKFetcherWrapper.java
2008-06-02 15:53:31 UTC (rev 20188)
+++ branches/db4o/freenet/src/freenet/client/async/USKFetcherWrapper.java
2008-06-03 20:49:20 UTC (rev 20189)
@@ -3,6 +3,8 @@
* http://www.gnu.org/ for further details of the GPL. */
package freenet.client.async;
+import com.db4o.ObjectContainer;
+
import freenet.client.FetchException;
import freenet.client.FetchResult;
import freenet.keys.FreenetURI;
@@ -33,19 +35,19 @@
// Do nothing
}
- public void onSuccess(FetchResult result, ClientGetState state) {
+ public void onSuccess(FetchResult result, ClientGetState state,
ObjectContainer container) {
// Ignore; we don't do anything with it because we are running
in the background.
}
- public void onFailure(FetchException e, ClientGetState state) {
+ public void onFailure(FetchException e, ClientGetState state,
ObjectContainer container) {
// Ignore
}
- public void onBlockSetFinished(ClientGetState state) {
+ public void onBlockSetFinished(ClientGetState state, ObjectContainer
container) {
// Ignore
}
- public void onTransition(ClientGetState oldState, ClientGetState
newState) {
+ public void onTransition(ClientGetState oldState, ClientGetState
newState, ObjectContainer container) {
// Ignore
}
@@ -53,15 +55,15 @@
return super.toString()+ ':' +usk;
}
- public void onExpectedMIME(String mime) {
+ public void onExpectedMIME(String mime, ObjectContainer container) {
// Ignore
}
- public void onExpectedSize(long size) {
+ public void onExpectedSize(long size, ObjectContainer container) {
// Ignore
}
- public void onFinalizedMetadata() {
+ public void onFinalizedMetadata(ObjectContainer container) {
// Ignore
}
}
Modified: branches/db4o/freenet/src/freenet/client/async/USKInserter.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/USKInserter.java
2008-06-02 15:53:31 UTC (rev 20188)
+++ branches/db4o/freenet/src/freenet/client/async/USKInserter.java
2008-06-03 20:49:20 UTC (rev 20189)
@@ -7,6 +7,8 @@
import java.net.MalformedURLException;
import java.util.Arrays;
+import com.db4o.ObjectContainer;
+
import freenet.client.InsertContext;
import freenet.client.InsertException;
import freenet.client.Metadata;
@@ -51,7 +53,7 @@
/** After attempting inserts on this many slots, go back to the Fetcher
*/
private static final long MAX_TRIED_SLOTS = 10;
- public void schedule() throws InsertException {
+ public void schedule(ObjectContainer container) throws InsertException {
// Caller calls schedule()
// schedule() calls scheduleFetcher()
// scheduleFetcher() creates a Fetcher (set up to tell us about
author-errors as well as valid inserts)
@@ -79,7 +81,7 @@
fetcher.schedule();
}
- public void onFoundEdition(long l, USK key) {
+ public void onFoundEdition(long l, USK key, ObjectContainer container) {
boolean alreadyInserted = false;
synchronized(this) {
edition = Math.max(l, edition);
@@ -111,11 +113,11 @@
parent.completedBlock(true);
cb.onSuccess(this);
} else {
- scheduleInsert();
+ scheduleInsert(container);
}
}
- private void scheduleInsert() {
+ private void scheduleInsert(ObjectContainer container) {
long edNo = Math.max(edition, ctx.uskManager.lookup(pubUSK)+1);
synchronized(this) {
if(finished) return;
@@ -126,7 +128,7 @@
ctx, this, isMetadata, sourceLength,
token, getCHKOnly, false, true /* we don't use it */, tokenObject);
}
try {
- sbi.schedule();
+ sbi.schedule(container);
} catch (InsertException e) {
cb.onFailure(e, this);
}
@@ -149,7 +151,7 @@
// FINISHED!!!! Yay!!!
}
- public synchronized void onFailure(InsertException e, ClientPutState
state) {
+ public synchronized void onFailure(InsertException e, ClientPutState
state, ObjectContainer container) {
sbi = null;
if(e.getMode() == InsertException.COLLISION) {
// Try the next slot
@@ -157,7 +159,7 @@
if(consecutiveCollisions++ > MAX_TRIED_SLOTS)
scheduleFetcher();
else
- scheduleInsert();
+ scheduleInsert(container);
} else {
cb.onFailure(e, state);
}
@@ -190,11 +192,11 @@
return parent;
}
- public void cancel() {
+ public void cancel(ObjectContainer container) {
if(fetcher != null)
fetcher.cancel();
if(sbi != null)
- sbi.cancel();
+ sbi.cancel(container);
synchronized(this) {
finished = true;
}
Modified:
branches/db4o/freenet/src/freenet/clients/http/bookmark/BookmarkManager.java
===================================================================
---
branches/db4o/freenet/src/freenet/clients/http/bookmark/BookmarkManager.java
2008-06-02 15:53:31 UTC (rev 20188)
+++
branches/db4o/freenet/src/freenet/clients/http/bookmark/BookmarkManager.java
2008-06-03 20:49:20 UTC (rev 20189)
@@ -10,6 +10,9 @@
import java.net.MalformedURLException;
import java.util.Date;
import java.util.HashMap;
+
+import com.db4o.ObjectContainer;
+
import freenet.client.async.USKCallback;
import freenet.keys.FreenetURI;
import freenet.keys.USK;
@@ -91,7 +94,7 @@
private class USKUpdatedCallback implements USKCallback {
- public void onFoundEdition(long edition, USK key) {
+ public void onFoundEdition(long edition, USK key,
ObjectContainer container) {
BookmarkItems items = MAIN_CATEGORY.getAllItems();
for(int i = 0; i < items.size(); i++) {
if(!"USK".equals(items.get(i).getKeyType()))
Modified: branches/db4o/freenet/src/freenet/node/BaseSendableGet.java
===================================================================
--- branches/db4o/freenet/src/freenet/node/BaseSendableGet.java 2008-06-02
15:53:31 UTC (rev 20188)
+++ branches/db4o/freenet/src/freenet/node/BaseSendableGet.java 2008-06-03
20:49:20 UTC (rev 20189)
@@ -1,12 +1,14 @@
package freenet.node;
+import com.db4o.ObjectContainer;
+
import freenet.keys.Key;
public abstract class BaseSendableGet extends SendableRequest {
/** Get a numbered key to fetch. */
- public abstract Key getNodeKey(Object token);
+ public abstract Key getNodeKey(Object token, ObjectContainer container);
- public abstract boolean hasValidKeys(KeysFetchingLocally fetching);
+ public abstract boolean hasValidKeys(KeysFetchingLocally fetching,
ObjectContainer container);
}
Modified: branches/db4o/freenet/src/freenet/node/NodeClientCore.java
===================================================================
--- branches/db4o/freenet/src/freenet/node/NodeClientCore.java 2008-06-02
15:53:31 UTC (rev 20188)
+++ branches/db4o/freenet/src/freenet/node/NodeClientCore.java 2008-06-03
20:49:20 UTC (rev 20189)
@@ -113,8 +113,14 @@
* - Only one weak-reference cache for the database.
* - No need to refresh live objects.
* - Deactivation is simpler.
+ * Note that the priorities are thread priorities, not request
priorities.
*/
public final PrioritizedSerialExecutor clientDatabaseExecutor;
+ /**
+ * Whenever a new request is added, we have to check the datastore. We
funnel all such access
+ * through this thread. Note that the priorities are request
priorities, not thread priorities.
+ */
+ public final PrioritizedSerialExecutor datastoreCheckerExecutor;
public static int maxBackgroundUSKFetchers;
@@ -141,6 +147,7 @@
clientSlowSerialExecutor[i] = new SerialExecutor(prio);
}
clientDatabaseExecutor = new
PrioritizedSerialExecutor(NativeThread.NORM_PRIORITY,
NativeThread.MAX_PRIORITY+1, NativeThread.NORM_PRIORITY);
+ datastoreCheckerExecutor = new
PrioritizedSerialExecutor(NativeThread.NORM_PRIORITY,
RequestStarter.NUMBER_OF_PRIORITY_CLASSES, 0);
byte[] pwdBuf = new byte[16];
random.nextBytes(pwdBuf);
this.formPassword = Base64.encode(pwdBuf);
Modified: branches/db4o/freenet/src/freenet/node/RequestScheduler.java
===================================================================
--- branches/db4o/freenet/src/freenet/node/RequestScheduler.java
2008-06-02 15:53:31 UTC (rev 20188)
+++ branches/db4o/freenet/src/freenet/node/RequestScheduler.java
2008-06-03 20:49:20 UTC (rev 20189)
@@ -58,4 +58,8 @@
public void callFailure(final SendableGet get, final
LowLevelGetException e, final Object keyNum, int prio, String name);
+ public void callFailure(final SendableInsert put, final
LowLevelPutException e, final Object keyNum, int prio, String name);
+
+ public void callSuccess(final SendableInsert put, final Object keyNum,
int prio, String name);
+
}
Modified: branches/db4o/freenet/src/freenet/node/SendableGet.java
===================================================================
--- branches/db4o/freenet/src/freenet/node/SendableGet.java 2008-06-02
15:53:31 UTC (rev 20188)
+++ branches/db4o/freenet/src/freenet/node/SendableGet.java 2008-06-03
20:49:20 UTC (rev 20189)
@@ -3,6 +3,8 @@
* http://www.gnu.org/ for further details of the GPL. */
package freenet.node;
+import com.db4o.ObjectContainer;
+
import freenet.client.FetchContext;
import freenet.client.async.ClientRequestScheduler;
import freenet.client.async.ClientRequester;
@@ -25,10 +27,10 @@
public final ClientRequester parent;
/** Get a numbered key to fetch. */
- public abstract ClientKey getKey(Object token);
+ public abstract ClientKey getKey(Object token, ObjectContainer
container);
- public Key getNodeKey(Object token) {
- ClientKey key = getKey(token);
+ public Key getNodeKey(Object token, ObjectContainer container) {
+ ClientKey key = getKey(token, container);
if(key == null) return null;
return key.getNodeKey();
}
@@ -37,10 +39,10 @@
public abstract FetchContext getContext();
/** Called when/if the low-level request succeeds. */
- public abstract void onSuccess(ClientKeyBlock block, boolean fromStore,
Object token, RequestScheduler sched);
+ public abstract void onSuccess(ClientKeyBlock block, boolean fromStore,
Object token, RequestScheduler sched, ObjectContainer container);
/** Called when/if the low-level request fails. */
- public abstract void onFailure(LowLevelGetException e, Object token,
RequestScheduler sched);
+ public abstract void onFailure(LowLevelGetException e, Object token,
RequestScheduler sched, ObjectContainer container);
/** Should the request ignore the datastore? */
public abstract boolean ignoreStore();
@@ -58,7 +60,7 @@
* @return True if a request was executed. False if caller should try
to find another request, and remove
* this one from the queue. */
public boolean send(NodeClientCore core, final RequestScheduler sched,
final Object keyNum) {
- ClientKey key = getKey(keyNum);
+ ClientKey key = getKey(keyNum, container);
if(key == null) {
Logger.error(this, "Key is null in send(): keyNum =
"+keyNum+" for "+this);
return false;
@@ -66,11 +68,6 @@
if(Logger.shouldLog(Logger.MINOR, this))
Logger.minor(this, "Sending get for key "+keyNum+" :
"+key);
FetchContext ctx = getContext();
- long now = System.currentTimeMillis();
- if(getCooldownWakeupByKey(key.getNodeKey()) > now) {
- Logger.error(this, "Key is still on the cooldown queue
in send() for "+this+" - key = "+key, new Exception("error"));
- return false;
- }
boolean logMINOR = Logger.shouldLog(Logger.MINOR, this);
if(isCancelled()) {
if(logMINOR) Logger.minor(this, "Cancelled: "+this);
@@ -99,7 +96,7 @@
return true;
}
- public void schedule() {
+ public void schedule(ObjectContainer container) {
if(Logger.shouldLog(Logger.MINOR, this))
Logger.minor(this, "Scheduling "+this);
getScheduler().register(this);
@@ -118,7 +115,7 @@
* @param block
* @param sched
*/
- public abstract void onGotKey(Key key, KeyBlock block, RequestScheduler
sched);
+ public abstract void onGotKey(Key key, KeyBlock block, RequestScheduler
sched, ObjectContainer container);
/**
* Get the time at which the key specified by the given token will wake
up from the
@@ -126,12 +123,12 @@
* @param token
* @return
*/
- public abstract long getCooldownWakeup(Object token);
+ public abstract long getCooldownWakeup(Object token, ObjectContainer
container);
- public abstract long getCooldownWakeupByKey(Key key);
+ public abstract long getCooldownWakeupByKey(Key key, ObjectContainer
container);
/** Reset the cooldown times when the request is reregistered. */
- public abstract void resetCooldownTimes();
+ public abstract void resetCooldownTimes(ObjectContainer container);
public final void unregister(boolean staySubscribed) {
if(!staySubscribed)
@@ -143,7 +140,7 @@
getScheduler().removePendingKey(this, false, key);
}
- public void internalError(final Object keyNum, final Throwable t, final
RequestScheduler sched) {
+ public void internalError(final Object keyNum, final Throwable t, final
RequestScheduler sched, ObjectContainer container) {
sched.callFailure(this, new
LowLevelGetException(LowLevelGetException.INTERNAL_ERROR, t.getMessage(), t),
keyNum, NativeThread.MAX_PRIORITY, "Internal error");
}
@@ -152,6 +149,6 @@
* Only requeue if our requeue time is less than or equal to the given
time.
* @param key
*/
- public abstract void requeueAfterCooldown(Key key, long time);
+ public abstract void requeueAfterCooldown(Key key, long time,
ObjectContainer container);
}
Modified: branches/db4o/freenet/src/freenet/node/SendableInsert.java
===================================================================
--- branches/db4o/freenet/src/freenet/node/SendableInsert.java 2008-06-02
15:53:31 UTC (rev 20188)
+++ branches/db4o/freenet/src/freenet/node/SendableInsert.java 2008-06-03
20:49:20 UTC (rev 20189)
@@ -3,6 +3,8 @@
* http://www.gnu.org/ for further details of the GPL. */
package freenet.node;
+import com.db4o.ObjectContainer;
+
/**
* Callback interface for a low level insert, which is immediately sendable.
These
* should be registered on the ClientRequestScheduler when we want to send
them. It will
@@ -12,13 +14,13 @@
public abstract class SendableInsert extends SendableRequest {
/** Called when we successfully insert the data */
- public abstract void onSuccess(Object keyNum);
+ public abstract void onSuccess(Object keyNum, ObjectContainer
container);
/** Called when we don't! */
- public abstract void onFailure(LowLevelPutException e, Object keyNum);
+ public abstract void onFailure(LowLevelPutException e, Object keyNum,
ObjectContainer container);
- public void internalError(Object keyNum, Throwable t, RequestScheduler
sched) {
- onFailure(new
LowLevelPutException(LowLevelPutException.INTERNAL_ERROR, t.getMessage(), t),
keyNum);
+ public void internalError(Object keyNum, Throwable t, RequestScheduler
sched, ObjectContainer container) {
+ onFailure(new
LowLevelPutException(LowLevelPutException.INTERNAL_ERROR, t.getMessage(), t),
keyNum, container);
}
}
Modified: branches/db4o/freenet/src/freenet/node/SendableRequest.java
===================================================================
--- branches/db4o/freenet/src/freenet/node/SendableRequest.java 2008-06-02
15:53:31 UTC (rev 20188)
+++ branches/db4o/freenet/src/freenet/node/SendableRequest.java 2008-06-03
20:49:20 UTC (rev 20189)
@@ -1,5 +1,7 @@
package freenet.node;
+import com.db4o.ObjectContainer;
+
import freenet.client.async.ClientRequester;
import freenet.support.Logger;
import freenet.support.RandomGrabArray;
@@ -25,15 +27,15 @@
* (but not the key itself, implementors must have a separate queue of
block
* numbers and mapping of block numbers to keys).
* @return An object identifying a specific key. null indicates no keys
available. */
- public abstract Object chooseKey(KeysFetchingLocally keys);
+ public abstract Object chooseKey(KeysFetchingLocally keys,
ObjectContainer container);
/** All key identifiers. Including those not currently eligible to be
sent because
* they are on a cooldown queue, requests for them are in progress,
etc. */
- public abstract Object[] allKeys();
+ public abstract Object[] allKeys(ObjectContainer container);
/** All key identifiers currently eligible to be sent. Does not include
those
* currently running, on the cooldown queue etc. */
- public abstract Object[] sendableKeys();
+ public abstract Object[] sendableKeys(ObjectContainer container);
/** ONLY called by RequestStarter. Start the actual request using the
NodeClientCore
* provided, and the key and key number earlier got from chooseKey().
@@ -85,6 +87,6 @@
}
/** Requeue after an internal error */
- public abstract void internalError(Object keyNum, Throwable t,
RequestScheduler sched);
+ public abstract void internalError(Object keyNum, Throwable t,
RequestScheduler sched, ObjectContainer container);
}
Modified: branches/db4o/freenet/src/freenet/node/SimpleSendableInsert.java
===================================================================
--- branches/db4o/freenet/src/freenet/node/SimpleSendableInsert.java
2008-06-02 15:53:31 UTC (rev 20188)
+++ branches/db4o/freenet/src/freenet/node/SimpleSendableInsert.java
2008-06-03 20:49:20 UTC (rev 20189)
@@ -3,6 +3,8 @@
* http://www.gnu.org/ for further details of the GPL. */
package freenet.node;
+import com.db4o.ObjectContainer;
+
import freenet.client.async.ClientRequestScheduler;
import freenet.client.async.ClientRequester;
import freenet.keys.CHKBlock;
@@ -42,13 +44,13 @@
this.scheduler = scheduler;
}
- public void onSuccess(Object keyNum) {
+ public void onSuccess(Object keyNum, ObjectContainer container) {
// Yay!
if(Logger.shouldLog(Logger.MINOR, this))
Logger.minor(this, "Finished insert of "+block);
}
- public void onFailure(LowLevelPutException e, Object keyNum) {
+ public void onFailure(LowLevelPutException e, Object keyNum,
ObjectContainer container) {
if(Logger.shouldLog(Logger.MINOR, this))
Logger.minor(this, "Failed insert of "+block+": "+e);
}
@@ -69,14 +71,15 @@
if(logMINOR) Logger.minor(this, "Starting request:
"+this);
core.realPut(block, shouldCache());
} catch (LowLevelPutException e) {
- onFailure(e, keyNum);
+ sched.callFailure(get, e, keyNum, prio, name);
+ onFailure(e, keyNum, container);
if(logMINOR) Logger.minor(this, "Request failed:
"+this+" for "+e);
return true;
} finally {
finished = true;
}
if(logMINOR) Logger.minor(this, "Request succeeded: "+this);
- onSuccess(keyNum);
+ onSuccess(keyNum, container);
return true;
}
@@ -118,17 +121,17 @@
return false;
}
- public synchronized Object[] allKeys() {
+ public synchronized Object[] allKeys(ObjectContainer container) {
if(finished) return new Object[] {};
return new Object[] { new Integer(0) };
}
- public synchronized Object[] sendableKeys() {
+ public synchronized Object[] sendableKeys(ObjectContainer container) {
if(finished) return new Object[] {};
return new Object[] { new Integer(0) };
}
- public synchronized Object chooseKey(KeysFetchingLocally keys) {
+ public synchronized Object chooseKey(KeysFetchingLocally keys,
ObjectContainer container) {
if(finished) return null;
else return new Integer(0);
}