Author: toad
Date: 2007-06-22 18:40:46 +0000 (Fri, 22 Jun 2007)
New Revision: 13711

Modified:
   trunk/freenet/src/freenet/client/async/BaseSingleFileFetcher.java
   trunk/freenet/src/freenet/client/async/ClientRequestScheduler.java
   trunk/freenet/src/freenet/client/async/SimpleSingleFileFetcher.java
   trunk/freenet/src/freenet/client/async/SingleFileFetcher.java
   trunk/freenet/src/freenet/client/async/SplitFileFetcherSubSegment.java
   trunk/freenet/src/freenet/client/async/USKChecker.java
   trunk/freenet/src/freenet/node/Node.java
   trunk/freenet/src/freenet/node/SendableGet.java
Log:
Back-door coalescing: When a key is written to the datastore, notify all 
clients interested in it.

Modified: trunk/freenet/src/freenet/client/async/BaseSingleFileFetcher.java
===================================================================
--- trunk/freenet/src/freenet/client/async/BaseSingleFileFetcher.java   
2007-06-22 17:16:57 UTC (rev 13710)
+++ trunk/freenet/src/freenet/client/async/BaseSingleFileFetcher.java   
2007-06-22 18:40:46 UTC (rev 13711)
@@ -6,6 +6,9 @@
 import freenet.client.FetchContext;
 import freenet.keys.ClientKey;
 import freenet.keys.ClientSSK;
+import freenet.keys.Key;
+import freenet.keys.KeyBlock;
+import freenet.keys.KeyVerifyException;
 import freenet.node.SendableGet;
 import freenet.support.Logger;
 import freenet.support.RandomGrabArray;
@@ -101,4 +104,20 @@
                return true;
        }

+       public void onGotKey(Key key, KeyBlock block) {
+               synchronized(this) {
+                       if(isCancelled()) return;
+                       if(!key.equals(this.key.getNodeKey())) {
+                               Logger.normal(this, "Got sent key "+key+" but 
want "+this.key+" for "+this);
+                               return;
+                       }
+               }
+               try {
+                       onSuccess(Key.createKeyBlock(this.key, block), false, 
0);
+               } catch (KeyVerifyException e) {
+                       Logger.error(this, "onGotKey("+key+","+block+") got 
"+e+" for "+this, e);
+                       // FIXME if we get rid of the direct route this must 
call onFailure()
+               }
+       }
+       
 }

Modified: trunk/freenet/src/freenet/client/async/ClientRequestScheduler.java
===================================================================
--- trunk/freenet/src/freenet/client/async/ClientRequestScheduler.java  
2007-06-22 17:16:57 UTC (rev 13710)
+++ trunk/freenet/src/freenet/client/async/ClientRequestScheduler.java  
2007-06-22 18:40:46 UTC (rev 13711)
@@ -14,6 +14,8 @@
 import freenet.crypt.RandomSource;
 import freenet.keys.ClientKey;
 import freenet.keys.ClientKeyBlock;
+import freenet.keys.Key;
+import freenet.keys.KeyBlock;
 import freenet.keys.KeyVerifyException;
 import freenet.node.LowLevelGetException;
 import freenet.node.Node;
@@ -24,8 +26,8 @@
 import freenet.node.SendableRequest;
 import freenet.support.Logger;
 import freenet.support.RandomGrabArray;
+import freenet.support.SectoredRandomGrabArrayWithInt;
 import freenet.support.SectoredRandomGrabArrayWithObject;
-import freenet.support.SectoredRandomGrabArrayWithInt;
 import freenet.support.SortedVectorByNumber;
 import freenet.support.api.StringCallback;

@@ -94,6 +96,11 @@
        public final String name;
        private final LinkedList /* <WeakReference <RandomGrabArray> > */ 
recentSuccesses = new LinkedList();

+       /** All pending gets by key. Used to automatically satisfy pending 
requests when either the key is fetched by
+        * an overlapping request, or it is fetched by a request from another 
node. Operations on this are synchronized on
+        * itself. */
+       private final HashMap /* <Key, SendableGet[]> */ pendingKeys;
+       
        public static final String PRIORITY_NONE = "NONE";
        public static final String PRIORITY_SOFT = "SOFT";
        public static final String PRIORITY_HARD = "HARD";
@@ -159,6 +166,10 @@
                this.isSSKScheduler = forSSKs;
                priorities = new 
SortedVectorByNumber[RequestStarter.NUMBER_OF_PRIORITY_CLASSES];
                allRequestsByClientRequest = new HashMap();
+               if(forInserts)
+                       pendingKeys = null;
+               else
+                       pendingKeys = new HashMap();

                this.name = name;
                sc.register(name+"_priority_policy", PRIORITY_HARD, 
name.hashCode(), true, false,
@@ -202,6 +213,29 @@
                                                                block = 
getter.getContext().blocks.get(key);
                                                        if(block == null)
                                                                block = 
node.fetchKey(key, getter.dontCache());
+                                                       if(block == null) {
+                                                               Key nodeKey = 
key.getNodeKey();
+                                                               
synchronized(pendingKeys) {
+                                                                       
SendableGet[] gets = (SendableGet[]) pendingKeys.get(nodeKey);
+                                                                       if(gets 
== null) {
+                                                                               
pendingKeys.put(nodeKey, new SendableGet[] { getter });
+                                                                       } else {
+                                                                               
boolean found = false;
+                                                                               
for(int j=0;j<gets.length;j++) {
+                                                                               
        if(gets[j] == getter) {
+                                                                               
                found = true;
+                                                                               
                break;
+                                                                               
        }
+                                                                               
}
+                                                                               
if(!found) {
+                                                                               
        SendableGet[] newGets = new SendableGet[gets.length+1];
+                                                                               
        System.arraycopy(gets, 0, newGets, 0, gets.length);
+                                                                               
        newGets[gets.length] = getter;
+                                                                               
        pendingKeys.put(nodeKey, newGets);
+                                                                               
}
+                                                                       }
+                                                               }
+                                                       }
                                                }
                                        } catch (KeyVerifyException e) {
                                                // Verify exception, probably 
bogus at source;
@@ -388,6 +422,9 @@
                                                        
allRequestsByClientRequest.remove(cr);
                                                if(logMINOR) Logger.minor(this, 
"Removed from HashSet for "+cr+" which now has "+v.size()+" elements");
                                        }
+                                       if(!isInsertScheduler) {
+                                               removePendingKeys((SendableGet) 
req, true);
+                                       }
                                }
                                if(logMINOR) Logger.minor(this, "removeFirst() 
returning "+req);
                                return req;
@@ -397,6 +434,53 @@
                return null;
        }

+       /**
+        * Remove a SendableGet from the list of getters we maintain for each 
key, indicating that we are no longer interested
+        * in that key.
+        * @param getter
+        * @param complain
+        */
+       public void removePendingKeys(SendableGet getter, boolean complain) {
+               int[] keyTokens = getter.allKeys();
+               for(int i=0;i<keyTokens.length;i++) {
+                       int tok = keyTokens[i];
+                       Key key = getter.getKey(tok).getNodeKey();
+                       synchronized(pendingKeys) {
+                               SendableGet[] gets = (SendableGet[]) 
pendingKeys.get(key);
+                               if(gets == null) {
+                                       if(complain)
+                                               Logger.error(this, "Not found: 
"+getter+" for "+key+" removing (no such key)");
+                               } else if(gets.length > 1) {
+                                       SendableGet[] newGets = new 
SendableGet[gets.length-1];
+                                       boolean found = false;
+                                       int x = 0;
+                                       for(int j=0;j<gets.length;j++) {
+                                               if(j >= gets.length) {
+                                                       if(!found) {
+                                                               if(complain)
+                                                                       
Logger.error(this, "Not found: "+getter+" for "+key+" removing ("+gets.length+" 
getters)");
+                                                               return; // not 
here
+                                                       }
+                                                       if(gets[j] == getter || 
gets[j] == null || gets[j].isCancelled()) continue;
+                                                       newGets[x++] = gets[j];
+                                               }
+                                       }
+                                       if(x != gets.length-1) {
+                                               SendableGet[] newNewGets = new 
SendableGet[x];
+                                               System.arraycopy(newGets, 0, 
newNewGets, 0, x);
+                                               newGets = newNewGets;
+                                       }
+                                       pendingKeys.put(key, newGets);
+                               } else if(gets.length == 1 && gets[0] == 
getter) {
+                                       pendingKeys.remove(key);
+                               } else if(gets.length == 1 && gets[0] != 
getter) {
+                                       if(complain)
+                                               Logger.error(this, "Not found: 
"+getter+" for "+key+" removing (1 getter)");
+                               }
+                       }
+               }
+       }
+
        public void reregisterAll(ClientRequester request) {
                SendableRequest[] reqs;
                synchronized(this) {
@@ -429,4 +513,20 @@
                                recentSuccesses.removeLast();
                }
        }
+
+       public void tripPendingKey(final KeyBlock block) {
+               final Key key = block.getKey();
+               final SendableGet[] gets;
+               synchronized(pendingKeys) {
+                       gets = (SendableGet[]) pendingKeys.get(key);
+               }
+               Runnable r = new Runnable() {
+                       public void run() {
+                               for(int i=0;i<gets.length;i++) {
+                                       gets[i].onGotKey(key, block);
+                               }
+                       }
+               };
+               node.getTicker().queueTimedJob(r, 0); // FIXME ideally these 
would be completed on a single thread; when we have 1.5, use a dedicated 
non-parallel Executor
+       }
 }

Modified: trunk/freenet/src/freenet/client/async/SimpleSingleFileFetcher.java
===================================================================
--- trunk/freenet/src/freenet/client/async/SimpleSingleFileFetcher.java 
2007-06-22 17:16:57 UTC (rev 13710)
+++ trunk/freenet/src/freenet/client/async/SimpleSingleFileFetcher.java 
2007-06-22 18:40:46 UTC (rev 13711)
@@ -95,6 +95,7 @@
                        }
                }
                // :(
+               getScheduler().removePendingKeys(this, false);
                if(e.isFatal() || forceFatal)
                        parent.fatallyFailedBlock();
                else
@@ -109,6 +110,7 @@
                        onFailure(new FetchException(FetchException.CANCELLED));
                        return;
                }
+               getScheduler().removePendingKeys(this, false);
                rcb.onSuccess(data, this);
        }


Modified: trunk/freenet/src/freenet/client/async/SingleFileFetcher.java
===================================================================
--- trunk/freenet/src/freenet/client/async/SingleFileFetcher.java       
2007-06-22 17:16:57 UTC (rev 13710)
+++ trunk/freenet/src/freenet/client/async/SingleFileFetcher.java       
2007-06-22 18:40:46 UTC (rev 13711)
@@ -177,6 +177,7 @@
        }

        protected void onSuccess(FetchResult result) {
+               getScheduler().removePendingKeys(this, false);
                if(parent.isCancelled()) {
                        if(logMINOR)
                                Logger.minor(this, "Parent is cancelled");

Modified: trunk/freenet/src/freenet/client/async/SplitFileFetcherSubSegment.java
===================================================================
--- trunk/freenet/src/freenet/client/async/SplitFileFetcherSubSegment.java      
2007-06-22 17:16:57 UTC (rev 13710)
+++ trunk/freenet/src/freenet/client/async/SplitFileFetcherSubSegment.java      
2007-06-22 18:40:46 UTC (rev 13711)
@@ -7,7 +7,10 @@
 import freenet.client.FetchException;
 import freenet.keys.ClientKey;
 import freenet.keys.ClientKeyBlock;
+import freenet.keys.Key;
+import freenet.keys.KeyBlock;
 import freenet.keys.KeyDecodeException;
+import freenet.keys.KeyVerifyException;
 import freenet.keys.TooBigException;
 import freenet.node.LowLevelGetException;
 import freenet.node.SendableGet;
@@ -253,9 +256,36 @@
                return 
super.toString()+":"+retryCount+"/"+segment+'('+blockNums.size()+')';
        }

-       public synchronized void possiblyRemoveFromParent() {
-               if(blockNums.isEmpty())
-                       segment.removeSeg(this);
+       public void possiblyRemoveFromParent() {
+               synchronized(this) {
+                       if(blockNums.isEmpty())
+                               segment.removeSeg(this);
+               }
+               getScheduler().removePendingKeys(this, false);
        }
+
+       public void onGotKey(Key key, KeyBlock block) {
+               int blockNum = -1;
+               ClientKey ckey = null;
+               synchronized(this) {
+                       for(int i=0;i<blockNums.size();i++) {
+                               int num = 
((Integer)blockNums.get(i)).intValue();
+                               ckey = segment.getBlockKey(num);
+                               Key k = ckey.getNodeKey();
+                               if(k.equals(key)) {
+                                       blockNum = num;
+                                       blockNums.remove(i);
+                                       break;
+                               }
+                       }
+               }
+               if(blockNum == -1) return;
+               try {
+                       onSuccess(Key.createKeyBlock(ckey, block), false, 
blockNum);
+               } catch (KeyVerifyException e) {
+                       // FIXME if we ever abolish the direct route, this must 
be turned into an onFailure().
+                       Logger.error(this, "Failed to parse in 
onGotKey("+key+","+block+") - believed to be "+ckey+" (block #"+blockNum+")");
+               }
+       }

 }

Modified: trunk/freenet/src/freenet/client/async/USKChecker.java
===================================================================
--- trunk/freenet/src/freenet/client/async/USKChecker.java      2007-06-22 
17:16:57 UTC (rev 13710)
+++ trunk/freenet/src/freenet/client/async/USKChecker.java      2007-06-22 
18:40:46 UTC (rev 13711)
@@ -26,6 +26,7 @@
        }

        public void onSuccess(ClientKeyBlock block, boolean fromStore, int 
token) {
+               getScheduler().removePendingKeys(this, false);
                cb.onSuccess((ClientSSKBlock)block);
        }

@@ -61,7 +62,7 @@
                if(canRetry && retry()) return;

                // Ran out of retries.
-               
+               getScheduler().removePendingKeys(this, false);          
                if(e.code == LowLevelGetException.CANCELLED){
                        cb.onCancelled();
                        return;

Modified: trunk/freenet/src/freenet/node/Node.java
===================================================================
--- trunk/freenet/src/freenet/node/Node.java    2007-06-22 17:16:57 UTC (rev 
13710)
+++ trunk/freenet/src/freenet/node/Node.java    2007-06-22 18:40:46 UTC (rev 
13711)
@@ -2073,6 +2073,8 @@
                                chkDatastore.put(block);
                        }
                        chkDatacache.put(block);
+                       if(clientCore != null && clientCore.requestStarters != 
null)
+                               
clientCore.requestStarters.chkFetchScheduler.tripPendingKey(block);
                } catch (IOException e) {
                        Logger.error(this, "Cannot store data: "+e, e);
                }
@@ -2096,6 +2098,9 @@
                        }
                        sskDatacache.put(block, false);
                        cacheKey(((NodeSSK)block.getKey()).getPubKeyHash(), 
((NodeSSK)block.getKey()).getPubKey(), deep);
+                       if(clientCore != null && clientCore.requestStarters != 
null)
+                               
clientCore.requestStarters.sskFetchScheduler.tripPendingKey(block);
+                       
                } catch (IOException e) {
                        Logger.error(this, "Cannot store data: "+e, e);
                }

Modified: trunk/freenet/src/freenet/node/SendableGet.java
===================================================================
--- trunk/freenet/src/freenet/node/SendableGet.java     2007-06-22 17:16:57 UTC 
(rev 13710)
+++ trunk/freenet/src/freenet/node/SendableGet.java     2007-06-22 18:40:46 UTC 
(rev 13711)
@@ -4,9 +4,12 @@
 package freenet.node;

 import freenet.client.FetchContext;
+import freenet.client.async.ClientRequestScheduler;
 import freenet.client.async.ClientRequester;
 import freenet.keys.ClientKey;
 import freenet.keys.ClientKeyBlock;
+import freenet.keys.Key;
+import freenet.keys.KeyBlock;
 import freenet.support.Logger;

 /**
@@ -102,10 +105,16 @@
        public void schedule() {
                if(Logger.shouldLog(Logger.MINOR, this))
                        Logger.minor(this, "Scheduling "+this);
+               getScheduler().register(this);
+       }
+       
+       public ClientRequestScheduler getScheduler() {
                if(isSSK())
-                       parent.sskScheduler.register(this);
+                       return parent.sskScheduler;
                else
-                       parent.chkScheduler.register(this);
+                       return parent.chkScheduler;
        }

+       public abstract void onGotKey(Key key, KeyBlock block);
+
 }


Reply via email to