Author: toad
Date: 2007-06-22 18:40:46 +0000 (Fri, 22 Jun 2007)
New Revision: 13711
Modified:
trunk/freenet/src/freenet/client/async/BaseSingleFileFetcher.java
trunk/freenet/src/freenet/client/async/ClientRequestScheduler.java
trunk/freenet/src/freenet/client/async/SimpleSingleFileFetcher.java
trunk/freenet/src/freenet/client/async/SingleFileFetcher.java
trunk/freenet/src/freenet/client/async/SplitFileFetcherSubSegment.java
trunk/freenet/src/freenet/client/async/USKChecker.java
trunk/freenet/src/freenet/node/Node.java
trunk/freenet/src/freenet/node/SendableGet.java
Log:
Back-door coalescing: When a key is written to the datastore, notify all
clients interested in it.
Modified: trunk/freenet/src/freenet/client/async/BaseSingleFileFetcher.java
===================================================================
--- trunk/freenet/src/freenet/client/async/BaseSingleFileFetcher.java
2007-06-22 17:16:57 UTC (rev 13710)
+++ trunk/freenet/src/freenet/client/async/BaseSingleFileFetcher.java
2007-06-22 18:40:46 UTC (rev 13711)
@@ -6,6 +6,9 @@
import freenet.client.FetchContext;
import freenet.keys.ClientKey;
import freenet.keys.ClientSSK;
+import freenet.keys.Key;
+import freenet.keys.KeyBlock;
+import freenet.keys.KeyVerifyException;
import freenet.node.SendableGet;
import freenet.support.Logger;
import freenet.support.RandomGrabArray;
@@ -101,4 +104,20 @@
return true;
}
+ public void onGotKey(Key key, KeyBlock block) {
+ synchronized(this) {
+ if(isCancelled()) return;
+ if(!key.equals(this.key.getNodeKey())) {
+ Logger.normal(this, "Got sent key "+key+" but
want "+this.key+" for "+this);
+ return;
+ }
+ }
+ try {
+ onSuccess(Key.createKeyBlock(this.key, block), false,
0);
+ } catch (KeyVerifyException e) {
+ Logger.error(this, "onGotKey("+key+","+block+") got
"+e+" for "+this, e);
+ // FIXME if we get rid of the direct route this must
call onFailure()
+ }
+ }
+
}
Modified: trunk/freenet/src/freenet/client/async/ClientRequestScheduler.java
===================================================================
--- trunk/freenet/src/freenet/client/async/ClientRequestScheduler.java
2007-06-22 17:16:57 UTC (rev 13710)
+++ trunk/freenet/src/freenet/client/async/ClientRequestScheduler.java
2007-06-22 18:40:46 UTC (rev 13711)
@@ -14,6 +14,8 @@
import freenet.crypt.RandomSource;
import freenet.keys.ClientKey;
import freenet.keys.ClientKeyBlock;
+import freenet.keys.Key;
+import freenet.keys.KeyBlock;
import freenet.keys.KeyVerifyException;
import freenet.node.LowLevelGetException;
import freenet.node.Node;
@@ -24,8 +26,8 @@
import freenet.node.SendableRequest;
import freenet.support.Logger;
import freenet.support.RandomGrabArray;
+import freenet.support.SectoredRandomGrabArrayWithInt;
import freenet.support.SectoredRandomGrabArrayWithObject;
-import freenet.support.SectoredRandomGrabArrayWithInt;
import freenet.support.SortedVectorByNumber;
import freenet.support.api.StringCallback;
@@ -94,6 +96,11 @@
public final String name;
private final LinkedList /* <WeakReference <RandomGrabArray> > */
recentSuccesses = new LinkedList();
+ /** All pending gets by key. Used to automatically satisfy pending
requests when either the key is fetched by
+ * an overlapping request, or it is fetched by a request from another
node. Operations on this are synchronized on
+ * itself. */
+ private final HashMap /* <Key, SendableGet[]> */ pendingKeys;
+
public static final String PRIORITY_NONE = "NONE";
public static final String PRIORITY_SOFT = "SOFT";
public static final String PRIORITY_HARD = "HARD";
@@ -159,6 +166,10 @@
this.isSSKScheduler = forSSKs;
priorities = new
SortedVectorByNumber[RequestStarter.NUMBER_OF_PRIORITY_CLASSES];
allRequestsByClientRequest = new HashMap();
+ if(forInserts)
+ pendingKeys = null;
+ else
+ pendingKeys = new HashMap();
this.name = name;
sc.register(name+"_priority_policy", PRIORITY_HARD,
name.hashCode(), true, false,
@@ -202,6 +213,29 @@
block =
getter.getContext().blocks.get(key);
if(block == null)
block =
node.fetchKey(key, getter.dontCache());
+ if(block == null) {
+ Key nodeKey =
key.getNodeKey();
+
synchronized(pendingKeys) {
+
SendableGet[] gets = (SendableGet[]) pendingKeys.get(nodeKey);
+ if(gets
== null) {
+
pendingKeys.put(nodeKey, new SendableGet[] { getter });
+ } else {
+
boolean found = false;
+
for(int j=0;j<gets.length;j++) {
+
if(gets[j] == getter) {
+
found = true;
+
break;
+
}
+
}
+
if(!found) {
+
SendableGet[] newGets = new SendableGet[gets.length+1];
+
System.arraycopy(gets, 0, newGets, 0, gets.length);
+
newGets[gets.length] = getter;
+
pendingKeys.put(nodeKey, newGets);
+
}
+ }
+ }
+ }
}
} catch (KeyVerifyException e) {
// Verify exception, probably
bogus at source;
@@ -388,6 +422,9 @@
allRequestsByClientRequest.remove(cr);
if(logMINOR) Logger.minor(this,
"Removed from HashSet for "+cr+" which now has "+v.size()+" elements");
}
+ if(!isInsertScheduler) {
+ removePendingKeys((SendableGet)
req, true);
+ }
}
if(logMINOR) Logger.minor(this, "removeFirst()
returning "+req);
return req;
@@ -397,6 +434,53 @@
return null;
}
+ /**
+ * Remove a SendableGet from the list of getters we maintain for each
key, indicating that we are no longer interested
+ * in that key.
+ * @param getter
+ * @param complain
+ */
+ public void removePendingKeys(SendableGet getter, boolean complain) {
+ int[] keyTokens = getter.allKeys();
+ for(int i=0;i<keyTokens.length;i++) {
+ int tok = keyTokens[i];
+ Key key = getter.getKey(tok).getNodeKey();
+ synchronized(pendingKeys) {
+ SendableGet[] gets = (SendableGet[])
pendingKeys.get(key);
+ if(gets == null) {
+ if(complain)
+ Logger.error(this, "Not found:
"+getter+" for "+key+" removing (no such key)");
+ } else if(gets.length > 1) {
+ SendableGet[] newGets = new
SendableGet[gets.length-1];
+ boolean found = false;
+ int x = 0;
+ for(int j=0;j<gets.length;j++) {
+ if(j >= gets.length) {
+ if(!found) {
+ if(complain)
+
Logger.error(this, "Not found: "+getter+" for "+key+" removing ("+gets.length+"
getters)");
+ return; // not
here
+ }
+ if(gets[j] == getter ||
gets[j] == null || gets[j].isCancelled()) continue;
+ newGets[x++] = gets[j];
+ }
+ }
+ if(x != gets.length-1) {
+ SendableGet[] newNewGets = new
SendableGet[x];
+ System.arraycopy(newGets, 0,
newNewGets, 0, x);
+ newGets = newNewGets;
+ }
+ pendingKeys.put(key, newGets);
+ } else if(gets.length == 1 && gets[0] ==
getter) {
+ pendingKeys.remove(key);
+ } else if(gets.length == 1 && gets[0] !=
getter) {
+ if(complain)
+ Logger.error(this, "Not found:
"+getter+" for "+key+" removing (1 getter)");
+ }
+ }
+ }
+ }
+
public void reregisterAll(ClientRequester request) {
SendableRequest[] reqs;
synchronized(this) {
@@ -429,4 +513,20 @@
recentSuccesses.removeLast();
}
}
+
+ public void tripPendingKey(final KeyBlock block) {
+ final Key key = block.getKey();
+ final SendableGet[] gets;
+ synchronized(pendingKeys) {
+ gets = (SendableGet[]) pendingKeys.get(key);
+ }
+ Runnable r = new Runnable() {
+ public void run() {
+ for(int i=0;i<gets.length;i++) {
+ gets[i].onGotKey(key, block);
+ }
+ }
+ };
+ node.getTicker().queueTimedJob(r, 0); // FIXME ideally these
would be completed on a single thread; when we have 1.5, use a dedicated
non-parallel Executor
+ }
}
Modified: trunk/freenet/src/freenet/client/async/SimpleSingleFileFetcher.java
===================================================================
--- trunk/freenet/src/freenet/client/async/SimpleSingleFileFetcher.java
2007-06-22 17:16:57 UTC (rev 13710)
+++ trunk/freenet/src/freenet/client/async/SimpleSingleFileFetcher.java
2007-06-22 18:40:46 UTC (rev 13711)
@@ -95,6 +95,7 @@
}
}
// :(
+ getScheduler().removePendingKeys(this, false);
if(e.isFatal() || forceFatal)
parent.fatallyFailedBlock();
else
@@ -109,6 +110,7 @@
onFailure(new FetchException(FetchException.CANCELLED));
return;
}
+ getScheduler().removePendingKeys(this, false);
rcb.onSuccess(data, this);
}
Modified: trunk/freenet/src/freenet/client/async/SingleFileFetcher.java
===================================================================
--- trunk/freenet/src/freenet/client/async/SingleFileFetcher.java
2007-06-22 17:16:57 UTC (rev 13710)
+++ trunk/freenet/src/freenet/client/async/SingleFileFetcher.java
2007-06-22 18:40:46 UTC (rev 13711)
@@ -177,6 +177,7 @@
}
protected void onSuccess(FetchResult result) {
+ getScheduler().removePendingKeys(this, false);
if(parent.isCancelled()) {
if(logMINOR)
Logger.minor(this, "Parent is cancelled");
Modified: trunk/freenet/src/freenet/client/async/SplitFileFetcherSubSegment.java
===================================================================
--- trunk/freenet/src/freenet/client/async/SplitFileFetcherSubSegment.java
2007-06-22 17:16:57 UTC (rev 13710)
+++ trunk/freenet/src/freenet/client/async/SplitFileFetcherSubSegment.java
2007-06-22 18:40:46 UTC (rev 13711)
@@ -7,7 +7,10 @@
import freenet.client.FetchException;
import freenet.keys.ClientKey;
import freenet.keys.ClientKeyBlock;
+import freenet.keys.Key;
+import freenet.keys.KeyBlock;
import freenet.keys.KeyDecodeException;
+import freenet.keys.KeyVerifyException;
import freenet.keys.TooBigException;
import freenet.node.LowLevelGetException;
import freenet.node.SendableGet;
@@ -253,9 +256,36 @@
return
super.toString()+":"+retryCount+"/"+segment+'('+blockNums.size()+')';
}
- public synchronized void possiblyRemoveFromParent() {
- if(blockNums.isEmpty())
- segment.removeSeg(this);
+ public void possiblyRemoveFromParent() {
+ synchronized(this) {
+ if(blockNums.isEmpty())
+ segment.removeSeg(this);
+ }
+ getScheduler().removePendingKeys(this, false);
}
+
+ public void onGotKey(Key key, KeyBlock block) {
+ int blockNum = -1;
+ ClientKey ckey = null;
+ synchronized(this) {
+ for(int i=0;i<blockNums.size();i++) {
+ int num =
((Integer)blockNums.get(i)).intValue();
+ ckey = segment.getBlockKey(num);
+ Key k = ckey.getNodeKey();
+ if(k.equals(key)) {
+ blockNum = num;
+ blockNums.remove(i);
+ break;
+ }
+ }
+ }
+ if(blockNum == -1) return;
+ try {
+ onSuccess(Key.createKeyBlock(ckey, block), false,
blockNum);
+ } catch (KeyVerifyException e) {
+ // FIXME if we ever abolish the direct route, this must
be turned into an onFailure().
+ Logger.error(this, "Failed to parse in
onGotKey("+key+","+block+") - believed to be "+ckey+" (block #"+blockNum+")");
+ }
+ }
}
Modified: trunk/freenet/src/freenet/client/async/USKChecker.java
===================================================================
--- trunk/freenet/src/freenet/client/async/USKChecker.java 2007-06-22
17:16:57 UTC (rev 13710)
+++ trunk/freenet/src/freenet/client/async/USKChecker.java 2007-06-22
18:40:46 UTC (rev 13711)
@@ -26,6 +26,7 @@
}
public void onSuccess(ClientKeyBlock block, boolean fromStore, int
token) {
+ getScheduler().removePendingKeys(this, false);
cb.onSuccess((ClientSSKBlock)block);
}
@@ -61,7 +62,7 @@
if(canRetry && retry()) return;
// Ran out of retries.
-
+ getScheduler().removePendingKeys(this, false);
if(e.code == LowLevelGetException.CANCELLED){
cb.onCancelled();
return;
Modified: trunk/freenet/src/freenet/node/Node.java
===================================================================
--- trunk/freenet/src/freenet/node/Node.java 2007-06-22 17:16:57 UTC (rev
13710)
+++ trunk/freenet/src/freenet/node/Node.java 2007-06-22 18:40:46 UTC (rev
13711)
@@ -2073,6 +2073,8 @@
chkDatastore.put(block);
}
chkDatacache.put(block);
+ if(clientCore != null && clientCore.requestStarters !=
null)
+
clientCore.requestStarters.chkFetchScheduler.tripPendingKey(block);
} catch (IOException e) {
Logger.error(this, "Cannot store data: "+e, e);
}
@@ -2096,6 +2098,9 @@
}
sskDatacache.put(block, false);
cacheKey(((NodeSSK)block.getKey()).getPubKeyHash(),
((NodeSSK)block.getKey()).getPubKey(), deep);
+ if(clientCore != null && clientCore.requestStarters !=
null)
+
clientCore.requestStarters.sskFetchScheduler.tripPendingKey(block);
+
} catch (IOException e) {
Logger.error(this, "Cannot store data: "+e, e);
}
Modified: trunk/freenet/src/freenet/node/SendableGet.java
===================================================================
--- trunk/freenet/src/freenet/node/SendableGet.java 2007-06-22 17:16:57 UTC
(rev 13710)
+++ trunk/freenet/src/freenet/node/SendableGet.java 2007-06-22 18:40:46 UTC
(rev 13711)
@@ -4,9 +4,12 @@
package freenet.node;
import freenet.client.FetchContext;
+import freenet.client.async.ClientRequestScheduler;
import freenet.client.async.ClientRequester;
import freenet.keys.ClientKey;
import freenet.keys.ClientKeyBlock;
+import freenet.keys.Key;
+import freenet.keys.KeyBlock;
import freenet.support.Logger;
/**
@@ -102,10 +105,16 @@
public void schedule() {
if(Logger.shouldLog(Logger.MINOR, this))
Logger.minor(this, "Scheduling "+this);
+ getScheduler().register(this);
+ }
+
+ public ClientRequestScheduler getScheduler() {
if(isSSK())
- parent.sskScheduler.register(this);
+ return parent.sskScheduler;
else
- parent.chkScheduler.register(this);
+ return parent.chkScheduler;
}
+ public abstract void onGotKey(Key key, KeyBlock block);
+
}