Author: toad
Date: 2009-04-18 20:05:16 +0000 (Sat, 18 Apr 2009)
New Revision: 27020

Modified:
   trunk/freenet/src/freenet/client/async/USKFetcher.java
Log:
Subscribe to the next 500 keys via KeyListener


Modified: trunk/freenet/src/freenet/client/async/USKFetcher.java
===================================================================
--- trunk/freenet/src/freenet/client/async/USKFetcher.java      2009-04-18 
19:54:46 UTC (rev 27019)
+++ trunk/freenet/src/freenet/client/async/USKFetcher.java      2009-04-18 
20:05:16 UTC (rev 27020)
@@ -4,6 +4,7 @@
 package freenet.client.async;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedList;
@@ -12,11 +13,16 @@
 import com.db4o.ObjectContainer;
 
 import freenet.client.FetchContext;
+import freenet.keys.ClientSSK;
 import freenet.keys.ClientSSKBlock;
 import freenet.keys.FreenetURI;
+import freenet.keys.Key;
+import freenet.keys.KeyBlock;
 import freenet.keys.KeyDecodeException;
+import freenet.keys.SSKBlock;
 import freenet.keys.USK;
 import freenet.node.RequestStarter;
+import freenet.node.SendableGet;
 import freenet.support.LogThresholdCallback;
 import freenet.support.Logger;
 import freenet.support.api.Bucket;
@@ -64,7 +70,7 @@
  * - TUKs (when we have TUKs).
  * - Passive requests (when we have passive requests).
  */
-public class USKFetcher implements ClientGetState, USKCallback {
+public class USKFetcher implements ClientGetState, USKCallback, 
HasKeyListener, KeyListener {
     private static volatile boolean logMINOR;
     private static volatile boolean logDEBUG;
 
@@ -107,6 +113,14 @@
        private short lastCompressionCodec;
        private boolean lastWasMetadata;
        
+       /** Edition number of the first key in keysWatching */
+       private long firstKeyWatching = -1;
+       /** A list of keys which we are interested in. This a sequence of SSKs 
starting 
+        * at the last known slot. */
+       private final ArrayList<ClientSSK> keysWatching;
+       
+       private static final int WATCH_KEYS = 500;
+       
        /**
         * Callbacks are told when the USKFetcher finishes, and unless 
background poll is
         * enabled, they are only sent onFoundEdition *once*, on completion.
@@ -254,6 +268,7 @@
                this.ctx = ctx;
                this.backgroundPoll = pollForever;
                this.keepLastData = keepLastData;
+               keysWatching = new ArrayList<ClientSSK>();
        }
        
        void onDNF(USKAttempt att, ClientContext context) {
@@ -310,6 +325,7 @@
                } else {
                        uskManager.unsubscribe(origUSK, this);
                        uskManager.onFinished(this);
+                       
context.getSskFetchScheduler().schedTransient.removePendingKeys((KeyListener)this);
                        long ed = uskManager.lookupLatestSlot(origUSK);
                        USKFetcherCallback[] cb;
                        synchronized(this) {
@@ -348,6 +364,7 @@
                        if(completed || cancelled) return;
                        decode = curLatest >= lastEd && !(dontUpdate && block 
== null);
                        curLatest = Math.max(lastEd, curLatest);
+                       fillKeysWatching(curLatest);
                        if(logMINOR) Logger.minor(this, "Latest: "+curLatest);
                        long addTo = curLatest + minFailures;
                        long addFrom = Math.max(lastAddedEdition + 1, curLatest 
+ 1);
@@ -510,12 +527,14 @@
        }
     
        public void schedule(ObjectContainer container, ClientContext context) {
+               
context.getSskFetchScheduler().schedTransient.addPendingKeys(this);
                updatePriorities();
                uskManager.subscribe(origUSK, this, false, parent.getClient());
                USKAttempt[] attempts;
                long lookedUp = uskManager.lookupLatestSlot(origUSK);
                synchronized(this) {
                        valueAtSchedule = Math.max(lookedUp, valueAtSchedule);
+                       fillKeysWatching(valueAtSchedule);
                        if(cancelled) return;
                        long startPoint = Math.max(origUSK.suggestedEdition, 
valueAtSchedule);
                        for(long i=startPoint;i<startPoint+minFailures;i++)
@@ -542,6 +561,7 @@
 
        public void cancel(ObjectContainer container, ClientContext context) {
                uskManager.unsubscribe(origUSK, this);
+               
context.getSskFetchScheduler().schedTransient.removePendingKeys((KeyListener)this);
                assert(container == null);
                USKAttempt[] attempts;
                uskManager.onFinished(this);
@@ -682,6 +702,7 @@
                        if(completed || cancelled) return;
                        decode = lastEd >= ed && data != null;
                        ed = Math.max(lastEd, ed);
+                       fillKeysWatching(ed);
                        if(logMINOR) Logger.minor(this, "Latest: "+ed);
                        long addTo = ed + minFailures;
                        long addFrom = Math.max(lastAddedEdition + 1, ed + 1);
@@ -736,4 +757,115 @@
                }
        }
 
+       private synchronized void fillKeysWatching(long ed) {
+//             if(firstKeyWatching == -1) {
+//                     firstKeyWatching = ed;
+//                     for(int i=0;i<WATCH_KEYS;i++) {
+//                             keysWatching.add(origUSK.getSSK(ed + i));
+//                     }
+//             } else {
+//                     long first = firstKeyWatching;
+//                     long last = firstKeyWatching + keysWatching.size() - 1;
+//                     if(last < ed) {
+                               keysWatching.clear();
+                               for(int i=0;i<WATCH_KEYS;i++) {
+                                       keysWatching.add(origUSK.getSSK(ed + 
i));
+                               }
+//                     } else {
+//                             int drop = (int) (ed - first);
+//                             ClientSSK[] keep = new 
ClientSSK[keysWatching.size() - drop];
+//                             for(int i=drop;i<keysWatching.size();i++)
+//                                     keep[i-drop] = keysWatching.get(i);
+//                             keysWatching.clear();
+//                             for(ClientSSK ssk : keep)
+//                                     keysWatching.add(ssk);
+//                             for(long l = last + 1; l < (ed + WATCH_KEYS); 
l++) {
+//                                     keysWatching.add(origUSK.getSSK(l));
+//                             }
+//                     }
+                       firstKeyWatching = ed;
+//             }
+       }
+
+       public synchronized boolean isCancelled(ObjectContainer container) {
+               return completed || cancelled;
+       }
+
+       public KeyListener makeKeyListener(ObjectContainer container, 
ClientContext context) throws KeyListenerConstructionException {
+               return this;
+       }
+
+       public void onFailed(KeyListenerConstructionException e, 
ObjectContainer container, ClientContext context) {
+               Logger.error(this, "Failed to construct KeyListener on 
USKFetcher: "+e, e);
+       }
+
+       public synchronized long countKeys() {
+               return keysWatching.size();
+       }
+
+       public synchronized short definitelyWantKey(Key key, byte[] saltedKey, 
ObjectContainer container, ClientContext context) {
+               for(ClientSSK ssk : keysWatching)
+                       if(ssk.getNodeKey().equals(key)) return 
progressPollPriority;
+               return -1;
+       }
+
+       public boolean dontCache() {
+               return !ctx.cacheLocalRequests;
+       }
+
+       public HasKeyListener getHasKeyListener() {
+               return this;
+       }
+
+       public short getPriorityClass(ObjectContainer container) {
+               return progressPollPriority;
+       }
+
+       public SendableGet[] getRequestsForKey(Key key, byte[] saltedKey, 
ObjectContainer container, ClientContext context) {
+               return new SendableGet[0];
+       }
+
+       public boolean handleBlock(Key key, byte[] saltedKey, KeyBlock found, 
ObjectContainer container, ClientContext context) {
+               if(!(found instanceof SSKBlock)) return false;
+               ClientSSK realKey = null;
+               long edition = -1;
+               synchronized(this) {
+                       for(int i=0;i<keysWatching.size();i++) {
+                               ClientSSK ssk = keysWatching.get(i);
+                               if(ssk.getNodeKey().equals(key)) {
+                                       realKey = ssk;
+                                       edition = firstKeyWatching + i;
+                                       break;
+                               }
+                       }
+                       if(realKey == null) return false;
+               }
+               // FIXME remove
+               assert(edition == 
realKey.getURI().uskForSSK().getSuggestedEdition());
+               onFoundEdition(edition, origUSK, container, context, false, 
(short)-1, null, false, false);
+               return true;
+       }
+
+       public synchronized boolean isEmpty() {
+               return cancelled || completed;
+       }
+
+       public boolean isSSK() {
+               return true;
+       }
+
+       public void onRemove() {
+               // Ignore
+       }
+
+       public boolean persistent() {
+               return false;
+       }
+
+       public synchronized boolean probablyWantKey(Key key, byte[] saltedKey) {
+               for(ClientSSK ssk : keysWatching)
+                       if(ssk.getNodeKey().equals(key)) return true;
+               return false;
+       }
+
 }

_______________________________________________
cvs mailing list
[email protected]
http://emu.freenetproject.org/cgi-bin/mailman/listinfo/cvs

Reply via email to