Author: toad
Date: 2008-02-15 23:59:37 +0000 (Fri, 15 Feb 2008)
New Revision: 17966
Modified:
trunk/freenet/src/freenet/client/async/BaseSingleFileFetcher.java
trunk/freenet/src/freenet/client/async/ClientRequestScheduler.java
trunk/freenet/src/freenet/client/async/OfferedKeysList.java
trunk/freenet/src/freenet/client/async/SimpleSingleFileFetcher.java
trunk/freenet/src/freenet/client/async/SingleFileFetcher.java
trunk/freenet/src/freenet/client/async/SplitFileFetcherSegment.java
trunk/freenet/src/freenet/client/async/SplitFileFetcherSubSegment.java
trunk/freenet/src/freenet/client/async/USKChecker.java
trunk/freenet/src/freenet/node/RequestScheduler.java
trunk/freenet/src/freenet/node/RequestStarter.java
trunk/freenet/src/freenet/node/SendableGet.java
trunk/freenet/src/freenet/node/SendableInsert.java
trunk/freenet/src/freenet/node/SendableRequest.java
Log:
Request cooldown queue implementation.
We request a key 3 times, and then we stick it on the request cooldown queue:
it won't be requested again for 30 minutes (it can still be found via ULPRs
etc).
Untested, may completely break everything!
Also lots of supporting refactoring.
Modified: trunk/freenet/src/freenet/client/async/BaseSingleFileFetcher.java
===================================================================
--- trunk/freenet/src/freenet/client/async/BaseSingleFileFetcher.java
2008-02-15 22:44:14 UTC (rev 17965)
+++ trunk/freenet/src/freenet/client/async/BaseSingleFileFetcher.java
2008-02-15 23:59:37 UTC (rev 17966)
@@ -9,6 +9,7 @@
import freenet.keys.Key;
import freenet.keys.KeyBlock;
import freenet.keys.KeyVerifyException;
+import freenet.node.RequestScheduler;
import freenet.node.SendableGet;
import freenet.support.Logger;
@@ -20,6 +21,9 @@
private int retryCount;
final FetchContext ctx;
static final Object[] keys = new Object[] { new Integer(0) };
+ /** It is essential that we know when the cooldown will end, otherwise
we cannot
+ * remove the key from the queue if we are killed before that */
+ long cooldownWakeupTime;
protected BaseSingleFileFetcher(ClientKey key, int maxRetries,
FetchContext ctx, ClientRequester parent) {
super(parent);
@@ -27,6 +31,7 @@
this.maxRetries = maxRetries;
this.key = key;
this.ctx = ctx;
+ cooldownWakeupTime = -1;
}
public Object[] allKeys() {
@@ -49,14 +54,21 @@
return key instanceof ClientSSK;
}
- /** Try again - returns true if we can retry */
- protected boolean retry() {
+ /** Try again - returns true if we can retry
+ * @param sched */
+ protected boolean retry(RequestScheduler sched) {
retryCount++;
if(Logger.shouldLog(Logger.MINOR, this))
Logger.minor(this, "Attempting to retry... (max
"+maxRetries+", current "+retryCount+ ')');
// We want 0, 1, ... maxRetries i.e. maxRetries+1 attempts
(maxRetries=0 => try once, no retries, maxRetries=1 = original try + 1 retry)
if((retryCount <= maxRetries) || (maxRetries == -1)) {
- schedule();
+ if(retryCount % ClientRequestScheduler.COOLDOWN_RETRIES
== 0) {
+ // Add to cooldown queue. Don't reschedule yet.
+ cooldownWakeupTime = sched.queueCooldown(key);
+ return true; // We will retry, just not yet.
See requeueAfterCooldown(Key).
+ } else {
+ schedule();
+ }
return true;
}
return false;
@@ -102,7 +114,7 @@
return true;
}
- public void onGotKey(Key key, KeyBlock block) {
+ public void onGotKey(Key key, KeyBlock block, RequestScheduler sched) {
synchronized(this) {
if(isCancelled()) return;
if(!key.equals(this.key.getNodeKey())) {
@@ -111,11 +123,28 @@
}
}
try {
- onSuccess(Key.createKeyBlock(this.key, block), false,
null);
+ onSuccess(Key.createKeyBlock(this.key, block), false,
null, sched);
} catch (KeyVerifyException e) {
Logger.error(this, "onGotKey("+key+","+block+") got
"+e+" for "+this, e);
// FIXME if we get rid of the direct route this must
call onFailure()
}
}
+
+ public long getCooldownWakeup(Object token) {
+ return cooldownWakeupTime;
+ }
+
+ public long getCooldownWakeupByKey(Key key) {
+ return cooldownWakeupTime;
+ }
+
+ public void requeueAfterCooldown(Key key) {
+ if(!(key.equals(this.key.getNodeKey()))) {
+ Logger.error(this, "Got requeueAfterCooldown for wrong
key: "+key+" but mine is "+this.key.getNodeKey()+" for "+this.key);
+ return;
+ }
+ schedule();
+ }
+
}
Modified: trunk/freenet/src/freenet/client/async/ClientRequestScheduler.java
===================================================================
--- trunk/freenet/src/freenet/client/async/ClientRequestScheduler.java
2008-02-15 22:44:14 UTC (rev 17965)
+++ trunk/freenet/src/freenet/client/async/ClientRequestScheduler.java
2008-02-15 23:59:37 UTC (rev 17966)
@@ -97,6 +97,12 @@
private final Node node;
public final String name;
private final LinkedList /* <WeakReference <RandomGrabArray> > */
recentSuccesses = new LinkedList();
+ private final RequestCooldownQueue cooldownQueue;
+ /** Once a key has been requested a few times, don't request it again
for 30 minutes.
+ * To do so would be pointless given ULPRs, and just waste bandwidth. */
+ public static final long COOLDOWN_PERIOD = 30*60*1000;
+ /** The number of times a key can be requested before triggering the
cooldown period. */
+ public static final int COOLDOWN_RETRIES = 3;
/** All pending gets by key. Used to automatically satisfy pending
requests when either the key is fetched by
* an overlapping request, or it is fetched by a request from another
node. Operations on this are synchronized on
@@ -187,6 +193,7 @@
} else {
offeredKeys = null;
}
+ cooldownQueue = new RequestCooldownQueue(COOLDOWN_PERIOD);
logMINOR = Logger.shouldLog(Logger.MINOR, this);
}
@@ -234,12 +241,12 @@
// verifies at low-level, but
not at decode.
if(logMINOR)
Logger.minor(this,
"Decode failed: "+e, e);
- getter.onFailure(new
LowLevelGetException(LowLevelGetException.DECODE_FAILED), tok);
+ getter.onFailure(new
LowLevelGetException(LowLevelGetException.DECODE_FAILED), tok, this);
continue; // other keys might
be valid
}
if(block != null) {
if(logMINOR) Logger.minor(this,
"Can fulfill "+req+" ("+tok+") immediately from store");
- getter.onSuccess(block, true,
tok);
+ getter.onSuccess(block, true,
tok, this);
// FIXME unfortunately this
seems to be necessary on *nix to prevent
// critical threads from
starving: sadly thread priorities only work on
// Windows and as of linux
2.6.23, fair scheduling does not ensure that
@@ -479,8 +486,9 @@
public void removePendingKey(SendableGet getter, boolean complain, Key
key) {
boolean dropped = false;
+ Object o;
synchronized(pendingKeys) {
- Object o = pendingKeys.get(key);
+ o = pendingKeys.get(key);
if(o == null) {
if(complain)
Logger.normal(this, "Not found:
"+getter+" for "+key+" removing (no such key)");
@@ -527,6 +535,14 @@
for(int i=0;i<offeredKeys.length;i++)
offeredKeys[i].remove(key);
}
+ if(o instanceof SendableGet)
+ cooldownQueue.removeKey(key,
((SendableGet)o).getCooldownWakeupByKey(key));
+ else if(o instanceof SendableGet[]) {
+ SendableGet[] gets = (SendableGet[]) o;
+ for(int i=0;i<gets.length;i++) {
+ cooldownQueue.removeKey(key,
gets[i].getCooldownWakeupByKey(key));
+ }
+ }
}
/**
@@ -595,14 +611,17 @@
if(o == null) return;
if(o instanceof SendableGet) {
gets = new SendableGet[] { (SendableGet) o };
+ cooldownQueue.removeKey(key,
((SendableGet)o).getCooldownWakeupByKey(key));
} else {
gets = (SendableGet[]) o;
+ for(int i=0;i<gets.length;i++)
+ cooldownQueue.removeKey(key,
gets[i].getCooldownWakeupByKey(key));
}
if(gets == null) return;
Runnable r = new Runnable() {
public void run() {
for(int i=0;i<gets.length;i++) {
- gets[i].onGotKey(key, block);
+ gets[i].onGotKey(key, block,
ClientRequestScheduler.this);
}
}
};
@@ -651,4 +670,29 @@
offeredKeys[i].remove(key);
}
}
+
+ public long queueCooldown(ClientKey key) {
+ return cooldownQueue.add(key.getNodeKey());
+ }
+
+ public void moveKeysFromCooldownQueue() {
+ long now = System.currentTimeMillis();
+ Key key;
+ while((key = cooldownQueue.removeKeyBefore(now)) != null) {
+ Object o;
+ synchronized(pendingKeys) {
+ o = pendingKeys.get(key);
+ }
+ if(o == null) {
+ continue;
+ } else if(o instanceof SendableGet) {
+ SendableGet get = (SendableGet) o;
+ get.requeueAfterCooldown(key);
+ } else {
+ SendableGet[] gets = (SendableGet[]) o;
+ for(int i=0;i<gets.length;i++)
+ gets[i].requeueAfterCooldown(key);
+ }
+ }
+ }
}
Modified: trunk/freenet/src/freenet/client/async/OfferedKeysList.java
===================================================================
--- trunk/freenet/src/freenet/client/async/OfferedKeysList.java 2008-02-15
22:44:14 UTC (rev 17965)
+++ trunk/freenet/src/freenet/client/async/OfferedKeysList.java 2008-02-15
23:59:37 UTC (rev 17966)
@@ -90,7 +90,7 @@
return 0; // All keys have equal chance even if they've been
tried before.
}
- public void internalError(Object keyNum, Throwable t) {
+ public void internalError(Object keyNum, Throwable t, RequestScheduler
sched) {
Logger.error(this, "Internal error: "+t, t);
}
Modified: trunk/freenet/src/freenet/client/async/SimpleSingleFileFetcher.java
===================================================================
--- trunk/freenet/src/freenet/client/async/SimpleSingleFileFetcher.java
2008-02-15 22:44:14 UTC (rev 17965)
+++ trunk/freenet/src/freenet/client/async/SimpleSingleFileFetcher.java
2008-02-15 23:59:37 UTC (rev 17966)
@@ -14,6 +14,7 @@
import freenet.keys.KeyDecodeException;
import freenet.keys.TooBigException;
import freenet.node.LowLevelGetException;
+import freenet.node.RequestScheduler;
import freenet.support.Logger;
import freenet.support.api.Bucket;
@@ -39,51 +40,51 @@
final long token;
// Translate it, then call the real onFailure
- public void onFailure(LowLevelGetException e, Object reqTokenIgnored) {
+ public void onFailure(LowLevelGetException e, Object reqTokenIgnored,
RequestScheduler sched) {
switch(e.code) {
case LowLevelGetException.DATA_NOT_FOUND:
- onFailure(new
FetchException(FetchException.DATA_NOT_FOUND));
+ onFailure(new
FetchException(FetchException.DATA_NOT_FOUND), sched);
return;
case LowLevelGetException.DATA_NOT_FOUND_IN_STORE:
- onFailure(new
FetchException(FetchException.DATA_NOT_FOUND));
+ onFailure(new
FetchException(FetchException.DATA_NOT_FOUND), sched);
return;
case LowLevelGetException.RECENTLY_FAILED:
- onFailure(new
FetchException(FetchException.RECENTLY_FAILED));
+ onFailure(new
FetchException(FetchException.RECENTLY_FAILED), sched);
return;
case LowLevelGetException.DECODE_FAILED:
- onFailure(new
FetchException(FetchException.BLOCK_DECODE_ERROR));
+ onFailure(new
FetchException(FetchException.BLOCK_DECODE_ERROR), sched);
return;
case LowLevelGetException.INTERNAL_ERROR:
- onFailure(new
FetchException(FetchException.INTERNAL_ERROR));
+ onFailure(new
FetchException(FetchException.INTERNAL_ERROR), sched);
return;
case LowLevelGetException.REJECTED_OVERLOAD:
- onFailure(new
FetchException(FetchException.REJECTED_OVERLOAD));
+ onFailure(new
FetchException(FetchException.REJECTED_OVERLOAD), sched);
return;
case LowLevelGetException.ROUTE_NOT_FOUND:
- onFailure(new
FetchException(FetchException.ROUTE_NOT_FOUND));
+ onFailure(new
FetchException(FetchException.ROUTE_NOT_FOUND), sched);
return;
case LowLevelGetException.TRANSFER_FAILED:
- onFailure(new
FetchException(FetchException.TRANSFER_FAILED));
+ onFailure(new
FetchException(FetchException.TRANSFER_FAILED), sched);
return;
case LowLevelGetException.VERIFY_FAILED:
- onFailure(new
FetchException(FetchException.BLOCK_DECODE_ERROR));
+ onFailure(new
FetchException(FetchException.BLOCK_DECODE_ERROR), sched);
return;
case LowLevelGetException.CANCELLED:
- onFailure(new FetchException(FetchException.CANCELLED));
+ onFailure(new FetchException(FetchException.CANCELLED),
sched);
return;
default:
Logger.error(this, "Unknown LowLevelGetException code:
"+e.code);
- onFailure(new
FetchException(FetchException.INTERNAL_ERROR));
+ onFailure(new
FetchException(FetchException.INTERNAL_ERROR), sched);
return;
}
}
- final void onFailure(FetchException e) {
- onFailure(e, false);
+ final void onFailure(FetchException e, RequestScheduler sched) {
+ onFailure(e, false, sched);
}
// Real onFailure
- protected void onFailure(FetchException e, boolean forceFatal) {
+ protected void onFailure(FetchException e, boolean forceFatal,
RequestScheduler sched) {
boolean logMINOR = Logger.shouldLog(Logger.MINOR, this);
if(logMINOR) Logger.minor(this, "onFailure( "+e+" ,
"+forceFatal+")", e);
if(parent.isCancelled() || cancelled) {
@@ -92,7 +93,7 @@
forceFatal = true;
}
if(!(e.isFatal() || forceFatal) ) {
- if(retry()) {
+ if(retry(sched)) {
if(logMINOR) Logger.minor(this, "Retrying");
return;
}
@@ -107,46 +108,46 @@
}
/** Will be overridden by SingleFileFetcher */
- protected void onSuccess(FetchResult data) {
+ protected void onSuccess(FetchResult data, RequestScheduler sched) {
unregister();
if(parent.isCancelled()) {
data.asBucket().free();
- onFailure(new FetchException(FetchException.CANCELLED));
+ onFailure(new FetchException(FetchException.CANCELLED),
sched);
return;
}
rcb.onSuccess(data, this);
}
- public void onSuccess(ClientKeyBlock block, boolean fromStore, Object
reqTokenIgnored) {
+ public void onSuccess(ClientKeyBlock block, boolean fromStore, Object
reqTokenIgnored, RequestScheduler sched) {
if(parent instanceof ClientGetter)
((ClientGetter)parent).addKeyToBinaryBlob(block);
- Bucket data = extract(block);
+ Bucket data = extract(block, sched);
if(data == null) return; // failed
if(!block.isMetadata()) {
- onSuccess(new FetchResult((ClientMetadata)null, data));
+ onSuccess(new FetchResult((ClientMetadata)null, data),
sched);
} else {
- onFailure(new
FetchException(FetchException.INVALID_METADATA, "Metadata where expected
data"));
+ onFailure(new
FetchException(FetchException.INVALID_METADATA, "Metadata where expected
data"), sched);
}
}
/** Convert a ClientKeyBlock to a Bucket. If an error occurs, report it
via onFailure
* and return null.
*/
- protected Bucket extract(ClientKeyBlock block) {
+ protected Bucket extract(ClientKeyBlock block, RequestScheduler sched) {
Bucket data;
try {
data = block.decode(ctx.bucketFactory,
(int)(Math.min(ctx.maxOutputLength, Integer.MAX_VALUE)), false);
} catch (KeyDecodeException e1) {
if(Logger.shouldLog(Logger.MINOR, this))
Logger.minor(this, "Decode failure: "+e1, e1);
- onFailure(new
FetchException(FetchException.BLOCK_DECODE_ERROR, e1.getMessage()));
+ onFailure(new
FetchException(FetchException.BLOCK_DECODE_ERROR, e1.getMessage()), sched);
return null;
} catch (TooBigException e) {
- onFailure(new FetchException(FetchException.TOO_BIG,
e));
+ onFailure(new FetchException(FetchException.TOO_BIG,
e), sched);
return null;
} catch (IOException e) {
Logger.error(this, "Could not capture data - disk
full?: "+e, e);
- onFailure(new
FetchException(FetchException.BUCKET_ERROR, e));
+ onFailure(new
FetchException(FetchException.BUCKET_ERROR, e), sched);
return null;
}
return data;
Modified: trunk/freenet/src/freenet/client/async/SingleFileFetcher.java
===================================================================
--- trunk/freenet/src/freenet/client/async/SingleFileFetcher.java
2008-02-15 22:44:14 UTC (rev 17965)
+++ trunk/freenet/src/freenet/client/async/SingleFileFetcher.java
2008-02-15 23:59:37 UTC (rev 17966)
@@ -26,6 +26,7 @@
import freenet.keys.ClientSSK;
import freenet.keys.FreenetURI;
import freenet.keys.USK;
+import freenet.node.RequestScheduler;
import freenet.support.Logger;
import freenet.support.api.Bucket;
import freenet.support.compress.CompressionOutputSizeException;
@@ -56,6 +57,7 @@
private final Bucket returnBucket;
/** If true, success/failure is immediately reported to the client, and
therefore we can check TOO_MANY_PATH_COMPONENTS. */
private final boolean isFinal;
+ private RequestScheduler sched;
/** Create a new SingleFileFetcher and register self.
* Called when following a redirect, or direct from ClientGet.
@@ -119,7 +121,8 @@
// Process the completed data. May result in us going to a
// splitfile, or another SingleFileFetcher, etc.
- public void onSuccess(ClientKeyBlock block, boolean fromStore, Object
token) {
+ public void onSuccess(ClientKeyBlock block, boolean fromStore, Object
token, RequestScheduler sched) {
+ this.sched = sched;
if(parent instanceof ClientGetter)
((ClientGetter)parent).addKeyToBinaryBlob(block);
parent.completedBlock(fromStore);
@@ -129,7 +132,7 @@
Logger.error(this, "block is null!
fromStore="+fromStore+", token="+token, new Exception("error"));
return;
}
- Bucket data = extract(block);
+ Bucket data = extract(block, sched);
if(data == null) {
if(logMINOR)
Logger.minor(this, "No data");
@@ -139,42 +142,43 @@
if(logMINOR)
Logger.minor(this, "Block "+(block.isMetadata() ? "is
metadata" : "is not metadata")+" on "+this);
if(!block.isMetadata()) {
- onSuccess(new FetchResult(clientMetadata, data));
+ onSuccess(new FetchResult(clientMetadata, data), sched);
} else {
if(!ctx.followRedirects) {
- onFailure(new
FetchException(FetchException.INVALID_METADATA, "Told me not to follow
redirects (splitfile block??)"));
+ onFailure(new
FetchException(FetchException.INVALID_METADATA, "Told me not to follow
redirects (splitfile block??)"), sched);
return;
}
if(parent.isCancelled()) {
- onFailure(new
FetchException(FetchException.CANCELLED));
+ onFailure(new
FetchException(FetchException.CANCELLED), sched);
return;
}
if(data.size() > ctx.maxMetadataSize) {
- onFailure(new
FetchException(FetchException.TOO_BIG_METADATA));
+ onFailure(new
FetchException(FetchException.TOO_BIG_METADATA), sched);
return;
}
// Parse metadata
try {
metadata = Metadata.construct(data);
} catch (MetadataParseException e) {
- onFailure(new FetchException(e));
+ onFailure(new FetchException(e), sched);
return;
} catch (IOException e) {
// Bucket error?
- onFailure(new
FetchException(FetchException.BUCKET_ERROR, e));
+ onFailure(new
FetchException(FetchException.BUCKET_ERROR, e), sched);
return;
}
wrapHandleMetadata(false);
}
}
- protected void onSuccess(FetchResult result) {
+ protected void onSuccess(FetchResult result, RequestScheduler sched) {
+ this.sched = sched;
unregister();
if(parent.isCancelled()) {
if(logMINOR)
Logger.minor(this, "Parent is cancelled");
result.asBucket().free();
- onFailure(new FetchException(FetchException.CANCELLED));
+ onFailure(new FetchException(FetchException.CANCELLED),
sched);
return;
}
if(!decompressors.isEmpty()) {
@@ -185,10 +189,10 @@
long maxLen =
Math.max(ctx.maxTempLength, ctx.maxOutputLength);
data = c.decompress(data,
ctx.bucketFactory, maxLen, maxLen * 4, decompressors.isEmpty() ? returnBucket :
null);
} catch (IOException e) {
- onFailure(new
FetchException(FetchException.BUCKET_ERROR, e));
+ onFailure(new
FetchException(FetchException.BUCKET_ERROR, e), sched);
return;
} catch (CompressionOutputSizeException e) {
- onFailure(new
FetchException(FetchException.TOO_BIG, e.estimatedSize, (rcb == parent),
result.getMimeType()));
+ onFailure(new
FetchException(FetchException.TOO_BIG, e.estimatedSize, (rcb == parent),
result.getMimeType()), sched);
return;
}
}
@@ -284,17 +288,17 @@
metadata =
Metadata.construct(data);
} catch
(MetadataParseException e) {
// Invalid
metadata
- onFailure(new
FetchException(FetchException.INVALID_METADATA, e));
+ onFailure(new
FetchException(FetchException.INVALID_METADATA, e), sched);
return;
} catch (IOException e)
{
// Bucket error?
- onFailure(new
FetchException(FetchException.BUCKET_ERROR, e));
+ onFailure(new
FetchException(FetchException.BUCKET_ERROR, e), sched);
return;
}
wrapHandleMetadata(true);
}
public void notInArchive() {
- onFailure(new
FetchException(FetchException.INTERNAL_ERROR, "No metadata in container! Cannot
happen as ArchiveManager should synthesise some!"));
+ onFailure(new
FetchException(FetchException.INTERNAL_ERROR, "No metadata in container! Cannot
happen as ArchiveManager should synthesise some!"), sched);
}
}); // will result in this function
being called again
return;
@@ -333,7 +337,7 @@
// Return the data
ctx.executor.execute(new Runnable() {
public void run() {
- onSuccess(new
FetchResult(clientMetadata, out));
+ onSuccess(new
FetchResult(clientMetadata, out), sched);
}
}, "SingleFileFetcher onSuccess
callback for "+this);
@@ -357,14 +361,14 @@
out =
data;
}
} catch (IOException e)
{
- onFailure(new
FetchException(FetchException.BUCKET_ERROR));
+ onFailure(new
FetchException(FetchException.BUCKET_ERROR), sched);
return;
}
// Return the data
- onSuccess(new
FetchResult(clientMetadata, out));
+ onSuccess(new
FetchResult(clientMetadata, out), sched);
}
public void notInArchive() {
- onFailure(new
FetchException(FetchException.NOT_IN_ARCHIVE));
+ onFailure(new
FetchException(FetchException.NOT_IN_ARCHIVE), sched);
}
});
// Will call back into this function
when it has been fetched.
@@ -565,15 +569,15 @@
try {
handleMetadata();
} catch (MetadataParseException e) {
- onFailure(new FetchException(e));
+ onFailure(new FetchException(e), sched);
} catch (FetchException e) {
if(notFinalizedSize)
e.setNotFinalizedSize();
- onFailure(e);
+ onFailure(e, sched);
} catch (ArchiveFailureException e) {
- onFailure(new FetchException(e));
+ onFailure(new FetchException(e), sched);
} catch (ArchiveRestartException e) {
- onFailure(new FetchException(e));
+ onFailure(new FetchException(e), sched);
}
}
@@ -593,10 +597,10 @@
try {
ah.extractToCache(result.asBucket(), actx,
element, callback);
} catch (ArchiveFailureException e) {
- SingleFileFetcher.this.onFailure(new
FetchException(e));
+ SingleFileFetcher.this.onFailure(new
FetchException(e), sched);
return;
} catch (ArchiveRestartException e) {
- SingleFileFetcher.this.onFailure(new
FetchException(e));
+ SingleFileFetcher.this.onFailure(new
FetchException(e), sched);
return;
}
if(callback != null) return;
@@ -605,7 +609,7 @@
public void onFailure(FetchException e, ClientGetState state) {
// Force fatal as the fetcher is presumed to have made
a reasonable effort.
- SingleFileFetcher.this.onFailure(e, true);
+ SingleFileFetcher.this.onFailure(e, true, sched);
}
public void onBlockSetFinished(ClientGetState state) {
@@ -638,11 +642,11 @@
try {
metadata =
Metadata.construct(result.asBucket());
} catch (MetadataParseException e) {
- SingleFileFetcher.this.onFailure(new
FetchException(FetchException.INVALID_METADATA, e));
+ SingleFileFetcher.this.onFailure(new
FetchException(FetchException.INVALID_METADATA, e), sched);
return;
} catch (IOException e) {
// Bucket error?
- SingleFileFetcher.this.onFailure(new
FetchException(FetchException.BUCKET_ERROR, e));
+ SingleFileFetcher.this.onFailure(new
FetchException(FetchException.BUCKET_ERROR, e), sched);
return;
}
wrapHandleMetadata(true);
@@ -650,7 +654,7 @@
public void onFailure(FetchException e, ClientGetState state) {
// Pass it on; fetcher is assumed to have retried as
appropriate already, so this is fatal.
- SingleFileFetcher.this.onFailure(e, true);
+ SingleFileFetcher.this.onFailure(e, true, sched);
}
public void onBlockSetFinished(ClientGetState state) {
Modified: trunk/freenet/src/freenet/client/async/SplitFileFetcherSegment.java
===================================================================
--- trunk/freenet/src/freenet/client/async/SplitFileFetcherSegment.java
2008-02-15 22:44:14 UTC (rev 17965)
+++ trunk/freenet/src/freenet/client/async/SplitFileFetcherSegment.java
2008-02-15 23:59:37 UTC (rev 17966)
@@ -22,7 +22,9 @@
import freenet.keys.ClientCHK;
import freenet.keys.ClientCHKBlock;
import freenet.keys.ClientKeyBlock;
+import freenet.keys.Key;
import freenet.keys.NodeCHK;
+import freenet.node.RequestScheduler;
import freenet.support.Logger;
import freenet.support.api.Bucket;
import freenet.support.io.BucketTools;
@@ -39,6 +41,8 @@
final ClientCHK[] checkKeys;
final MinimalSplitfileBlock[] dataBuckets;
final MinimalSplitfileBlock[] checkBuckets;
+ final long[] dataCooldownTimes;
+ final long[] checkCooldownTimes;
final int[] dataRetries;
final int[] checkRetries;
final Vector subSegments;
@@ -89,6 +93,8 @@
checkBuckets[i] = new
MinimalSplitfileBlock(i+dataBuckets.length);
dataRetries = new int[dataKeys.length];
checkRetries = new int[checkKeys.length];
+ dataCooldownTimes = new long[dataKeys.length];
+ checkCooldownTimes = new long[checkKeys.length];
subSegments = new Vector();
this.fetchContext = fetchContext;
maxBlockLength = maxTempLength;
@@ -362,7 +368,7 @@
}
/** A request has failed non-fatally, so the block may be retried */
- public void onNonFatalFailure(FetchException e, int blockNo,
SplitFileFetcherSubSegment seg) {
+ public void onNonFatalFailure(FetchException e, int blockNo,
SplitFileFetcherSubSegment seg, RequestScheduler sched) {
int tries;
int maxTries = blockFetchContext.maxNonSplitfileRetries;
boolean failed = false;
@@ -370,10 +376,18 @@
if(isFinished()) return;
if(blockNo < dataKeys.length) {
tries = ++dataRetries[blockNo];
- if(tries > maxTries && maxTries >= 0) failed =
true;
+ if(tries > maxTries && maxTries >= 0) failed =
true;
+ else if(tries %
ClientRequestScheduler.COOLDOWN_RETRIES == 0) {
+ dataCooldownTimes[blockNo] =
sched.queueCooldown(dataKeys[blockNo]);
+ return; // Don't add to sub-segment yet.
+ }
} else {
tries = ++checkRetries[blockNo-dataKeys.length];
if(tries > maxTries && maxTries >= 0) failed =
true;
+ else if(tries %
ClientRequestScheduler.COOLDOWN_RETRIES == 0) {
+ checkCooldownTimes[blockNo] =
sched.queueCooldown(checkKeys[blockNo]);
+ return; // Don't add to sub-segment yet.
+ }
}
}
if(failed) {
@@ -492,4 +506,58 @@
}
subSegments.clear();
}
+
+ public long getCooldownWakeup(int blockNum) {
+ if(blockNum < dataKeys.length)
+ return dataCooldownTimes[blockNum];
+ else
+ return checkCooldownTimes[blockNum - dataKeys.length];
+ }
+
+ public void requeueAfterCooldown(Key key) {
+ boolean notFound = true;
+ int maxTries = blockFetchContext.maxNonSplitfileRetries;
+ // FIXME synchronization
+ for(int i=0;i<dataKeys.length;i++) {
+ if(dataKeys[i] == null) continue;
+ if(dataKeys[i].getNodeKey().equals(key)) {
+ int tries = dataRetries[i];
+ SplitFileFetcherSubSegment sub =
getSubSegment(tries);
+ if(logMINOR)
+ Logger.minor(this, "Retrying after
cooldown: data block "+i+" on "+this+" : tries="+tries+"/"+maxTries+" : "+sub);
+ sub.add(i, false);
+ notFound = false;
+ }
+ }
+ for(int i=0;i<checkKeys.length;i++) {
+ if(checkKeys[i] == null) continue;
+ if(checkKeys[i].getNodeKey().equals(key)) {
+ int tries = checkRetries[i];
+ SplitFileFetcherSubSegment sub =
getSubSegment(tries);
+ if(logMINOR)
+ Logger.minor(this, "Retrying after
cooldown: check block "+i+" on "+this+" : tries="+tries+"/"+maxTries+" : "+sub);
+ sub.add(i, false);
+ notFound = false;
+ }
+ }
+ if(notFound) {
+ Logger.error(this, "requeueAfterCooldown: Key not
found!: "+key+" on "+this);
+ }
+ }
+
+ public long getCooldownWakeupByKey(Key key) {
+ for(int i=0;i<dataKeys.length;i++) {
+ if(dataKeys[i] == null) continue;
+ if(dataKeys[i].getNodeKey().equals(key)) {
+ return dataCooldownTimes[i];
+ }
+ }
+ for(int i=0;i<checkKeys.length;i++) {
+ if(checkKeys[i] == null) continue;
+ if(checkKeys[i].getNodeKey().equals(key)) {
+ return checkCooldownTimes[i];
+ }
+ }
+ return -1;
+ }
}
Modified: trunk/freenet/src/freenet/client/async/SplitFileFetcherSubSegment.java
===================================================================
--- trunk/freenet/src/freenet/client/async/SplitFileFetcherSubSegment.java
2008-02-15 22:44:14 UTC (rev 17965)
+++ trunk/freenet/src/freenet/client/async/SplitFileFetcherSubSegment.java
2008-02-15 23:59:37 UTC (rev 17966)
@@ -13,6 +13,7 @@
import freenet.keys.KeyVerifyException;
import freenet.keys.TooBigException;
import freenet.node.LowLevelGetException;
+import freenet.node.RequestScheduler;
import freenet.node.SendableGet;
import freenet.support.Logger;
import freenet.support.api.Bucket;
@@ -97,49 +98,49 @@
// Translate it, then call the real onFailure
// FIXME refactor this out to a common method; see
SimpleSingleFileFetcher
- public void onFailure(LowLevelGetException e, Object token) {
+ public void onFailure(LowLevelGetException e, Object token,
RequestScheduler sched) {
if(logMINOR)
Logger.minor(this, "onFailure("+e+" , "+token);
switch(e.code) {
case LowLevelGetException.DATA_NOT_FOUND:
- onFailure(new
FetchException(FetchException.DATA_NOT_FOUND), token);
+ onFailure(new
FetchException(FetchException.DATA_NOT_FOUND), token, sched);
return;
case LowLevelGetException.DATA_NOT_FOUND_IN_STORE:
- onFailure(new
FetchException(FetchException.DATA_NOT_FOUND), token);
+ onFailure(new
FetchException(FetchException.DATA_NOT_FOUND), token, sched);
return;
case LowLevelGetException.RECENTLY_FAILED:
- onFailure(new
FetchException(FetchException.RECENTLY_FAILED), token);
+ onFailure(new
FetchException(FetchException.RECENTLY_FAILED), token, sched);
return;
case LowLevelGetException.DECODE_FAILED:
- onFailure(new
FetchException(FetchException.BLOCK_DECODE_ERROR), token);
+ onFailure(new
FetchException(FetchException.BLOCK_DECODE_ERROR), token, sched);
return;
case LowLevelGetException.INTERNAL_ERROR:
- onFailure(new
FetchException(FetchException.INTERNAL_ERROR), token);
+ onFailure(new
FetchException(FetchException.INTERNAL_ERROR), token, sched);
return;
case LowLevelGetException.REJECTED_OVERLOAD:
- onFailure(new
FetchException(FetchException.REJECTED_OVERLOAD), token);
+ onFailure(new
FetchException(FetchException.REJECTED_OVERLOAD), token, sched);
return;
case LowLevelGetException.ROUTE_NOT_FOUND:
- onFailure(new
FetchException(FetchException.ROUTE_NOT_FOUND), token);
+ onFailure(new
FetchException(FetchException.ROUTE_NOT_FOUND), token, sched);
return;
case LowLevelGetException.TRANSFER_FAILED:
- onFailure(new
FetchException(FetchException.TRANSFER_FAILED), token);
+ onFailure(new
FetchException(FetchException.TRANSFER_FAILED), token, sched);
return;
case LowLevelGetException.VERIFY_FAILED:
- onFailure(new
FetchException(FetchException.BLOCK_DECODE_ERROR), token);
+ onFailure(new
FetchException(FetchException.BLOCK_DECODE_ERROR), token, sched);
return;
case LowLevelGetException.CANCELLED:
- onFailure(new FetchException(FetchException.CANCELLED),
token);
+ onFailure(new FetchException(FetchException.CANCELLED),
token, sched);
return;
default:
Logger.error(this, "Unknown LowLevelGetException code:
"+e.code);
- onFailure(new
FetchException(FetchException.INTERNAL_ERROR), token);
+ onFailure(new
FetchException(FetchException.INTERNAL_ERROR), token, sched);
return;
}
}
// Real onFailure
- protected void onFailure(FetchException e, Object token) {
+ protected void onFailure(FetchException e, Object token,
RequestScheduler sched) {
boolean forceFatal = false;
if(parent.isCancelled()) {
if(Logger.shouldLog(Logger.MINOR, this))
@@ -151,12 +152,12 @@
if(e.isFatal() || forceFatal) {
segment.onFatalFailure(e, ((Integer)token).intValue(),
this);
} else {
- segment.onNonFatalFailure(e,
((Integer)token).intValue(), this);
+ segment.onNonFatalFailure(e,
((Integer)token).intValue(), this, sched);
}
}
- public void onSuccess(ClientKeyBlock block, boolean fromStore, Object
token) {
- Bucket data = extract(block, token);
+ public void onSuccess(ClientKeyBlock block, boolean fromStore, Object
token, RequestScheduler sched) {
+ Bucket data = extract(block, token, sched);
if(fromStore) {
// Normally when this method is called the block number
has already
// been removed. However if fromStore=true, it won't
have been, so
@@ -172,16 +173,16 @@
}
}
if(!block.isMetadata()) {
- onSuccess(data, fromStore, (Integer)token,
((Integer)token).intValue(), block);
+ onSuccess(data, fromStore, (Integer)token,
((Integer)token).intValue(), block, sched);
} else {
- onFailure(new
FetchException(FetchException.INVALID_METADATA, "Metadata where expected
data"), token);
+ onFailure(new
FetchException(FetchException.INVALID_METADATA, "Metadata where expected
data"), token, sched);
}
}
- protected void onSuccess(Bucket data, boolean fromStore, Integer token,
int blockNo, ClientKeyBlock block) {
+ protected void onSuccess(Bucket data, boolean fromStore, Integer token,
int blockNo, ClientKeyBlock block, RequestScheduler sched) {
if(parent.isCancelled()) {
data.free();
- onFailure(new FetchException(FetchException.CANCELLED),
token);
+ onFailure(new FetchException(FetchException.CANCELLED),
token, sched);
return;
}
segment.onSuccess(data, blockNo, fromStore, this, block);
@@ -190,21 +191,21 @@
/** Convert a ClientKeyBlock to a Bucket. If an error occurs, report it
via onFailure
* and return null.
*/
- protected Bucket extract(ClientKeyBlock block, Object token) {
+ protected Bucket extract(ClientKeyBlock block, Object token,
RequestScheduler sched) {
Bucket data;
try {
data = block.decode(ctx.bucketFactory,
(int)(Math.min(ctx.maxOutputLength, Integer.MAX_VALUE)), false);
} catch (KeyDecodeException e1) {
if(Logger.shouldLog(Logger.MINOR, this))
Logger.minor(this, "Decode failure: "+e1, e1);
- onFailure(new
FetchException(FetchException.BLOCK_DECODE_ERROR, e1.getMessage()), token);
+ onFailure(new
FetchException(FetchException.BLOCK_DECODE_ERROR, e1.getMessage()), token,
sched);
return null;
} catch (TooBigException e) {
- onFailure(new FetchException(FetchException.TOO_BIG,
e.getMessage()), token);
+ onFailure(new FetchException(FetchException.TOO_BIG,
e.getMessage()), token, sched);
return null;
} catch (IOException e) {
Logger.error(this, "Could not capture data - disk
full?: "+e, e);
- onFailure(new
FetchException(FetchException.BUCKET_ERROR, e), token);
+ onFailure(new
FetchException(FetchException.BUCKET_ERROR, e), token, sched);
return null;
}
if(Logger.shouldLog(Logger.MINOR, this))
@@ -295,7 +296,7 @@
unregister();
}
- public void onGotKey(Key key, KeyBlock block) {
+ public void onGotKey(Key key, KeyBlock block, RequestScheduler sched) {
int blockNum = -1;
Object token = null;
ClientKey ckey = null;
@@ -315,7 +316,7 @@
}
if(blockNum == -1) return;
try {
- onSuccess(Key.createKeyBlock(ckey, block), false,
token);
+ onSuccess(Key.createKeyBlock(ckey, block), false,
token, sched);
} catch (KeyVerifyException e) {
// FIXME if we ever abolish the direct route, this must
be turned into an onFailure().
Logger.error(this, "Failed to parse in
onGotKey("+key+","+block+") - believed to be "+ckey+" (block #"+blockNum+")");
@@ -334,4 +335,16 @@
segment.removeSeg(this);
}
+ public long getCooldownWakeup(Object token) {
+ return segment.getCooldownWakeup(((Integer)token).intValue());
+ }
+
+ public void requeueAfterCooldown(Key key) {
+ segment.requeueAfterCooldown(key);
+ }
+
+ public long getCooldownWakeupByKey(Key key) {
+ return segment.getCooldownWakeupByKey(key);
+ }
+
}
Modified: trunk/freenet/src/freenet/client/async/USKChecker.java
===================================================================
--- trunk/freenet/src/freenet/client/async/USKChecker.java 2008-02-15
22:44:14 UTC (rev 17965)
+++ trunk/freenet/src/freenet/client/async/USKChecker.java 2008-02-15
23:59:37 UTC (rev 17966)
@@ -8,6 +8,7 @@
import freenet.keys.ClientKeyBlock;
import freenet.keys.ClientSSKBlock;
import freenet.node.LowLevelGetException;
+import freenet.node.RequestScheduler;
import freenet.support.Logger;
/**
@@ -25,12 +26,12 @@
this.cb = cb;
}
- public void onSuccess(ClientKeyBlock block, boolean fromStore, Object
token) {
+ public void onSuccess(ClientKeyBlock block, boolean fromStore, Object
token, RequestScheduler sched) {
unregister();
cb.onSuccess((ClientSSKBlock)block);
}
- public void onFailure(LowLevelGetException e, Object token) {
+ public void onFailure(LowLevelGetException e, Object token,
RequestScheduler sched) {
if(Logger.shouldLog(Logger.MINOR, this))
Logger.minor(this, "onFailure: "+e+" for "+this);
// Firstly, can we retry?
@@ -60,7 +61,7 @@
canRetry = true;
}
- if(canRetry && retry()) return;
+ if(canRetry && retry(sched)) return;
// Ran out of retries.
unregister();
Modified: trunk/freenet/src/freenet/node/RequestScheduler.java
===================================================================
--- trunk/freenet/src/freenet/node/RequestScheduler.java 2008-02-15
22:44:14 UTC (rev 17965)
+++ trunk/freenet/src/freenet/node/RequestScheduler.java 2008-02-15
23:59:37 UTC (rev 17966)
@@ -3,6 +3,7 @@
* http://www.gnu.org/ for further details of the GPL. */
package freenet.node;
+import freenet.keys.ClientKey;
import freenet.support.RandomGrabArray;
public interface RequestScheduler {
@@ -15,5 +16,21 @@
* another may also work.
* */
public void succeeded(RandomGrabArray parentGrabArray);
+
+ /**
+ * After a key has been requested a few times, it is added to the
cooldown queue for
+ * a fixed period, since it would be pointless to rerequest it
(especially given ULPRs).
+ * Note that while a key is on the cooldown queue its requestors will
still be told if
+ * it is found by ULPRs or back door coalescing.
+ * @param key The key to be added.
+ * @return The time at which the key will leave the cooldown queue.
+ */
+ public long queueCooldown(ClientKey key);
+
+ /**
+ * Remove keys from the cooldown queue who have now served their time
and can be requested
+ * again.
+ */
+ public void moveKeysFromCooldownQueue();
}
Modified: trunk/freenet/src/freenet/node/RequestStarter.java
===================================================================
--- trunk/freenet/src/freenet/node/RequestStarter.java 2008-02-15 22:44:14 UTC
(rev 17965)
+++ trunk/freenet/src/freenet/node/RequestStarter.java 2008-02-15 23:59:37 UTC
(rev 17966)
@@ -92,6 +92,7 @@
// The last time at which we sent a request or decided not to
long cycleTime = sentRequestTime;
while(true) {
+ sched.moveKeysFromCooldownQueue();
boolean logMINOR = Logger.shouldLog(Logger.MINOR, this);
if(req == null) req = sched.removeFirst();
if(req != null) {
@@ -173,7 +174,7 @@
} catch (Throwable t) {
if(keyNum != null) {
// Re-queue
- req.internalError(keyNum, t);
+ req.internalError(keyNum, t, sched);
Logger.error(this, "Caught "+t+" while
trying to start request");
return true; // Sort of ... maybe it
will clear
}
Modified: trunk/freenet/src/freenet/node/SendableGet.java
===================================================================
--- trunk/freenet/src/freenet/node/SendableGet.java 2008-02-15 22:44:14 UTC
(rev 17965)
+++ trunk/freenet/src/freenet/node/SendableGet.java 2008-02-15 23:59:37 UTC
(rev 17966)
@@ -30,10 +30,10 @@
public abstract FetchContext getContext();
/** Called when/if the low-level request succeeds. */
- public abstract void onSuccess(ClientKeyBlock block, boolean fromStore,
Object token);
+ public abstract void onSuccess(ClientKeyBlock block, boolean fromStore,
Object token, RequestScheduler sched);
/** Called when/if the low-level request fails. */
- public abstract void onFailure(LowLevelGetException e, Object token);
+ public abstract void onFailure(LowLevelGetException e, Object token,
RequestScheduler sched);
/** Should the request ignore the datastore? */
public abstract boolean ignoreStore();
@@ -59,7 +59,7 @@
synchronized (this) {
if(isCancelled()) {
if(logMINOR) Logger.minor(this,
"Cancelled: "+this);
- onFailure(new
LowLevelGetException(LowLevelGetException.CANCELLED), null);
+ onFailure(new
LowLevelGetException(LowLevelGetException.CANCELLED), null, sched);
return false;
}
}
@@ -74,18 +74,18 @@
try {
block = core.realGetKey(key,
ctx.localRequestOnly, ctx.cacheLocalRequests, ctx.ignoreStore);
} catch (LowLevelGetException e) {
- onFailure(e, keyNum);
+ onFailure(e, keyNum, sched);
return true;
} catch (Throwable t) {
Logger.error(this, "Caught "+t, t);
- onFailure(new
LowLevelGetException(LowLevelGetException.INTERNAL_ERROR), keyNum);
+ onFailure(new
LowLevelGetException(LowLevelGetException.INTERNAL_ERROR), keyNum, sched);
return true;
}
- onSuccess(block, false, keyNum);
+ onSuccess(block, false, keyNum, sched);
sched.succeeded(this.getParentGrabArray());
} catch (Throwable t) {
Logger.error(this, "Caught "+t, t);
- onFailure(new
LowLevelGetException(LowLevelGetException.INTERNAL_ERROR), keyNum);
+ onFailure(new
LowLevelGetException(LowLevelGetException.INTERNAL_ERROR), keyNum, sched);
return true;
}
return true;
@@ -104,8 +104,18 @@
return parent.chkScheduler;
}
- public abstract void onGotKey(Key key, KeyBlock block);
+ public abstract void onGotKey(Key key, KeyBlock block, RequestScheduler
sched);
+ /**
+ * Get the time at which the key specified by the given token will wake
up from the
+ * cooldown queue.
+ * @param token
+ * @return
+ */
+ public abstract long getCooldownWakeup(Object token);
+
+ public abstract long getCooldownWakeupByKey(Key key);
+
public final void unregister() {
getScheduler().removePendingKeys(this, false);
super.unregister();
@@ -115,8 +125,14 @@
getScheduler().removePendingKey(this, false, key);
}
- public void internalError(Object keyNum, Throwable t) {
- onFailure(new
LowLevelGetException(LowLevelGetException.INTERNAL_ERROR, t.getMessage(), t),
keyNum);
+ public void internalError(Object keyNum, Throwable t, RequestScheduler
sched) {
+ onFailure(new
LowLevelGetException(LowLevelGetException.INTERNAL_ERROR, t.getMessage(), t),
keyNum, sched);
}
-
+
+ /**
+ * Requeue a key after it has been on the cooldown queue for a while.
+ * @param key
+ */
+ public abstract void requeueAfterCooldown(Key key);
+
}
Modified: trunk/freenet/src/freenet/node/SendableInsert.java
===================================================================
--- trunk/freenet/src/freenet/node/SendableInsert.java 2008-02-15 22:44:14 UTC
(rev 17965)
+++ trunk/freenet/src/freenet/node/SendableInsert.java 2008-02-15 23:59:37 UTC
(rev 17966)
@@ -17,7 +17,7 @@
/** Called when we don't! */
public abstract void onFailure(LowLevelPutException e, Object keyNum);
- public void internalError(Object keyNum, Throwable t) {
+ public void internalError(Object keyNum, Throwable t, RequestScheduler
sched) {
onFailure(new
LowLevelPutException(LowLevelPutException.INTERNAL_ERROR, t.getMessage(), t),
keyNum);
}
Modified: trunk/freenet/src/freenet/node/SendableRequest.java
===================================================================
--- trunk/freenet/src/freenet/node/SendableRequest.java 2008-02-15 22:44:14 UTC
(rev 17965)
+++ trunk/freenet/src/freenet/node/SendableRequest.java 2008-02-15 23:59:37 UTC
(rev 17966)
@@ -67,6 +67,6 @@
}
/** Requeue after an internal error */
- public abstract void internalError(Object keyNum, Throwable t);
+ public abstract void internalError(Object keyNum, Throwable t,
RequestScheduler sched);
}