Author: toad
Date: 2008-06-24 19:27:00 +0000 (Tue, 24 Jun 2008)
New Revision: 20653
Modified:
branches/db4o/freenet/src/freenet/client/async/BaseSingleFileFetcher.java
branches/db4o/freenet/src/freenet/client/async/SimpleSingleFileFetcher.java
branches/db4o/freenet/src/freenet/client/async/SingleFileFetcher.java
branches/db4o/freenet/src/freenet/client/async/SplitFileFetcherSubSegment.java
branches/db4o/freenet/src/freenet/client/async/USKChecker.java
branches/db4o/freenet/src/freenet/node/SendableRequest.java
Log:
Some activation work on fetches.
Modified:
branches/db4o/freenet/src/freenet/client/async/BaseSingleFileFetcher.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/BaseSingleFileFetcher.java
2008-06-24 17:49:49 UTC (rev 20652)
+++ branches/db4o/freenet/src/freenet/client/async/BaseSingleFileFetcher.java
2008-06-24 19:27:00 UTC (rev 20653)
@@ -56,6 +56,8 @@
}
public ClientKey getKey(Object token, ObjectContainer container) {
+ if(persistent)
+ container.activate(key, 5);
return key;
}
@@ -70,6 +72,9 @@
/** Try again - returns true if we can retry
* @param sched */
protected boolean retry(RequestScheduler sched, ObjectContainer
container, ClientContext context) {
+ if(persistent) {
+ container.activate(this, 1);
+ }
retryCount++;
if(Logger.shouldLog(Logger.MINOR, this))
Logger.minor(this, "Attempting to retry... (max
"+maxRetries+", current "+retryCount+ ')');
@@ -136,6 +141,8 @@
}
public void onGotKey(Key key, KeyBlock block, RequestScheduler sched,
ObjectContainer container, ClientContext context) {
+ if(persistent)
+ container.activate(this, 2);
synchronized(this) {
if(isCancelled()) return;
if(!key.equals(this.key.getNodeKey())) {
Modified:
branches/db4o/freenet/src/freenet/client/async/SimpleSingleFileFetcher.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/SimpleSingleFileFetcher.java
2008-06-24 17:49:49 UTC (rev 20652)
+++ branches/db4o/freenet/src/freenet/client/async/SimpleSingleFileFetcher.java
2008-06-24 19:27:00 UTC (rev 20653)
@@ -84,6 +84,10 @@
// Real onFailure
protected void onFailure(FetchException e, boolean forceFatal,
RequestScheduler sched, ObjectContainer container, ClientContext context) {
+ if(persistent) {
+ container.activate(parent, 1);
+ container.activate(rcb, 1);
+ }
boolean logMINOR = Logger.shouldLog(Logger.MINOR, this);
if(logMINOR) Logger.minor(this, "onFailure( "+e+" ,
"+forceFatal+")", e);
if(parent.isCancelled() || cancelled) {
@@ -108,6 +112,10 @@
/** Will be overridden by SingleFileFetcher */
protected void onSuccess(FetchResult data, RequestScheduler sched,
ObjectContainer container, ClientContext context) {
+ if(persistent) {
+ container.activate(parent, 1);
+ container.activate(rcb, 1);
+ }
unregister(false, container);
if(parent.isCancelled()) {
data.asBucket().free();
Modified: branches/db4o/freenet/src/freenet/client/async/SingleFileFetcher.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/SingleFileFetcher.java
2008-06-24 17:49:49 UTC (rev 20652)
+++ branches/db4o/freenet/src/freenet/client/async/SingleFileFetcher.java
2008-06-24 19:27:00 UTC (rev 20653)
@@ -125,6 +125,10 @@
// splitfile, or another SingleFileFetcher, etc.
public void onSuccess(ClientKeyBlock block, boolean fromStore, Object
token, RequestScheduler sched, ObjectContainer container, ClientContext
context) {
this.sched = sched;
+ if(persistent) {
+ container.activate(parent, 1);
+ container.activate(ctx, 1);
+ }
if(parent instanceof ClientGetter)
((ClientGetter)parent).addKeyToBinaryBlob(block,
container, context);
parent.completedBlock(fromStore, container, context);
@@ -183,6 +187,12 @@
onFailure(new FetchException(FetchException.CANCELLED),
false, sched, container, context);
return;
}
+ if(persistent) {
+ container.activate(decompressors, 1);
+ container.activate(parent, 1);
+ container.activate(ctx, 1);
+ container.activate(rcb, 1);
+ }
if(!decompressors.isEmpty()) {
Bucket data = result.asBucket();
while(!decompressors.isEmpty()) {
@@ -238,6 +248,8 @@
* @throws ArchiveRestartException
*/
private synchronized void handleMetadata(final ObjectContainer
container, final ClientContext context) throws FetchException,
MetadataParseException, ArchiveFailureException, ArchiveRestartException {
+ if(persistent)
+ container.activate(this, 2);
while(true) {
if(metadata.isSimpleManifest()) {
if(logMINOR) Logger.minor(this, "Is simple
manifest");
@@ -566,7 +578,7 @@
* Call handleMetadata(), and deal with any resulting exceptions
*/
private void wrapHandleMetadata(final boolean notFinalizedSize,
ObjectContainer container, final ClientContext context) {
- if(!parent.persistent())
+ if(!persistent)
innerWrapHandleMetadata(notFinalizedSize, container,
context);
else {
if(!context.jobRunner.onDatabaseThread())
@@ -609,18 +621,22 @@
}
public void onSuccess(FetchResult result, ClientGetState state,
ObjectContainer container, ClientContext context) {
- if(!parent.persistent()) {
+ if(!persistent) {
// Run directly - we are running on some thread
somewhere, don't worry about it.
innerSuccess(result, container, context);
} else {
// We are running on the database thread.
// Add a tag, unpack on a separate thread, copy
the data to a persistent bucket, then schedule on the database thread,
// remove the tag, and call the callback.
+ container.activate(SingleFileFetcher.this, 1);
+ container.activate(ah, 1);
ah.extractPersistentOffThread(result.asBucket(), actx, element, callback,
container, context);
}
}
private void innerSuccess(FetchResult result, ObjectContainer
container, ClientContext context) {
+ container.activate(SingleFileFetcher.this, 1);
+ container.activate(ah, 1);
try {
ah.extractToCache(result.asBucket(), actx,
element, callback, context.archiveManager, container, context);
} catch (ArchiveFailureException e) {
@@ -635,11 +651,14 @@
}
public void onFailure(FetchException e, ClientGetState state,
ObjectContainer container, ClientContext context) {
+ container.activate(SingleFileFetcher.this, 1);
// Force fatal as the fetcher is presumed to have made
a reasonable effort.
SingleFileFetcher.this.onFailure(e, true, sched,
container, context);
}
public void onBlockSetFinished(ClientGetState state,
ObjectContainer container, ClientContext context) {
+ if(persistent)
+ container.activate(rcb, 1);
if(wasFetchingFinalData) {
rcb.onBlockSetFinished(SingleFileFetcher.this,
container, context);
}
@@ -654,6 +673,8 @@
}
public void onExpectedSize(long size, ObjectContainer
container) {
+ if(persistent)
+ container.activate(rcb, 1);
rcb.onExpectedSize(size, container);
}
@@ -666,6 +687,8 @@
class MultiLevelMetadataCallback implements GetCompletionCallback {
public void onSuccess(FetchResult result, ClientGetState state,
ObjectContainer container, ClientContext context) {
+ if(persistent)
+ container.activate(SingleFileFetcher.this, 1);
try {
metadata =
Metadata.construct(result.asBucket());
} catch (MetadataParseException e) {
@@ -680,6 +703,8 @@
}
public void onFailure(FetchException e, ClientGetState state,
ObjectContainer container, ClientContext context) {
+ if(persistent)
+ container.activate(SingleFileFetcher.this, 1);
// Pass it on; fetcher is assumed to have retried as
appropriate already, so this is fatal.
SingleFileFetcher.this.onFailure(e, true, sched,
container, context);
}
@@ -697,6 +722,10 @@
}
public void onExpectedSize(long size, ObjectContainer
container) {
+ if(persistent) {
+ container.activate(SingleFileFetcher.this, 1);
+ container.activate(rcb, 1);
+ }
rcb.onExpectedSize(size, container);
}
@@ -760,7 +789,7 @@
// Do a thorough, blocking search
USKFetcherTag tag =
context.uskManager.getFetcher(usk.copy(-usk.suggestedEdition), ctx, false,
requester.persistent(),
- new
MyUSKFetcherCallback(requester, cb, clientMetadata, usk, metaStrings, ctx,
actx, maxRetries, recursionLevel, dontTellClientGet, l, returnBucket),
container, context);
+ new
MyUSKFetcherCallback(requester, cb, clientMetadata, usk, metaStrings, ctx,
actx, maxRetries, recursionLevel, dontTellClientGet, l, returnBucket,
requester.persistent()), container, context);
if(isEssential)
requester.addMustSucceedBlocks(1, container);
return tag;
@@ -781,8 +810,9 @@
final boolean dontTellClientGet;
final long token;
final Bucket returnBucket;
+ final boolean persistent;
- public MyUSKFetcherCallback(ClientRequester requester,
GetCompletionCallback cb, ClientMetadata clientMetadata, USK usk, LinkedList
metaStrings, FetchContext ctx, ArchiveContext actx, int maxRetries, int
recursionLevel, boolean dontTellClientGet, long l, Bucket returnBucket) {
+ public MyUSKFetcherCallback(ClientRequester requester,
GetCompletionCallback cb, ClientMetadata clientMetadata, USK usk, LinkedList
metaStrings, FetchContext ctx, ArchiveContext actx, int maxRetries, int
recursionLevel, boolean dontTellClientGet, long l, Bucket returnBucket, boolean
persistent) {
this.parent = requester;
this.cb = cb;
this.clientMetadata = clientMetadata;
@@ -795,9 +825,12 @@
this.dontTellClientGet = dontTellClientGet;
this.token = l;
this.returnBucket = returnBucket;
+ this.persistent = persistent;
}
public void onFoundEdition(long l, USK newUSK, ObjectContainer
container, ClientContext context, boolean metadata, short codec, byte[] data) {
+ if(persistent)
+ container.activate(this, 2);
ClientSSK key = usk.getSSK(l);
try {
if(l == usk.suggestedEdition) {
@@ -813,10 +846,14 @@
}
public void onFailure(ObjectContainer container, ClientContext
context) {
+ if(persistent)
+ container.activate(this, 2);
cb.onFailure(new
FetchException(FetchException.DATA_NOT_FOUND, "No USK found"), null, container,
context);
}
public void onCancelled(ObjectContainer container,
ClientContext context) {
+ if(persistent)
+ container.activate(this, 2);
cb.onFailure(new
FetchException(FetchException.CANCELLED, (String)null), null, container,
context);
}
Modified:
branches/db4o/freenet/src/freenet/client/async/SplitFileFetcherSubSegment.java
===================================================================
---
branches/db4o/freenet/src/freenet/client/async/SplitFileFetcherSubSegment.java
2008-06-24 17:49:49 UTC (rev 20652)
+++
branches/db4o/freenet/src/freenet/client/async/SplitFileFetcherSubSegment.java
2008-06-24 19:27:00 UTC (rev 20653)
@@ -68,10 +68,14 @@
public Object chooseKey(KeysFetchingLocally keys, ObjectContainer
container, ClientContext context) {
if(cancelled) return null;
- return removeRandomBlockNum(keys, context);
+ return removeRandomBlockNum(keys, context, container);
}
public ClientKey getKey(Object token, ObjectContainer container) {
+ if(persistent) {
+ container.activate(this, 1);
+ container.activate(segment, 1);
+ }
synchronized(segment) {
if(cancelled) {
if(logMINOR)
@@ -97,6 +101,10 @@
* those on cooldown queues. This is important when unregistering.
*/
public Object[] allKeys(ObjectContainer container) {
+ if(persistent) {
+ container.activate(this, 1);
+ container.activate(segment, 1);
+ }
return segment.getKeyNumbersAtRetryLevel(retryCount);
}
@@ -104,10 +112,19 @@
* Just those keys which are eligible to be started now.
*/
public Object[] sendableKeys(ObjectContainer container) {
+ if(persistent) {
+ container.activate(this, 1);
+ container.activate(blockNums, 1);
+ }
return blockNums.toArray();
}
- private Object removeRandomBlockNum(KeysFetchingLocally keys,
ClientContext context) {
+ private Object removeRandomBlockNum(KeysFetchingLocally keys,
ClientContext context, ObjectContainer container) {
+ if(persistent) {
+ container.activate(this, 1);
+ container.activate(blockNums, 1);
+ container.activate(segment, 1);
+ }
logMINOR = Logger.shouldLog(Logger.MINOR, this);
synchronized(segment) {
if(blockNums.isEmpty()) {
@@ -132,6 +149,8 @@
}
if(logMINOR)
Logger.minor(this, "Removing block
"+x+" of "+(blockNums.size()+1)+ " : "+ret+ " on "+this);
+ if(persistent)
+ container.set(blockNums);
return ret;
}
return null;
@@ -139,6 +158,12 @@
}
public boolean hasValidKeys(KeysFetchingLocally keys, ObjectContainer
container, ClientContext context) {
+ if(persistent) {
+ container.activate(this, 1);
+ container.activate(blockNums, 1);
+ container.activate(segment, 1);
+ }
+ boolean hasSet = false;
synchronized(segment) {
for(int i=0;i<10;i++) {
Object ret;
@@ -150,6 +175,10 @@
if(key == null) {
Logger.error(this, "Key is null for
block "+ret+" for "+this+" in hasValidKeys()");
blockNums.remove(x);
+ if(persistent && !hasSet) {
+ hasSet = true;
+ container.set(blockNums);
+ }
continue;
}
if(keys.hasKey(key)) {
@@ -210,6 +239,11 @@
// Real onFailure
protected void onFailure(FetchException e, Object token,
RequestScheduler sched, ObjectContainer container, ClientContext context) {
+ if(persistent) {
+ container.activate(this, 1);
+ container.activate(segment, 1);
+ container.activate(parent, 1);
+ }
boolean forceFatal = false;
if(parent.isCancelled()) {
if(Logger.shouldLog(Logger.MINOR, this))
@@ -226,11 +260,17 @@
}
public void onSuccess(ClientKeyBlock block, boolean fromStore, Object
token, RequestScheduler sched, ObjectContainer container, ClientContext
context) {
+ if(persistent) {
+ container.activate(this, 1);
+ container.activate(segment, 1);
+ container.activate(blockNums, 1);
+ }
Bucket data = extract(block, token, sched, container);
if(fromStore) {
// Normally when this method is called the block number
has already
// been removed. However if fromStore=true, it won't
have been, so
// we have to do it. (Check the call trace for why)
+ boolean removed = false;
synchronized(segment) {
for(int i=0;i<blockNums.size();i++) {
Integer x = (Integer) blockNums.get(i);
@@ -239,9 +279,12 @@
blockNums.remove(i);
if(logMINOR) Logger.minor(this,
"Removed block "+i+" : "+x);
i--;
+ removed = true;
}
}
}
+ if(persistent && removed)
+ container.set(blockNums);
}
if(!block.isMetadata()) {
onSuccess(data, fromStore, (Integer)token,
((Integer)token).intValue(), block, sched, container);
@@ -251,6 +294,11 @@
}
protected void onSuccess(Bucket data, boolean fromStore, Integer token,
int blockNo, ClientKeyBlock block, RequestScheduler sched, ObjectContainer
container) {
+ if(persistent) {
+ container.activate(this, 1);
+ container.activate(segment, 1);
+ container.activate(parent, 1);
+ }
if(parent.isCancelled()) {
data.free();
onFailure(new FetchException(FetchException.CANCELLED),
token, sched, container, sched.getContext());
@@ -266,7 +314,7 @@
ClientContext context = sched.getContext();
Bucket data;
try {
- data =
block.decode(sched.getContext().getBucketFactory(segment.parentFetcher.parent.persistent()),
(int)(Math.min(ctx.maxOutputLength, Integer.MAX_VALUE)), false);
+ data =
block.decode(sched.getContext().getBucketFactory(persistent),
(int)(Math.min(ctx.maxOutputLength, Integer.MAX_VALUE)), false);
} catch (KeyDecodeException e1) {
if(Logger.shouldLog(Logger.MINOR, this))
Logger.minor(this, "Decode failure: "+e1, e1);
@@ -331,6 +379,11 @@
}
public void add(int blockNo, boolean dontSchedule, ObjectContainer
container, ClientContext context) {
+ if(persistent) {
+ container.activate(this, 1);
+ container.activate(segment, 1);
+ container.activate(blockNums, 1);
+ }
boolean logMINOR = Logger.shouldLog(Logger.MINOR, this);
if(logMINOR) Logger.minor(this, "Adding block "+blockNo+" to
"+this+" dontSchedule="+dontSchedule);
if(blockNo < 0) throw new IllegalArgumentException();
@@ -360,6 +413,8 @@
schedule = false;
}
}
+ if(persistent)
+ container.set(blockNums);
if(schedule) schedule(container, context);
else if(!dontSchedule)
// Already scheduled, however this key may not be
registered.
@@ -371,6 +426,10 @@
}
public void possiblyRemoveFromParent(ObjectContainer container) {
+ if(persistent) {
+ container.activate(this, 1);
+ container.activate(segment, 1);
+ }
if(logMINOR)
Logger.minor(this, "Possibly removing from parent:
"+this);
synchronized(segment) {
@@ -384,6 +443,11 @@
}
public void onGotKey(Key key, KeyBlock block, RequestScheduler sched,
ObjectContainer container, ClientContext context) {
+ if(persistent) {
+ container.activate(this, 1);
+ container.activate(segment, 1);
+ container.activate(blockNums, 1);
+ }
if(logMINOR) Logger.minor(this, "onGotKey("+key+")");
// Find and remove block if it is on this subsegment. However
it may have been
// removed already.
@@ -428,6 +492,11 @@
* subsegment from the list.
*/
public void kill(ObjectContainer container) {
+ if(persistent) {
+ container.activate(this, 1);
+ container.activate(segment, 1);
+ container.activate(blockNums, 1);
+ }
if(logMINOR)
Logger.minor(this, "Killing "+this);
// Do unregister() first so can get and unregister each key and
avoid a memory leak
@@ -439,20 +508,36 @@
}
public long getCooldownWakeup(Object token, ObjectContainer container) {
+ if(persistent) {
+ container.activate(this, 1);
+ container.activate(segment, 1);
+ }
return segment.getCooldownWakeup(((Integer)token).intValue());
}
public void requeueAfterCooldown(Key key, long time, ObjectContainer
container, ClientContext context) {
+ if(persistent) {
+ container.activate(this, 1);
+ container.activate(segment, 1);
+ }
if(Logger.shouldLog(Logger.MINOR, this))
Logger.minor(this, "Requeueing after cooldown "+key+"
for "+this);
segment.requeueAfterCooldown(key, time, container, context);
}
public long getCooldownWakeupByKey(Key key, ObjectContainer container) {
+ if(persistent) {
+ container.activate(this, 1);
+ container.activate(segment, 1);
+ }
return segment.getCooldownWakeupByKey(key);
}
public void resetCooldownTimes(ObjectContainer container) {
+ if(persistent) {
+ container.activate(this, 1);
+ container.activate(segment, 1);
+ }
synchronized(segment) {
segment.resetCooldownTimes((Integer[])blockNums.toArray(new
Integer[blockNums.size()]));
}
Modified: branches/db4o/freenet/src/freenet/client/async/USKChecker.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/USKChecker.java
2008-06-24 17:49:49 UTC (rev 20652)
+++ branches/db4o/freenet/src/freenet/client/async/USKChecker.java
2008-06-24 19:27:00 UTC (rev 20653)
@@ -29,11 +29,19 @@
}
public void onSuccess(ClientKeyBlock block, boolean fromStore, Object
token, RequestScheduler sched, ObjectContainer container, ClientContext
context) {
+ if(persistent) {
+ container.activate(this, 1);
+ container.activate(cb, 1);
+ }
unregister(false, container);
cb.onSuccess((ClientSSKBlock)block, context);
}
public void onFailure(LowLevelGetException e, Object token,
RequestScheduler sched, ObjectContainer container, ClientContext context) {
+ if(persistent) {
+ container.activate(this, 1);
+ container.activate(cb, 1);
+ }
if(Logger.shouldLog(Logger.MINOR, this))
Logger.minor(this, "onFailure: "+e+" for "+this);
// Firstly, can we retry?
Modified: branches/db4o/freenet/src/freenet/node/SendableRequest.java
===================================================================
--- branches/db4o/freenet/src/freenet/node/SendableRequest.java 2008-06-24
17:49:49 UTC (rev 20652)
+++ branches/db4o/freenet/src/freenet/node/SendableRequest.java 2008-06-24
19:27:00 UTC (rev 20653)
@@ -24,7 +24,7 @@
protected RandomGrabArray parentGrabArray;
/** Member because must be accessible when only marginally activated */
- final boolean persistent;
+ protected final boolean persistent;
/** Get the priority class of the request. */
public abstract short getPriorityClass();