Author: toad
Date: 2009-04-18 20:05:16 +0000 (Sat, 18 Apr 2009)
New Revision: 27020
Modified:
trunk/freenet/src/freenet/client/async/USKFetcher.java
Log:
Subscribe to the next 500 keys via KeyListener
Modified: trunk/freenet/src/freenet/client/async/USKFetcher.java
===================================================================
--- trunk/freenet/src/freenet/client/async/USKFetcher.java 2009-04-18
19:54:46 UTC (rev 27019)
+++ trunk/freenet/src/freenet/client/async/USKFetcher.java 2009-04-18
20:05:16 UTC (rev 27020)
@@ -4,6 +4,7 @@
package freenet.client.async;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
@@ -12,11 +13,16 @@
import com.db4o.ObjectContainer;
import freenet.client.FetchContext;
+import freenet.keys.ClientSSK;
import freenet.keys.ClientSSKBlock;
import freenet.keys.FreenetURI;
+import freenet.keys.Key;
+import freenet.keys.KeyBlock;
import freenet.keys.KeyDecodeException;
+import freenet.keys.SSKBlock;
import freenet.keys.USK;
import freenet.node.RequestStarter;
+import freenet.node.SendableGet;
import freenet.support.LogThresholdCallback;
import freenet.support.Logger;
import freenet.support.api.Bucket;
@@ -64,7 +70,7 @@
* - TUKs (when we have TUKs).
* - Passive requests (when we have passive requests).
*/
-public class USKFetcher implements ClientGetState, USKCallback {
+public class USKFetcher implements ClientGetState, USKCallback,
HasKeyListener, KeyListener {
private static volatile boolean logMINOR;
private static volatile boolean logDEBUG;
@@ -107,6 +113,14 @@
private short lastCompressionCodec;
private boolean lastWasMetadata;
+ /** Edition number of the first key in keysWatching */
+ private long firstKeyWatching = -1;
+ /** A list of keys which we are interested in. This a sequence of SSKs
starting
+ * at the last known slot. */
+ private final ArrayList<ClientSSK> keysWatching;
+
+ private static final int WATCH_KEYS = 500;
+
/**
* Callbacks are told when the USKFetcher finishes, and unless
background poll is
* enabled, they are only sent onFoundEdition *once*, on completion.
@@ -254,6 +268,7 @@
this.ctx = ctx;
this.backgroundPoll = pollForever;
this.keepLastData = keepLastData;
+ keysWatching = new ArrayList<ClientSSK>();
}
void onDNF(USKAttempt att, ClientContext context) {
@@ -310,6 +325,7 @@
} else {
uskManager.unsubscribe(origUSK, this);
uskManager.onFinished(this);
+
context.getSskFetchScheduler().schedTransient.removePendingKeys((KeyListener)this);
long ed = uskManager.lookupLatestSlot(origUSK);
USKFetcherCallback[] cb;
synchronized(this) {
@@ -348,6 +364,7 @@
if(completed || cancelled) return;
decode = curLatest >= lastEd && !(dontUpdate && block
== null);
curLatest = Math.max(lastEd, curLatest);
+ fillKeysWatching(curLatest);
if(logMINOR) Logger.minor(this, "Latest: "+curLatest);
long addTo = curLatest + minFailures;
long addFrom = Math.max(lastAddedEdition + 1, curLatest
+ 1);
@@ -510,12 +527,14 @@
}
public void schedule(ObjectContainer container, ClientContext context) {
+
context.getSskFetchScheduler().schedTransient.addPendingKeys(this);
updatePriorities();
uskManager.subscribe(origUSK, this, false, parent.getClient());
USKAttempt[] attempts;
long lookedUp = uskManager.lookupLatestSlot(origUSK);
synchronized(this) {
valueAtSchedule = Math.max(lookedUp, valueAtSchedule);
+ fillKeysWatching(valueAtSchedule);
if(cancelled) return;
long startPoint = Math.max(origUSK.suggestedEdition,
valueAtSchedule);
for(long i=startPoint;i<startPoint+minFailures;i++)
@@ -542,6 +561,7 @@
public void cancel(ObjectContainer container, ClientContext context) {
uskManager.unsubscribe(origUSK, this);
+
context.getSskFetchScheduler().schedTransient.removePendingKeys((KeyListener)this);
assert(container == null);
USKAttempt[] attempts;
uskManager.onFinished(this);
@@ -682,6 +702,7 @@
if(completed || cancelled) return;
decode = lastEd >= ed && data != null;
ed = Math.max(lastEd, ed);
+ fillKeysWatching(ed);
if(logMINOR) Logger.minor(this, "Latest: "+ed);
long addTo = ed + minFailures;
long addFrom = Math.max(lastAddedEdition + 1, ed + 1);
@@ -736,4 +757,115 @@
}
}
+ private synchronized void fillKeysWatching(long ed) {
+// if(firstKeyWatching == -1) {
+// firstKeyWatching = ed;
+// for(int i=0;i<WATCH_KEYS;i++) {
+// keysWatching.add(origUSK.getSSK(ed + i));
+// }
+// } else {
+// long first = firstKeyWatching;
+// long last = firstKeyWatching + keysWatching.size() - 1;
+// if(last < ed) {
+ keysWatching.clear();
+ for(int i=0;i<WATCH_KEYS;i++) {
+ keysWatching.add(origUSK.getSSK(ed +
i));
+ }
+// } else {
+// int drop = (int) (ed - first);
+// ClientSSK[] keep = new
ClientSSK[keysWatching.size() - drop];
+// for(int i=drop;i<keysWatching.size();i++)
+// keep[i-drop] = keysWatching.get(i);
+// keysWatching.clear();
+// for(ClientSSK ssk : keep)
+// keysWatching.add(ssk);
+// for(long l = last + 1; l < (ed + WATCH_KEYS);
l++) {
+// keysWatching.add(origUSK.getSSK(l));
+// }
+// }
+ firstKeyWatching = ed;
+// }
+ }
+
+ public synchronized boolean isCancelled(ObjectContainer container) {
+ return completed || cancelled;
+ }
+
+ public KeyListener makeKeyListener(ObjectContainer container,
ClientContext context) throws KeyListenerConstructionException {
+ return this;
+ }
+
+ public void onFailed(KeyListenerConstructionException e,
ObjectContainer container, ClientContext context) {
+ Logger.error(this, "Failed to construct KeyListener on
USKFetcher: "+e, e);
+ }
+
+ public synchronized long countKeys() {
+ return keysWatching.size();
+ }
+
+ public synchronized short definitelyWantKey(Key key, byte[] saltedKey,
ObjectContainer container, ClientContext context) {
+ for(ClientSSK ssk : keysWatching)
+ if(ssk.getNodeKey().equals(key)) return
progressPollPriority;
+ return -1;
+ }
+
+ public boolean dontCache() {
+ return !ctx.cacheLocalRequests;
+ }
+
+ public HasKeyListener getHasKeyListener() {
+ return this;
+ }
+
+ public short getPriorityClass(ObjectContainer container) {
+ return progressPollPriority;
+ }
+
+ public SendableGet[] getRequestsForKey(Key key, byte[] saltedKey,
ObjectContainer container, ClientContext context) {
+ return new SendableGet[0];
+ }
+
+ public boolean handleBlock(Key key, byte[] saltedKey, KeyBlock found,
ObjectContainer container, ClientContext context) {
+ if(!(found instanceof SSKBlock)) return false;
+ ClientSSK realKey = null;
+ long edition = -1;
+ synchronized(this) {
+ for(int i=0;i<keysWatching.size();i++) {
+ ClientSSK ssk = keysWatching.get(i);
+ if(ssk.getNodeKey().equals(key)) {
+ realKey = ssk;
+ edition = firstKeyWatching + i;
+ break;
+ }
+ }
+ if(realKey == null) return false;
+ }
+ // FIXME remove
+ assert(edition ==
realKey.getURI().uskForSSK().getSuggestedEdition());
+ onFoundEdition(edition, origUSK, container, context, false,
(short)-1, null, false, false);
+ return true;
+ }
+
+ public synchronized boolean isEmpty() {
+ return cancelled || completed;
+ }
+
+ public boolean isSSK() {
+ return true;
+ }
+
+ public void onRemove() {
+ // Ignore
+ }
+
+ public boolean persistent() {
+ return false;
+ }
+
+ public synchronized boolean probablyWantKey(Key key, byte[] saltedKey) {
+ for(ClientSSK ssk : keysWatching)
+ if(ssk.getNodeKey().equals(key)) return true;
+ return false;
+ }
+
}
_______________________________________________
cvs mailing list
[email protected]
http://emu.freenetproject.org/cgi-bin/mailman/listinfo/cvs