Author: toad
Date: 2006-08-31 20:23:20 +0000 (Thu, 31 Aug 2006)
New Revision: 10310
Added:
trunk/freenet/src/freenet/client/async/BaseClientGetter.java
Removed:
trunk/freenet/src/freenet/node/ARKFetchManager.java
trunk/freenet/src/freenet/node/ARKFetcher.java
Modified:
trunk/freenet/src/freenet/client/async/BaseSingleFileFetcher.java
trunk/freenet/src/freenet/client/async/ClientGetter.java
trunk/freenet/src/freenet/client/async/GetCompletionCallback.java
trunk/freenet/src/freenet/client/async/SingleFileFetcher.java
trunk/freenet/src/freenet/client/async/SplitFileFetcher.java
trunk/freenet/src/freenet/client/async/SplitFileFetcherSegment.java
trunk/freenet/src/freenet/client/async/USKChecker.java
trunk/freenet/src/freenet/client/async/USKFetcher.java
trunk/freenet/src/freenet/client/async/USKFetcherWrapper.java
trunk/freenet/src/freenet/client/async/USKManager.java
trunk/freenet/src/freenet/client/async/USKProxyCompletionCallback.java
trunk/freenet/src/freenet/client/async/USKRetriever.java
trunk/freenet/src/freenet/keys/USK.java
trunk/freenet/src/freenet/node/Node.java
trunk/freenet/src/freenet/node/PacketSender.java
trunk/freenet/src/freenet/node/PeerManager.java
trunk/freenet/src/freenet/node/PeerNode.java
Log:
Rewrite ARK fetching code; greatly simplify it.
No more ARKFetchManager, because we don't need it; USK fetches have built-in
backoff and are run at fairly low priority anyway.
Should solve ARKs not being fetched after startup.
Also minor supporting changes to client/async/.
Added: trunk/freenet/src/freenet/client/async/BaseClientGetter.java
===================================================================
--- trunk/freenet/src/freenet/client/async/BaseClientGetter.java
2006-08-31 19:46:46 UTC (rev 10309)
+++ trunk/freenet/src/freenet/client/async/BaseClientGetter.java
2006-08-31 20:23:20 UTC (rev 10310)
@@ -0,0 +1,11 @@
+package freenet.client.async;
+
+public abstract class BaseClientGetter extends ClientRequester implements
+ GetCompletionCallback {
+
+ protected BaseClientGetter(short priorityClass, ClientRequestScheduler
chkScheduler, ClientRequestScheduler sskScheduler, Object client) {
+ super(priorityClass, chkScheduler, sskScheduler, client);
+ }
+
+
+}
Modified: trunk/freenet/src/freenet/client/async/BaseSingleFileFetcher.java
===================================================================
--- trunk/freenet/src/freenet/client/async/BaseSingleFileFetcher.java
2006-08-31 19:46:46 UTC (rev 10309)
+++ trunk/freenet/src/freenet/client/async/BaseSingleFileFetcher.java
2006-08-31 20:23:20 UTC (rev 10310)
@@ -17,9 +17,9 @@
final int maxRetries;
private int retryCount;
final FetcherContext ctx;
- final ClientRequester parent;
+ final BaseClientGetter parent;
- BaseSingleFileFetcher(ClientKey key, int maxRetries, FetcherContext
ctx, ClientRequester parent) {
+ BaseSingleFileFetcher(ClientKey key, int maxRetries, FetcherContext
ctx, BaseClientGetter parent) {
retryCount = 0;
this.maxRetries = maxRetries;
this.key = key;
Modified: trunk/freenet/src/freenet/client/async/ClientGetter.java
===================================================================
--- trunk/freenet/src/freenet/client/async/ClientGetter.java 2006-08-31
19:46:46 UTC (rev 10309)
+++ trunk/freenet/src/freenet/client/async/ClientGetter.java 2006-08-31
20:23:20 UTC (rev 10310)
@@ -17,13 +17,13 @@
/**
* A high level data request.
*/
-public class ClientGetter extends ClientRequester implements
GetCompletionCallback {
+public class ClientGetter extends BaseClientGetter {
final ClientCallback client;
final FreenetURI uri;
final FetcherContext ctx;
final ArchiveContext actx;
- ClientGetState currentState;
+ private ClientGetState currentState;
private boolean finished;
private int archiveRestarts;
/** If not null, Bucket to return the data in */
@@ -156,4 +156,16 @@
blockSetFinalized();
}
+ public void onTransition(ClientGetState oldState, ClientGetState
newState) {
+ synchronized(this) {
+ if(currentState == oldState) {
+ currentState = newState;
+ Logger.minor(this, "Transition: "+oldState+" ->
"+newState);
+ } else
+ Logger.minor(this, "Ignoring transition:
"+oldState+" -> "+newState);
+ }
+ // TODO Auto-generated method stub
+
+ }
+
}
Modified: trunk/freenet/src/freenet/client/async/GetCompletionCallback.java
===================================================================
--- trunk/freenet/src/freenet/client/async/GetCompletionCallback.java
2006-08-31 19:46:46 UTC (rev 10309)
+++ trunk/freenet/src/freenet/client/async/GetCompletionCallback.java
2006-08-31 20:23:20 UTC (rev 10310)
@@ -18,4 +18,6 @@
*/
public void onBlockSetFinished(ClientGetState state);
+ public void onTransition(ClientGetState oldState, ClientGetState
newState);
+
}
Modified: trunk/freenet/src/freenet/client/async/SingleFileFetcher.java
===================================================================
--- trunk/freenet/src/freenet/client/async/SingleFileFetcher.java
2006-08-31 19:46:46 UTC (rev 10309)
+++ trunk/freenet/src/freenet/client/async/SingleFileFetcher.java
2006-08-31 20:23:20 UTC (rev 10310)
@@ -54,7 +54,7 @@
* @param token
* @param dontTellClientGet
*/
- public SingleFileFetcher(ClientRequester get, GetCompletionCallback cb,
ClientMetadata metadata,
+ public SingleFileFetcher(BaseClientGetter get, GetCompletionCallback
cb, ClientMetadata metadata,
ClientKey key, LinkedList metaStrings, FetcherContext
ctx,
ArchiveContext actx, int maxRetries, int
recursionLevel,
boolean dontTellClientGet, Object token, boolean
isEssential,
@@ -316,13 +316,14 @@
}
// **FIXME** Is key in the call to
SingleFileFetcher here supposed to be this.key or the same key used in the try
block above? MultiLevelMetadataCallback.onSuccess() below uses this.key, thus
the question
- SingleFileFetcher f = new
SingleFileFetcher((ClientGetter)parent, rcb, clientMetadata, key, metaStrings,
ctx, actx, maxRetries, recursionLevel, false, null, true, returnBucket);
+ SingleFileFetcher f = new
SingleFileFetcher(parent, rcb, clientMetadata, key, metaStrings, ctx, actx,
maxRetries, recursionLevel, false, token, true, returnBucket);
if((key instanceof ClientCHK) &&
!((ClientCHK)key).isMetadata())
rcb.onBlockSetFinished(this);
if(metadata.isCompressed()) {
Compressor codec =
Compressor.getCompressionAlgorithmByMetadataID(metadata.getCompressionCodec());
f.addDecompressor(codec);
}
+ parent.onTransition(this, f);
f.schedule();
// All done! No longer our problem!
return;
@@ -352,8 +353,9 @@
return;
}
- SplitFileFetcher sf = new
SplitFileFetcher(metadata, rcb, (ClientGetter)parent, ctx,
+ SplitFileFetcher sf = new
SplitFileFetcher(metadata, rcb, parent, ctx,
decompressors, clientMetadata,
actx, recursionLevel, returnBucket, false, token);
+ parent.onTransition(this, sf);
sf.schedule();
rcb.onBlockSetFinished(this);
// SplitFile will now run.
@@ -397,7 +399,6 @@
}
public void onSuccess(FetchResult result, ClientGetState state)
{
- ((ClientGetter)parent).currentState =
SingleFileFetcher.this;
try {
ctx.archiveManager.extractToCache(thisKey,
ah.getArchiveType(), result.asBucket(), actx, ah);
} catch (ArchiveFailureException e) {
@@ -428,16 +429,19 @@
rcb.onBlockSetFinished(SingleFileFetcher.this);
}
}
+
+ public void onTransition(ClientGetState oldState,
ClientGetState newState) {
+ // Ignore
+ }
}
class MultiLevelMetadataCallback implements GetCompletionCallback {
public void onSuccess(FetchResult result, ClientGetState state)
{
- ((ClientGetter)parent).currentState =
SingleFileFetcher.this;
try {
metadata =
Metadata.construct(result.asBucket());
- SingleFileFetcher f = new
SingleFileFetcher((ClientGetter)parent, rcb, clientMetadata, key, metaStrings,
ctx, actx, maxRetries, recursionLevel, dontTellClientGet, null, true,
returnBucket);
+ SingleFileFetcher f = new
SingleFileFetcher(parent, rcb, clientMetadata, key, metaStrings, ctx, actx,
maxRetries, recursionLevel, dontTellClientGet, token, true, returnBucket);
f.metadata = metadata;
f.handleMetadata();
} catch (MetadataParseException e) {
@@ -464,6 +468,10 @@
public void onBlockSetFinished(ClientGetState state) {
// Ignore as we are fetching metadata here
}
+
+ public void onTransition(ClientGetState oldState,
ClientGetState newState) {
+ // Ignore
+ }
}
@@ -527,8 +535,6 @@
}
public void schedule() {
- if(!dontTellClientGet)
- ((ClientGetter)parent).currentState = this;
super.schedule();
}
@@ -540,11 +546,7 @@
return ctx.ignoreStore;
}
- public ClientGetter getParent() {
- return (ClientGetter) parent;
- }
-
- public static ClientGetState create(ClientRequester parent,
GetCompletionCallback cb, ClientMetadata clientMetadata, FreenetURI uri,
FetcherContext ctx, ArchiveContext actx, int maxRetries, int recursionLevel,
boolean dontTellClientGet, Object token, boolean isEssential, Bucket
returnBucket) throws MalformedURLException, FetchException {
+ public static ClientGetState create(BaseClientGetter parent,
GetCompletionCallback cb, ClientMetadata clientMetadata, FreenetURI uri,
FetcherContext ctx, ArchiveContext actx, int maxRetries, int recursionLevel,
boolean dontTellClientGet, Object token, boolean isEssential, Bucket
returnBucket) throws MalformedURLException, FetchException {
BaseClientKey key = BaseClientKey.getBaseKey(uri);
if(key instanceof ClientKey)
return new SingleFileFetcher(parent, cb,
clientMetadata, (ClientKey)key, uri.listMetaStrings(), ctx, actx, maxRetries,
recursionLevel, dontTellClientGet, token, isEssential, returnBucket);
@@ -553,7 +555,7 @@
}
}
- private static ClientGetState uskCreate(ClientRequester parent,
GetCompletionCallback cb, ClientMetadata clientMetadata, USK usk, LinkedList
metaStrings, FetcherContext ctx, ArchiveContext actx, int maxRetries, int
recursionLevel, boolean dontTellClientGet, Object token, boolean isEssential,
Bucket returnBucket) throws FetchException {
+ private static ClientGetState uskCreate(BaseClientGetter parent,
GetCompletionCallback cb, ClientMetadata clientMetadata, USK usk, LinkedList
metaStrings, FetcherContext ctx, ArchiveContext actx, int maxRetries, int
recursionLevel, boolean dontTellClientGet, Object token, boolean isEssential,
Bucket returnBucket) throws FetchException {
if(usk.suggestedEdition >= 0) {
// Return the latest known version but at least
suggestedEdition.
long edition = ctx.uskManager.lookup(usk);
@@ -586,7 +588,7 @@
public static class MyUSKFetcherCallback implements USKFetcherCallback {
- final ClientRequester parent;
+ final BaseClientGetter parent;
final GetCompletionCallback cb;
final ClientMetadata clientMetadata;
final USK usk;
@@ -599,7 +601,7 @@
final Object token;
final Bucket returnBucket;
- public MyUSKFetcherCallback(ClientRequester parent,
GetCompletionCallback cb, ClientMetadata clientMetadata, USK usk, LinkedList
metaStrings, FetcherContext ctx, ArchiveContext actx, int maxRetries, int
recursionLevel, boolean dontTellClientGet, Object token, Bucket returnBucket) {
+ public MyUSKFetcherCallback(BaseClientGetter parent,
GetCompletionCallback cb, ClientMetadata clientMetadata, USK usk, LinkedList
metaStrings, FetcherContext ctx, ArchiveContext actx, int maxRetries, int
recursionLevel, boolean dontTellClientGet, Object token, Bucket returnBucket) {
this.parent = parent;
this.cb = cb;
this.clientMetadata = clientMetadata;
Modified: trunk/freenet/src/freenet/client/async/SplitFileFetcher.java
===================================================================
--- trunk/freenet/src/freenet/client/async/SplitFileFetcher.java
2006-08-31 19:46:46 UTC (rev 10309)
+++ trunk/freenet/src/freenet/client/async/SplitFileFetcher.java
2006-08-31 20:23:20 UTC (rev 10310)
@@ -29,7 +29,7 @@
final ArchiveContext archiveContext;
final LinkedList decompressors;
final ClientMetadata clientMetadata;
- final ClientGetter parent;
+ final BaseClientGetter parent;
final GetCompletionCallback cb;
final int recursionLevel;
/** The splitfile type. See the SPLITFILE_ constants on Metadata. */
@@ -59,7 +59,7 @@
private boolean finished;
private Object token;
- public SplitFileFetcher(Metadata metadata, GetCompletionCallback rcb,
ClientGetter parent,
+ public SplitFileFetcher(Metadata metadata, GetCompletionCallback rcb,
BaseClientGetter parent,
FetcherContext newCtx, LinkedList decompressors,
ClientMetadata clientMetadata,
ArchiveContext actx, int recursionLevel, Bucket
returnBucket, boolean dontTellParent, Object token) throws FetchException,
MetadataParseException {
this.finished = false;
@@ -73,8 +73,6 @@
this.parent = parent;
if(parent.isCancelled())
throw new FetchException(FetchException.CANCELLED);
- if(!dontTellParent)
- parent.currentState = this;
overrideLength = metadata.dataLength();
this.splitfileType = metadata.getSplitfileType();
splitfileDataBlocks = metadata.getSplitfileDataKeys();
@@ -238,10 +236,6 @@
}
}
- public ClientGetter getParent() {
- return parent;
- }
-
public void schedule() {
for(int i=0;i<segments.length;i++) {
segments[i].schedule();
Modified: trunk/freenet/src/freenet/client/async/SplitFileFetcherSegment.java
===================================================================
--- trunk/freenet/src/freenet/client/async/SplitFileFetcherSegment.java
2006-08-31 19:46:46 UTC (rev 10309)
+++ trunk/freenet/src/freenet/client/async/SplitFileFetcherSegment.java
2006-08-31 20:23:20 UTC (rev 10310)
@@ -395,4 +395,8 @@
// Ignore; irrelevant
}
+ public void onTransition(ClientGetState oldState, ClientGetState
newState) {
+ // Ignore
+ }
+
}
Modified: trunk/freenet/src/freenet/client/async/USKChecker.java
===================================================================
--- trunk/freenet/src/freenet/client/async/USKChecker.java 2006-08-31
19:46:46 UTC (rev 10309)
+++ trunk/freenet/src/freenet/client/async/USKChecker.java 2006-08-31
20:23:20 UTC (rev 10310)
@@ -15,7 +15,7 @@
final USKCheckerCallback cb;
private int dnfs;
- USKChecker(USKCheckerCallback cb, ClientKey key, int maxRetries,
FetcherContext ctx, ClientRequester parent) {
+ USKChecker(USKCheckerCallback cb, ClientKey key, int maxRetries,
FetcherContext ctx, BaseClientGetter parent) {
super(key, maxRetries, ctx, parent);
if(Logger.shouldLog(Logger.MINOR, this))
Logger.minor(this, "Created USKChecker for "+key);
Modified: trunk/freenet/src/freenet/client/async/USKFetcher.java
===================================================================
--- trunk/freenet/src/freenet/client/async/USKFetcher.java 2006-08-31
19:46:46 UTC (rev 10309)
+++ trunk/freenet/src/freenet/client/async/USKFetcher.java 2006-08-31
20:23:20 UTC (rev 10310)
@@ -82,7 +82,7 @@
/** Kill a background poll fetcher when it has lost its last
subscriber? */
private boolean killOnLoseSubscribers;
- final ClientRequester parent;
+ final BaseClientGetter parent;
// We keep the data from the last (highest number) request.
private Bucket lastRequestData;
@@ -197,12 +197,12 @@
private Object token;
- USKFetcher(USK origUSK, USKManager manager, FetcherContext ctx,
ClientRequester parent, int minFailures, boolean pollForever, boolean
keepLastData, Object token) {
+ USKFetcher(USK origUSK, USKManager manager, FetcherContext ctx,
BaseClientGetter parent, int minFailures, boolean pollForever, boolean
keepLastData, Object token) {
this(origUSK, manager, ctx, parent, minFailures, pollForever,
DEFAULT_MAX_MIN_FAILURES, keepLastData, token);
}
// FIXME use this!
- USKFetcher(USK origUSK, USKManager manager, FetcherContext ctx,
ClientRequester parent, int minFailures, boolean pollForever, long
maxProbeEditions, boolean keepLastData, Object token) {
+ USKFetcher(USK origUSK, USKManager manager, FetcherContext ctx,
BaseClientGetter parent, int minFailures, boolean pollForever, long
maxProbeEditions, boolean keepLastData, Object token) {
this.parent = parent;
this.maxMinFailures = maxProbeEditions;
this.origUSK = origUSK;
Modified: trunk/freenet/src/freenet/client/async/USKFetcherWrapper.java
===================================================================
--- trunk/freenet/src/freenet/client/async/USKFetcherWrapper.java
2006-08-31 19:46:46 UTC (rev 10309)
+++ trunk/freenet/src/freenet/client/async/USKFetcherWrapper.java
2006-08-31 20:23:20 UTC (rev 10310)
@@ -1,12 +1,14 @@
package freenet.client.async;
+import freenet.client.FetchException;
+import freenet.client.FetchResult;
import freenet.keys.FreenetURI;
import freenet.keys.USK;
/**
* Wrapper for a backgrounded USKFetcher.
*/
-public class USKFetcherWrapper extends ClientRequester {
+public class USKFetcherWrapper extends BaseClientGetter {
USK usk;
@@ -27,4 +29,20 @@
// Do nothing
}
+ public void onSuccess(FetchResult result, ClientGetState state) {
+ // Ignore; we don't do anything with it because we are running
in the background.
+ }
+
+ public void onFailure(FetchException e, ClientGetState state) {
+ // Ignore
+ }
+
+ public void onBlockSetFinished(ClientGetState state) {
+ // Ignore
+ }
+
+ public void onTransition(ClientGetState oldState, ClientGetState
newState) {
+ // Ignore
+ }
+
}
Modified: trunk/freenet/src/freenet/client/async/USKManager.java
===================================================================
--- trunk/freenet/src/freenet/client/async/USKManager.java 2006-08-31
19:46:46 UTC (rev 10309)
+++ trunk/freenet/src/freenet/client/async/USKManager.java 2006-08-31
20:23:20 UTC (rev 10310)
@@ -77,7 +77,7 @@
}
public synchronized USKFetcher getFetcher(USK usk, FetcherContext ctx,
- ClientRequester parent, boolean keepLastData) {
+ BaseClientGetter parent, boolean keepLastData) {
USKFetcher f = (USKFetcher) fetchersByUSK.get(usk);
USK clear = usk.clearCopy();
if(temporaryBackgroundFetchersLRU.contains(clear))
Modified: trunk/freenet/src/freenet/client/async/USKProxyCompletionCallback.java
===================================================================
--- trunk/freenet/src/freenet/client/async/USKProxyCompletionCallback.java
2006-08-31 19:46:46 UTC (rev 10309)
+++ trunk/freenet/src/freenet/client/async/USKProxyCompletionCallback.java
2006-08-31 20:23:20 UTC (rev 10310)
@@ -29,4 +29,8 @@
cb.onBlockSetFinished(state);
}
+ public void onTransition(ClientGetState oldState, ClientGetState
newState) {
+ // Ignore
+ }
+
}
Modified: trunk/freenet/src/freenet/client/async/USKRetriever.java
===================================================================
--- trunk/freenet/src/freenet/client/async/USKRetriever.java 2006-08-31
19:46:46 UTC (rev 10309)
+++ trunk/freenet/src/freenet/client/async/USKRetriever.java 2006-08-31
20:23:20 UTC (rev 10310)
@@ -14,7 +14,7 @@
/**
* Poll a USK, and when a new slot is found, fetch it.
*/
-public class USKRetriever extends ClientRequester implements USKCallback,
GetCompletionCallback {
+public class USKRetriever extends BaseClientGetter implements USKCallback {
/** Context for fetching data */
final FetcherContext ctx;
@@ -35,7 +35,7 @@
try {
SingleFileFetcher getter =
(SingleFileFetcher)
SingleFileFetcher.create(this, this, new ClientMetadata(), uri, ctx, new
ArchiveContext(ctx.maxArchiveLevels),
- ctx.maxNonSplitfileRetries, 0,
false, key.copy(l), true, null);
+ ctx.maxNonSplitfileRetries, 0,
true, key.copy(l), true, null);
getter.schedule();
} catch (MalformedURLException e) {
Logger.error(this, "Impossible: "+e, e);
@@ -46,8 +46,8 @@
public void onSuccess(FetchResult result, ClientGetState state) {
Object token = state.getToken();
- FreenetURI uri = (FreenetURI) token;
- cb.onFound(uri.getSuggestedEdition(), result);
+ USK key = (USK) token;
+ cb.onFound(key.suggestedEdition, result);
}
public void onFailure(FetchException e, ClientGetState state) {
@@ -71,4 +71,8 @@
// Ignore for now
}
+ public void onTransition(ClientGetState oldState, ClientGetState
newState) {
+ // Ignore
+ }
+
}
Modified: trunk/freenet/src/freenet/keys/USK.java
===================================================================
--- trunk/freenet/src/freenet/keys/USK.java 2006-08-31 19:46:46 UTC (rev
10309)
+++ trunk/freenet/src/freenet/keys/USK.java 2006-08-31 20:23:20 UTC (rev
10310)
@@ -118,4 +118,7 @@
return new FreenetURI("SSK", siteName, pubKeyHash, cryptoKey,
ClientSSK.getExtraBytes());
}
+ public String toString() {
+ return super.toString()+":"+getURI();
+ }
}
Deleted: trunk/freenet/src/freenet/node/ARKFetchManager.java
===================================================================
--- trunk/freenet/src/freenet/node/ARKFetchManager.java 2006-08-31 19:46:46 UTC
(rev 10309)
+++ trunk/freenet/src/freenet/node/ARKFetchManager.java 2006-08-31 20:23:20 UTC
(rev 10310)
@@ -1,91 +0,0 @@
-package freenet.node;
-
-import java.util.LinkedList;
-
-import freenet.support.Logger;
-
-/**
- * @author zothar
- *
- * Maintains:
- * - A list of ARKFetchers who want to fetch
- */
-public class ARKFetchManager {
-
- /** Our Node */
- final Node node;
-
- /** All the ARKFetchers who want to fetch */
- private final LinkedList readyARKFetchers = new LinkedList();
-
- /**
- * Create a ARKFetchManager
- * @param node
- */
- public ARKFetchManager(Node node) {
- Logger.normal(this, "Creating ARKFetchManager");
- System.out.println("Creating ARKFetchManager");
- this.node = node;
- }
-
- public void addReadyARKFetcher(ARKFetcher arkFetcher) {
- synchronized(readyARKFetchers) {
- if(hasReadyARKFetcher(arkFetcher)) {
- Logger.error(this, arkFetcher.peer.getPeer()+"
already in readyARKFetchers");
- return;
- }
- readyARKFetchers.addLast(arkFetcher);
- }
- }
-
- public boolean hasReadyARKFetcher(ARKFetcher arkFetcher) {
- synchronized(readyARKFetchers) {
- if(readyARKFetchers.contains(arkFetcher)) {
- return true;
- }
- return false;
- }
- }
-
- public boolean hasReadyARKFetchers() {
- synchronized (readyARKFetchers) {
- if(readyARKFetchers.size() > 0) {
- return true;
- }
- }
- return false;
- }
-
- public void maybeStartNextReadyARKFetcher() {
- boolean logMINOR = Logger.shouldLog(Logger.MINOR, this);
- synchronized(readyARKFetchers) {
- if(node.getNumARKFetchers() >= 30) {
- if(logMINOR) Logger.minor(this, "Not starting
ARKFetcher in maybeStartNextReadyARKFetcher() because there are already 30 or
more ARK Fetchers running");
- return;
- }
- if(!hasReadyARKFetchers()) {
- if(logMINOR) Logger.minor(this,
"maybeStartNextReadyARKFetcher() called with no ARKFetchers ready");
- return;
- }
- while( true ) {
- if(readyARKFetchers.size() <= 0) {
- break;
- }
- ARKFetcher nextARKFetcher = (ARKFetcher)
readyARKFetchers.removeFirst();
- if(!nextARKFetcher.peer.isConnected()) {
-
nextARKFetcher.queueRunnableImmediately();
- break;
- }
- }
- }
- }
-
- public void removeReadyARKFetcher(ARKFetcher arkFetcher) {
- synchronized(readyARKFetchers) {
- if(!hasReadyARKFetcher(arkFetcher)) {
- return;
- }
- readyARKFetchers.remove(arkFetcher);
- }
- }
-}
Deleted: trunk/freenet/src/freenet/node/ARKFetcher.java
===================================================================
--- trunk/freenet/src/freenet/node/ARKFetcher.java 2006-08-31 19:46:46 UTC
(rev 10309)
+++ trunk/freenet/src/freenet/node/ARKFetcher.java 2006-08-31 20:23:20 UTC
(rev 10310)
@@ -1,270 +0,0 @@
-package freenet.node;
-
-import java.io.IOException;
-import java.io.UnsupportedEncodingException;
-
-import freenet.client.FetchException;
-import freenet.client.FetchResult;
-import freenet.client.InserterException;
-import freenet.client.async.BaseClientPutter;
-import freenet.client.async.ClientCallback;
-import freenet.client.async.ClientGetter;
-import freenet.keys.FreenetURI;
-import freenet.keys.USK;
-import freenet.support.Logger;
-import freenet.support.SimpleFieldSet;
-import freenet.support.io.ArrayBucket;
-
-/**
- * Fetch an ARK. Permanent, tied to a PeerNode, stops itself after a
successful fetch.
- */
-public class ARKFetcher implements ClientCallback {
-
- static boolean logMINOR;
- final PeerNode peer;
- final Node node;
- private ClientGetter getter;
- private FreenetURI fetchingURI;
- private boolean shouldRun;
- private static final int MAX_BACKOFF = 60*60*1000; // 1 hour
- private static final int MIN_BACKOFF = 5*1000; // 5 seconds
- private int backoff = MIN_BACKOFF;
- private final String identity;
- private boolean isFetching;
- private boolean started;
- private long startedEdition;
-
- public ARKFetcher(PeerNode peer, Node node) {
- this.peer = peer;
- this.node = node;
- this.identity = peer.getIdentityString();
- logMINOR = Logger.shouldLog(Logger.MINOR, this);
- }
-
- /**
- * Called when we fail to connect twice after a new reference. (So we
get one from
- * the ARK, we wait for the current connect attempt to fail, we start
another one,
- * that fails, we start another one, that also fails, so we try the
fetch again to
- * see if we can find something more recent).
- */
- public synchronized void queue() {
- if(node.arkFetchManager.hasReadyARKFetcher(this)) {
- return;
- }
- if(peer.isConnected()) {
- return;
- }
- if(isFetching) {
- return;
- }
- Logger.normal( this, "Queueing ARK Fetcher after
"+peer.getHandshakeCount()+" failed handshakes for "+peer.getPeer()+" with
identity '"+peer.getIdentityString()+"'");
- node.arkFetchManager.addReadyARKFetcher(this);
- }
-
- /**
- * Called when the ARKFetchManager says it's our turn to start fetching.
- */
- public void start() {
- logMINOR = Logger.shouldLog(Logger.MINOR, this);
- if(node.arkFetchManager.hasReadyARKFetcher(this)) {
- node.arkFetchManager.removeReadyARKFetcher(this);
- }
- if(peer.isConnected()) {
- return;
- }
- ClientGetter cg = null;
- synchronized(this) {
- if(isFetching) {
- return;
- }
- if(started) { // We only need one ARKFetcher per
PeerNode
- return;
- }
- Logger.normal( this, "Starting ARK Fetcher after
"+peer.getHandshakeCount()+" failed handshakes for "+peer.getPeer()+" with
identity '"+peer.getIdentityString()+"' on "+this);
- started = true;
- // Start fetch
- shouldRun = true;
- if(getter == null) {
- USK ark = peer.getARK();
- if(ark == null) {
- return;
- }
- FreenetURI uri = ark.getURI();
- startedEdition = uri.getSuggestedEdition();
- fetchingURI = uri;
- if(logMINOR) Logger.minor(this, "Fetching ARK:
"+uri+" for "+peer);
- cg = new ClientGetter(this,
node.clientCore.requestStarters.chkFetchScheduler,
node.clientCore.requestStarters.sskFetchScheduler,
- uri, node.arkFetcherContext,
RequestStarter.UPDATE_PRIORITY_CLASS,
- this, new ArrayBucket());
- getter = cg;
- } else return; // already running
- }
-
- if(cg != null)
- try {
- boolean mustAdd = false;
- synchronized(this) {
- if(!isFetching) {
- mustAdd = true;
- isFetching = true;
- }
- }
- if(mustAdd) {
- node.addARKFetcher(identity, this);
- }
-
- cg.start();
- } catch (FetchException e) {
- onFailure(e, cg);
- }
- }
-
- /**
- * Called when the node connects successfully.
- */
- public synchronized void stop() {
- // Stop fetch
- backoff = MIN_BACKOFF;
- if(logMINOR) Logger.minor(this, "Cancelling ARK fetch for
"+peer);
- shouldRun = false;
- started = false;
- if(node.arkFetchManager.hasReadyARKFetcher(this)) {
- node.arkFetchManager.removeReadyARKFetcher(this);
- }
- if(isFetching) {
- node.removeARKFetcher(identity,this);
- isFetching = false;
- }
-
- if(getter != null)
- getter.cancel();
- }
-
- public void onSuccess(FetchResult result, ClientGetter state) {
- if(logMINOR) Logger.minor(this, "Fetched ARK for "+peer, new
Exception("debug"));
- synchronized(this) {
- started = false;
- // Fetcher context specifies an upper bound on size.
- backoff = MIN_BACKOFF;
-
- if(isFetching) {
- node.removeARKFetcher(identity,this);
-
if(node.arkFetchManager.hasReadyARKFetcher(this)) {
-
node.arkFetchManager.removeReadyARKFetcher(this);
- }
- isFetching = false;
- }
-
- getter = null;
- }
-
- ArrayBucket bucket = (ArrayBucket) result.asBucket();
- byte[] data = bucket.toByteArray();
- String ref;
- try {
- ref = new String(data, "UTF-8");
- } catch (UnsupportedEncodingException e) {
- // Yeah, right.
- throw new Error(e);
- }
- SimpleFieldSet fs;
- try {
- fs = new SimpleFieldSet(ref, true);
- if(logMINOR) Logger.minor(this, "Got ARK for
"+peer.getPeer());
- peer.gotARK(fs, getStartedEdition());
- } catch (IOException e) {
- // Corrupt ref.
- Logger.error(this, "Corrupt ARK reference? Fetched
"+getFetchingURI()+" got while parsing: "+e+" from:\n"+ref, e);
- }
- }
-
- public void onFailure(FetchException e, ClientGetter state) {
- synchronized(this) {
- started = false;
- if(logMINOR) Logger.minor(this, "Failed to fetch ARK
for "+peer+" : "+e, e);
-
- if(isFetching) {
- node.removeARKFetcher(identity,this);
-
if(node.arkFetchManager.hasReadyARKFetcher(this)) {
-
node.arkFetchManager.removeReadyARKFetcher(this);
- }
- isFetching = false;
- }
-
- // If it's a redirect, follow the redirect and update
the ARK.
- // If it's any other error, wait a while then retry.
- getter = null;
- if(!shouldRun) return;
- if(e.newURI == null) {
- backoff += backoff;
- if(backoff > MAX_BACKOFF) backoff = MAX_BACKOFF;
- }
- }
- if(e.newURI != null) {
- if(logMINOR)
- Logger.minor(this, "Failed to fetch ARK for
"+peer.getPeer()+", "+getFetchingURI()+" gave redirect to "+e.newURI);
- peer.updateARK(e.newURI);
- queueWithBackoff();
- return;
- }
- if(logMINOR)
- Logger.minor(this, "Failed to fetch ARK for
"+peer.getPeer()+", now backing off ARK fetches for "+(getBackoff() / 1000)+"
seconds");
- // We may be on the PacketSender thread.
- // FIXME should this be exponential backoff?
- queueWithBackoff();
- }
-
- public void onSuccess(BaseClientPutter state) {
- // Impossible.
- Logger.error(this, "Impossible reached in
ARKFetcher.onSuccess(BaseClientPutter) for peer "+peer.getPeer(), new
Exception("error"));
- }
-
- public void onFailure(InserterException e, BaseClientPutter state) {
- // Impossible.
- Logger.error(this, "Impossible reached in
ARKFetcher.onFailure(InserterException,BaseClientPutter) for peer
"+peer.getPeer(), new Exception("error"));
- }
-
- public void onGeneratedURI(FreenetURI uri, BaseClientPutter state) {
- // Impossible.
- Logger.error(this, "Impossible reached in
ARKFetcher.onGeneratredURI(FreenetURI,BaseClientPutter) for peer
"+peer.getPeer(), new Exception("error"));
- }
-
- public synchronized boolean isFetching() {
- return isFetching;
- }
-
- /**
- * Queue a Runnable on the PacketSender timed job queue to be run
almost immediately
- * Should not be called from other objects except for ARKFetchManager
- */
- public void queueRunnableImmediately() {
- node.ps.queueTimedJob(new Runnable() { public void run() {
start(); }}, 100); // Runnable rather than FastRunnable so we don't put it on
the PacketSender thread
- }
-
- /**
- * Queue a call to our queue method on the PacketSender timed job queue
to be run after our ARK fetch backoff expires
- */
- private void queueWithBackoff() {
- node.ps.queueTimedJob(new Runnable() { public void run() {
queue(); }}, getBackoff()); // Runnable rather than FastRunnable so we don't
put it on the PacketSender thread
- }
-
- private synchronized int getBackoff() {
- return backoff;
- }
-
- private synchronized FreenetURI getFetchingURI() {
- return fetchingURI;
- }
-
- private synchronized long getStartedEdition() {
- return startedEdition;
- }
-
- public void onMajorProgress() {
- // Ignore
- }
-
- public void onFetchable(BaseClientPutter state) {
- // Ignore, we don't insert
- }
-}
Modified: trunk/freenet/src/freenet/node/Node.java
===================================================================
--- trunk/freenet/src/freenet/node/Node.java 2006-08-31 19:46:46 UTC (rev
10309)
+++ trunk/freenet/src/freenet/node/Node.java 2006-08-31 20:23:20 UTC (rev
10310)
@@ -295,8 +295,6 @@
long myARKNumber;
/** FetcherContext for ARKs */
public final FetcherContext arkFetcherContext;
- /** ARKFetcher's currently running, by identity */
- private final HashMap arkFetchers;
/** Next time to log the PeerNode status summary */
private long nextPeerNodeStatusLogTime = -1;
/** PeerNode status summary log interval (milliseconds) */
@@ -342,7 +340,6 @@
String myName;
final LocationManager lm;
final PeerManager peers; // my peers
- final ARKFetchManager arkFetchManager; // ready ARK Fetchers
/** Directory to put node, peers, etc into */
final File nodeDir;
/** Directory to put extra peer data into */
@@ -713,7 +710,6 @@
requestSenders = new HashMap();
transferringRequestSenders = new HashMap();
insertSenders = new HashMap();
- arkFetchers = new HashMap();
peerNodeStatuses = new HashMap();
peerNodeRoutingBackoffReasons = new HashMap();
runningUIDs = new HashSet();
@@ -989,9 +985,6 @@
}
}
- // Prepare the ARKFetchManager
- arkFetchManager = new ARKFetchManager(this);
-
// Then read the peers
peers = new PeerManager(this, new File(nodeDir,
"peers-"+portNumber).getPath());
peers.writePeers();
@@ -2437,45 +2430,7 @@
/**
* Add a ARKFetcher to the map
*/
- public void addARKFetcher(String identity, ARKFetcher fetcher) {
- synchronized(arkFetchers) {
- if(arkFetchers.containsKey(identity)) {
- ARKFetcher af = (ARKFetcher)
arkFetchers.get(identity);
- if(af != fetcher)
- Logger.error(this, "addARKFetcher():
identity '"+identity+"' already in arkFetcher as "+af+" and you want to
add"+fetcher);
- else if(logMINOR) Logger.minor(this, "Re-adding
"+identity+" : "+fetcher);
- return;
- }
- if(logMINOR) Logger.minor(this, "addARKFetcher():
adding ARK Fetcher for "+identity);
- arkFetchers.put(identity, fetcher);
- }
- }
-
/**
- * How many ARKFetchers are currently requesting ARKs?
- */
- public int getNumARKFetchers() {
- return arkFetchers.size();
- }
-
- /**
- * Remove a ARKFetcher from the map
- */
- public void removeARKFetcher(String identity, ARKFetcher fetcher) {
- synchronized(arkFetchers) {
- if(!arkFetchers.containsKey(identity)) {
- Logger.error(this, "removeARKFetcher():
identity '"+identity+"' not in arkFetcher to remove");
- return;
- }
- if(logMINOR) Logger.minor(this, "removeARKFetcher():
removing ARK Fetcher for "+identity);
- ARKFetcher af = (ARKFetcher)
arkFetchers.remove(identity);
- if(af != fetcher) {
- Logger.error(this, "Removed "+af+" should be
"+fetcher+" for "+identity+" in removeARKFetcher");
- }
- }
- }
-
- /**
* Add a PeerNode status to the map
*/
public void addPeerNodeStatus(int pnStatus, PeerNode peerNode) {
@@ -2768,22 +2723,6 @@
}
/**
- * Start a ready ARKFetcher if the timer has expired
- */
- public void maybeStartAReadyARKFetcher(long now) {
- if(now > nextReadyARKFetcherStartTime) {
- if(arkFetchManager.hasReadyARKFetchers()) {
- if(getNumARKFetchers() >= 30) {
- Logger.error(this, "Not starting ARKFetcher in
maybeStartAReadyARKFetcher() because there are already 30 or more ARK Fetchers
running");
- } else {
- arkFetchManager.maybeStartNextReadyARKFetcher();
- }
- }
- nextReadyARKFetcherStartTime = now +
readyARKFetcherStartInterval;
- }
- }
-
- /**
* Update peerManagerUserAlertStats if the timer has expired
*/
public void maybeUpdatePeerManagerUserAlertStats(long now) {
@@ -3013,4 +2952,13 @@
}
return result;
}
+
+ public int getNumARKFetchers() {
+ PeerNode[] p = peers.myPeers;
+ int x = 0;
+ for(int i=0;i<p.length;i++) {
+ if(p[i].isFetchingARK()) x++;
+ }
+ return x;
+ }
}
Modified: trunk/freenet/src/freenet/node/PacketSender.java
===================================================================
--- trunk/freenet/src/freenet/node/PacketSender.java 2006-08-31 19:46:46 UTC
(rev 10309)
+++ trunk/freenet/src/freenet/node/PacketSender.java 2006-08-31 20:23:20 UTC
(rev 10310)
@@ -153,7 +153,6 @@
}
node.maybeLogPeerNodeStatusSummary(now);
node.maybeUpdateOldestNeverConnectedPeerAge(now);
- node.maybeStartAReadyARKFetcher(now);
node.maybeUpdatePeerManagerUserAlertStats(now);
node.maybeUpdateNodeIOStats(now);
long nextActionTime = Long.MAX_VALUE;
@@ -273,6 +272,8 @@
long beforeHandshakeTime = System.currentTimeMillis();
if(pn.shouldSendHandshake())
node.packetMangler.sendHandshake(pn);
+ if(pn.noContactDetails())
+ pn.startARKFetcher();
long afterHandshakeTime = System.currentTimeMillis();
if((afterHandshakeTime - beforeHandshakeTime) > (2*1000))
Logger.error(this, "afterHandshakeTime is more than 2
seconds past beforeHandshakeTime ("+(afterHandshakeTime -
beforeHandshakeTime)+") in PacketSender working with "+pn.getPeer()+" named
"+pn.getName());
Modified: trunk/freenet/src/freenet/node/PeerManager.java
===================================================================
--- trunk/freenet/src/freenet/node/PeerManager.java 2006-08-31 19:46:46 UTC
(rev 10309)
+++ trunk/freenet/src/freenet/node/PeerManager.java 2006-08-31 20:23:20 UTC
(rev 10310)
@@ -612,10 +612,18 @@
final Object writePeersSync = new Object();
+ void writePeers() {
+ node.ps.queueTimedJob(new Runnable() {
+ public void run() {
+ writePeersInner();
+ }
+ }, 0);
+ }
+
/**
* Write the peers file to disk
*/
- void writePeers() {
+ private void writePeersInner() {
synchronized (writePeersSync) {
FileOutputStream fos;
String f = filename + ".bak";
Modified: trunk/freenet/src/freenet/node/PeerNode.java
===================================================================
--- trunk/freenet/src/freenet/node/PeerNode.java 2006-08-31 19:46:46 UTC
(rev 10309)
+++ trunk/freenet/src/freenet/node/PeerNode.java 2006-08-31 20:23:20 UTC
(rev 10310)
@@ -24,6 +24,9 @@
import java.util.zip.DataFormatException;
import java.util.zip.Inflater;
+import freenet.client.FetchResult;
+import freenet.client.async.USKRetriever;
+import freenet.client.async.USKRetrieverCallback;
import freenet.crypt.BlockCipher;
import freenet.crypt.DiffieHellmanContext;
import freenet.crypt.UnsupportedCipherException;
@@ -62,7 +65,7 @@
* code into KeyTracker, which handles all communications to and
* from this peer over the duration of a single key.
*/
-public class PeerNode implements PeerContext {
+public class PeerNode implements PeerContext, USKRetrieverCallback {
/** Set to true when we complete a handshake. */
private boolean completedHandshake;
@@ -134,7 +137,7 @@
/**
* ARK fetcher.
*/
- private final ARKFetcher arkFetcher;
+ private USKRetriever arkFetcher;
/** My ARK SSK public key; edition is the next one, not the current one,
* so this is what we want to fetch. */
@@ -437,8 +440,6 @@
parseARK(fs, true);
- arkFetcher = new ARKFetcher(this, node);
-
// Now for the metadata.
// The metadata sub-fieldset contains data about the node which is not
part of the node reference.
// It belongs to this node, not to the node being described.
@@ -1030,8 +1031,6 @@
}
if(successfulHandshakeSend) {
firstHandshake = false;
- } else {
- handshakeIPs = null;
}
handshakeCount++;
fetchARKFlag = ((handshakeCount ==
MAX_HANDSHAKE_COUNT) && !(verifiedIncompatibleOlderVersion ||
verifiedIncompatibleNewerVersion));
@@ -1055,9 +1054,6 @@
sendHandshakeTime = now +
Node.MIN_TIME_BETWEEN_HANDSHAKE_SENDS
+
node.random.nextInt(Node.RANDOMIZED_TIME_BETWEEN_HANDSHAKE_SENDS);
}
- if(!successfulHandshakeSend) {
- handshakeIPs = null;
- }
if(logMINOR) Logger.minor(this, "Next BurstOnly
mode handshake in "+(sendHandshakeTime - now)+"ms for "+getName()+" (count:
"+listeningHandshakeBurstCount+", size: "+listeningHandshakeBurstSize+")", new
Exception("double-called debug"));
}
}
@@ -1065,7 +1061,7 @@
// Don't fetch ARKs for peers we have verified (through handshake) to
be incompatible with us
if(fetchARKFlag) {
long arkFetcherStartTime1 = System.currentTimeMillis();
- arkFetcher.queue();
+ startARKFetcher();
long arkFetcherStartTime2 = System.currentTimeMillis();
if((arkFetcherStartTime2 - arkFetcherStartTime1) > 500)
{
Logger.normal(this, "arkFetcherStartTime2 is
more than half a second after arkFetcherStartTime1 ("+(arkFetcherStartTime2 -
arkFetcherStartTime1)+") working on "+getName());
@@ -1296,12 +1292,13 @@
public boolean completedHandshake(long thisBootID, byte[] data, int
offset, int length, BlockCipher encCipher, byte[] encKey, Peer replyTo, boolean
unverified) {
logMINOR = Logger.shouldLog(Logger.MINOR, this);
long now = System.currentTimeMillis();
- arkFetcher.stop();
+
synchronized(this) {
completedHandshake = true;
handshakeCount = 0;
bogusNoderef = false;
isConnected = true;
+ stopARKFetcher();
}
try {
// First, the new noderef
@@ -1411,8 +1408,33 @@
}
return true;
}
+
+ private final Object arkFetcherSync = new Object();
+
+ void startARKFetcher() {
+ // FIXME any way to reduce locking here?
+ synchronized(arkFetcherSync) {
+ if(myARK == null) {
+ Logger.minor(this, "No ARK for "+this+" !!!!");
+ return;
+ }
+ Logger.minor(this, "Starting ARK fetcher for "+this+" :
"+myARK);
+ if(arkFetcher == null)
+ arkFetcher =
node.clientCore.uskManager.subscribeContent(myARK, this, true,
node.arkFetcherContext, RequestStarter.IMMEDIATE_SPLITFILE_PRIORITY_CLASS,
node);
+ }
+ }
- boolean sentInitialMessages;
+ private void stopARKFetcher() {
+ Logger.minor(this, "Stopping ARK fetcher for "+this+" : "+myARK);
+ // FIXME any way to reduce locking here?
+ synchronized(arkFetcherSync) {
+ if(arkFetcher == null) return;
+ node.clientCore.uskManager.unsubscribeContent(myARK,
this.arkFetcher, true);
+ arkFetcher = null;
+ }
+ }
+
+ boolean sentInitialMessages;
void maybeSendInitialMessages() {
synchronized(this) {
@@ -2301,7 +2323,7 @@
}
public boolean isFetchingARK() {
- return arkFetcher.isFetching();
+ return arkFetcher != null;
}
public synchronized int getHandshakeCount() {
@@ -2323,7 +2345,7 @@
if(isConnected()) {
forceDisconnect();
}
- arkFetcher.stop();
+ stopARKFetcher();
setPeerNodeStatus(System.currentTimeMillis());
node.peers.writePeers();
}
@@ -2749,4 +2771,42 @@
private void onConnect() {
sendQueuedN2NTMs();
}
+
+ public void onFound(long edition, FetchResult result) {
+ if(isConnected() || myARK.suggestedEdition > edition) {
+ result.asBucket().free();
+ return;
+ }
+
+ byte[] data;
+ try {
+ data = result.asByteArray();
+ } catch (IOException e) {
+ Logger.error(this, "I/O error reading fetched ARK: "+e,
e);
+ return;
+ }
+
+ String ref;
+ try {
+ ref = new String(data, "UTF-8");
+ } catch (UnsupportedEncodingException e) {
+ // Yeah, right.
+ throw new Error(e);
+ }
+
+ SimpleFieldSet fs;
+ try {
+ fs = new SimpleFieldSet(ref, true);
+ if(logMINOR) Logger.minor(this, "Got ARK for "+this);
+ gotARK(fs, edition);
+ } catch (IOException e) {
+ // Corrupt ref.
+ Logger.error(this, "Corrupt ARK reference? Fetched
"+myARK.copy(edition)+" got while parsing: "+e+" from:\n"+ref, e);
+ }
+
+ }
+
+ public synchronized boolean noContactDetails() {
+ return handshakeIPs == null || handshakeIPs.length == 0;
+ }
}