Author: toad
Date: 2008-07-10 19:28:28 +0000 (Thu, 10 Jul 2008)
New Revision: 21032

Added:
   branches/db4o/freenet/src/freenet/client/async/GotKeyListener.java
Modified:
   branches/db4o/freenet/src/freenet/client/async/BaseSingleFileFetcher.java
   branches/db4o/freenet/src/freenet/client/async/ClientGetState.java
   branches/db4o/freenet/src/freenet/client/async/ClientGetter.java
   branches/db4o/freenet/src/freenet/client/async/ClientRequestScheduler.java
   
branches/db4o/freenet/src/freenet/client/async/ClientRequestSchedulerBase.java
   
branches/db4o/freenet/src/freenet/client/async/ClientRequestSchedulerCore.java
   
branches/db4o/freenet/src/freenet/client/async/ClientRequestSchedulerNonPersistent.java
   branches/db4o/freenet/src/freenet/client/async/PendingKeyItem.java
   branches/db4o/freenet/src/freenet/client/async/RegisterMe.java
   branches/db4o/freenet/src/freenet/client/async/SimpleSingleFileFetcher.java
   branches/db4o/freenet/src/freenet/client/async/SingleBlockInserter.java
   branches/db4o/freenet/src/freenet/client/async/SingleFileFetcher.java
   branches/db4o/freenet/src/freenet/client/async/SplitFileFetcher.java
   branches/db4o/freenet/src/freenet/client/async/SplitFileFetcherSegment.java
   
branches/db4o/freenet/src/freenet/client/async/SplitFileFetcherSubSegment.java
   branches/db4o/freenet/src/freenet/client/async/USKChecker.java
   branches/db4o/freenet/src/freenet/client/async/USKFetcher.java
   branches/db4o/freenet/src/freenet/client/async/USKFetcherTag.java
   branches/db4o/freenet/src/freenet/client/async/USKInserter.java
   branches/db4o/freenet/src/freenet/client/async/USKManager.java
   branches/db4o/freenet/src/freenet/client/async/USKRetriever.java
   branches/db4o/freenet/src/freenet/node/SendableGet.java
   branches/db4o/freenet/src/freenet/node/SendableRequest.java
   branches/db4o/freenet/src/freenet/node/SimpleSendableInsert.java
Log:
Major refactoring.
Separate onGotKey() etc out from SendableGet into new interface GotKeyListener.
Saves a lot of effort unregistering and reregistering 
SplitFileFetcherSubSegment's on pendingKeys, since we now just register the 
SplitFileFetcherSegment once and don't need to worry about it on a typical 
nonfatal failure retry.
Also simplifies some code.

Modified: 
branches/db4o/freenet/src/freenet/client/async/BaseSingleFileFetcher.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/BaseSingleFileFetcher.java   
2008-07-10 15:37:53 UTC (rev 21031)
+++ branches/db4o/freenet/src/freenet/client/async/BaseSingleFileFetcher.java   
2008-07-10 19:28:28 UTC (rev 21032)
@@ -17,7 +17,7 @@
 import freenet.node.SendableGet;
 import freenet.support.Logger;

-public abstract class BaseSingleFileFetcher extends SendableGet {
+public abstract class BaseSingleFileFetcher extends SendableGet implements 
GotKeyListener {

        final ClientKey key;
        protected boolean cancelled;
@@ -102,7 +102,7 @@
                                }
                                return true; // We will retry, just not yet. 
See requeueAfterCooldown(Key).
                        } else {
-                               schedule(container, context, false, true);
+                               schedule(container, context, false);
                        }
                        return true;
                }
@@ -131,10 +131,22 @@
                synchronized(this) {
                        cancelled = true;
                }
-               if(persistent)
+               if(persistent) {
                        container.set(this);
-               super.unregister(false, container);
+                       container.activate(key, 5);
+               }
+               
+               unregisterAll(container, context);
        }
+       
+       /**
+        * Remove the pendingKeys item and then remove from the queue as well.
+        * Call unregister(container) if you only want to remove from the queue.
+        */
+       public void unregisterAll(ObjectContainer container, ClientContext 
context) {
+               getScheduler(context).removePendingKey(this, false, 
key.getNodeKey(), container);
+               super.unregister(container);
+       }

        public synchronized boolean isCancelled(ObjectContainer container) {
                return cancelled;
@@ -148,6 +160,10 @@
                return parent.getClient();
        }

+       public boolean dontCache(ObjectContainer container) {
+               return !ctx.cacheLocalRequests;
+       }
+       
        public boolean dontCache() {
                return !ctx.cacheLocalRequests;
        }
@@ -207,7 +223,25 @@
                }
                if(Logger.shouldLog(Logger.MINOR, this))
                        Logger.minor(this, "Requeueing after cooldown "+key+" 
for "+this);
-               schedule(container, context, false, true);
+               schedule(container, context, false);
        }
+
+       public void schedule(ObjectContainer container, ClientContext context, 
boolean delayed) {
+               getScheduler(context).register(this, new SendableGet[] { this 
}, delayed, persistent, true, ctx.blocks);
+       }

+       public SendableGet getRequest(Key key, ObjectContainer container) {
+               return this;
+       }
+
+       public Key[] listKeys(ObjectContainer container) {
+               if(cancelled || finished)
+                       return new Key[0];
+               else {
+                       if(persistent)
+                               container.activate(key, 5);
+                       return new Key[] { key.getNodeKey() };
+               }
+       }
+
 }

Modified: branches/db4o/freenet/src/freenet/client/async/ClientGetState.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/ClientGetState.java  
2008-07-10 15:37:53 UTC (rev 21031)
+++ branches/db4o/freenet/src/freenet/client/async/ClientGetState.java  
2008-07-10 19:28:28 UTC (rev 21032)
@@ -11,7 +11,7 @@
  */
 public interface ClientGetState {

-       public void schedule(ObjectContainer container, ClientContext context, 
boolean delayedRegister, boolean probablyNotInStore);
+       public void schedule(ObjectContainer container, ClientContext context, 
boolean delayedRegister);

        public void cancel(ObjectContainer container, ClientContext context);


Modified: branches/db4o/freenet/src/freenet/client/async/ClientGetter.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/ClientGetter.java    
2008-07-10 15:37:53 UTC (rev 21031)
+++ branches/db4o/freenet/src/freenet/client/async/ClientGetter.java    
2008-07-10 19:28:28 UTC (rev 21032)
@@ -113,7 +113,7 @@
                                                return false;
                                        }
                                }
-                               currentState.schedule(container, context, 
false, false);
+                               currentState.schedule(container, context, 
false);
                        }
                        if(cancelled) cancel();
                } catch (MalformedURLException e) {

Modified: 
branches/db4o/freenet/src/freenet/client/async/ClientRequestScheduler.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/ClientRequestScheduler.java  
2008-07-10 15:37:53 UTC (rev 21031)
+++ branches/db4o/freenet/src/freenet/client/async/ClientRequestScheduler.java  
2008-07-10 19:28:28 UTC (rev 21032)
@@ -155,232 +155,157 @@
                choosenPriorityScheduler = val;
        }

-       public void register(final SendableRequest req, boolean regmeOnly, 
boolean probablyNotInStore) {
-               register(req, databaseExecutor.onThread(), regmeOnly, null, 
probablyNotInStore);
+       public void registerInsert(final SendableRequest req, boolean 
persistent) {
+               registerInsert(req, persistent, databaseExecutor.onThread());
        }

+       public void registerInsert(final SendableRequest req, boolean 
persistent, boolean onDatabaseThread) {
+               if(persistent) {
+                       if(onDatabaseThread) {
+                               schedCore.innerRegister(req, random, 
selectorContainer);
+                       } else {
+                               jobRunner.queue(new DBJob() {
+
+                                       public void run(ObjectContainer 
container, ClientContext context) {
+                                               schedCore.innerRegister(req, 
random, selectorContainer);
+                                       }
+                                       
+                               }, NativeThread.NORM_PRIORITY, false);
+                       }
+               } else {
+                       schedTransient.innerRegister(req, random, null);
+               }
+       }
+       
        /**
-        * Register and then delete the RegisterMe which is passed in to avoid 
querying.
+        * Register a group of requests (not inserts): a GotKeyListener and/or 
one 
+        * or more SendableGet's.
+        * @param listener Listeners for specific keys. Can be null if the 
listener
+        * is already registered e.g. most of the time with SplitFileFetcher*.
+        * @param getters The actual requests to register to the request sender 
queue.
+        * @param registerOffThread If true, create and store a RegisterMe to 
ensure
+        * that the request is registered, but then schedule a job to complete 
it
+        * after this job completes. Reduces the latency impact of scheduling a 
big
+        * splitfile dramatically.
+        * @param persistent True if the request is persistent.
+        * @param onDatabaseThread True if we are running on the database 
thread.
+        * NOTE: delayedStoreCheck/probablyNotInStore is unnecessary because we 
only
+        * register the listener once.
         */
-       public void register(final SendableRequest req, boolean 
onDatabaseThread, final boolean regmeOnly, RegisterMe reg, final boolean 
probablyNotInStore) {
+       public void register(final GotKeyListener listener, final SendableGet[] 
getters, boolean registerOffThread, final boolean persistent, boolean 
onDatabaseThread, final BlockSet blocks) {
                logMINOR = Logger.shouldLog(Logger.MINOR, this);
-               if(logMINOR) Logger.minor(this, "Registering "+req, new 
Exception("debug"));
-               final boolean persistent = req.persistent();
-               if(isInsertScheduler != (req instanceof SendableInsert))
-                       throw new IllegalArgumentException("Expected a 
SendableInsert: "+req);
-               if(req instanceof SendableGet) {
-                       final SendableGet getter = (SendableGet)req;
-                       
-                       if(persistent && onDatabaseThread) {
-                               if(req.isEmpty(selectorContainer) || 
req.isCancelled(selectorContainer)) {
-                                       Logger.error(this, "In register(): 
Request is empty/cancelled: "+req, new Exception("debug"));
-                               }
-                               if(regmeOnly) {
-                                       assert(reg == null);
-                                       reg = schedCore.queueRegister(getter, 
databaseExecutor, selectorContainer);
-                                       final RegisterMe regme = reg;
-                                       clientContext.jobRunner.queue(new 
DBJob() {
+               if(logMINOR)
+                       Logger.minor(this, 
"register("+persistent+","+listener+","+getters+","+registerOffThread);
+               if(persistent) {
+                       if(onDatabaseThread) {
+                               innerRegister(listener, getters, 
registerOffThread, persistent, blocks);
+                       } else {
+                               jobRunner.queue(new DBJob() {

-                                               public void run(ObjectContainer 
container, ClientContext context) {
-                                                       register(req, true, 
false, regme, probablyNotInStore);
-                                               }
-                                       // NORM_PRIORITY so the completion 
(finishRegister()) runs before the next block does addPendingKeys().
-                                       }, NativeThread.NORM_PRIORITY, false);
-                                       return;
-                               }
-                               schedCore.addPendingKeys(getter, 
selectorContainer);
-                               final Object[] keyTokens = 
getter.sendableKeys(selectorContainer);
-                               final ClientKey[] keys = new 
ClientKey[keyTokens.length];
-                               
-                               if(probablyNotInStore) {
-                                       // Complete the registration *before* 
checking the store.
-                                       // Check the store anyway though!
-                                       finishRegister(req, persistent, true, 
true, reg);
-                                       // RegisterMe has been deleted or was 
null in the first place.
-                                       reg = null;
-                               } else {
-                                       if(reg == null)
-                                               reg = 
schedCore.queueRegister(getter, databaseExecutor, selectorContainer);
-                               }
-                               final RegisterMe regme = reg;
-                               
-                               for(int i=0;i<keyTokens.length;i++) {
-                                       keys[i] = getter.getKey(keyTokens[i], 
selectorContainer);
-                                       selectorContainer.activate(keys[i], 5);
-                               }
-                               final BlockSet blocks = 
getter.getContext().blocks;
-                               final boolean dontCache = getter.dontCache();
+                                       public void run(ObjectContainer 
container, ClientContext context) {
+                                               // registerOffThread would be 
pointless because this is a separate job.
+                                               innerRegister(listener, 
getters, false, persistent, blocks);
+                                       }
+                                       
+                               }, NativeThread.NORM_PRIORITY, false);
+                       }
+               } else {
+                       if(listener != null) {
+                               schedTransient.addPendingKeys(listener, null);
+                               short prio = 
listener.getPriorityClass(selectorContainer);
+                               final Key[] keys = 
listener.listKeys(selectorContainer);
+                               final boolean dontCache = 
listener.dontCache(null);
                                datastoreCheckerExecutor.execute(new Runnable() 
{

                                        public void run() {
-                                               registerCheckStore(getter, 
true, keyTokens, keys, regme, blocks, dontCache);
+                                               // Check the store, then queue 
the requests to the main queue.
+                                               registerCheckStore(getters, 
false, keys, null, blocks, dontCache);
                                        }

-                               }, getter.getPriorityClass(selectorContainer), 
"Checking datastore");
-                       } else if(persistent) {
-                               final RegisterMe regme = reg;
+                               }, prio, "Checking datastore");
+                       } else {
+                               this.finishRegister(getters, persistent, false, 
true, null);
+                       }
+               }
+       }
+       
+       
+       private void innerRegister(final GotKeyListener listener, final 
SendableGet[] getters, boolean registerOffThread, boolean persistent, final 
BlockSet blocks) {
+               if(listener != null) {
+                       if(registerOffThread) {
+                               short prio = 
listener.getPriorityClass(selectorContainer);
+                               RegisterMe regme = new RegisterMe(listener, 
getters, prio, schedCore, blocks);
+                               selectorContainer.set(regme);
+                               if(logMINOR) Logger.minor(this, "Added regme: 
"+regme);
                                jobRunner.queue(new DBJob() {

                                        public void run(ObjectContainer 
container, ClientContext context) {
-                                               container.activate(getter, 1);
-                                               
schedCore.addPendingKeys(getter, container);
-                                               RegisterMe reg = regme;
-                                               if(probablyNotInStore) {
-                                                       // Complete the 
registration *before* checking the store.
-                                                       // Check the store 
anyway though!
-                                                       finishRegister(req, 
persistent, true, true, reg);
-                                                       // RegisterMe has been 
deleted or was null in the first place.
-                                                       reg = null;
-                                               } else {
-                                                       if(reg == null)
-                                                               reg = 
schedCore.queueRegister(getter, databaseExecutor, container);
-                                               }
-                                               final RegisterMe regInner = reg;
-                                               
-                                               final Object[] keyTokens = 
getter.sendableKeys(container);
-                                               final ClientKey[] keys = new 
ClientKey[keyTokens.length];
-                                               for(int 
i=0;i<keyTokens.length;i++) {
-                                                       keys[i] = 
getter.getKey(keyTokens[i], selectorContainer);
-                                                       
container.activate(keys[i], 5);
-                                               }
-                                               final BlockSet blocks = 
getter.getContext().blocks;
-                                               final boolean dontCache = 
getter.dontCache();
-                                               
datastoreCheckerExecutor.execute(new Runnable() {
-
-                                                       public void run() {
-                                                               
registerCheckStore(getter, true, keyTokens, keys, regInner, blocks, dontCache);
-                                                       }
-                                                       
-                                               }, 
getter.getPriorityClass(container), "Checking datastore");
+                                               register(listener, getters, 
false, true, true, blocks);
                                        }

                                }, NativeThread.NORM_PRIORITY, false);
+                               return;
                        } else {
-                               // Not persistent
-                               schedTransient.addPendingKeys(getter, null);
-                               // Check the store off-thread anyway.
-                               final Object[] keyTokens = 
getter.sendableKeys(null);
-                               final ClientKey[] keys = new 
ClientKey[keyTokens.length];
-                               for(int i=0;i<keyTokens.length;i++)
-                                       keys[i] = getter.getKey(keyTokens[i], 
null);
+                               short prio = 
listener.getPriorityClass(selectorContainer);
+                               schedCore.addPendingKeys(listener, 
selectorContainer);
+                               final RegisterMe regme;
+                               if(getters != null) {
+                                       regme = new RegisterMe(null, getters, 
prio, schedCore, blocks);
+                                       selectorContainer.set(regme);
+                                       if(logMINOR) Logger.minor(this, "Added 
regme: "+regme);
+                               } else regme = null; // Nothing to finish 
registering.
+                               // Check the datastore before proceding.
+                               final Key[] keys = 
listener.listKeys(selectorContainer);
+                               final boolean dontCache = 
listener.dontCache(selectorContainer);
                                datastoreCheckerExecutor.execute(new Runnable() 
{

                                        public void run() {
-                                               registerCheckStore(getter, 
false, keyTokens, keys, null, getter.getContext().blocks, getter.dontCache());
+                                               // Check the store, then queue 
the requests to the main queue.
+                                               registerCheckStore(getters, 
true, keys, regme, blocks, dontCache);
                                        }

-                               }, getter.getPriorityClass(null), "Checking 
datastore");
+                               }, prio, "Checking datastore");
+
                        }
                } else {
-                       if(persistent) {
-                               if(onDatabaseThread) {
-                                       schedCore.queueRegister(req, 
databaseExecutor, selectorContainer);
-                                       // Pretend to not be on the database 
thread.
-                                       // In some places (e.g. 
SplitFileInserter.start(), we call register() *many* times within a single 
transaction.
-                                       // We can greatly improve 
responsiveness at the cost of some throughput and RAM by only adding the tags 
at this point.
-                                       finishRegister(req, persistent, false, 
true, reg);
-                               } else {
-                                       final RegisterMe regme = reg;
-                                       jobRunner.queue(new DBJob() {
-
-                                               public void run(ObjectContainer 
container, ClientContext context) {
-                                                       container.activate(req, 
1);
-                                                       RegisterMe reg = regme;
-                                                       if(reg == null)
-                                                               reg = 
schedCore.queueRegister(req, databaseExecutor, selectorContainer);
-                                                       // Self-contained job, 
will complete quickly enough.
-                                                       finishRegister(req, 
persistent, true, true, reg);
-                                               }
-                                               
-                                       }, NativeThread.NORM_PRIORITY, false);
-                               }
-                       } else {
-                               finishRegister(req, persistent, 
onDatabaseThread, true, reg);
+                       // The listener is already registered.
+                       // Ignore registerOffThread for now.
+                       short prio = RequestStarter.MINIMUM_PRIORITY_CLASS;
+                       for(int i=0;i<getters.length;i++) {
+                               short p = 
getters[i].getPriorityClass(selectorContainer);
+                               if(p < prio) prio = p;
                        }
+                       this.finishRegister(getters, persistent, true, true, 
null);
                }
        }

-       /**
-        * Check the store for all the keys on the SendableGet. By now the 
pendingKeys will have
-        * been set up, and this is run on the datastore checker thread. Once 
completed, this should
-        * (for a persistent request) queue a job on the databaseExecutor and 
(for a transient 
-        * request) finish registering the request immediately.
-        * @param getter The SendableGet. NOTE: If persistent, DO NOT USE THIS 
INLINE, because it won't
-        * be activated. This is why we pass in extraBlocks and dontCache.
-        * @param reg 
-        */
-       protected void registerCheckStore(SendableGet getter, boolean 
persistent, Object[] keyTokens, ClientKey[] keys, RegisterMe reg, BlockSet 
extraBlocks, boolean dontCache) {
+       protected void registerCheckStore(SendableGet[] getters, boolean 
persistent, 
+                       Key[] keys, RegisterMe regme, BlockSet extraBlocks, 
boolean dontCache) {
                boolean anyValid = false;
-               for(int i=0;i<keyTokens.length;i++) {
-                       Object tok = keyTokens[i];
-                       ClientKeyBlock block = null;
-                       try {
-                               ClientKey key = keys[i];
-                               if(key == null) {
+               for(int i=0;i<keys.length;i++) {
+                       Key key = keys[i];
+                       KeyBlock block = null;
+                       if(key == null) {
+                               if(logMINOR) Logger.minor(this, "No key at "+i);
+                               continue;
+                       } else {
+                               if(extraBlocks != null)
+                                       block = extraBlocks.get(key);
+                               if(block == null)
+                                       block = node.fetch(key, dontCache);
+                               if(block != null) {
                                        if(logMINOR)
-                                               Logger.minor(this, "No key for 
"+tok+" for "+getter+" - already finished?");
-                                       continue;
-                               } else {
-                                       if(extraBlocks != null)
-                                               block = extraBlocks.get(key);
-                                       if(block == null)
-                                               block = node.fetchKey(key, 
dontCache);
-                                       if(block == null) {
-                                               if(!persistent) {
-                                                       
schedTransient.addPendingKey(key.getNodeKey(), getter, null);
-                                               } // If persistent, when it is 
registered (in a later job) the keys will be added first.
-                                       } else {
-                                               if(logMINOR)
-                                                       Logger.minor(this, "Got 
"+block);
-                                       }
+                                               Logger.minor(this, "Got 
"+block);
                                }
-                       } catch (KeyVerifyException e) {
-                               // Verify exception, probably bogus at source;
-                               // verifies at low-level, but not at decode.
-                               if(logMINOR)
-                                       Logger.minor(this, "Decode failed: "+e, 
e);
-                               if(!persistent)
-                                       getter.onFailure(new 
LowLevelGetException(LowLevelGetException.DECODE_FAILED), tok, null, 
clientContext);
-                               else {
-                                       final SendableGet g = getter;
-                                       final Object token = tok;
-                                       jobRunner.queue(new DBJob() {
-
-                                               public void run(ObjectContainer 
container, ClientContext context) {
-                                                       container.activate(g, 
1);
-                                                       g.onFailure(new 
LowLevelGetException(LowLevelGetException.DECODE_FAILED), token, container, 
context);
-                                               }
-                                               // NORM_PRIORITY+1 as must run 
before finishRegister()
-                                       }, NativeThread.NORM_PRIORITY+1, false);
-                               }
-                               continue; // other keys might be valid
                        }
                        if(block != null) {
-                               if(logMINOR) Logger.minor(this, "Can fulfill 
"+getter+" ("+tok+") immediately from store");
-                               if(!persistent)
-                                       getter.onSuccess(block, true, tok, 
null, clientContext);
-                               else {
-                                       final ClientKeyBlock b = block;
-                                       final Object t = tok;
-                                       final SendableGet g = getter;
-                                       if(persistent) {
-                                               jobRunner.queue(new DBJob() {
-                                                       
-                                                       public void 
run(ObjectContainer container, ClientContext context) {
-                                                               
container.activate(g, 1);
-                                                               g.onSuccess(b, 
true, t, container, context);
-                                                       }
-                                                       // NORM_PRIORITY+1 as 
must run before finishRegister()
-                                               }, 
NativeThread.NORM_PRIORITY+1, false);
-                                       } else {
-                                               g.onSuccess(b, true, t, null, 
clientContext);
-                                       }
-                               }
+                               if(logMINOR) Logger.minor(this, "Found key");
+                               tripPendingKey(block);
                        } else {
                                anyValid = true;
                        }
                }
-               finishRegister(getter, persistent, false, anyValid, reg);
+               finishRegister(getters, persistent, false, anyValid, regme);
        }

        /** If enabled, if the queue is less than 25% full, attempt to add 
newly 
@@ -388,7 +313,7 @@
         * bypassing registration on the queue. Risky optimisation. */
        static final boolean TRY_DIRECT = true;

-       private void finishRegister(final SendableRequest req, boolean 
persistent, boolean onDatabaseThread, final boolean anyValid, final RegisterMe 
reg) {
+       private void finishRegister(final SendableGet[] getters, boolean 
persistent, boolean onDatabaseThread, final boolean anyValid, final RegisterMe 
reg) {
                if(persistent) {
                        // Add to the persistent registration queue
                        if(onDatabaseThread) {
@@ -396,39 +321,49 @@
                                        throw new IllegalStateException("Not on 
database thread!");
                                }
                                if(persistent)
-                                       selectorContainer.activate(req, 1);
+                                       selectorContainer.activate(getters, 1);
                                boolean tryDirect = false;
                                if(anyValid && TRY_DIRECT) {
                                        synchronized(starterQueue) {
                                                tryDirect = starterQueue.size() 
< MAX_STARTER_QUEUE_SIZE * 1 / 4;
                                        }
                                        if(tryDirect) {
-                                               while(true) {
-                                                       PersistentChosenRequest 
cr = (PersistentChosenRequest) schedCore.maybeMakeChosenRequest(req, 
selectorContainer, clientContext);
-                                                       if(cr == null) {
-                                                               
if(!(req.isEmpty(selectorContainer) || req.isCancelled(selectorContainer)))
-                                                                       // 
Still needs to be registered
-                                                                       
tryDirect = false;
-                                                               break;
-                                                       }
-                                                       
synchronized(starterQueue) {
-                                                               
if(starterQueue.size() >= MAX_STARTER_QUEUE_SIZE) {
+                                               for(int 
i=0;i<getters.length;i++) {
+                                                       SendableGet getter = 
getters[i];
+                                                       while(true) {
+                                                               
PersistentChosenRequest cr = (PersistentChosenRequest) 
schedCore.maybeMakeChosenRequest(getter, selectorContainer, clientContext);
+                                                               if(cr == null) {
+                                                                       
if(!(getter.isEmpty(selectorContainer) || 
getter.isCancelled(selectorContainer)))
+                                                                               
// Still needs to be registered
+                                                                               
tryDirect = false;
                                                                        break;
                                                                }
-                                                               
starterQueue.add(cr);
+                                                               
synchronized(starterQueue) {
+                                                                       
if(starterQueue.size() >= MAX_STARTER_QUEUE_SIZE) {
+                                                                               
tryDirect = false;
+                                                                               
break;
+                                                                       }
+                                                                       
starterQueue.add(cr);
+                                                               }
                                                        }
                                                }
                                        }
                                }
                                if(logMINOR)
-                                       Logger.minor(this, "finishRegister() 
for "+req);
+                                       Logger.minor(this, "finishRegister() 
for "+getters);
                                if(anyValid) {
                                        if(!tryDirect) {
-                                               
if(req.isCancelled(selectorContainer) || req.isEmpty(selectorContainer)) {
-                                                       Logger.error(this, 
"Request is empty/cancelled: "+req);
-                                               } else {
-                                                       
schedCore.innerRegister(req, random, selectorContainer);
+                                               boolean wereAnyValid = false;
+                                               for(int 
i=0;i<getters.length;i++) {
+                                                       SendableGet getter = 
getters[i];
+                                                       
if(!(getter.isCancelled(selectorContainer) || 
getter.isEmpty(selectorContainer))) {
+                                                               wereAnyValid = 
true;
+                                                               
schedCore.innerRegister(getter, random, selectorContainer);
+                                                       }
                                                }
+                                               if(!wereAnyValid) {
+                                                       Logger.error(this, "No 
requests valid: "+getters);
+                                               }
                                        }
                                }
                                if(reg != null)
@@ -439,15 +374,20 @@
                                jobRunner.queue(new DBJob() {

                                        public void run(ObjectContainer 
container, ClientContext context) {
-                                               container.activate(req, 1);
+                                               container.activate(getters, 1);
                                                if(logMINOR)
-                                                       Logger.minor(this, 
"finishRegister() for "+req);
-                                               if(anyValid) {
-                                                       
if(req.isCancelled(container) || req.isEmpty(container)) {
-                                                               
Logger.error(this, "Request is empty/cancelled: "+req);
-                                                       } else
-                                                               
schedCore.innerRegister(req, random, container);
+                                                       Logger.minor(this, 
"finishRegister() for "+getters);
+                                               boolean wereAnyValid = false;
+                                               for(int 
i=0;i<getters.length;i++) {
+                                                       SendableGet getter = 
getters[i];
+                                                       
if(!(getter.isCancelled(selectorContainer) || 
getter.isEmpty(selectorContainer))) {
+                                                               wereAnyValid = 
true;
+                                                               
schedCore.innerRegister(getter, random, selectorContainer);
+                                                       }
                                                }
+                                               if(!wereAnyValid) {
+                                                       Logger.error(this, "No 
requests valid: "+getters);
+                                               }
                                                if(reg != null)
                                                        container.delete(reg);
                                                
maybeFillStarterQueue(container, context);
@@ -458,7 +398,8 @@
                        }
                } else {
                        // Register immediately.
-                       schedTransient.innerRegister(req, random, null);
+                       for(int i=0;i<getters.length;i++)
+                               schedTransient.innerRegister(getters[i], 
random, null);
                        starter.wakeUp();
                }
        }
@@ -471,7 +412,7 @@
                requestStarterQueueFiller.run(container, context);
        }

-       void addPendingKey(final ClientKey key, final SendableGet getter) {
+       void addPendingKey(final ClientKey key, final GotKeyListener getter) {
                if(getter.persistent()) {
                        if(!databaseExecutor.onThread()) {
                                throw new IllegalStateException("Not on 
database thread!");
@@ -606,28 +547,35 @@
                }
        };

-       public void removePendingKey(final SendableGet getter, final boolean 
complain, final Key key, ObjectContainer container) {
+       public void removePendingKey(final GotKeyListener getter, final boolean 
complain, final Key key, ObjectContainer container) {
                if(!getter.persistent()) {
                        boolean dropped = 
schedTransient.removePendingKey(getter, complain, key, container);
                        if(dropped && offeredKeys != null && 
!node.peersWantKey(key)) {
                                for(int i=0;i<offeredKeys.length;i++)
                                        offeredKeys[i].remove(key);
                        }
-                       if(transientCooldownQueue != null)
-                               transientCooldownQueue.removeKey(key, getter, 
getter.getCooldownWakeupByKey(key, null), null);
+                       if(transientCooldownQueue != null) {
+                               SendableGet cooldownGetter = 
getter.getRequest(key, container);
+                               if(cooldownGetter != null)
+                                       transientCooldownQueue.removeKey(key, 
cooldownGetter, cooldownGetter.getCooldownWakeupByKey(key, null), null);
+                       }
                } else if(container != null) {
                        // We are on the database thread already.
                        schedCore.removePendingKey(getter, complain, key, 
container);
-                       if(persistentCooldownQueue != null)
-                               persistentCooldownQueue.removeKey(key, getter, 
getter.getCooldownWakeupByKey(key, container), container);
+                       if(persistentCooldownQueue != null) {
+                               SendableGet cooldownGetter = 
getter.getRequest(key, container);
+                               persistentCooldownQueue.removeKey(key, 
cooldownGetter, cooldownGetter.getCooldownWakeupByKey(key, container), 
container);
+                       }
                } else {
                        jobRunner.queue(new DBJob() {

                                public void run(ObjectContainer container, 
ClientContext context) {
                                        container.activate(getter, 1);
                                        schedCore.removePendingKey(getter, 
complain, key, container);
-                                       if(persistentCooldownQueue != null)
-                                               
persistentCooldownQueue.removeKey(key, getter, 
getter.getCooldownWakeupByKey(key, container), container);
+                                       if(persistentCooldownQueue != null) {
+                                               SendableGet cooldownGetter = 
getter.getRequest(key, container);
+                                               
persistentCooldownQueue.removeKey(key, cooldownGetter, 
cooldownGetter.getCooldownWakeupByKey(key, container), container);
+                                       }
                                }

                        }, NativeThread.NORM_PRIORITY, false);
@@ -640,7 +588,7 @@
         * @param getter
         * @param complain
         */
-       public void removePendingKeys(SendableGet getter, boolean complain) {
+       public void removePendingKeys(GotKeyListener getter, boolean complain) {
                ObjectContainer container;
                if(getter.persistent()) {
                        container = selectorContainer;
@@ -650,16 +598,9 @@
                } else {
                        container = null;
                }
-               Object[] keyTokens = getter.allKeys(container);
-               for(int i=0;i<keyTokens.length;i++) {
-                       Object tok = keyTokens[i];
-                       ClientKey ckey = getter.getKey(tok, container);
-                       if(ckey == null) {
-                               if(complain)
-                                       Logger.error(this, "Key "+tok+" is null 
for "+getter, new Exception("debug"));
-                               continue;
-                       }
-                       removePendingKey(getter, complain, ckey.getNodeKey(), 
container);
+               Key[] keys = getter.listKeys(container);
+               for(int i=0;i<keys.length;i++) {
+                       removePendingKey(getter, complain, keys[i], container);
                }
        }

@@ -707,7 +648,7 @@
                        }
                }
                final Key key = block.getKey();
-               final SendableGet[] transientGets = 
schedTransient.removePendingKey(key, null);
+               final GotKeyListener[] transientGets = 
schedTransient.removePendingKey(key, null);
                if(transientGets != null && transientGets.length > 0) {
                        node.executor.execute(new Runnable() {
                                public void run() {
@@ -723,8 +664,12 @@
                                }
                        }, "Running off-thread callbacks for "+block.getKey());
                        if(transientCooldownQueue != null) {
-                               for(int i=0;i<transientGets.length;i++)
-                                       transientCooldownQueue.removeKey(key, 
transientGets[i], transientGets[i].getCooldownWakeupByKey(key, null), null);
+                               for(int i=0;i<transientGets.length;i++) {
+                                       GotKeyListener got = transientGets[i];
+                                       SendableGet req = got.getRequest(key, 
null);
+                                       if(req == null) continue;
+                                       transientCooldownQueue.removeKey(key, 
req, req.getCooldownWakeupByKey(key, null), null);
+                               }
                        }
                }

@@ -734,12 +679,15 @@

                        public void run(ObjectContainer container, 
ClientContext context) {
                                container.activate(key, 1);
-                               final SendableGet[] gets = 
schedCore.removePendingKey(key, container);
+                               final GotKeyListener[] gets = 
schedCore.removePendingKey(key, container);
                                if(gets == null) return;
                                if(persistentCooldownQueue != null) {
                                        for(int i=0;i<gets.length;i++) {
-                                               container.activate(gets[i], 1);
-                                               
persistentCooldownQueue.removeKey(key, gets[i], 
gets[i].getCooldownWakeupByKey(key, container), container);
+                                               GotKeyListener got = gets[i];
+                                               container.activate(got, 1);
+                                               SendableGet req = 
got.getRequest(key, null);
+                                               if(req == null) continue;
+                                               
persistentCooldownQueue.removeKey(key, req, req.getCooldownWakeupByKey(key, 
container), container);
                                        }
                                }
                                // Call the callbacks on the database executor 
thread, because the first thing
@@ -839,8 +787,8 @@
                        if(persistent)
                                container.activate(key, 5);
                        if(logMINOR) Logger.minor(this, "Restoring key: "+key);
-                       SendableGet[] gets = 
schedCore.getClientsForPendingKey(key, container);
-                       SendableGet[] transientGets = 
schedTransient.getClientsForPendingKey(key, null);
+                       GotKeyListener[] gets = 
schedCore.getClientsForPendingKey(key, container);
+                       GotKeyListener[] transientGets = 
schedTransient.getClientsForPendingKey(key, null);
                        if(gets == null && transientGets == null) {
                                // Not an error as this can happen due to race 
conditions etc.
                                if(logMINOR) Logger.minor(this, "Restoring key 
but no keys queued?? for "+key);
@@ -850,11 +798,22 @@
                                for(int i=0;i<gets.length;i++) {
                                        if(persistent)
                                                container.activate(gets[i], 1);
-                                       gets[i].requeueAfterCooldown(key, now, 
container, clientContext);
+                                       GotKeyListener got = gets[i];
+                                       SendableGet req = got.getRequest(key, 
container);
+                                       if(req == null) {
+                                               Logger.error(this, "No request 
for listener "+got+" while requeueing "+key);
+                                       }
+                                       req.requeueAfterCooldown(key, now, 
container, clientContext);
                                }
                                if(transientGets != null)
-                               for(int i=0;i<transientGets.length;i++)
-                                       
transientGets[i].requeueAfterCooldown(key, now, container, clientContext);
+                               for(int i=0;i<transientGets.length;i++) {
+                                       GotKeyListener got = transientGets[i];
+                                       SendableGet req = got.getRequest(key, 
null);
+                                       if(req == null) {
+                                               Logger.error(this, "No request 
for listener "+got+" while requeueing "+key);
+                                       }
+                                       req.requeueAfterCooldown(key, now, 
container, clientContext);
+                               }
                        }
                }
                return found;

Modified: 
branches/db4o/freenet/src/freenet/client/async/ClientRequestSchedulerBase.java
===================================================================
--- 
branches/db4o/freenet/src/freenet/client/async/ClientRequestSchedulerBase.java  
    2008-07-10 15:37:53 UTC (rev 21031)
+++ 
branches/db4o/freenet/src/freenet/client/async/ClientRequestSchedulerBase.java  
    2008-07-10 19:28:28 UTC (rev 21032)
@@ -10,11 +10,9 @@
 import com.db4o.ObjectContainer;

 import freenet.crypt.RandomSource;
-import freenet.keys.ClientKey;
 import freenet.keys.Key;
 import freenet.node.BaseSendableGet;
 import freenet.node.RequestStarter;
-import freenet.node.SendableGet;
 import freenet.node.SendableRequest;
 import freenet.support.Logger;
 import freenet.support.SectoredRandomGrabArrayWithInt;
@@ -142,7 +140,7 @@
                for(int i=0;i<reqs.length;i++) {
                        SendableRequest req = reqs[i];
                        // Unregister from the RGA's, but keep the pendingKeys 
and cooldown queue data.
-                       req.unregister(true, container);
+                       req.unregister(container);
                        // Then can do innerRegister() (not register()).
                        innerRegister(req, random, container);
                }
@@ -175,31 +173,25 @@
                        }
        }

-       public void addPendingKeys(SendableGet getter, ObjectContainer 
container) {
-               Object[] keyTokens = getter.sendableKeys(container);
-               Object prevTok = null;
+       public void addPendingKeys(GotKeyListener getter, ObjectContainer 
container) {
+               if(persistent())
+                       container.activate(getter, 1);
+               Key[] keyTokens = getter.listKeys(container);
+               Key prevTok = null;
                for(int i=0;i<keyTokens.length;i++) {
-                       Object tok = keyTokens[i];
-                       if(i != 0 && prevTok == tok || (prevTok != null && tok 
!= null && prevTok.equals(tok))) {
+                       Key key = keyTokens[i];
+                       if(i != 0 && prevTok == key || (prevTok != null && key 
!= null && prevTok.equals(key))) {
                                Logger.error(this, "Ignoring duplicate token");
                                continue;
                        }
-                       prevTok = tok;
-                       ClientKey key = getter.getKey(tok, container);
                        if(getter.persistent())
                                container.activate(key, 5);
-                       if(key == null) {
-                               if(logMINOR)
-                                       Logger.minor(this, "No key for "+tok+" 
for "+getter+" - already finished?");
-                                       continue;
-                       } else {
-                               addPendingKey(key.getNodeKey(), getter, 
container);
-                       }
+                       addPendingKey(key, getter, container);
                }
        }

        public short getKeyPrio(Key key, short priority, ObjectContainer 
container) {
-               SendableGet[] getters = getClientsForPendingKey(key, container);
+               GotKeyListener[] getters = getClientsForPendingKey(key, 
container);
                if(getters == null) return priority;
                for(int i=0;i<getters.length;i++) {
                        if(persistent())
@@ -214,17 +206,17 @@

        public abstract long countQueuedRequests(ObjectContainer container);

-       protected abstract boolean inPendingKeys(SendableGet req, Key key, 
ObjectContainer container);
+       protected abstract boolean inPendingKeys(GotKeyListener req, Key key, 
ObjectContainer container);

-       public abstract SendableGet[] getClientsForPendingKey(Key key, 
ObjectContainer container);
+       public abstract GotKeyListener[] getClientsForPendingKey(Key key, 
ObjectContainer container);

        public abstract boolean anyWantKey(Key key, ObjectContainer container);

-       public abstract SendableGet[] removePendingKey(Key key, ObjectContainer 
container);
+       public abstract GotKeyListener[] removePendingKey(Key key, 
ObjectContainer container);

-       public abstract boolean removePendingKey(SendableGet getter, boolean 
complain, Key key, ObjectContainer container);
+       public abstract boolean removePendingKey(GotKeyListener getter, boolean 
complain, Key key, ObjectContainer container);

-       abstract void addPendingKey(Key key, SendableGet getter, 
ObjectContainer container);
+       abstract void addPendingKey(Key key, GotKeyListener getter, 
ObjectContainer container);

        protected abstract Set 
makeSetForAllRequestsByClientRequest(ObjectContainer container);


Modified: 
branches/db4o/freenet/src/freenet/client/async/ClientRequestSchedulerCore.java
===================================================================
--- 
branches/db4o/freenet/src/freenet/client/async/ClientRequestSchedulerCore.java  
    2008-07-10 15:37:53 UTC (rev 21031)
+++ 
branches/db4o/freenet/src/freenet/client/async/ClientRequestSchedulerCore.java  
    2008-07-10 19:28:28 UTC (rev 21032)
@@ -273,12 +273,6 @@
                                container.set(ret);
                                if(logMINOR)
                                        Logger.minor(this, "Storing "+ret+" for 
"+req);
-                               if((ctr++ & 15) == 0) {
-                                       // This check is quite expensive, don't 
do it all the time.
-                                       if((req instanceof SendableGet) && 
!inPendingKeys((SendableGet)req, key, container)) {
-                                               Logger.error(this, "Selected 
key not in pendingKeys: key "+key+" for "+req);
-                                       }
-                               }
                        } else {
                                ret = new ChosenRequest(req, token, key, ckey, 
req.getPriorityClass(container));
                        }
@@ -542,17 +536,31 @@
                                if(logMINOR)
                                        Logger.minor(this, "RegisterMe: next() 
took "+(endNext-startNext));
                                container.delete(reg);
-                               container.activate(reg.getter, 2);
-                               if(reg.getter.isCancelled(container)) continue;
+                               if(reg.getters != null) {
+                                       boolean allKilled = true;
+                                       for(int j=0;j<reg.getters.length;j++) {
+                                               
container.activate(reg.getters[j], 2);
+                                               
if(!reg.getters[i].isCancelled(container))
+                                                       allKilled = false;
+                                       }
+                                       if(allKilled) {
+                                               if(logMINOR)
+                                                       Logger.minor(this, "Not 
registering as all SendableGet's already cancelled");
+                                       }
+                               }
+                               
                                if(logMINOR)
-                                       Logger.minor(this, "Running RegisterMe 
for "+reg.getter+" : "+reg.key.addedTime+" : "+reg.key.priority);
+                                       Logger.minor(this, "Running RegisterMe 
for "+reg.listener+" and "+reg.getters+" : "+reg.key.addedTime+" : 
"+reg.key.priority);
                                // Don't need to activate, fields should exist? 
FIXME
                                try {
-                                       sched.register(reg.getter, true, false, 
reg, false);
+                                       sched.register(reg.listener, 
reg.getters, false, true, true, reg.blocks);
                                } catch (Throwable t) {
                                        Logger.error(this, "Caught "+t+" 
running RegisterMeRunner", t);
                                        // Cancel the request, and commit so it 
isn't tried again.
-                                       reg.getter.internalError(null, t, 
sched, container, context, true);
+                                       if(reg.getters != null) {
+                                               for(int 
k=0;k<reg.getters.length;k++)
+                                                       
reg.getters[k].internalError(null, t, sched, container, context, true);
+                                       }
                                }
                                if(System.currentTimeMillis() > deadline) break;
                        }
@@ -565,17 +573,6 @@
                }

        }
-       public RegisterMe queueRegister(SendableRequest req, 
PrioritizedSerialExecutor databaseExecutor, ObjectContainer container) {
-               if(!databaseExecutor.onThread()) {
-                       throw new IllegalStateException("Not on database 
thread!");
-               }
-               RegisterMe reg = new RegisterMe(req, 
req.getPriorityClass(container), this);
-               container.set(reg);
-               if(logMINOR)
-                       Logger.minor(this, "Queued RegisterMe for "+req+" : 
"+reg);
-               return reg;
-       }
-
        /**
         * @return True unless the key was already present.
         */
@@ -620,7 +617,7 @@
                return pending.size();
        }

-       protected boolean inPendingKeys(SendableGet req, final Key key, 
ObjectContainer container) {
+       protected boolean inPendingKeys(GotKeyListener req, final Key key, 
ObjectContainer container) {
                final String pks = HexUtil.bytesToHex(key.getFullKey());
                ObjectSet pending = container.query(new Predicate() {
                        public boolean match(PendingKeyItem item) {
@@ -656,7 +653,7 @@
                return false;
        }

-       public SendableGet[] getClientsForPendingKey(final Key key, 
ObjectContainer container) {
+       public GotKeyListener[] getClientsForPendingKey(final Key key, 
ObjectContainer container) {
                final String pks = HexUtil.bytesToHex(key.getFullKey());
                ObjectSet pending = container.query(new Predicate() {
                        public boolean match(PendingKeyItem item) {
@@ -686,7 +683,7 @@
                return pending.hasNext();
        }

-       public SendableGet[] removePendingKey(final Key key, ObjectContainer 
container) {
+       public GotKeyListener[] removePendingKey(final Key key, ObjectContainer 
container) {
                final String pks = HexUtil.bytesToHex(key.getFullKey());
                ObjectSet pending = container.query(new Predicate() {
                        public boolean match(PendingKeyItem item) {
@@ -698,14 +695,14 @@
                });
                if(pending.hasNext()) {
                        PendingKeyItem item = (PendingKeyItem) pending.next();
-                       SendableGet[] getters = item.getters();
+                       GotKeyListener[] getters = item.getters();
                        container.delete(item);
                        return getters;
                }
                return null;
        }

-       public boolean removePendingKey(SendableGet getter, boolean complain, 
final Key key, ObjectContainer container) {
+       public boolean removePendingKey(GotKeyListener getter, boolean 
complain, final Key key, ObjectContainer container) {
                final String pks = HexUtil.bytesToHex(key.getFullKey());
                ObjectSet pending = container.query(new Predicate() {
                        public boolean match(PendingKeyItem item) {
@@ -728,7 +725,7 @@
                return false;
        }

-       protected void addPendingKey(final Key key, SendableGet getter, 
ObjectContainer container) {
+       protected void addPendingKey(final Key key, GotKeyListener getter, 
ObjectContainer container) {
                if(logMINOR)
                        Logger.minor(this, "Adding pending key for "+key+" for 
"+getter);
                long startTime = System.currentTimeMillis();

Modified: 
branches/db4o/freenet/src/freenet/client/async/ClientRequestSchedulerNonPersistent.java
===================================================================
--- 
branches/db4o/freenet/src/freenet/client/async/ClientRequestSchedulerNonPersistent.java
     2008-07-10 15:37:53 UTC (rev 21031)
+++ 
branches/db4o/freenet/src/freenet/client/async/ClientRequestSchedulerNonPersistent.java
     2008-07-10 19:28:28 UTC (rev 21032)
@@ -63,7 +63,7 @@
         * Register a pending key to an already-registered request. This is 
necessary if we've
         * already registered a SendableGet, but we later add some more keys to 
it.
         */
-       void addPendingKey(Key nodeKey, SendableGet getter, ObjectContainer 
container) {
+       void addPendingKey(Key nodeKey, GotKeyListener getter, ObjectContainer 
container) {
                logMINOR = Logger.shouldLog(Logger.MINOR, 
ClientRequestSchedulerBase.class);
                if(logMINOR)
                        Logger.minor(this, "Adding pending key "+nodeKey+" for 
"+getter);
@@ -71,13 +71,13 @@
                        Object o = pendingKeys.get(nodeKey);
                        if(o == null) {
                                pendingKeys.put(nodeKey, getter);
-                       } else if(o instanceof SendableGet) {
-                               SendableGet oldGet = (SendableGet) o;
+                       } else if(o instanceof GotKeyListener) {
+                               GotKeyListener oldGet = (GotKeyListener) o;
                                if(oldGet != getter) {
-                                       pendingKeys.put(nodeKey, new 
SendableGet[] { oldGet, getter });
+                                       pendingKeys.put(nodeKey, new 
GotKeyListener[] { oldGet, getter });
                                }
                        } else {
-                               SendableGet[] gets = (SendableGet[]) o;
+                               GotKeyListener[] gets = (GotKeyListener[]) o;
                                boolean found = false;
                                for(int j=0;j<gets.length;j++) {
                                        if(gets[j] == getter) {
@@ -86,7 +86,7 @@
                                        }
                                }
                                if(!found) {
-                                       SendableGet[] newGets = new 
SendableGet[gets.length+1];
+                                       GotKeyListener[] newGets = new 
GotKeyListener[gets.length+1];
                                        System.arraycopy(gets, 0, newGets, 0, 
gets.length);
                                        newGets[gets.length] = getter;
                                        pendingKeys.put(nodeKey, newGets);
@@ -95,7 +95,7 @@
                }
        }

-       public boolean removePendingKey(SendableGet getter, boolean complain, 
Key key, ObjectContainer container) {
+       public boolean removePendingKey(GotKeyListener getter, boolean 
complain, Key key, ObjectContainer container) {
                if(logMINOR)
                        Logger.minor(this, "Removing pending key: "+getter+" 
for "+key);
                boolean dropped = false;
@@ -110,8 +110,8 @@
                        if(o == null) {
                                if(complain)
                                        Logger.normal(this, "Not found: 
"+getter+" for "+key+" removing (no such key)");
-                       } else if(o instanceof SendableGet) {
-                               SendableGet oldGet = (SendableGet) o;
+                       } else if(o instanceof GotKeyListener) {
+                               GotKeyListener oldGet = (GotKeyListener) o;
                                if(oldGet != getter) {
                                        if(complain)
                                                Logger.normal(this, "Not found: 
"+getter+" for "+key+" removing (1 getter)");
@@ -122,9 +122,9 @@
                                                Logger.minor(this, "Removed 
only getter (1) for "+key, new Exception("debug"));
                                }
                        } else {
-                               SendableGet[] gets = (SendableGet[]) o;
+                               GotKeyListener[] gets = (GotKeyListener[]) o;
                                final int getsLength = gets.length;
-                               SendableGet[] newGets = new 
SendableGet[getsLength > 1 ? getsLength-1 : 0];
+                               GotKeyListener[] newGets = new 
GotKeyListener[getsLength > 1 ? getsLength-1 : 0];
                                boolean found = false;
                                int x = 0;
                                for(int j=0;j<getsLength;j++) {
@@ -152,7 +152,7 @@
                                        pendingKeys.put(key, newGets[0]);
                                } else {
                                        if(x != getsLength-1) {
-                                               SendableGet[] newNewGets = new 
SendableGet[x];
+                                               GotKeyListener[] newNewGets = 
new GotKeyListener[x];
                                                System.arraycopy(newGets, 0, 
newNewGets, 0, x);
                                                newGets = newNewGets;
                                        }
@@ -163,19 +163,19 @@
                return dropped;
        }

-       public SendableGet[] removePendingKey(Key key, ObjectContainer 
container) {
+       public GotKeyListener[] removePendingKey(Key key, ObjectContainer 
container) {
                Object o;
-               final SendableGet[] gets;
+               final GotKeyListener[] gets;
                synchronized(pendingKeys) {
                        o = pendingKeys.remove(key);
                }
                if(o == null) return null;
-               if(o instanceof SendableGet) {
-                       gets = new SendableGet[] { (SendableGet) o };
+               if(o instanceof GotKeyListener) {
+                       gets = new GotKeyListener[] { (GotKeyListener) o };
                        if(logMINOR)
                                Logger.minor(this, "Removing all pending keys 
for "+key+" (1)", new Exception("debug"));
                } else {
-                       gets = (SendableGet[]) o;
+                       gets = (GotKeyListener[]) o;
                        if(logMINOR)
                                Logger.minor(this, "Removing all pending keys 
for "+key+" ("+gets.length+")", new Exception("debug"));
                }
@@ -193,11 +193,11 @@
                        Object o = pendingKeys.get(key);
                        if(o == null) {
                                // Blah
-                       } else if(o instanceof SendableGet) {
-                               short p = 
((SendableGet)o).getPriorityClass(container);
+                       } else if(o instanceof GotKeyListener) {
+                               short p = 
((GotKeyListener)o).getPriorityClass(container);
                                if(p < priority) priority = p;
                        } else { // if(o instanceof SendableGet[]) {
-                               SendableGet[] gets = (SendableGet[]) o;
+                               GotKeyListener[] gets = (GotKeyListener[]) o;
                                for(int i=0;i<gets.length;i++) {
                                        short p = 
gets[i].getPriorityClass(container);
                                        if(p < priority) priority = p;
@@ -207,32 +207,32 @@
                return priority;
        }

-       public SendableGet[] getClientsForPendingKey(Key key, ObjectContainer 
container) {
+       public GotKeyListener[] getClientsForPendingKey(Key key, 
ObjectContainer container) {
                Object o;
                synchronized(pendingKeys) {
                        o = pendingKeys.get(key);
                }
                if(o == null) {
                        return null;
-               } else if(o instanceof SendableGet) {
-                       SendableGet get = (SendableGet) o;
-                       return new SendableGet[] { get };
+               } else if(o instanceof GotKeyListener) {
+                       GotKeyListener get = (GotKeyListener) o;
+                       return new GotKeyListener[] { get };
                } else {
-                       return (SendableGet[]) o;
+                       return (GotKeyListener[]) o;
                }
        }

-       protected boolean inPendingKeys(SendableGet req, Key key, 
ObjectContainer container) {
+       protected boolean inPendingKeys(GotKeyListener req, Key key, 
ObjectContainer container) {
                Object o;
                synchronized(pendingKeys) {
                        o = pendingKeys.get(key);
                }
                if(o == null) {
                        return false;
-               } else if(o instanceof SendableGet) {
+               } else if(o instanceof GotKeyListener) {
                        return o == req;
                } else {
-                       SendableGet[] gets = (SendableGet[]) o;
+                       GotKeyListener[] gets = (GotKeyListener[]) o;
                        for(int i=0;i<gets.length;i++)
                                if(gets[i] == req) return true;
                }

Added: branches/db4o/freenet/src/freenet/client/async/GotKeyListener.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/GotKeyListener.java          
                (rev 0)
+++ branches/db4o/freenet/src/freenet/client/async/GotKeyListener.java  
2008-07-10 19:28:28 UTC (rev 21032)
@@ -0,0 +1,58 @@
+package freenet.client.async;
+
+import com.db4o.ObjectContainer;
+
+import freenet.keys.Key;
+import freenet.keys.KeyBlock;
+import freenet.node.SendableGet;
+
+public interface GotKeyListener {
+
+       /**
+        * Callback for when a block is found. Will be called on the database 
executor thread.
+        * @param key
+        * @param block
+        * @param sched
+        */
+       public abstract void onGotKey(Key key, KeyBlock block, ObjectContainer 
container, ClientContext context);
+       
+       /**
+        * What keys are we interested in?
+        * @param container Database handle.
+        */
+       Key[] listKeys(ObjectContainer container);
+
+       /**
+        * Is this related to a persistent request?
+        */
+       boolean persistent();
+
+       /**
+        * Priority of the associated request.
+        * @param container Database handle.
+        */
+       short getPriorityClass(ObjectContainer container);
+
+       /**
+        * Is the request cancelled/finished/invalid?
+        * @param container Database handle.
+        */
+       boolean isCancelled(ObjectContainer container);
+
+       /**
+        * Get the SendableGet for a specific key, if any.
+        * Used in requeueing requests after a cooldown has expired.
+        * @param key The key.
+        * @param container The database handle.
+        * @return Null if we don't want to register a request for the key,
+        * otherwise the SendableGet.
+        */
+       public abstract SendableGet getRequest(Key key, ObjectContainer 
container);
+
+       /**
+        * @return True if when checking the datastore on initial registration, 
we
+        * should not promote any blocks found.
+        */
+       public abstract boolean dontCache(ObjectContainer container);
+
+}

Modified: branches/db4o/freenet/src/freenet/client/async/PendingKeyItem.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/PendingKeyItem.java  
2008-07-10 15:37:53 UTC (rev 21031)
+++ branches/db4o/freenet/src/freenet/client/async/PendingKeyItem.java  
2008-07-10 19:28:28 UTC (rev 21032)
@@ -3,8 +3,6 @@
 import com.db4o.ObjectContainer;

 import freenet.keys.Key;
-import freenet.node.SendableGet;
-import freenet.node.SendableRequest;
 import freenet.support.HexUtil;

 public class PendingKeyItem {
@@ -20,20 +18,20 @@
         * it... whereas doing so results in a fast index lookup.
         */
        final String fullKeyAsBytes;
-       private SendableGet[] getters;
+       private GotKeyListener[] getters;

-       PendingKeyItem(Key key, SendableGet getter, long nodeDBHandle) {
+       PendingKeyItem(Key key, GotKeyListener getter, long nodeDBHandle) {
                this.key = key;
-               this.getters = new SendableGet[] { getter };
+               this.getters = new GotKeyListener[] { getter };
                this.nodeDBHandle = nodeDBHandle;
                this.fullKeyAsBytes = HexUtil.bytesToHex(key.getFullKey());
        }

-       public void addGetter(SendableGet getter) {
+       public void addGetter(GotKeyListener getter) {
                for(int i=0;i<getters.length;i++) {
                        if(getters[i] == getter) return;
                }
-               SendableGet[] newGetters = new SendableGet[getters.length+1];
+               GotKeyListener[] newGetters = new 
GotKeyListener[getters.length+1];
                System.arraycopy(getters, 0, newGetters, 0, getters.length);
                newGetters[getters.length] = getter;
                getters = newGetters;
@@ -43,16 +41,16 @@
         * @param getter
         * @return True if the getter was removed. Caller should check 
isEmpty() afterwards.
         */
-       public boolean removeGetter(SendableGet getter) {
+       public boolean removeGetter(GotKeyListener getter) {
                int found = 0;
                for(int i=0;i<getters.length;i++) {
                        if(getters[i] == getter) found++;
                }
                if(found == 0) return false;
                if(found == getters.length)
-                       getters = new SendableGet[0];
+                       getters = new GotKeyListener[0];
                else {
-                       SendableGet[] newGetters = new 
SendableGet[getters.length - found];
+                       GotKeyListener[] newGetters = new 
GotKeyListener[getters.length - found];
                        int x = 0;
                        for(int i=0;i<getters.length;i++) {
                                if(getters[i] == getter) continue;
@@ -67,13 +65,13 @@
                return getters.length == 0;
        }

-       public boolean hasGetter(SendableRequest req) {
+       public boolean hasGetter(GotKeyListener req) {
                for(int i=0;i<getters.length;i++)
                        if(getters[i] == req) return true;
                return false;
        }

-       public SendableGet[] getters() {
+       public GotKeyListener[] getters() {
                return getters;
        }


Modified: branches/db4o/freenet/src/freenet/client/async/RegisterMe.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/RegisterMe.java      
2008-07-10 15:37:53 UTC (rev 21031)
+++ branches/db4o/freenet/src/freenet/client/async/RegisterMe.java      
2008-07-10 19:28:28 UTC (rev 21032)
@@ -2,7 +2,7 @@

 import com.db4o.ObjectContainer;

-import freenet.node.SendableRequest;
+import freenet.node.SendableGet;

 /**
  * These must be deleted once the request has been registered.
@@ -10,16 +10,28 @@
  * @author toad
  */
 public class RegisterMe {
-       final SendableRequest getter;
+       final GotKeyListener listener;
+       final SendableGet[] getters;
        final ClientRequestSchedulerCore core;
        final RegisterMeSortKey key;
        private final int hashCode;
+       public final BlockSet blocks;

-       RegisterMe(SendableRequest getter, short prio, 
ClientRequestSchedulerCore core) {
-               hashCode = (getter.hashCode() * prio) ^ core.hashCode();
-               this.getter = getter;
+       RegisterMe(GotKeyListener listener, SendableGet[] getters, short prio, 
ClientRequestSchedulerCore core, BlockSet blocks) {
+               this.listener = listener;
+               this.getters = getters;
                this.core = core;
                this.key = new RegisterMeSortKey(prio);
+               this.blocks = blocks;
+               int hash = core.hashCode();
+               if(listener != null)
+                       hash ^= listener.hashCode();
+               if(getters != null) {
+                       for(int i=0;i<getters.length;i++)
+                               hash ^= getters[i].hashCode();
+               }
+               hash *= prio;
+               hashCode = hash;
        }

        public void objectOnActivate(ObjectContainer container) {

Modified: 
branches/db4o/freenet/src/freenet/client/async/SimpleSingleFileFetcher.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/SimpleSingleFileFetcher.java 
2008-07-10 15:37:53 UTC (rev 21031)
+++ branches/db4o/freenet/src/freenet/client/async/SimpleSingleFileFetcher.java 
2008-07-10 19:28:28 UTC (rev 21032)
@@ -101,7 +101,7 @@
                        }
                }
                // :(
-               unregister(false, container);
+               unregisterAll(container, context);
                if(e.isFatal() || forceFatal)
                        parent.fatallyFailedBlock(container, context);
                else
@@ -115,7 +115,7 @@
                        container.activate(parent, 1);
                        container.activate(rcb, 1);
                }
-               unregister(false, container);
+               unregister(container); // pending key has already been removed
                if(parent.isCancelled()) {
                        data.asBucket().free();
                        onFailure(new FetchException(FetchException.CANCELLED), 
false, container, context);

Modified: 
branches/db4o/freenet/src/freenet/client/async/SingleBlockInserter.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/SingleBlockInserter.java     
2008-07-10 15:37:53 UTC (rev 21031)
+++ branches/db4o/freenet/src/freenet/client/async/SingleBlockInserter.java     
2008-07-10 19:28:28 UTC (rev 21032)
@@ -199,7 +199,7 @@
                }
                if(persistent)
                        container.set(this);
-               getScheduler(context).register(this, false, false);
+               getScheduler(context).registerInsert(this, persistent, true);
        }

        private void fail(InsertException e, ObjectContainer container, 
ClientContext context) {
@@ -263,7 +263,7 @@
                        if(persistent)
                                container.set(this);
                } else {
-                       getScheduler(context).register(this, false, false);
+                       getScheduler(context).registerInsert(this, persistent, 
true);
                }
        }

@@ -329,7 +329,7 @@
                        container.set(this);
                        container.activate(cb, 1);
                }
-               super.unregister(false, container);
+               super.unregister(container);
                cb.onFailure(new InsertException(InsertException.CANCELLED), 
this, container, context);
        }


Modified: branches/db4o/freenet/src/freenet/client/async/SingleFileFetcher.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/SingleFileFetcher.java       
2008-07-10 15:37:53 UTC (rev 21031)
+++ branches/db4o/freenet/src/freenet/client/async/SingleFileFetcher.java       
2008-07-10 19:28:28 UTC (rev 21032)
@@ -6,7 +6,6 @@
 import java.io.IOException;
 import java.net.MalformedURLException;
 import java.util.ArrayList;
-import java.util.LinkedList;
 import java.util.List;

 import com.db4o.ObjectContainer;
@@ -182,7 +181,7 @@
        }

        protected void onSuccess(FetchResult result, ObjectContainer container, 
ClientContext context) {
-               unregister(false, container);
+               unregister(container); // Key has already been removed from 
pendingKeys
                if(persistent) {
                        container.activate(decompressors, 1);
                        container.activate(parent, 1);
@@ -522,7 +521,7 @@
                                        f.addDecompressor(codec);
                                }
                                parent.onTransition(this, f, container);
-                               f.schedule(container, context, false, false);
+                               f.schedule(container, context, false);
                                if(persistent) {
                                        container.set(metaStrings);
                                        container.set(this);
@@ -599,7 +598,7 @@
                                SplitFileFetcher sf = new 
SplitFileFetcher(metadata, rcb, parent, ctx, 
                                                decompressors, clientMetadata, 
actx, recursionLevel, returnBucket, token, container);
                                parent.onTransition(this, sf, container);
-                               sf.schedule(container, context, false, false);
+                               sf.schedule(container, context, false);
                                rcb.onBlockSetFinished(this, container, 
context);
                                // Clear our own metadata, we won't need it any 
more.
                                // For multi-level metadata etc see above.
@@ -925,7 +924,7 @@
                                if(l == usk.suggestedEdition) {
                                        SingleFileFetcher sf = new 
SingleFileFetcher(parent, cb, clientMetadata, key, metaStrings, 
key.getURI().addMetaStrings(metaStrings),
                                                        0, ctx, actx, null, 
null, maxRetries, recursionLevel+1, dontTellClientGet, token, false, 
returnBucket, true, container, context);
-                                       sf.schedule(container, context, false, 
false);
+                                       sf.schedule(container, context, false);
                                } else {
                                        cb.onFailure(new 
FetchException(FetchException.PERMANENT_REDIRECT, 
newUSK.getURI().addMetaStrings(metaStrings)), null, container, context);
                                }

Modified: branches/db4o/freenet/src/freenet/client/async/SplitFileFetcher.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/SplitFileFetcher.java        
2008-07-10 15:37:53 UTC (rev 21031)
+++ branches/db4o/freenet/src/freenet/client/async/SplitFileFetcher.java        
2008-07-10 19:28:28 UTC (rev 21032)
@@ -336,14 +336,14 @@
                }
        }

-       public void schedule(ObjectContainer container, ClientContext context, 
boolean regmeOnly, boolean probablyNotInStore) {
+       public void schedule(ObjectContainer container, ClientContext context, 
boolean regmeOnly) {
                if(persistent)
                        container.activate(this, 1);
                if(segments.length > 1)
                        regmeOnly = true;
                if(Logger.shouldLog(Logger.MINOR, this)) Logger.minor(this, 
"Scheduling "+this);
                for(int i=0;i<segments.length;i++) {
-                       segments[i].schedule(container, context, regmeOnly, 
probablyNotInStore);
+                       segments[i].schedule(container, context, regmeOnly);
                }
        }


Modified: 
branches/db4o/freenet/src/freenet/client/async/SplitFileFetcherSegment.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/SplitFileFetcherSegment.java 
2008-07-10 15:37:53 UTC (rev 21031)
+++ branches/db4o/freenet/src/freenet/client/async/SplitFileFetcherSegment.java 
2008-07-10 19:28:28 UTC (rev 21032)
@@ -22,13 +22,18 @@
 import freenet.client.SplitfileBlock;
 import freenet.keys.CHKBlock;
 import freenet.keys.CHKEncodeException;
+import freenet.keys.CHKVerifyException;
 import freenet.keys.ClientCHK;
 import freenet.keys.ClientCHKBlock;
 import freenet.keys.ClientKey;
 import freenet.keys.ClientKeyBlock;
 import freenet.keys.Key;
+import freenet.keys.KeyBlock;
+import freenet.keys.KeyDecodeException;
 import freenet.keys.NodeCHK;
+import freenet.keys.TooBigException;
 import freenet.node.RequestScheduler;
+import freenet.node.SendableGet;
 import freenet.support.Logger;
 import freenet.support.RandomGrabArray;
 import freenet.support.api.Bucket;
@@ -38,7 +43,7 @@
  * A single segment within a SplitFileFetcher.
  * This in turn controls a large number of SplitFileFetcherSubSegment's, which 
are registered on the ClientRequestScheduler.
  */
-public class SplitFileFetcherSegment implements FECCallback {
+public class SplitFileFetcherSegment implements FECCallback, GotKeyListener {

        private static volatile boolean logMINOR;
        final short splitfileType;
@@ -179,13 +184,13 @@
                return fatallyFailedBlocks;
        }

-       public void onSuccess(Bucket data, int blockNo, 
SplitFileFetcherSubSegment seg, ClientKeyBlock block, ObjectContainer 
container, ClientContext context) {
+       public void onSuccess(Bucket data, int blockNo, ClientKeyBlock block, 
ObjectContainer container, ClientContext context) {
                if(persistent)
                        container.activate(this, 1);
                if(data == null) throw new NullPointerException();
                boolean decodeNow = false;
                logMINOR = Logger.shouldLog(Logger.MINOR, this);
-               if(logMINOR) Logger.minor(this, "Fetched block "+blockNo+" on 
"+seg);
+               if(logMINOR) Logger.minor(this, "Fetched block "+blockNo);
                if(parentFetcher.parent instanceof ClientGetter)
                        
((ClientGetter)parentFetcher.parent).addKeyToBinaryBlob(block, container, 
context);
                // No need to unregister key, because it will be cleared in 
tripPendingKey().
@@ -249,8 +254,8 @@
                        container.activate(parentFetcher.parent, 1);
                }
                parentFetcher.parent.completedBlock(dontNotify, container, 
context);
-               seg.possiblyRemoveFromParent(container);
                if(decodeNow) {
+                       context.getChkFetchScheduler().removePendingKeys(this, 
true);
                        removeSubSegments(container);
                        decode(container, context);
                }
@@ -465,7 +470,9 @@
                boolean allFailed;
                // Since we can't keep the key, we need to unregister for it at 
this point to avoid a memory leak
                NodeCHK key = getBlockNodeKey(blockNo, container);
-               if(key != null) seg.unregisterKey(key, context, container);
+               if(key != null)
+                       // don't complain as may already have been removed e.g. 
if we have a decode error in onGotKey; don't NPE for same reason
+                       context.getChkFetchScheduler().removePendingKey(this, 
false, key, container);
                synchronized(this) {
                        if(isFinishing(container)) return; // this failure is 
now irrelevant, and cleanup will occur on the decoder thread
                        if(blockNo < dataKeys.length) {
@@ -497,7 +504,7 @@
                        container.set(this);
                if(allFailed)
                        fail(new FetchException(FetchException.SPLITFILE_ERROR, 
errors), container, context);
-               else
+               else if(seg != null)
                        seg.possiblyRemoveFromParent(container);
        }

@@ -524,7 +531,7 @@
                                tries = ++dataRetries[blockNo];
                                if(tries > maxTries && maxTries >= 0) failed = 
true;
                                else {
-                                       sub = getSubSegment(tries, container);
+                                       sub = getSubSegment(tries, container, 
false);
                                        if(tries % 
ClientRequestScheduler.COOLDOWN_RETRIES == 0) {
                                                long now = 
System.currentTimeMillis();
                                                if(dataCooldownTimes[blockNo] > 
now)
@@ -542,7 +549,7 @@
                                tries = ++checkRetries[checkNo];
                                if(tries > maxTries && maxTries >= 0) failed = 
true;
                                else {
-                                       sub = getSubSegment(tries, container);
+                                       sub = getSubSegment(tries, container, 
false);
                                        if(tries % 
ClientRequestScheduler.COOLDOWN_RETRIES == 0) {
                                                long now = 
System.currentTimeMillis();
                                                if(checkCooldownTimes[checkNo] 
> now)
@@ -567,20 +574,16 @@
                }
                if(cooldown) {
                        // Register to the next sub-segment before removing 
from the old one.
-                       sub.getScheduler(context).addPendingKey(key, sub);
-                       seg.unregisterKey(key.getNodeKey(), context, container);
                        if(logMINOR)
                                Logger.minor(this, "Adding to cooldown queue: 
"+key+" for "+this+" was on segment "+seg+" now registered to "+sub);
                } else {
                        // If we are here we are going to retry
                        if(logMINOR)
                                Logger.minor(this, "Retrying block "+blockNo+" 
on "+this+" : tries="+tries+"/"+maxTries+" : "+sub);
-                       sub.add(blockNo, false, container, context, false);
-                       seg.unregisterKey(key.getNodeKey(), context, container);
                }
        }

-       private SplitFileFetcherSubSegment getSubSegment(int retryCount, 
ObjectContainer container) {
+       private SplitFileFetcherSubSegment getSubSegment(int retryCount, 
ObjectContainer container, boolean noCreate) {
                SplitFileFetcherSubSegment sub;
                if(persistent)
                        container.activate(subSegments, 1);
@@ -589,6 +592,7 @@
                                sub = (SplitFileFetcherSubSegment) 
subSegments.get(i);
                                if(sub.retryCount == retryCount) return sub;
                        }
+                       if(noCreate) return null;
                        sub = new SplitFileFetcherSubSegment(this, retryCount);
                        subSegments.add(sub);
                }
@@ -627,26 +631,27 @@
                                checkBuckets[i] = null;
                        }
                }
+               context.getChkFetchScheduler().removePendingKeys(this, true);
                removeSubSegments(container);
                if(persistent)
                        container.set(this);
                parentFetcher.segmentFinished(this, container, context);
        }

-       public void schedule(ObjectContainer container, ClientContext context, 
boolean regmeOnly, boolean probablyNotInStore) {
+       public void schedule(ObjectContainer container, ClientContext context, 
boolean regmeOnly) {
                if(persistent) {
                        container.activate(this, 1);
                        container.activate(parentFetcher, 1);
                        container.activate(parentFetcher.parent, 1);
                }
                try {
-                       SplitFileFetcherSubSegment seg = getSubSegment(0, 
container);
+                       SplitFileFetcherSubSegment seg = getSubSegment(0, 
container, false);
                        if(persistent)
                                container.activate(seg, 1);
                        for(int 
i=0;i<dataRetries.length+checkRetries.length;i++)
                                seg.add(i, true, container, context, false);

-                       seg.schedule(container, context, regmeOnly, 
probablyNotInStore);
+                       seg.schedule(container, context, true, regmeOnly);
                        synchronized(this) {
                                scheduled = true;
                        }
@@ -760,9 +765,9 @@
        }

        /**
-        * @return True if the key was wanted and the scheduled segment was the 
one that called, false otherwise. 
+        * @return True if the key was wanted, false otherwise. 
         */
-       public boolean requeueAfterCooldown(Key key, long time, ObjectContainer 
container, ClientContext context, SplitFileFetcherSubSegment segment) {
+       public boolean requeueAfterCooldown(Key key, long time, ObjectContainer 
container, ClientContext context) {
                if(persistent)
                        container.activate(this, 1);
                Vector v = null;
@@ -782,7 +787,7 @@
                                        return false;
                                }
                                int tries = dataRetries[i];
-                               SplitFileFetcherSubSegment sub = 
getSubSegment(tries, container);
+                               SplitFileFetcherSubSegment sub = 
getSubSegment(tries, container, false);
                                if(logMINOR)
                                        Logger.minor(this, "Retrying after 
cooldown on "+this+": data block "+i+" on "+this+" : 
tries="+tries+"/"+maxTries+" : "+sub);
                                if(v == null) v = new Vector();
@@ -803,7 +808,7 @@
                                        return false;
                                }
                                int tries = checkRetries[i];
-                               SplitFileFetcherSubSegment sub = 
getSubSegment(tries, container);
+                               SplitFileFetcherSubSegment sub = 
getSubSegment(tries, container, false);
                                if(logMINOR)
                                        Logger.minor(this, "Retrying after 
cooldown on "+this+": check block "+i+" on "+this+" : 
tries="+tries+"/"+maxTries+" : "+sub);
                                if(v == null) v = new Vector();
@@ -816,27 +821,25 @@
                if(notFound) {
                        Logger.error(this, "requeueAfterCooldown: Key not 
found!: "+key+" on "+this);
                }
-               boolean foundCaller = false;
                if(v != null) {
                        for(int i=0;i<v.size();i++) {
-                               if(v.get(i) == segment) foundCaller = true;
                                SplitFileFetcherSubSegment sub = 
(SplitFileFetcherSubSegment) v.get(i);
                                RandomGrabArray rga = sub.getParentGrabArray();
                                if(sub.getParentGrabArray() == null) {
-                                       sub.schedule(container, context, false, 
true);
+                                       sub.schedule(container, context, false, 
false);
                                } else {
 //                                     if(logMINOR) {
                                                container.activate(rga, 1);
                                                if(!rga.contains(sub, 
container)) {
                                                        Logger.error(this, 
"Sub-segment has RGA but isn't registered to it!!: "+sub+" for "+rga);
-                                                       sub.schedule(container, 
context, false, true);
+                                                       sub.schedule(container, 
context, false, false);
                                                }
                                                container.deactivate(rga, 1);
 //                                     }
                                }
                        }
                }
-               return foundCaller;
+               return true;
        }

        public synchronized long getCooldownWakeupByKey(Key key, 
ObjectContainer container) {
@@ -929,4 +932,107 @@
                        return checkBuckets[blockNo].hasData();
                }
        }
+
+       public boolean dontCache(ObjectContainer container) {
+               return !blockFetchContext.cacheLocalRequests;
+       }
+
+       public short getPriorityClass(ObjectContainer container) {
+               container.activate(parent, 1);
+               return parent.priorityClass;
+       }
+
+       public SendableGet getRequest(Key key, ObjectContainer container) {
+               int blockNum = this.getBlockNumber(key, container);
+               if(blockNum < 0) return null;
+               int retryCount = getBlockRetryCount(blockNum);
+               return getSubSegment(retryCount, container, false);
+       }
+
+       public boolean isCancelled(ObjectContainer container) {
+               return isFinishing(container);
+       }
+
+       public Key[] listKeys(ObjectContainer container) {
+               Vector v = new Vector();
+               synchronized(this) {
+                       for(int i=0;i<dataKeys.length;i++) {
+                               if(dataKeys[i] != null) {
+                                       if(persistent)
+                                               container.activate(dataKeys[i], 
5);
+                                       v.add(dataKeys[i].getNodeKey());
+                               }
+                       }
+                       for(int i=0;i<checkKeys.length;i++) {
+                               if(checkKeys[i] != null) {
+                                       if(persistent)
+                                               
container.activate(checkKeys[i], 5);
+                                       v.add(checkKeys[i].getNodeKey());
+                               }
+                       }
+               }
+               return (Key[]) v.toArray(new Key[v.size()]);
+       }
+
+       public void onGotKey(Key key, KeyBlock block, ObjectContainer 
container, ClientContext context) {
+               int blockNum = this.getBlockNumber(key, container);
+               if(blockNum < 0) return;
+               ClientCHK ckey = this.getBlockKey(blockNum, container);
+               ClientCHKBlock cb;
+               int retryCount = getBlockRetryCount(blockNum);
+               SplitFileFetcherSubSegment seg = this.getSubSegment(retryCount, 
container, true);
+               seg.removeBlockNum(blockNum);
+               seg.possiblyRemoveFromParent(container);
+               try {
+                       cb = new ClientCHKBlock((CHKBlock)block, ckey);
+               } catch (CHKVerifyException e) {
+                       this.onFatalFailure(new 
FetchException(FetchException.BLOCK_DECODE_ERROR, e), blockNum, null, 
container, context);
+                       return;
+               }
+               Bucket data = extract(cb, blockNum, container, context);
+               if(data == null) return;
+               
+               if(!cb.isMetadata()) {
+                       this.onSuccess(data, blockNum, cb, container, context);
+               } else {
+                       this.onFatalFailure(new 
FetchException(FetchException.INVALID_METADATA, "Metadata where expected 
data"), blockNum, null, container, context);
+               }
+       }
+       
+       private int getBlockRetryCount(int blockNum) {
+               if(blockNum < dataRetries.length)
+                       return dataRetries[blockNum];
+               blockNum -= dataRetries.length;
+               return checkRetries[blockNum];
+       }
+
+       /** Convert a ClientKeyBlock to a Bucket. If an error occurs, report it 
via onFailure
+        * and return null.
+        */
+       protected Bucket extract(ClientKeyBlock block, int blockNum, 
ObjectContainer container, ClientContext context) {
+               Bucket data;
+               try {
+                       data = 
block.decode(context.getBucketFactory(persistent), 
(int)(Math.min(this.blockFetchContext.maxOutputLength, Integer.MAX_VALUE)), 
false);
+               } catch (KeyDecodeException e1) {
+                       if(Logger.shouldLog(Logger.MINOR, this))
+                               Logger.minor(this, "Decode failure: "+e1, e1);
+                       this.onFatalFailure(new 
FetchException(FetchException.BLOCK_DECODE_ERROR, e1.getMessage()), blockNum, 
null, container, context);
+                       return null;
+               } catch (TooBigException e) {
+                       this.onFatalFailure(new 
FetchException(FetchException.TOO_BIG, e.getMessage()), blockNum, null, 
container, context);
+                       return null;
+               } catch (IOException e) {
+                       Logger.error(this, "Could not capture data - disk 
full?: "+e, e);
+                       this.onFatalFailure(new 
FetchException(FetchException.BUCKET_ERROR, e), blockNum, null, container, 
context);
+                       return null;
+               }
+               if(Logger.shouldLog(Logger.MINOR, this))
+                       Logger.minor(this, data == null ? "Could not decode: 
null" : ("Decoded "+data.size()+" bytes"));
+               return data;
+       }
+
+
+       public boolean persistent() {
+               return persistent;
+       }
 }

Modified: 
branches/db4o/freenet/src/freenet/client/async/SplitFileFetcherSubSegment.java
===================================================================
--- 
branches/db4o/freenet/src/freenet/client/async/SplitFileFetcherSubSegment.java  
    2008-07-10 15:37:53 UTC (rev 21031)
+++ 
branches/db4o/freenet/src/freenet/client/async/SplitFileFetcherSubSegment.java  
    2008-07-10 19:28:28 UTC (rev 21032)
@@ -329,7 +329,7 @@
                        onFailure(new FetchException(FetchException.CANCELLED), 
token, container, context);
                        return;
                }
-               segment.onSuccess(data, blockNo, this, block, container, 
context);
+               segment.onSuccess(data, blockNo, block, container, context);
        }

        /** Convert a ClientKeyBlock to a Bucket. If an error occurs, report it 
via onFailure
@@ -456,10 +456,8 @@
                }
                if(persistent)
                        container.set(blockNums);
-               if(schedule) schedule(container, context, false, true); // 
Retrying so not in store
-               else if(!dontSchedule)
-                       // Already scheduled, however this key may not be 
registered.
-                       
getScheduler(context).addPendingKey(segment.getBlockKey(blockNo, container), 
this);
+               if(schedule)
+                       context.getChkFetchScheduler().register(null, new 
SendableGet[] { this }, false, persistent, true, null);
        }

        public String toString() {
@@ -480,7 +478,7 @@
                                Logger.minor(this, "Definitely removing from 
parent: "+this);
                        if(!segment.maybeRemoveSeg(this, container)) return;
                }
-               unregister(false, container);
+               unregister(container);
        }

        public void onGotKey(Key key, KeyBlock block, ObjectContainer 
container, ClientContext context) {
@@ -542,7 +540,7 @@
                if(logMINOR)
                        Logger.minor(this, "Killing "+this);
                // Do unregister() first so can get and unregister each key and 
avoid a memory leak
-               unregister(false, container);
+               unregister(container);
                synchronized(segment) {
                        blockNums.clear();
                        cancelled = true;
@@ -564,9 +562,8 @@
                }
                if(Logger.shouldLog(Logger.MINOR, this))
                        Logger.minor(this, "Requeueing after cooldown "+key+" 
for "+this);
-               if(!segment.requeueAfterCooldown(key, time, container, context, 
this)) {
-                       Logger.error(this, "Removing key "+key+" for "+this+" 
in requeueAfterCooldown as is now registered to a different subsegment");
-                       unregisterKey(key, context, container);
+               if(!segment.requeueAfterCooldown(key, time, container, 
context)) {
+                       Logger.error(this, "Key was not wanted after cooldown: 
"+key+" for "+this+" in requeueAfterCooldown");
                }
        }

@@ -588,4 +585,21 @@
                }
        }

+       public void schedule(ObjectContainer container, ClientContext context, 
boolean firstTime, boolean regmeOnly) {
+               getScheduler(context).register(firstTime ? segment : null, new 
SendableGet[] { this }, regmeOnly, persistent, true, 
segment.blockFetchContext.blocks);
+       }
+
+       public void removeBlockNum(int blockNum) {
+               synchronized(segment) {
+                       for(int i=0;i<blockNums.size();i++) {
+                               Integer token = (Integer) blockNums.get(i);
+                               int num = ((Integer)token).intValue();
+                               if(num == blockNum) {
+                                       blockNums.remove(i);
+                                       break;
+                               }
+                       }
+               }
+       }
+
 }

Modified: branches/db4o/freenet/src/freenet/client/async/USKChecker.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/USKChecker.java      
2008-07-10 15:37:53 UTC (rev 21031)
+++ branches/db4o/freenet/src/freenet/client/async/USKChecker.java      
2008-07-10 19:28:28 UTC (rev 21032)
@@ -32,7 +32,7 @@
                        container.activate(this, 1);
                        container.activate(cb, 1);
                }
-               unregister(false, container);
+               unregister(container); // Key has already been removed from 
pendingKeys
                cb.onSuccess((ClientSSKBlock)block, context);
        }

@@ -73,7 +73,7 @@
                if(canRetry && retry(container, context)) return;

                // Ran out of retries.
-               unregister(false, container);
+               unregisterAll(container, context);
                if(e.code == LowLevelGetException.CANCELLED){
                        cb.onCancelled(context);
                        return;

Modified: branches/db4o/freenet/src/freenet/client/async/USKFetcher.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/USKFetcher.java      
2008-07-10 15:37:53 UTC (rev 21031)
+++ branches/db4o/freenet/src/freenet/client/async/USKFetcher.java      
2008-07-10 19:28:28 UTC (rev 21032)
@@ -159,7 +159,7 @@
                                if(logMINOR)
                                        Logger.minor(this, "Checker == null in 
schedule() for "+this, new Exception("debug"));
                        } else
-                               checker.schedule(container, context, false, 
false);
+                               checker.schedule(container, context, false);
                }

                public String toString() {
@@ -469,17 +469,17 @@
        public void schedule(long delay, ObjectContainer container, final 
ClientContext context) {
                assert(container == null);
                if (delay<=0) {
-                       schedule(container, context, false, false);
+                       schedule(container, context, false);
                } else {
                        uskManager.ticker.queueTimedJob(new Runnable() {
                                public void run() {
-                                       USKFetcher.this.schedule(null, context, 
false, false);
+                                       USKFetcher.this.schedule(null, context, 
false);
                                }
                        }, delay);
                }
        }

-       public void schedule(ObjectContainer container, ClientContext context, 
boolean regmeOnly, boolean probablyNotInStore) {
+       public void schedule(ObjectContainer container, ClientContext context, 
boolean regmeOnly) {
                USKAttempt[] attempts;
                long lookedUp = uskManager.lookup(origUSK);
                synchronized(this) {

Modified: branches/db4o/freenet/src/freenet/client/async/USKFetcherTag.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/USKFetcherTag.java   
2008-07-10 15:37:53 UTC (rev 21031)
+++ branches/db4o/freenet/src/freenet/client/async/USKFetcherTag.java   
2008-07-10 19:28:28 UTC (rev 21032)
@@ -91,7 +91,7 @@
                        usk = usk.copy(edition);
                fetcher = manager.getFetcher(usk, ctx, new 
USKFetcherWrapper(usk, priority, client), keepLastData);
                fetcher.addCallback(this);
-               fetcher.schedule(null, context, false, false); // non-persistent
+               fetcher.schedule(null, context, false); // non-persistent
        }

        public void cancel(ObjectContainer container, ClientContext context) {
@@ -118,7 +118,7 @@
                return token;
        }

-       public void schedule(ObjectContainer container, ClientContext context, 
boolean regmeOnly, boolean probablyNotInStore) {
+       public void schedule(ObjectContainer container, ClientContext context, 
boolean regmeOnly) {
                start(context.uskManager, context);
        }


Modified: branches/db4o/freenet/src/freenet/client/async/USKInserter.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/USKInserter.java     
2008-07-10 15:37:53 UTC (rev 21031)
+++ branches/db4o/freenet/src/freenet/client/async/USKInserter.java     
2008-07-10 19:28:28 UTC (rev 21032)
@@ -78,7 +78,7 @@
                        if(finished) return;
                        fetcher = 
context.uskManager.getFetcherForInsertDontSchedule(pubUSK, 
parent.priorityClass, this, parent.getClient(), container, context);
                }
-               fetcher.schedule(container, context, false, false);
+               fetcher.schedule(container, context, false);
        }

        public void onFoundEdition(long l, USK key, ObjectContainer container, 
ClientContext context, boolean lastContentWasMetadata, short codec, byte[] 
hisData) {

Modified: branches/db4o/freenet/src/freenet/client/async/USKManager.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/USKManager.java      
2008-07-10 15:37:53 UTC (rev 21031)
+++ branches/db4o/freenet/src/freenet/client/async/USKManager.java      
2008-07-10 19:28:28 UTC (rev 21032)
@@ -140,7 +140,7 @@
                                fetcher.cancel(null, context);
                        }
                }
-               if(sched != null) sched.schedule(null, context, false, false);
+               if(sched != null) sched.schedule(null, context, false);
        }

        void update(final USK origUSK, final long number, final ClientContext 
context) {
@@ -216,7 +216,7 @@
                if(fetcher != null) {
                        ticker.queueTimedJob(new Runnable() {
                                public void run() {
-                                       fetcher.schedule(null, context, false, 
false);
+                                       fetcher.schedule(null, context, false);
                                }
                        }, 0);
                }

Modified: branches/db4o/freenet/src/freenet/client/async/USKRetriever.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/USKRetriever.java    
2008-07-10 15:37:53 UTC (rev 21031)
+++ branches/db4o/freenet/src/freenet/client/async/USKRetriever.java    
2008-07-10 19:28:28 UTC (rev 21032)
@@ -48,7 +48,7 @@
                        SingleFileFetcher getter =
                                (SingleFileFetcher) 
SingleFileFetcher.create(this, this, new ClientMetadata(), uri, ctx, new 
ArchiveContext(ctx.maxArchiveLevels), 
                                                ctx.maxNonSplitfileRetries, 0, 
true, l, true, null, false, null, context);
-                       getter.schedule(null, context, false, false);
+                       getter.schedule(null, context, false);
                } catch (MalformedURLException e) {
                        Logger.error(this, "Impossible: "+e, e);
                } catch (FetchException e) {

Modified: branches/db4o/freenet/src/freenet/node/SendableGet.java
===================================================================
--- branches/db4o/freenet/src/freenet/node/SendableGet.java     2008-07-10 
15:37:53 UTC (rev 21031)
+++ branches/db4o/freenet/src/freenet/node/SendableGet.java     2008-07-10 
19:28:28 UTC (rev 21032)
@@ -99,12 +99,6 @@
                return true;
        }

-       public void schedule(ObjectContainer container, ClientContext context, 
boolean regmeOnly, boolean assumeNotInStore) {
-               if(Logger.shouldLog(Logger.MINOR, this))
-                       Logger.minor(this, "Scheduling "+this);
-               getScheduler(context).register(this, regmeOnly, 
assumeNotInStore);
-       }
-       
        public ClientRequestScheduler getScheduler(ClientContext context) {
                if(isSSK())
                        return context.getSskFetchScheduler();
@@ -113,14 +107,6 @@
        }

        /**
-        * Callback for when a block is found. Will be called on the database 
executor thread.
-        * @param key
-        * @param block
-        * @param sched
-        */
-       public abstract void onGotKey(Key key, KeyBlock block, ObjectContainer 
container, ClientContext context);
-       
-       /**
         * Get the time at which the key specified by the given token will wake 
up from the 
         * cooldown queue.
         * @param token
@@ -133,16 +119,6 @@
        /** Reset the cooldown times when the request is reregistered. */
        public abstract void resetCooldownTimes(ObjectContainer container);

-       public final void unregister(boolean staySubscribed, ObjectContainer 
container, ClientContext context) {
-               if(!staySubscribed)
-                       getScheduler(context).removePendingKeys(this, false);
-               super.unregister(staySubscribed, container);
-       }
-       
-       public final void unregisterKey(Key key, ClientContext context, 
ObjectContainer container) {
-               getScheduler(context).removePendingKey(this, false, key, 
container);
-       }
-
        public void internalError(final Object keyNum, final Throwable t, final 
RequestScheduler sched, ObjectContainer container, ClientContext context, 
boolean persistent) {
                sched.callFailure(this, new 
LowLevelGetException(LowLevelGetException.INTERNAL_ERROR, t.getMessage(), t), 
keyNum, NativeThread.MAX_PRIORITY, null, persistent);
        }

Modified: branches/db4o/freenet/src/freenet/node/SendableRequest.java
===================================================================
--- branches/db4o/freenet/src/freenet/node/SendableRequest.java 2008-07-10 
15:37:53 UTC (rev 21031)
+++ branches/db4o/freenet/src/freenet/node/SendableRequest.java 2008-07-10 
19:28:28 UTC (rev 21032)
@@ -96,7 +96,7 @@
                        container.set(this);
        }

-       public void unregister(boolean staySubscribed, ObjectContainer 
container) {
+       public void unregister(ObjectContainer container) {
                RandomGrabArray arr = getParentGrabArray();
                if(arr != null) {
                        if(persistent)

Modified: branches/db4o/freenet/src/freenet/node/SimpleSendableInsert.java
===================================================================
--- branches/db4o/freenet/src/freenet/node/SimpleSendableInsert.java    
2008-07-10 15:37:53 UTC (rev 21031)
+++ branches/db4o/freenet/src/freenet/node/SimpleSendableInsert.java    
2008-07-10 19:28:28 UTC (rev 21032)
@@ -110,7 +110,7 @@

        public void schedule() {
                finished = false; // can reschedule
-               scheduler.register(this, false, false);
+               scheduler.registerInsert(this, false, false);
        }

        public void cancel(ObjectContainer container, ClientContext context) {
@@ -118,7 +118,7 @@
                        if(finished) return;
                        finished = true;
                }
-               super.unregister(false, container);
+               super.unregister(container);
        }

        public boolean shouldCache() {


Reply via email to