Author: toad
Date: 2006-03-24 18:06:35 +0000 (Fri, 24 Mar 2006)
New Revision: 8299

Added:
   trunk/freenet/src/freenet/client/async/USKFetcherWrapper.java
Modified:
   trunk/freenet/src/freenet/client/FetcherContext.java
   trunk/freenet/src/freenet/client/async/ClientGetState.java
   trunk/freenet/src/freenet/client/async/SingleFileFetcher.java
   trunk/freenet/src/freenet/client/async/USKCheckerCallback.java
   trunk/freenet/src/freenet/client/async/USKFetcher.java
   trunk/freenet/src/freenet/client/async/USKManager.java
   trunk/freenet/src/freenet/node/Node.java
   trunk/freenet/src/freenet/node/Version.java
Log:
562:
More (largely untested) work on USK subscription and background USK updating.

Modified: trunk/freenet/src/freenet/client/FetcherContext.java
===================================================================
--- trunk/freenet/src/freenet/client/FetcherContext.java        2006-03-24 
16:45:06 UTC (rev 8298)
+++ trunk/freenet/src/freenet/client/FetcherContext.java        2006-03-24 
18:06:35 UTC (rev 8299)
@@ -19,7 +19,7 @@
        public long maxTempLength;
        public final ArchiveManager archiveManager;
        public final BucketFactory bucketFactory;
-       public final USKManager uskManager;
+       public USKManager uskManager;
        public int maxRecursionLevel;
        public int maxArchiveRestarts;
        public boolean dontEnterImplicitArchives;

Modified: trunk/freenet/src/freenet/client/async/ClientGetState.java
===================================================================
--- trunk/freenet/src/freenet/client/async/ClientGetState.java  2006-03-24 
16:45:06 UTC (rev 8298)
+++ trunk/freenet/src/freenet/client/async/ClientGetState.java  2006-03-24 
18:06:35 UTC (rev 8299)
@@ -6,8 +6,6 @@
  */
 public abstract interface ClientGetState {

-       public ClientGetter getParent();
-
        public void schedule();

        public void cancel();

Modified: trunk/freenet/src/freenet/client/async/SingleFileFetcher.java
===================================================================
--- trunk/freenet/src/freenet/client/async/SingleFileFetcher.java       
2006-03-24 16:45:06 UTC (rev 8298)
+++ trunk/freenet/src/freenet/client/async/SingleFileFetcher.java       
2006-03-24 18:06:35 UTC (rev 8299)
@@ -535,9 +535,8 @@
                                                        metaStrings, ctx, actx, 
maxRetries, recursionLevel, dontTellClientGet,
                                                        token, false, 
returnBucket);
                                sf.schedule();
-//                             // Background fetch
-//                             USKFetcher fetcher =
-//                                     ctx.uskManager.getFetcher(usk, ctx, 
parent);
+                               // Background fetch
+                               
ctx.uskManager.startTemporaryBackgroundFetcher(usk);
                                return sf;
                        } else {
                                cb.onFailure(new 
FetchException(FetchException.PERMANENT_REDIRECT, 
usk.copy(edition).getURI().addMetaStrings(metaStrings)), null);

Modified: trunk/freenet/src/freenet/client/async/USKCheckerCallback.java
===================================================================
--- trunk/freenet/src/freenet/client/async/USKCheckerCallback.java      
2006-03-24 16:45:06 UTC (rev 8298)
+++ trunk/freenet/src/freenet/client/async/USKCheckerCallback.java      
2006-03-24 18:06:35 UTC (rev 8299)
@@ -20,4 +20,7 @@
        /** Request cancelled */
        public void onCancelled();

+       /** Get priority to run the request at */
+       public short getPriority();
+       
 }

Modified: trunk/freenet/src/freenet/client/async/USKFetcher.java
===================================================================
--- trunk/freenet/src/freenet/client/async/USKFetcher.java      2006-03-24 
16:45:06 UTC (rev 8298)
+++ trunk/freenet/src/freenet/client/async/USKFetcher.java      2006-03-24 
18:06:35 UTC (rev 8299)
@@ -1,5 +1,6 @@
 package freenet.client.async;

+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.Vector;
@@ -7,6 +8,7 @@
 import freenet.client.FetcherContext;
 import freenet.keys.FreenetURI;
 import freenet.keys.USK;
+import freenet.node.RequestStarter;
 import freenet.support.Logger;

 /**
@@ -71,7 +73,7 @@
        /** Cancelled? */
        private boolean cancelled;

-       final ClientGetter parent;
+       final ClientRequester parent;

        public synchronized boolean addCallback(USKFetcherCallback cb) {
                if(completed) return false; 
@@ -136,24 +138,49 @@
                public String toString() {
                        return "USKAttempt for "+number+" for 
"+origUSK.getURI();
                }
+               
+               public short getPriority() {
+                       if(backgroundPoll)
+                               return RequestStarter.UPDATE_PRIORITY_CLASS;
+                       else
+                               return parent.getPriorityClass();
+               }
        }

        private final Vector runningAttempts;

        private long lastFetchedEdition;
        private long lastAddedEdition;
+
+       /** Number of keys to probe ahead. If they are all empty, then
+        * we have finished (unless in background poll mode) */
+       long minFailures;
+       final long origMinFailures;

-       static final long MIN_FAILURES = 3;
+       final long origSleepTime = 1000;
+       final long maxSleepTime = 60 * 60 * 1000;
+       long sleepTime = origSleepTime;
+
+       // At most, probe 1000 editions ahead!
+       private static final long MAX_MIN_FAILURES = 1000;
+
+       private long valueAtSchedule;

-       USKFetcher(USK origUSK, USKManager manager, FetcherContext ctx, 
ClientGetter parent) {
+       /** Keep going forever? */
+       private final boolean backgroundPoll;
+       
+       USKFetcher(USK origUSK, USKManager manager, FetcherContext ctx, 
ClientRequester parent, int minFailures, boolean pollForever) {
                this.parent = parent;
                this.origUSK = origUSK;
                this.uskManager = manager;
+               this.minFailures = this.origMinFailures = minFailures;
                runningAttempts = new Vector();
                callbacks = new LinkedList();
+               subscribers = new HashSet();
                lastFetchedEdition = -1;
                lastAddedEdition = -1;
                this.ctx = ctx;
+               this.backgroundPoll = pollForever;
        }

        void onDNF(USKAttempt att) {
@@ -164,8 +191,8 @@
                        runningAttempts.remove(att);
                        if(runningAttempts.isEmpty()) {
                                long curLatest = uskManager.lookup(origUSK);
-                               Logger.minor(this, "latest: "+curLatest+", last 
fetched: "+lastFetchedEdition+", curLatest+MIN_FAILURES: 
"+(curLatest+MIN_FAILURES));
-                               if(curLatest + MIN_FAILURES >= 
lastFetchedEdition) {
+                               Logger.minor(this, "latest: "+curLatest+", last 
fetched: "+lastFetchedEdition+", curLatest+MIN_FAILURES: 
"+(curLatest+minFailures));
+                               if(curLatest + minFailures >= 
lastFetchedEdition) {
                                        finished = true;
                                }
                        } else 
@@ -177,14 +204,52 @@
        }

        private void finishSuccess() {
-               long ed = uskManager.lookup(origUSK);
-               USKFetcherCallback[] cb;
-               synchronized(this) {
-                       completed = true;
-                       cb = (USKFetcherCallback[]) callbacks.toArray(new 
USKFetcherCallback[callbacks.size()]);
+               if(backgroundPoll) {
+                       synchronized(this) {
+                               long valAtEnd = uskManager.lookup(origUSK);
+                               if(valAtEnd > valueAtSchedule) {
+                                       // Have advanced.
+                                       minFailures = origMinFailures;
+                                       sleepTime = origSleepTime;
+                               } else {
+                                       long newMinFailures = minFailures * 2;
+                                       if(newMinFailures > MAX_MIN_FAILURES)
+                                               newMinFailures = 
MAX_MIN_FAILURES;
+                                       minFailures = newMinFailures;
+                               }
+                               long newSleepTime = sleepTime * 2;
+                               if(newSleepTime > maxSleepTime) newSleepTime = 
maxSleepTime;
+                               sleepTime = newSleepTime;
+                               long now = System.currentTimeMillis();
+                               long end = now + sleepTime;
+                               long newValAtEnd = valAtEnd;
+                               // FIXME do this without occupying a thread
+                               while(now < end && ((newValAtEnd = 
uskManager.lookup(origUSK)) == valAtEnd)) {
+                                       long d = end - now;
+                                       if(d > 0)
+                                               try {
+                                                       wait(d);
+                                               } catch (InterruptedException 
e) {
+                                                       // Maybe break? Go 
around loop.
+                                               }
+                                       now = System.currentTimeMillis();
+                               }
+                               if(newValAtEnd != valAtEnd) {
+                                       minFailures = origMinFailures;
+                                       sleepTime = origSleepTime;
+                               }
+                       }
+                       schedule();
+               } else {
+                       long ed = uskManager.lookup(origUSK);
+                       USKFetcherCallback[] cb;
+                       synchronized(this) {
+                               completed = true;
+                               cb = (USKFetcherCallback[]) 
callbacks.toArray(new USKFetcherCallback[callbacks.size()]);
+                       }
+                       for(int i=0;i<cb.length;i++)
+                               cb[i].onFoundEdition(ed, origUSK.copy(ed));
                }
-               for(int i=0;i<cb.length;i++)
-                       cb[i].onFoundEdition(ed, origUSK.copy(ed));
        }

        void onSuccess(USKAttempt att, boolean dontUpdate) {
@@ -196,7 +261,7 @@
                                uskManager.update(origUSK, curLatest);
                        curLatest = Math.max(uskManager.lookup(origUSK), 
curLatest);
                        Logger.minor(this, "Latest: "+curLatest);
-                       long addTo = curLatest + MIN_FAILURES;
+                       long addTo = curLatest + minFailures;
                        long addFrom = Math.max(lastAddedEdition + 1, curLatest 
+ 1);
                        if(addTo >= addFrom) {
                                l = new LinkedList();
@@ -206,7 +271,7 @@
                        cancelBefore(curLatest);
                }
                if(l == null) return;
-               else {
+               else if(!cancelled) {
                        for(Iterator i=l.iterator();i.hasNext();) {
                                USKAttempt a = (USKAttempt) i.next();
                                a.schedule();
@@ -254,6 +319,7 @@
         * Caller is responsible for calling .schedule().
         */
        private synchronized USKAttempt add(long i) {
+               if(cancelled) return null;
                Logger.minor(this, "Adding USKAttempt for "+i+" for 
"+origUSK.getURI());
                if(!runningAttempts.isEmpty()) {
                        USKAttempt last = (USKAttempt) 
runningAttempts.lastElement();
@@ -278,19 +344,19 @@
                return origUSK;
        }

-       public ClientGetter getParent() {
-               return parent;
-       }
-
        public void schedule() {
                USKAttempt[] attempts;
                synchronized(this) {
-                       for(long 
i=origUSK.suggestedEdition;i<origUSK.suggestedEdition+MIN_FAILURES;i++)
+                       if(cancelled) return;
+                       valueAtSchedule = uskManager.lookup(origUSK);
+                       long startPoint = Math.max(origUSK.suggestedEdition, 
valueAtSchedule);
+                       for(long i=startPoint;i<startPoint+minFailures;i++)
                                add(i);
                        attempts = (USKAttempt[]) runningAttempts.toArray(new 
USKAttempt[runningAttempts.size()]);
                }
-               for(int i=0;i<attempts.length;i++)
-                       attempts[i].schedule();
+               if(!cancelled)
+                       for(int i=0;i<attempts.length;i++)
+                               attempts[i].schedule();
        }

        public void cancel() {
@@ -303,4 +369,23 @@
                        attempts[i].cancel();
        }

+       /** Set of interested USKCallbacks. Note that we don't actually
+        * send them any information - they are essentially placeholders,
+        * an alternative to a refcount. This could be replaced with a 
+        * Bloom filter or whatever, we only need .exists and .count.
+        */
+       final HashSet subscribers;
+       
+       public synchronized void addSubscriber(USKCallback cb) {
+               subscribers.add(cb);
+       }
+
+       public synchronized boolean hasSubscribers() {
+               return !subscribers.isEmpty();
+       }
+       
+       public synchronized void removeSubscriber(USKCallback cb) {
+               subscribers.remove(cb);
+       }
+
 }

Added: trunk/freenet/src/freenet/client/async/USKFetcherWrapper.java
===================================================================
--- trunk/freenet/src/freenet/client/async/USKFetcherWrapper.java       
2006-03-24 16:45:06 UTC (rev 8298)
+++ trunk/freenet/src/freenet/client/async/USKFetcherWrapper.java       
2006-03-24 18:06:35 UTC (rev 8299)
@@ -0,0 +1,31 @@
+package freenet.client.async;
+
+import freenet.keys.FreenetURI;
+import freenet.keys.USK;
+import freenet.node.RequestStarter;
+
+/**
+ * Wrapper for a backgrounded USKFetcher.
+ */
+public class USKFetcherWrapper extends ClientRequester {
+
+       USK usk;
+       
+       public USKFetcherWrapper(USK usk, ClientRequestScheduler chkScheduler, 
ClientRequestScheduler sskScheduler) {
+               super(RequestStarter.UPDATE_PRIORITY_CLASS, chkScheduler, 
sskScheduler, usk);
+               this.usk = usk;
+       }
+
+       public FreenetURI getURI() {
+               return usk.getURI();
+       }
+
+       public boolean isFinished() {
+               return false;
+       }
+
+       public void notifyClients() {
+               // Do nothing
+       }
+
+}

Modified: trunk/freenet/src/freenet/client/async/USKManager.java
===================================================================
--- trunk/freenet/src/freenet/client/async/USKManager.java      2006-03-24 
16:45:06 UTC (rev 8298)
+++ trunk/freenet/src/freenet/client/async/USKManager.java      2006-03-24 
18:06:35 UTC (rev 8299)
@@ -4,6 +4,9 @@

 import freenet.client.FetcherContext;
 import freenet.keys.USK;
+import freenet.node.Node;
+import freenet.node.RequestStarter;
+import freenet.support.LRUQueue;
 import freenet.support.Logger;

 /**
@@ -22,16 +25,46 @@
         * USKFetcher for each {USK, edition number}. */
        final HashMap fetchersByUSK;

+       /** Backgrounded USKFetchers by USK. */
+       final HashMap backgroundFetchersByClearUSK;
+       
+       // FIXME make this configurable
+       static final int MAX_BACKGROUND_FETCHERS = 64;
+       final LRUQueue temporaryBackgroundFetchersLRU;
+       
        /** USKChecker's by USK. Deleted immediately on completion. */
        final HashMap checkersByUSK;
+
+       final FetcherContext backgroundFetchContext;
+       final ClientRequestScheduler chkRequestScheduler;
+       final ClientRequestScheduler sskRequestScheduler;

-       public USKManager() {
+       public USKManager(FetcherContext backgroundFetchContext, 
ClientRequestScheduler chkRequestScheduler, ClientRequestScheduler 
sskRequestScheduler) {
                latestVersionByClearUSK = new HashMap();
                subscribersByClearUSK = new HashMap();
                fetchersByUSK = new HashMap();
                checkersByUSK = new HashMap();
+               backgroundFetchersByClearUSK = new HashMap();
+               temporaryBackgroundFetchersLRU = new LRUQueue();
+               this.backgroundFetchContext = backgroundFetchContext;
+               this.chkRequestScheduler = chkRequestScheduler;
+               this.sskRequestScheduler = sskRequestScheduler;
        }

+       public USKManager(Node node) {
+               backgroundFetchContext = 
node.makeClient(RequestStarter.UPDATE_PRIORITY_CLASS).getFetcherContext();
+               backgroundFetchContext.followRedirects = false;
+               backgroundFetchContext.uskManager = this;
+               this.chkRequestScheduler = node.chkFetchScheduler;
+               this.sskRequestScheduler = node.sskFetchScheduler;
+               latestVersionByClearUSK = new HashMap();
+               subscribersByClearUSK = new HashMap();
+               fetchersByUSK = new HashMap();
+               checkersByUSK = new HashMap();
+               backgroundFetchersByClearUSK = new HashMap();
+               temporaryBackgroundFetchersLRU = new LRUQueue();
+       }
+
        /**
         * Look up the latest known version of the given USK.
         * @return The latest known edition number, or -1.
@@ -50,11 +83,32 @@
                        if(f.parent.priorityClass == parent.priorityClass && 
f.ctx.equals(ctx))
                                return f;
                }
-               f = new USKFetcher(usk, this, ctx, parent);
+               f = new USKFetcher(usk, this, ctx, parent, 3, false);
                fetchersByUSK.put(usk, f);
                return f;
        }

+       public void startTemporaryBackgroundFetcher(USK usk) {
+               USK clear = usk.clearCopy();
+               USKFetcher sched = null;
+               synchronized(this) {
+                       USKFetcher f = (USKFetcher) 
backgroundFetchersByClearUSK.get(clear);
+                       if(f == null) {
+                               f = new USKFetcher(usk, this, 
backgroundFetchContext, new USKFetcherWrapper(usk, chkRequestScheduler, 
sskRequestScheduler), 10, false);
+                               sched = f;
+                               backgroundFetchersByClearUSK.put(clear, f);
+                       }
+                       temporaryBackgroundFetchersLRU.push(clear);
+                       while(temporaryBackgroundFetchersLRU.size() > 
MAX_BACKGROUND_FETCHERS) {
+                               USK del = (USK) 
temporaryBackgroundFetchersLRU.pop();
+                               USKFetcher fetcher = (USKFetcher) 
backgroundFetchersByClearUSK.get(del.clearCopy());
+                               if(!fetcher.hasSubscribers())
+                                       fetcher.cancel();
+                       }
+               }
+               if(sched != null) sched.schedule();
+       }
+       
        synchronized void finished(USKFetcher f) {
                USK u = f.getOriginalUSK();
                fetchersByUSK.remove(u);
@@ -84,15 +138,60 @@
        /**
         * Subscribe to a given USK. Callback will be notified when it is
         * updated. Note that this does not imply that the USK will be
-        * checked on a regular basis!
+        * checked on a regular basis, unless runBackgroundFetch=true.
         */
-       public synchronized void subscribe(USK origUSK, USKCallback cb) {
-               USK clear = origUSK.clearCopy();
-               USKCallback[] callbacks = (USKCallback[]) 
subscribersByClearUSK.get(clear);
-               if(callbacks == null)
-                       callbacks = new USKCallback[1];
-               else
-                       callbacks = new USKCallback[callbacks.length+1];
-               callbacks[callbacks.length-1] = cb;
+       public void subscribe(USK origUSK, USKCallback cb, boolean 
runBackgroundFetch) {
+               USKFetcher sched = null;
+               synchronized(this) {
+                       USK clear = origUSK.clearCopy();
+                       USKCallback[] callbacks = (USKCallback[]) 
subscribersByClearUSK.get(clear);
+                       if(callbacks == null)
+                               callbacks = new USKCallback[1];
+                       else
+                               callbacks = new USKCallback[callbacks.length+1];
+                       callbacks[callbacks.length-1] = cb;
+                       subscribersByClearUSK.put(clear, callbacks);
+                       if(runBackgroundFetch) {
+                               USKFetcher f = (USKFetcher) 
backgroundFetchersByClearUSK.get(clear);
+                               if(f == null) {
+                                       f = new USKFetcher(origUSK, this, 
backgroundFetchContext, new USKFetcherWrapper(origUSK, chkRequestScheduler, 
sskRequestScheduler), 10, false);
+                                       sched = f;
+                                       backgroundFetchersByClearUSK.put(clear, 
f);
+                               }
+                               f.addSubscriber(cb);
+                       }
+               }
+               if(sched != null)
+                       sched.schedule();
        }
+       
+       public void unsubscribe(USK origUSK, USKCallback cb, boolean 
runBackgroundFetch) {
+               synchronized(this) {
+                       USK clear = origUSK.clearCopy();
+                       USKCallback[] callbacks = (USKCallback[]) 
subscribersByClearUSK.get(clear);
+                       int j=0;
+                       for(int i=0;i<callbacks.length;i++) {
+                               USKCallback c = callbacks[i];
+                               if(c != null && c != cb) {
+                                       callbacks[j++] = c;
+                               }
+                       }
+                       USKCallback[] newCallbacks = new USKCallback[j];
+                       System.arraycopy(callbacks, 0, newCallbacks, 0, j);
+                       if(newCallbacks.length > 0)
+                               subscribersByClearUSK.put(clear, callbacks);
+                       else
+                               subscribersByClearUSK.remove(clear);
+                       if(runBackgroundFetch) {
+                               USKFetcher f = (USKFetcher) 
backgroundFetchersByClearUSK.get(clear);
+                               f.removeSubscriber(cb);
+                               if(!f.hasSubscribers()) {
+                                       
if(!temporaryBackgroundFetchersLRU.contains(clear)) {
+                                               f.cancel();
+                                               
backgroundFetchersByClearUSK.remove(clear);
+                                       }
+                               }
+                       }
+               }
+       }
 }

Modified: trunk/freenet/src/freenet/node/Node.java
===================================================================
--- trunk/freenet/src/freenet/node/Node.java    2006-03-24 16:45:06 UTC (rev 
8298)
+++ trunk/freenet/src/freenet/node/Node.java    2006-03-24 18:06:35 UTC (rev 
8299)
@@ -26,9 +26,11 @@
 import java.util.Iterator;

 import freenet.client.ArchiveManager;
+import freenet.client.FetcherContext;
 import freenet.client.HighLevelSimpleClient;
 import freenet.client.HighLevelSimpleClientImpl;
 import freenet.client.async.ClientRequestScheduler;
+import freenet.client.async.RequestScheduler;
 import freenet.client.async.USKManager;
 import freenet.clients.http.FproxyToadlet;
 import freenet.clients.http.SimpleToadletServer;
@@ -454,7 +456,6 @@
     private Node(Config config, RandomSource random) throws NodeInitException {

        // Easy stuff
-       uskManager = new USKManager();
         startupTime = System.currentTimeMillis();
         recentlyCompletedIDs = new LRUQueue();
        this.config = config;
@@ -874,6 +875,8 @@
                sskPutScheduler = new ClientRequestScheduler(true, random, 
sskInsertStarter, this);
                sskInsertStarter.setScheduler(sskPutScheduler);
                sskInsertStarter.start();
+
+               uskManager = new USKManager(this);

                // And finally, Initialize the plugin manager
                pluginManager = new PluginManager(this);

Modified: trunk/freenet/src/freenet/node/Version.java
===================================================================
--- trunk/freenet/src/freenet/node/Version.java 2006-03-24 16:45:06 UTC (rev 
8298)
+++ trunk/freenet/src/freenet/node/Version.java 2006-03-24 18:06:35 UTC (rev 
8299)
@@ -20,7 +20,7 @@
        public static final String protocolVersion = "1.0";

        /** The build number of the current revision */
-       private static final int buildNumber = 561;
+       private static final int buildNumber = 562;

        /** Oldest build of Fred we will talk to */
        private static final int lastGoodBuild = 507;


Reply via email to