Author: toad
Date: 2006-03-24 18:06:35 +0000 (Fri, 24 Mar 2006)
New Revision: 8299
Added:
trunk/freenet/src/freenet/client/async/USKFetcherWrapper.java
Modified:
trunk/freenet/src/freenet/client/FetcherContext.java
trunk/freenet/src/freenet/client/async/ClientGetState.java
trunk/freenet/src/freenet/client/async/SingleFileFetcher.java
trunk/freenet/src/freenet/client/async/USKCheckerCallback.java
trunk/freenet/src/freenet/client/async/USKFetcher.java
trunk/freenet/src/freenet/client/async/USKManager.java
trunk/freenet/src/freenet/node/Node.java
trunk/freenet/src/freenet/node/Version.java
Log:
562:
More (largely untested) work on USK subscription and background USK updating.
Modified: trunk/freenet/src/freenet/client/FetcherContext.java
===================================================================
--- trunk/freenet/src/freenet/client/FetcherContext.java 2006-03-24
16:45:06 UTC (rev 8298)
+++ trunk/freenet/src/freenet/client/FetcherContext.java 2006-03-24
18:06:35 UTC (rev 8299)
@@ -19,7 +19,7 @@
public long maxTempLength;
public final ArchiveManager archiveManager;
public final BucketFactory bucketFactory;
- public final USKManager uskManager;
+ public USKManager uskManager;
public int maxRecursionLevel;
public int maxArchiveRestarts;
public boolean dontEnterImplicitArchives;
Modified: trunk/freenet/src/freenet/client/async/ClientGetState.java
===================================================================
--- trunk/freenet/src/freenet/client/async/ClientGetState.java 2006-03-24
16:45:06 UTC (rev 8298)
+++ trunk/freenet/src/freenet/client/async/ClientGetState.java 2006-03-24
18:06:35 UTC (rev 8299)
@@ -6,8 +6,6 @@
*/
public abstract interface ClientGetState {
- public ClientGetter getParent();
-
public void schedule();
public void cancel();
Modified: trunk/freenet/src/freenet/client/async/SingleFileFetcher.java
===================================================================
--- trunk/freenet/src/freenet/client/async/SingleFileFetcher.java
2006-03-24 16:45:06 UTC (rev 8298)
+++ trunk/freenet/src/freenet/client/async/SingleFileFetcher.java
2006-03-24 18:06:35 UTC (rev 8299)
@@ -535,9 +535,8 @@
metaStrings, ctx, actx,
maxRetries, recursionLevel, dontTellClientGet,
token, false,
returnBucket);
sf.schedule();
-// // Background fetch
-// USKFetcher fetcher =
-// ctx.uskManager.getFetcher(usk, ctx,
parent);
+ // Background fetch
+
ctx.uskManager.startTemporaryBackgroundFetcher(usk);
return sf;
} else {
cb.onFailure(new
FetchException(FetchException.PERMANENT_REDIRECT,
usk.copy(edition).getURI().addMetaStrings(metaStrings)), null);
Modified: trunk/freenet/src/freenet/client/async/USKCheckerCallback.java
===================================================================
--- trunk/freenet/src/freenet/client/async/USKCheckerCallback.java
2006-03-24 16:45:06 UTC (rev 8298)
+++ trunk/freenet/src/freenet/client/async/USKCheckerCallback.java
2006-03-24 18:06:35 UTC (rev 8299)
@@ -20,4 +20,7 @@
/** Request cancelled */
public void onCancelled();
+ /** Get priority to run the request at */
+ public short getPriority();
+
}
Modified: trunk/freenet/src/freenet/client/async/USKFetcher.java
===================================================================
--- trunk/freenet/src/freenet/client/async/USKFetcher.java 2006-03-24
16:45:06 UTC (rev 8298)
+++ trunk/freenet/src/freenet/client/async/USKFetcher.java 2006-03-24
18:06:35 UTC (rev 8299)
@@ -1,5 +1,6 @@
package freenet.client.async;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Vector;
@@ -7,6 +8,7 @@
import freenet.client.FetcherContext;
import freenet.keys.FreenetURI;
import freenet.keys.USK;
+import freenet.node.RequestStarter;
import freenet.support.Logger;
/**
@@ -71,7 +73,7 @@
/** Cancelled? */
private boolean cancelled;
- final ClientGetter parent;
+ final ClientRequester parent;
public synchronized boolean addCallback(USKFetcherCallback cb) {
if(completed) return false;
@@ -136,24 +138,49 @@
public String toString() {
return "USKAttempt for "+number+" for
"+origUSK.getURI();
}
+
+ public short getPriority() {
+ if(backgroundPoll)
+ return RequestStarter.UPDATE_PRIORITY_CLASS;
+ else
+ return parent.getPriorityClass();
+ }
}
private final Vector runningAttempts;
private long lastFetchedEdition;
private long lastAddedEdition;
+
+ /** Number of keys to probe ahead. If they are all empty, then
+ * we have finished (unless in background poll mode) */
+ long minFailures;
+ final long origMinFailures;
- static final long MIN_FAILURES = 3;
+ final long origSleepTime = 1000;
+ final long maxSleepTime = 60 * 60 * 1000;
+ long sleepTime = origSleepTime;
+
+ // At most, probe 1000 editions ahead!
+ private static final long MAX_MIN_FAILURES = 1000;
+
+ private long valueAtSchedule;
- USKFetcher(USK origUSK, USKManager manager, FetcherContext ctx,
ClientGetter parent) {
+ /** Keep going forever? */
+ private final boolean backgroundPoll;
+
+ USKFetcher(USK origUSK, USKManager manager, FetcherContext ctx,
ClientRequester parent, int minFailures, boolean pollForever) {
this.parent = parent;
this.origUSK = origUSK;
this.uskManager = manager;
+ this.minFailures = this.origMinFailures = minFailures;
runningAttempts = new Vector();
callbacks = new LinkedList();
+ subscribers = new HashSet();
lastFetchedEdition = -1;
lastAddedEdition = -1;
this.ctx = ctx;
+ this.backgroundPoll = pollForever;
}
void onDNF(USKAttempt att) {
@@ -164,8 +191,8 @@
runningAttempts.remove(att);
if(runningAttempts.isEmpty()) {
long curLatest = uskManager.lookup(origUSK);
- Logger.minor(this, "latest: "+curLatest+", last
fetched: "+lastFetchedEdition+", curLatest+MIN_FAILURES:
"+(curLatest+MIN_FAILURES));
- if(curLatest + MIN_FAILURES >=
lastFetchedEdition) {
+ Logger.minor(this, "latest: "+curLatest+", last
fetched: "+lastFetchedEdition+", curLatest+MIN_FAILURES:
"+(curLatest+minFailures));
+ if(curLatest + minFailures >=
lastFetchedEdition) {
finished = true;
}
} else
@@ -177,14 +204,52 @@
}
private void finishSuccess() {
- long ed = uskManager.lookup(origUSK);
- USKFetcherCallback[] cb;
- synchronized(this) {
- completed = true;
- cb = (USKFetcherCallback[]) callbacks.toArray(new
USKFetcherCallback[callbacks.size()]);
+ if(backgroundPoll) {
+ synchronized(this) {
+ long valAtEnd = uskManager.lookup(origUSK);
+ if(valAtEnd > valueAtSchedule) {
+ // Have advanced.
+ minFailures = origMinFailures;
+ sleepTime = origSleepTime;
+ } else {
+ long newMinFailures = minFailures * 2;
+ if(newMinFailures > MAX_MIN_FAILURES)
+ newMinFailures =
MAX_MIN_FAILURES;
+ minFailures = newMinFailures;
+ }
+ long newSleepTime = sleepTime * 2;
+ if(newSleepTime > maxSleepTime) newSleepTime =
maxSleepTime;
+ sleepTime = newSleepTime;
+ long now = System.currentTimeMillis();
+ long end = now + sleepTime;
+ long newValAtEnd = valAtEnd;
+ // FIXME do this without occupying a thread
+ while(now < end && ((newValAtEnd =
uskManager.lookup(origUSK)) == valAtEnd)) {
+ long d = end - now;
+ if(d > 0)
+ try {
+ wait(d);
+ } catch (InterruptedException
e) {
+ // Maybe break? Go
around loop.
+ }
+ now = System.currentTimeMillis();
+ }
+ if(newValAtEnd != valAtEnd) {
+ minFailures = origMinFailures;
+ sleepTime = origSleepTime;
+ }
+ }
+ schedule();
+ } else {
+ long ed = uskManager.lookup(origUSK);
+ USKFetcherCallback[] cb;
+ synchronized(this) {
+ completed = true;
+ cb = (USKFetcherCallback[])
callbacks.toArray(new USKFetcherCallback[callbacks.size()]);
+ }
+ for(int i=0;i<cb.length;i++)
+ cb[i].onFoundEdition(ed, origUSK.copy(ed));
}
- for(int i=0;i<cb.length;i++)
- cb[i].onFoundEdition(ed, origUSK.copy(ed));
}
void onSuccess(USKAttempt att, boolean dontUpdate) {
@@ -196,7 +261,7 @@
uskManager.update(origUSK, curLatest);
curLatest = Math.max(uskManager.lookup(origUSK),
curLatest);
Logger.minor(this, "Latest: "+curLatest);
- long addTo = curLatest + MIN_FAILURES;
+ long addTo = curLatest + minFailures;
long addFrom = Math.max(lastAddedEdition + 1, curLatest
+ 1);
if(addTo >= addFrom) {
l = new LinkedList();
@@ -206,7 +271,7 @@
cancelBefore(curLatest);
}
if(l == null) return;
- else {
+ else if(!cancelled) {
for(Iterator i=l.iterator();i.hasNext();) {
USKAttempt a = (USKAttempt) i.next();
a.schedule();
@@ -254,6 +319,7 @@
* Caller is responsible for calling .schedule().
*/
private synchronized USKAttempt add(long i) {
+ if(cancelled) return null;
Logger.minor(this, "Adding USKAttempt for "+i+" for
"+origUSK.getURI());
if(!runningAttempts.isEmpty()) {
USKAttempt last = (USKAttempt)
runningAttempts.lastElement();
@@ -278,19 +344,19 @@
return origUSK;
}
- public ClientGetter getParent() {
- return parent;
- }
-
public void schedule() {
USKAttempt[] attempts;
synchronized(this) {
- for(long
i=origUSK.suggestedEdition;i<origUSK.suggestedEdition+MIN_FAILURES;i++)
+ if(cancelled) return;
+ valueAtSchedule = uskManager.lookup(origUSK);
+ long startPoint = Math.max(origUSK.suggestedEdition,
valueAtSchedule);
+ for(long i=startPoint;i<startPoint+minFailures;i++)
add(i);
attempts = (USKAttempt[]) runningAttempts.toArray(new
USKAttempt[runningAttempts.size()]);
}
- for(int i=0;i<attempts.length;i++)
- attempts[i].schedule();
+ if(!cancelled)
+ for(int i=0;i<attempts.length;i++)
+ attempts[i].schedule();
}
public void cancel() {
@@ -303,4 +369,23 @@
attempts[i].cancel();
}
+ /** Set of interested USKCallbacks. Note that we don't actually
+ * send them any information - they are essentially placeholders,
+ * an alternative to a refcount. This could be replaced with a
+ * Bloom filter or whatever, we only need .exists and .count.
+ */
+ final HashSet subscribers;
+
+ public synchronized void addSubscriber(USKCallback cb) {
+ subscribers.add(cb);
+ }
+
+ public synchronized boolean hasSubscribers() {
+ return !subscribers.isEmpty();
+ }
+
+ public synchronized void removeSubscriber(USKCallback cb) {
+ subscribers.remove(cb);
+ }
+
}
Added: trunk/freenet/src/freenet/client/async/USKFetcherWrapper.java
===================================================================
--- trunk/freenet/src/freenet/client/async/USKFetcherWrapper.java
2006-03-24 16:45:06 UTC (rev 8298)
+++ trunk/freenet/src/freenet/client/async/USKFetcherWrapper.java
2006-03-24 18:06:35 UTC (rev 8299)
@@ -0,0 +1,31 @@
+package freenet.client.async;
+
+import freenet.keys.FreenetURI;
+import freenet.keys.USK;
+import freenet.node.RequestStarter;
+
+/**
+ * Wrapper for a backgrounded USKFetcher.
+ */
+public class USKFetcherWrapper extends ClientRequester {
+
+ USK usk;
+
+ public USKFetcherWrapper(USK usk, ClientRequestScheduler chkScheduler,
ClientRequestScheduler sskScheduler) {
+ super(RequestStarter.UPDATE_PRIORITY_CLASS, chkScheduler,
sskScheduler, usk);
+ this.usk = usk;
+ }
+
+ public FreenetURI getURI() {
+ return usk.getURI();
+ }
+
+ public boolean isFinished() {
+ return false;
+ }
+
+ public void notifyClients() {
+ // Do nothing
+ }
+
+}
Modified: trunk/freenet/src/freenet/client/async/USKManager.java
===================================================================
--- trunk/freenet/src/freenet/client/async/USKManager.java 2006-03-24
16:45:06 UTC (rev 8298)
+++ trunk/freenet/src/freenet/client/async/USKManager.java 2006-03-24
18:06:35 UTC (rev 8299)
@@ -4,6 +4,9 @@
import freenet.client.FetcherContext;
import freenet.keys.USK;
+import freenet.node.Node;
+import freenet.node.RequestStarter;
+import freenet.support.LRUQueue;
import freenet.support.Logger;
/**
@@ -22,16 +25,46 @@
* USKFetcher for each {USK, edition number}. */
final HashMap fetchersByUSK;
+ /** Backgrounded USKFetchers by USK. */
+ final HashMap backgroundFetchersByClearUSK;
+
+ // FIXME make this configurable
+ static final int MAX_BACKGROUND_FETCHERS = 64;
+ final LRUQueue temporaryBackgroundFetchersLRU;
+
/** USKChecker's by USK. Deleted immediately on completion. */
final HashMap checkersByUSK;
+
+ final FetcherContext backgroundFetchContext;
+ final ClientRequestScheduler chkRequestScheduler;
+ final ClientRequestScheduler sskRequestScheduler;
- public USKManager() {
+ public USKManager(FetcherContext backgroundFetchContext,
ClientRequestScheduler chkRequestScheduler, ClientRequestScheduler
sskRequestScheduler) {
latestVersionByClearUSK = new HashMap();
subscribersByClearUSK = new HashMap();
fetchersByUSK = new HashMap();
checkersByUSK = new HashMap();
+ backgroundFetchersByClearUSK = new HashMap();
+ temporaryBackgroundFetchersLRU = new LRUQueue();
+ this.backgroundFetchContext = backgroundFetchContext;
+ this.chkRequestScheduler = chkRequestScheduler;
+ this.sskRequestScheduler = sskRequestScheduler;
}
+ public USKManager(Node node) {
+ backgroundFetchContext =
node.makeClient(RequestStarter.UPDATE_PRIORITY_CLASS).getFetcherContext();
+ backgroundFetchContext.followRedirects = false;
+ backgroundFetchContext.uskManager = this;
+ this.chkRequestScheduler = node.chkFetchScheduler;
+ this.sskRequestScheduler = node.sskFetchScheduler;
+ latestVersionByClearUSK = new HashMap();
+ subscribersByClearUSK = new HashMap();
+ fetchersByUSK = new HashMap();
+ checkersByUSK = new HashMap();
+ backgroundFetchersByClearUSK = new HashMap();
+ temporaryBackgroundFetchersLRU = new LRUQueue();
+ }
+
/**
* Look up the latest known version of the given USK.
* @return The latest known edition number, or -1.
@@ -50,11 +83,32 @@
if(f.parent.priorityClass == parent.priorityClass &&
f.ctx.equals(ctx))
return f;
}
- f = new USKFetcher(usk, this, ctx, parent);
+ f = new USKFetcher(usk, this, ctx, parent, 3, false);
fetchersByUSK.put(usk, f);
return f;
}
+ public void startTemporaryBackgroundFetcher(USK usk) {
+ USK clear = usk.clearCopy();
+ USKFetcher sched = null;
+ synchronized(this) {
+ USKFetcher f = (USKFetcher)
backgroundFetchersByClearUSK.get(clear);
+ if(f == null) {
+ f = new USKFetcher(usk, this,
backgroundFetchContext, new USKFetcherWrapper(usk, chkRequestScheduler,
sskRequestScheduler), 10, false);
+ sched = f;
+ backgroundFetchersByClearUSK.put(clear, f);
+ }
+ temporaryBackgroundFetchersLRU.push(clear);
+ while(temporaryBackgroundFetchersLRU.size() >
MAX_BACKGROUND_FETCHERS) {
+ USK del = (USK)
temporaryBackgroundFetchersLRU.pop();
+ USKFetcher fetcher = (USKFetcher)
backgroundFetchersByClearUSK.get(del.clearCopy());
+ if(!fetcher.hasSubscribers())
+ fetcher.cancel();
+ }
+ }
+ if(sched != null) sched.schedule();
+ }
+
synchronized void finished(USKFetcher f) {
USK u = f.getOriginalUSK();
fetchersByUSK.remove(u);
@@ -84,15 +138,60 @@
/**
* Subscribe to a given USK. Callback will be notified when it is
* updated. Note that this does not imply that the USK will be
- * checked on a regular basis!
+ * checked on a regular basis, unless runBackgroundFetch=true.
*/
- public synchronized void subscribe(USK origUSK, USKCallback cb) {
- USK clear = origUSK.clearCopy();
- USKCallback[] callbacks = (USKCallback[])
subscribersByClearUSK.get(clear);
- if(callbacks == null)
- callbacks = new USKCallback[1];
- else
- callbacks = new USKCallback[callbacks.length+1];
- callbacks[callbacks.length-1] = cb;
+ public void subscribe(USK origUSK, USKCallback cb, boolean
runBackgroundFetch) {
+ USKFetcher sched = null;
+ synchronized(this) {
+ USK clear = origUSK.clearCopy();
+ USKCallback[] callbacks = (USKCallback[])
subscribersByClearUSK.get(clear);
+ if(callbacks == null)
+ callbacks = new USKCallback[1];
+ else
+ callbacks = new USKCallback[callbacks.length+1];
+ callbacks[callbacks.length-1] = cb;
+ subscribersByClearUSK.put(clear, callbacks);
+ if(runBackgroundFetch) {
+ USKFetcher f = (USKFetcher)
backgroundFetchersByClearUSK.get(clear);
+ if(f == null) {
+ f = new USKFetcher(origUSK, this,
backgroundFetchContext, new USKFetcherWrapper(origUSK, chkRequestScheduler,
sskRequestScheduler), 10, false);
+ sched = f;
+ backgroundFetchersByClearUSK.put(clear,
f);
+ }
+ f.addSubscriber(cb);
+ }
+ }
+ if(sched != null)
+ sched.schedule();
}
+
+ public void unsubscribe(USK origUSK, USKCallback cb, boolean
runBackgroundFetch) {
+ synchronized(this) {
+ USK clear = origUSK.clearCopy();
+ USKCallback[] callbacks = (USKCallback[])
subscribersByClearUSK.get(clear);
+ int j=0;
+ for(int i=0;i<callbacks.length;i++) {
+ USKCallback c = callbacks[i];
+ if(c != null && c != cb) {
+ callbacks[j++] = c;
+ }
+ }
+ USKCallback[] newCallbacks = new USKCallback[j];
+ System.arraycopy(callbacks, 0, newCallbacks, 0, j);
+ if(newCallbacks.length > 0)
+ subscribersByClearUSK.put(clear, callbacks);
+ else
+ subscribersByClearUSK.remove(clear);
+ if(runBackgroundFetch) {
+ USKFetcher f = (USKFetcher)
backgroundFetchersByClearUSK.get(clear);
+ f.removeSubscriber(cb);
+ if(!f.hasSubscribers()) {
+
if(!temporaryBackgroundFetchersLRU.contains(clear)) {
+ f.cancel();
+
backgroundFetchersByClearUSK.remove(clear);
+ }
+ }
+ }
+ }
+ }
}
Modified: trunk/freenet/src/freenet/node/Node.java
===================================================================
--- trunk/freenet/src/freenet/node/Node.java 2006-03-24 16:45:06 UTC (rev
8298)
+++ trunk/freenet/src/freenet/node/Node.java 2006-03-24 18:06:35 UTC (rev
8299)
@@ -26,9 +26,11 @@
import java.util.Iterator;
import freenet.client.ArchiveManager;
+import freenet.client.FetcherContext;
import freenet.client.HighLevelSimpleClient;
import freenet.client.HighLevelSimpleClientImpl;
import freenet.client.async.ClientRequestScheduler;
+import freenet.client.async.RequestScheduler;
import freenet.client.async.USKManager;
import freenet.clients.http.FproxyToadlet;
import freenet.clients.http.SimpleToadletServer;
@@ -454,7 +456,6 @@
private Node(Config config, RandomSource random) throws NodeInitException {
// Easy stuff
- uskManager = new USKManager();
startupTime = System.currentTimeMillis();
recentlyCompletedIDs = new LRUQueue();
this.config = config;
@@ -874,6 +875,8 @@
sskPutScheduler = new ClientRequestScheduler(true, random,
sskInsertStarter, this);
sskInsertStarter.setScheduler(sskPutScheduler);
sskInsertStarter.start();
+
+ uskManager = new USKManager(this);
// And finally, Initialize the plugin manager
pluginManager = new PluginManager(this);
Modified: trunk/freenet/src/freenet/node/Version.java
===================================================================
--- trunk/freenet/src/freenet/node/Version.java 2006-03-24 16:45:06 UTC (rev
8298)
+++ trunk/freenet/src/freenet/node/Version.java 2006-03-24 18:06:35 UTC (rev
8299)
@@ -20,7 +20,7 @@
public static final String protocolVersion = "1.0";
/** The build number of the current revision */
- private static final int buildNumber = 561;
+ private static final int buildNumber = 562;
/** Oldest build of Fred we will talk to */
private static final int lastGoodBuild = 507;