Author: toad
Date: 2008-08-12 19:42:51 +0000 (Tue, 12 Aug 2008)
New Revision: 21773

Added:
   branches/db4o/freenet/src/freenet/client/async/ChosenBlock.java
   branches/db4o/freenet/src/freenet/client/async/PersistentChosenBlock.java
   branches/db4o/freenet/src/freenet/client/async/TransientChosenBlock.java
   branches/db4o/freenet/src/freenet/node/SendableGetRequestSender.java
   branches/db4o/freenet/src/freenet/node/SendableRequestSender.java
Removed:
   branches/db4o/freenet/src/freenet/client/async/ChosenRequest.java
   branches/db4o/freenet/src/freenet/client/async/PersistentChosenRequest.java
Modified:
   branches/db4o/freenet/src/freenet/client/async/BaseSingleFileFetcher.java
   branches/db4o/freenet/src/freenet/client/async/ClientGetter.java
   branches/db4o/freenet/src/freenet/client/async/ClientPutter.java
   branches/db4o/freenet/src/freenet/client/async/ClientRequestScheduler.java
   
branches/db4o/freenet/src/freenet/client/async/ClientRequestSchedulerBase.java
   
branches/db4o/freenet/src/freenet/client/async/ClientRequestSchedulerCore.java
   
branches/db4o/freenet/src/freenet/client/async/ClientRequestSchedulerNonPersistent.java
   branches/db4o/freenet/src/freenet/client/async/OfferedKeysList.java
   branches/db4o/freenet/src/freenet/client/async/SingleBlockInserter.java
   branches/db4o/freenet/src/freenet/client/async/SplitFileFetcherSegment.java
   
branches/db4o/freenet/src/freenet/client/async/SplitFileFetcherSubSegment.java
   branches/db4o/freenet/src/freenet/keys/ClientCHK.java
   branches/db4o/freenet/src/freenet/node/BulkCallFailureItem.java
   branches/db4o/freenet/src/freenet/node/RequestScheduler.java
   branches/db4o/freenet/src/freenet/node/RequestStarter.java
   branches/db4o/freenet/src/freenet/node/SendableGet.java
   branches/db4o/freenet/src/freenet/node/SendableInsert.java
   branches/db4o/freenet/src/freenet/node/SendableRequest.java
   branches/db4o/freenet/src/freenet/node/SimpleSendableInsert.java
Log:
Keep the chosen requests list entirely in RAM.
Choose one or more SendableRequest's, and fetch all of the blocks on those 
requests.
Drop stuff from the queue if more important requests are registered.
This is a major optimisation: Expect approx 40% reduction in worst case time 
spent on the database thread.
Tested for requests but not yet inserts.


Modified: 
branches/db4o/freenet/src/freenet/client/async/BaseSingleFileFetcher.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/BaseSingleFileFetcher.java   
2008-08-12 19:25:54 UTC (rev 21772)
+++ branches/db4o/freenet/src/freenet/client/async/BaseSingleFileFetcher.java   
2008-08-12 19:42:51 UTC (rev 21773)
@@ -3,6 +3,9 @@
  * http://www.gnu.org/ for further details of the GPL. */
 package freenet.client.async;

+import java.util.Collections;
+import java.util.List;
+
 import com.db4o.ObjectContainer;

 import freenet.client.FetchContext;
@@ -91,7 +94,7 @@
                if((retryCount <= maxRetries) || (maxRetries == -1)) {
                        if(persistent)
                                container.set(this);
-                       if(retryCount % ClientRequestScheduler.COOLDOWN_RETRIES 
== 0) {
+                       if(retryCount % RequestScheduler.COOLDOWN_RETRIES == 0) 
{
                                // Add to cooldown queue. Don't reschedule yet.
                                long now = System.currentTimeMillis();
                                if(cooldownWakeupTime > now)
@@ -254,4 +257,13 @@
                }
        }

+       @Override
+       public List<PersistentChosenBlock> makeBlocks(PersistentChosenRequest 
request, RequestScheduler sched, ObjectContainer container, ClientContext 
context) {
+               if(persistent)
+                       container.activate(key, 5);
+               ClientKey ckey = key.cloneKey();
+               PersistentChosenBlock block = new PersistentChosenBlock(false, 
request, keys[0], ckey.getNodeKey(), ckey, sched);
+               return Collections.singletonList(block);
+       }
+
 }

Added: branches/db4o/freenet/src/freenet/client/async/ChosenBlock.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/ChosenBlock.java             
                (rev 0)
+++ branches/db4o/freenet/src/freenet/client/async/ChosenBlock.java     
2008-08-12 19:42:51 UTC (rev 21773)
@@ -0,0 +1,73 @@
+package freenet.client.async;
+
+import freenet.keys.ClientKey;
+import freenet.keys.Key;
+import freenet.node.LowLevelGetException;
+import freenet.node.LowLevelPutException;
+import freenet.node.NodeClientCore;
+import freenet.node.RequestScheduler;
+import freenet.node.SendableRequestSender;
+
+/**
+ * A single selected request, including everything needed to execute it.
+ * @author Matthew Toseland <toad at amphibian.dyndns.org> (0xE43DA450)
+ */
+public abstract class ChosenBlock {
+
+       /** The token indicating the key within the request to be 
fetched/inserted.
+        * Meaning is entirely defined by the request. */
+       public transient final Object token;
+       /** The key to be fetched, null if not a BaseSendableGet */
+       public transient final Key key;
+       /** The client-layer key to be fetched, null if not a SendableGet */
+       public transient final ClientKey ckey;
+       public transient final boolean localRequestOnly;
+       public transient final boolean cacheLocalRequests;
+       public transient final boolean ignoreStore;
+       
+       public ChosenBlock(Object token, Key key, ClientKey ckey, boolean 
localRequestOnly, boolean cacheLocalRequests, boolean ignoreStore, 
RequestScheduler sched) {
+               this.token = token;
+               this.key = key;
+               this.ckey = ckey;
+               this.localRequestOnly = localRequestOnly;
+               this.cacheLocalRequests = cacheLocalRequests;
+               this.ignoreStore = ignoreStore;
+               // Ignore return value. Not our problem at this point.
+               // It won't have been scheduled if there were *no* valid 
blocks...
+               // it's possible, but very unlikely, that some blocks are 
scheduled
+               // and some are not.
+               if(key != null)
+                       sched.addToFetching(key);
+       }
+
+       public abstract boolean isPersistent();
+
+       public abstract boolean isCancelled();
+
+       public abstract void onFailure(LowLevelPutException e, ClientContext 
context);
+
+       public abstract void onInsertSuccess(ClientContext context);
+
+       public abstract void onFailure(LowLevelGetException e, ClientContext 
context);
+
+       /**
+        * The actual data delivery goes through CRS.tripPendingKey(). This is 
just a notification
+        * for book-keeping purposes. We call the scheduler to tell it that the 
request succeeded,
+        * so that it can be rescheduled soon for more requests.
+        * @param context Might be useful.
+        */
+       public abstract void onFetchSuccess(ClientContext context);
+
+       public abstract short getPriority();
+
+       public boolean send(NodeClientCore core, RequestScheduler sched) {
+               ClientContext context = sched.getContext();
+               return getSender(context).send(core, sched, context, this);
+       }
+       
+       public abstract SendableRequestSender getSender(ClientContext context);
+       
+       public void removeFromFetching(ClientRequestSchedulerCore core) {
+               core.removeFetchingKey(key);
+       }
+}

Deleted: branches/db4o/freenet/src/freenet/client/async/ChosenRequest.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/ChosenRequest.java   
2008-08-12 19:25:54 UTC (rev 21772)
+++ branches/db4o/freenet/src/freenet/client/async/ChosenRequest.java   
2008-08-12 19:42:51 UTC (rev 21773)
@@ -1,88 +0,0 @@
-/* This code is part of Freenet. It is distributed under the GNU General
- * Public License, version 2 (or at your option any later version). See
- * http://www.gnu.org/ for further details of the GPL. */
-package freenet.client.async;
-
-import com.db4o.ObjectContainer;
-
-import freenet.client.FetchContext;
-import freenet.keys.ClientKey;
-import freenet.keys.Key;
-import freenet.node.NodeClientCore;
-import freenet.node.RequestScheduler;
-import freenet.node.SendableGet;
-import freenet.node.SendableRequest;
-
-/**
- * A request chosen by ClientRequestScheduler.
- * @author toad
- */
-public class ChosenRequest {
-
-       /** The request object */
-       public final SendableRequest request;
-       /** The token indicating the key within the request to be 
fetched/inserted.
-        * Meaning is entirely defined by the request. */
-       public final Object token;
-       /** The key to be fetched, null if not a BaseSendableGet */
-       public final Key key;
-       /** The client-layer key to be fetched, null if not a SendableGet */
-       public final ClientKey ckey;
-       /** Priority when we selected it */
-       public short prio;
-       public final boolean localRequestOnly;
-       public final boolean cacheLocalRequests;
-       public final boolean ignoreStore;
-
-       ChosenRequest(SendableRequest req, Object tok, Key key, ClientKey ckey, 
short prio, ObjectContainer container) {
-               request = req;
-               token = tok;
-               this.key = key;
-               this.ckey = ckey;
-               this.prio = prio;
-               if(req instanceof SendableGet) {
-                       SendableGet sg = (SendableGet) req;
-                       FetchContext ctx = sg.getContext();
-                       if(container != null)
-                               container.activate(ctx, 1);
-                       localRequestOnly = ctx.localRequestOnly;
-                       cacheLocalRequests = ctx.cacheLocalRequests;
-                       ignoreStore = ctx.ignoreStore;
-               } else {
-                       localRequestOnly = false;
-                       cacheLocalRequests = false;
-                       ignoreStore = false;
-               }
-       }
-
-       public boolean send(NodeClientCore core, RequestScheduler sched) {
-               return request.send(core, sched, this);
-       }
-
-       public boolean isPersistent() {
-               return this instanceof PersistentChosenRequest;
-       }
-       
-       public boolean equals(Object o) {
-               if(!(o instanceof ChosenRequest)) return false;
-               if(o == this) return true;
-               ChosenRequest cr = (ChosenRequest) o;
-               if(!cr.request.equals(request)) return false;
-               if(!cr.token.equals(token)) return false;
-               if(cr.key != null) {
-                       if(key != null) {
-                               if(!key.equals(cr.key)) return false;
-                       } else return false;
-               } else {
-                       if(key != null) return false;
-               }
-               if(cr.ckey != null) {
-                       if(ckey != null) {
-                               if(!ckey.equals(cr.ckey)) return false;
-                       } else return false;
-               } else {
-                       if(ckey != null) return false;
-               }
-               return true;
-       }
-}

Modified: branches/db4o/freenet/src/freenet/client/async/ClientGetter.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/ClientGetter.java    
2008-08-12 19:25:54 UTC (rev 21772)
+++ branches/db4o/freenet/src/freenet/client/async/ClientGetter.java    
2008-08-12 19:42:51 UTC (rev 21773)
@@ -21,6 +21,7 @@
 import freenet.keys.FreenetURI;
 import freenet.keys.Key;
 import freenet.node.RequestClient;
+import freenet.node.RequestScheduler;
 import freenet.support.Logger;
 import freenet.support.api.Bucket;
 import freenet.support.io.BucketTools;
@@ -60,7 +61,7 @@
         * write the data directly to the bucket, or copy it and free the 
original temporary bucket. Preferably the
         * former, obviously!
         */
-       public ClientGetter(ClientCallback client, ClientRequestScheduler 
chkSched, ClientRequestScheduler sskSched,
+       public ClientGetter(ClientCallback client, RequestScheduler chkSched, 
RequestScheduler sskSched,
                            FreenetURI uri, FetchContext ctx, short 
priorityClass, RequestClient clientContext, Bucket returnBucket, Bucket 
binaryBlobBucket) {
                super(priorityClass, clientContext);
                this.clientCallback = client;
@@ -145,8 +146,10 @@
                        try {
                                if(Logger.shouldLog(Logger.MINOR, this))
                                        Logger.minor(this, "Copying - 
returnBucket not respected by client.async");
-                               if(persistent())
+                               if(persistent()) {
                                        container.activate(from, 5);
+                                       container.activate(returnBucket, 5);
+                               }
                                BucketTools.copy(from, to);
                                from.free();
                                if(persistent())

Modified: branches/db4o/freenet/src/freenet/client/async/ClientPutter.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/ClientPutter.java    
2008-08-12 19:25:54 UTC (rev 21772)
+++ branches/db4o/freenet/src/freenet/client/async/ClientPutter.java    
2008-08-12 19:42:51 UTC (rev 21773)
@@ -16,6 +16,7 @@
 import freenet.keys.BaseClientKey;
 import freenet.keys.FreenetURI;
 import freenet.node.RequestClient;
+import freenet.node.RequestScheduler;
 import freenet.support.Logger;
 import freenet.support.SimpleFieldSet;
 import freenet.support.api.Bucket;
@@ -55,7 +56,7 @@
         * @param targetFilename If set, create a one-file manifest containing 
this filename pointing to this file.
         */
        public ClientPutter(ClientCallback client, Bucket data, FreenetURI 
targetURI, ClientMetadata cm, InsertContext ctx,
-                       ClientRequestScheduler chkScheduler, 
ClientRequestScheduler sskScheduler, short priorityClass, boolean getCHKOnly, 
+                       RequestScheduler chkScheduler, RequestScheduler 
sskScheduler, short priorityClass, boolean getCHKOnly, 
                        boolean isMetadata, RequestClient clientContext, 
SimpleFieldSet stored, String targetFilename, boolean binaryBlob) {
                super(priorityClass, clientContext);
                this.cm = cm;

Modified: 
branches/db4o/freenet/src/freenet/client/async/ClientRequestScheduler.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/ClientRequestScheduler.java  
2008-08-12 19:25:54 UTC (rev 21772)
+++ branches/db4o/freenet/src/freenet/client/async/ClientRequestScheduler.java  
2008-08-12 19:42:51 UTC (rev 21773)
@@ -3,6 +3,7 @@
  * http://www.gnu.org/ for further details of the GPL. */
 package freenet.client.async;

+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.LinkedList;

@@ -17,7 +18,6 @@
 import freenet.keys.Key;
 import freenet.keys.KeyBlock;
 import freenet.node.BaseSendableGet;
-import freenet.node.BulkCallFailureItem;
 import freenet.node.KeysFetchingLocally;
 import freenet.node.LowLevelGetException;
 import freenet.node.LowLevelPutException;
@@ -28,7 +28,6 @@
 import freenet.node.SendableGet;
 import freenet.node.SendableInsert;
 import freenet.node.SendableRequest;
-import freenet.node.SupportsBulkCallFailure;
 import freenet.support.Logger;
 import freenet.support.PrioritizedSerialExecutor;
 import freenet.support.api.StringCallback;
@@ -116,8 +115,6 @@
                this.selectorContainer = node.db;
                schedCore = ClientRequestSchedulerCore.create(node, forInserts, 
forSSKs, selectorContainer, COOLDOWN_PERIOD, core.clientDatabaseExecutor, this, 
context);
                schedTransient = new ClientRequestSchedulerNonPersistent(this, 
forInserts, forSSKs);
-               schedCore.fillStarterQueue(selectorContainer);
-               schedCore.start(core);
                persistentCooldownQueue = schedCore.persistentCooldownQueue;
                this.databaseExecutor = core.clientDatabaseExecutor;
                this.datastoreCheckerExecutor = core.datastoreCheckerExecutor;
@@ -148,6 +145,11 @@
                logMINOR = Logger.shouldLog(Logger.MINOR, this);
        }

+       public void start(NodeClientCore core) {
+               schedCore.start(core);
+               queueFillRequestStarterQueue();
+       }
+       
        /** Called by the  config. Callback
         * 
         * @param val
@@ -412,11 +414,6 @@
                finishRegister(getters, persistent, false, anyValid, regme);
        }

-       /** If enabled, if the queue is less than 25% full, attempt to add 
newly 
-        * registered requests directly to it, storing a 
PersistentChosenRequest and 
-        * bypassing registration on the queue. Risky optimisation. */
-       static final boolean TRY_DIRECT = true;
-
        private void finishRegister(final SendableGet[] getters, boolean 
persistent, boolean onDatabaseThread, final boolean anyValid, final RegisterMe 
reg) {
                if(isInsertScheduler && getters != null) {
                        IllegalStateException e = new 
IllegalStateException("finishRegister on an insert scheduler");
@@ -424,7 +421,7 @@
                                for(int i=0;i<getters.length;i++) {
                                        if(persistent)
                                                
selectorContainer.activate(getters[i], 1);
-                                       getters[i].internalError(null, e, this, 
selectorContainer, clientContext, persistent);
+                                       getters[i].internalError(e, this, 
selectorContainer, clientContext, persistent);
                                        if(persistent)
                                                
selectorContainer.deactivate(getters[i], 1);
                                }
@@ -439,57 +436,21 @@
                                }
                                if(persistent)
                                        selectorContainer.activate(getters, 1);
-                               boolean tryDirect = false;
-                               if(anyValid && TRY_DIRECT) {
-                                       synchronized(starterQueue) {
-                                               tryDirect = starterQueue.size() 
< MAX_STARTER_QUEUE_SIZE * 1 / 4;
-                                       }
-                                       if(tryDirect) {
-                                               for(int 
i=0;i<getters.length;i++) {
-                                                       SendableGet getter = 
getters[i];
-                                                       if(persistent)
-                                                               
selectorContainer.activate(getter, 1);
-                                                       while(true) {
-                                                               
PersistentChosenRequest cr = (PersistentChosenRequest) 
schedCore.maybeMakeChosenRequest(getter, selectorContainer, clientContext);
-                                                               if(cr == null) {
-                                                                       
if(!(getter.isEmpty(selectorContainer) || 
getter.isCancelled(selectorContainer)))
-                                                                               
// Still needs to be registered
-                                                                               
tryDirect = false;
-                                                                       break;
-                                                               }
-                                                               
synchronized(starterQueue) {
-                                                                       
if(logMINOR) Logger.minor(this, "Adding directly to queue: "+cr);
-                                                                       
starterQueue.add(cr);
-                                                                       
if(starterQueue.size() >= MAX_STARTER_QUEUE_SIZE) {
-                                                                               
if(!(getter.isEmpty(selectorContainer) || 
getter.isCancelled(selectorContainer)))
-                                                                               
        // Still stuff to register.
-                                                                               
        tryDirect = false;
-                                                                               
break;
-                                                                       }
-                                                               }
-                                                       }
-                                                       if(persistent)
-                                                               
selectorContainer.deactivate(getter, 1);
-                                               }
-                                       }
-                               }
                                if(logMINOR)
                                        Logger.minor(this, "finishRegister() 
for "+getters);
                                if(anyValid) {
-                                       if(!tryDirect) {
-                                               boolean wereAnyValid = false;
-                                               for(int 
i=0;i<getters.length;i++) {
-                                                       SendableGet getter = 
getters[i];
-                                                       
selectorContainer.activate(getters[i], 1);
-                                                       
if(!(getter.isCancelled(selectorContainer) || 
getter.isEmpty(selectorContainer))) {
-                                                               wereAnyValid = 
true;
-                                                               
schedCore.innerRegister(getter, random, selectorContainer);
-                                                       }
+                                       boolean wereAnyValid = false;
+                                       for(int i=0;i<getters.length;i++) {
+                                               SendableGet getter = getters[i];
+                                               
selectorContainer.activate(getters[i], 1);
+                                               
if(!(getter.isCancelled(selectorContainer) || 
getter.isEmpty(selectorContainer))) {
+                                                       wereAnyValid = true;
+                                                       
schedCore.innerRegister(getter, random, selectorContainer);
                                                }
-                                               if(!wereAnyValid) {
-                                                       Logger.normal(this, "No 
requests valid: "+getters);
-                                               }
                                        }
+                                       if(!wereAnyValid) {
+                                               Logger.normal(this, "No 
requests valid: "+getters);
+                                       }
                                }
                                if(reg != null)
                                        selectorContainer.delete(reg);
@@ -549,32 +510,51 @@
                        schedTransient.addPendingKey(key.getNodeKey(), getter, 
null);
        }

-       private synchronized ChosenRequest removeFirst(ObjectContainer 
container, boolean transientOnly, boolean notTransient) {
-               if(!databaseExecutor.onThread()) {
-                       throw new IllegalStateException("Not on database 
thread!");
-               }
+       public ChosenBlock getBetterNonPersistentRequest(short prio, int 
retryCount) {
                short fuzz = -1;
                if(PRIORITY_SOFT.equals(choosenPriorityScheduler))
                        fuzz = -1;
                else if(PRIORITY_HARD.equals(choosenPriorityScheduler))
                        fuzz = 0;       
-               // schedCore juggles both
-               return schedCore.removeFirst(fuzz, random, offeredKeys, 
starter, schedTransient, transientOnly, notTransient, Short.MAX_VALUE, 
Integer.MAX_VALUE, clientContext, container);
-       }
-
-       public ChosenRequest getBetterNonPersistentRequest(ChosenRequest req) {
-               short fuzz = -1;
-               if(PRIORITY_SOFT.equals(choosenPriorityScheduler))
-                       fuzz = -1;
-               else if(PRIORITY_HARD.equals(choosenPriorityScheduler))
-                       fuzz = 0;       
-               if(req == null)
-                       return schedCore.removeFirst(fuzz, random, offeredKeys, 
starter, schedTransient, true, false, Short.MAX_VALUE, Integer.MAX_VALUE, 
clientContext, null);
-               short prio = req.prio;
-               int retryCount = req.request.getRetryCount();
                return schedCore.removeFirst(fuzz, random, offeredKeys, 
starter, schedTransient, true, false, prio, retryCount, clientContext, null);
        }

+       /**
+        * All the persistent SendableRequest's currently running (either 
actually in flight, just chosen,
+        * awaiting the callbacks being executed etc). Note that this is an 
ArrayList because we *must*
+        * compare by pointer: these objects may well implement hashCode() etc 
for use by other code, but 
+        * if they are deactivated, they will be unreliable. Fortunately, this 
will be fairly small most
+        * of the time, since a single SendableRequest might include 256 actual 
requests.
+        * 
+        * SYNCHRONIZATION: Synched on starterQueue.
+        */
+       private final transient ArrayList<SendableRequest> 
runningPersistentRequests = new ArrayList<SendableRequest> ();
+       
+       public void removeRunningRequest(SendableRequest request) {
+               synchronized(starterQueue) {
+                       for(int i=0;i<runningPersistentRequests.size();i++) {
+                               if(runningPersistentRequests.get(i) == request) 
{
+                                       runningPersistentRequests.remove(i);
+                                       i--;
+                               }
+                       }
+               }
+       }
+       
+       public boolean isRunningRequest(SendableRequest request) {
+               synchronized(starterQueue) {
+                       for(int i=0;i<runningPersistentRequests.size();i++) {
+                               if(runningPersistentRequests.get(i) == request)
+                                       return true;
+                       }
+               }
+               return false;
+       }
+       
+       void startingRequest(SendableRequest request) {
+               runningPersistentRequests.add(request);
+       }
+       
        /** The maximum number of requests that we will keep on the in-RAM 
request
         * starter queue. */
        static final int MAX_STARTER_QUEUE_SIZE = 100;
@@ -586,31 +566,73 @@
         * something odd is happening.. (e.g. leaking 
PersistentChosenRequest's). */
        static final int WARNING_STARTER_QUEUE_SIZE = 300;

+       private transient LinkedList<PersistentChosenRequest> starterQueue = 
new LinkedList<PersistentChosenRequest>();
+       
+       /** Length of the starter queue in requests. */
+       private transient int starterQueueLength;
+       
        /**
-        * Normally this will only contain PersistentChosenRequest's, however 
in the
-        * case of coalescing keys, we will put ChosenRequest's back onto it as 
well.
+        * Called by RequestStarter to find a request to run.
         */
-       private transient LinkedList starterQueue = new LinkedList();
-       
-       public LinkedList getRequestStarterQueue() {
-               return starterQueue;
+       public ChosenBlock grabRequest() {
+               while(true) {
+                       PersistentChosenRequest reqGroup;
+                       synchronized(starterQueue) {
+                               reqGroup = starterQueue.isEmpty() ? null : 
starterQueue.getFirst();
+                       }
+                       if(reqGroup != null) {
+                               // Try to find a better non-persistent request
+                               ChosenBlock better = 
getBetterNonPersistentRequest(reqGroup.prio, reqGroup.retryCount);
+                               if(better != null) return better;
+                       }
+                       if(reqGroup == null) {
+                               queueFillRequestStarterQueue();
+                               return 
getBetterNonPersistentRequest(Short.MAX_VALUE, Integer.MAX_VALUE);
+                       }
+                       ChosenBlock block;
+                       int length = starterQueueLength;
+                       synchronized(starterQueue) {
+                               block = 
reqGroup.grabNotStarted(clientContext.fastWeakRandom);
+                               if(block == null) {
+                                       for(int i=0;i<starterQueue.size();i++) {
+                                               if(starterQueue.get(i) == 
reqGroup) {
+                                                       starterQueue.remove(i);
+                                                       i--;
+                                               }
+                                       }
+                                       continue;
+                               }
+                               starterQueueLength--;
+                       }
+                       if(length < MAX_STARTER_QUEUE_SIZE)
+                               queueFillRequestStarterQueue();
+                       return block;
+               }
        }

        public void queueFillRequestStarterQueue() {
                synchronized(starterQueue) {
-                       if(starterQueue.size() > MAX_STARTER_QUEUE_SIZE / 2)
+                       if(starterQueueLength > MAX_STARTER_QUEUE_SIZE / 2)
                                return;
                }
                jobRunner.queue(requestStarterQueueFiller, 
NativeThread.MAX_PRIORITY, true);
        }

-       void addToStarterQueue(ChosenRequest req) {
+       /**
+        * @param request
+        * @param container
+        * @return True if the queue is now full/over-full.
+        */
+       boolean addToStarterQueue(SendableRequest request, ObjectContainer 
container) {
+               container.activate(request, 1);
+               PersistentChosenRequest chosen = new 
PersistentChosenRequest(request, request.getPriorityClass(container), 
request.getRetryCount(), container, ClientRequestScheduler.this, clientContext);
+               container.deactivate(request, 1);
                synchronized(starterQueue) {
-                       if(starterQueue.contains(req)) {
-                               Logger.error(this, "Not re-adding: "+req);
-                               return;
-                       }
-                       starterQueue.add(req);
+                       // Since we pass in runningPersistentRequests, we don't 
need to check whether it is already in the starterQueue.
+                       starterQueue.add(chosen);
+                       starterQueueLength += chosen.sizeNotStarted();
+                       runningPersistentRequests.add(request);
+                       return starterQueueLength < MAX_STARTER_QUEUE_SIZE;
                }
        }

@@ -626,65 +648,116 @@
        private DBJob requestStarterQueueFiller = new DBJob() {
                public void run(ObjectContainer container, ClientContext 
context) {
                        if(logMINOR) Logger.minor(this, "Filling request 
queue... (SSK="+isSSKScheduler+" insert="+isInsertScheduler);
-                       ChosenRequest req = null;
+                       short fuzz = -1;
+                       if(PRIORITY_SOFT.equals(choosenPriorityScheduler))
+                               fuzz = -1;
+                       else if(PRIORITY_HARD.equals(choosenPriorityScheduler))
+                               fuzz = 0;       
                        synchronized(starterQueue) {
-                               int size = starterQueue.size();
-                               if(logMINOR) Logger.minor(this, "Queue size: 
"+size+" SSK="+isSSKScheduler+" insert="+isInsertScheduler);
-                               if(size > MAX_STARTER_QUEUE_SIZE * 3 / 4) {
+                               // Recompute starterQueueLength
+                               int length = 0;
+                               for(PersistentChosenRequest req : starterQueue)
+                                       length += req.sizeNotStarted();
+                               if(length != starterQueueLength) {
+                                       Logger.error(this, "Correcting 
starterQueueLength from "+starterQueueLength+" to "+length);
+                                       starterQueueLength = length;
+                               }
+                               if(logMINOR) Logger.minor(this, "Queue size: 
"+length+" SSK="+isSSKScheduler+" insert="+isInsertScheduler);
+                               if(starterQueueLength > MAX_STARTER_QUEUE_SIZE 
* 3 / 4) {
                                        return;
                                }
-                               if(size >= MAX_STARTER_QUEUE_SIZE) {
-                                       if(size >= WARNING_STARTER_QUEUE_SIZE)
+                               if(starterQueueLength >= 
MAX_STARTER_QUEUE_SIZE) {
+                                       if(starterQueueLength >= 
WARNING_STARTER_QUEUE_SIZE)
                                                Logger.error(this, "Queue 
already full: "+starterQueue.size());
                                        return;
                                }
                        }
-                       SendableRequest lastReq = null;
-                       int sameKey = 0;
+                       
                        while(true) {
-                               req = null;
-                               if(lastReq != null && sameKey < 
MAX_CONSECUTIVE_SAME_REQ &&
-                                               lastReq.getParentGrabArray() != 
null) {
-                                       req = 
schedCore.maybeMakeChosenRequest(lastReq, container, context);
-                                       sameKey++;
-                               }
-                               if(req == null) {
-                                       req = removeFirst(container, false, 
true);
-                                       if(sameKey > 1)
-                                               Logger.normal(this, "Selected 
"+sameKey+" requests from same SendableRequest: "+lastReq);
-                                       sameKey = 0;
-                               }
-                               if(req == null) {
-                                       if(lastReq != null) {
-                                               container.deactivate(lastReq, 
1);
+                               SendableRequest request = 
schedCore.removeFirstInner(fuzz, random, offeredKeys, starter, schedTransient, 
false, true, Short.MAX_VALUE, Integer.MAX_VALUE, context, container);
+                               if(request == null) return;
+                               boolean full = addToStarterQueue(request, 
container);
+                               starter.wakeUp();
+                               if(full) return;
+                               return;
+                       }
+               }
+       };
+       
+       /**
+        * Compare a recently registered SendableRequest to what is already on 
the
+        * starter queue. If it is better, kick out stuff from the queue until 
we
+        * are just over the limit.
+        * @param req
+        * @param container
+        */
+       public void maybeAddToStarterQueue(SendableRequest req, ObjectContainer 
container) {
+               short prio = req.getPriorityClass(container);
+               int retryCount = req.getRetryCount();
+               synchronized(starterQueue) {
+                       boolean allBetter = true;
+                       for(PersistentChosenRequest old : starterQueue) {
+                               if(old.prio < prio)
+                                       allBetter = false;
+                               else if(old.prio == prio && old.retryCount <= 
retryCount)
+                                       allBetter = false;
+                       }
+                       if(allBetter && !starterQueue.isEmpty()) return;
+               }
+               addToStarterQueue(req, container);
+               trimStarterQueue(container);
+       }
+       
+       private void trimStarterQueue(ObjectContainer container) {
+               ArrayList<PersistentChosenRequest> dumped = null;
+               synchronized(starterQueue) {
+                       while(starterQueueLength > MAX_STARTER_QUEUE_SIZE) {
+                               // Find the lowest priority/retry count request.
+                               // If we can dump it without going below the 
limit, then do so.
+                               // If we can't, return.
+                               PersistentChosenRequest worst = null;
+                               short worstPrio = -1;
+                               int worstRetryCount = -1;
+                               int worstIndex = -1;
+                               if(starterQueue.isEmpty()) {
+                                       if(starterQueueLength != 0) {
+                                               Logger.error(this, "Starter 
queue empty but starterQueueLength is "+starterQueueLength);
+                                               starterQueueLength = 0;
                                        }
-                                       return;
+                                       break;
                                }
-                               lastReq = req.request;
-                               if(logMINOR) Logger.minor(this, "Activating 
key");
-                               container.activate(req.key, 5);
-                               container.activate(req.ckey, 5);
-                               container.activate(req.request, 1);
-                               
container.activate(req.request.getClientRequest(), 1);
-                               if(logMINOR) Logger.minor(this, "Activated");
-                               synchronized(starterQueue) {
-                                       if(req != null) {
-                                               starterQueue.add(req);
-                                               if(logMINOR)
-                                                       Logger.minor(this, 
"Added to starterQueue: "+req+" size now "+starterQueue.size());
-                                               req = null;
+                               for(int i=0;i<starterQueue.size();i++) {
+                                       PersistentChosenRequest req = 
starterQueue.get(i);
+                                       short prio = req.prio;
+                                       int retryCount = req.retryCount;
+                                       if(prio > worstPrio ||
+                                                       (prio == worstPrio && 
retryCount > worstRetryCount)) {
+                                               worstPrio = prio;
+                                               worstRetryCount = retryCount;
+                                               worst = req;
+                                               worstIndex = i;
+                                               continue;
                                        }
-                                       if(starterQueue.size() >= 
MAX_STARTER_QUEUE_SIZE) {
-                                               if(lastReq != null) {
-                                                       
container.deactivate(lastReq, 1);
-                                               }
-                                               return;
-                                       }
                                }
+                               int lengthAfter = starterQueueLength - 
worst.sizeNotStarted();
+                               if(lengthAfter >= MAX_STARTER_QUEUE_SIZE) {
+                                       if(dumped == null)
+                                               dumped = new 
ArrayList<PersistentChosenRequest>(2);
+                                       dumped.add(worst);
+                                       starterQueue.remove(worstIndex);
+                                       if(lengthAfter == 
MAX_STARTER_QUEUE_SIZE) break;
+                               } else {
+                                       // Can't remove any more.
+                                       break;
+                               }
                        }
                }
-       };
-       
+               if(dumped == null) return;
+               for(PersistentChosenRequest req : dumped) {
+                       req.onDumped(schedCore, container);
+               }
+       }
+
        public void removePendingKey(final GotKeyListener getter, final boolean 
complain, final Key key, ObjectContainer container) {
                if(!getter.persistent()) {
                        boolean dropped = 
schedTransient.removePendingKey(getter, complain, key, container);
@@ -764,14 +837,13 @@
         */
        static final short TRIP_PENDING_PRIORITY = NativeThread.HIGH_PRIORITY-1;

-       public synchronized void succeeded(final BaseSendableGet succeeded, 
final ChosenRequest req) {
+       public synchronized void succeeded(final BaseSendableGet succeeded, 
final ChosenBlock req) {
                if(req.isPersistent()) {
                        jobRunner.queue(new DBJob() {

                                public void run(ObjectContainer container, 
ClientContext context) {
                                        container.activate(succeeded, 1);
                                        schedCore.succeeded(succeeded, 
container);
-                                       
container.delete((PersistentChosenRequest)req);
                                        container.deactivate(succeeded, 1);
                                }

@@ -993,152 +1065,40 @@
                schedCore.removeFetchingKey(key);
        }

-       public void removeChosenRequest(ChosenRequest req) {
-               schedCore.removeChosenRequest(req);
-       }
-
        /**
         * Map from SendableGet implementing SupportsBulkCallFailure to 
BulkCallFailureItem[].
         */
        private transient HashMap bulkFailureLookupItems = new HashMap();
        private transient HashMap bulkFailureLookupJob = new HashMap();
-       
-       private class BulkCaller implements DBJob {
-               
-               private final SupportsBulkCallFailure getter;
-               
-               BulkCaller(SupportsBulkCallFailure getter) {
-                       this.getter = getter;
-                       if(getter == null) throw new NullPointerException();
-               }

-               public void run(ObjectContainer container, ClientContext 
context) {
-                       BulkCallFailureItem[] items;
-                       container.activate(getter, 1);
-                       synchronized(ClientRequestScheduler.this) {
-                               items = (BulkCallFailureItem[]) 
bulkFailureLookupItems.get(getter);
-                               bulkFailureLookupItems.remove(getter);
-                               bulkFailureLookupJob.remove(getter);
-                       }
-                       if(items != null && items.length > 0) {
-                               if(logMINOR) Logger.minor(this, "Calling 
non-fatal failure in bulk for "+items.length+" items for "+getter);
-                               getter.onFailure(items, container, context);
-                               for(int i=0;i<items.length;i++)
-                                       if(items[i] != null)
-                                               container.delete(items[i].req);
-                       } else
-                               Logger.normal(this, "Calling non-fatal failure 
in bulk for "+getter+" but no items to run for "+getter);
-                       container.deactivate(getter, 1);
-               }
-               
-               public boolean equals(Object o) {
-                       if(o instanceof BulkCaller) {
-                               return ((BulkCaller)o).getter == getter;
-                       } else return false;
-               }
-       }
-       
-       public void callFailure(final SendableGet get, final 
LowLevelGetException e, final Object keyNum, final int prio, final 
ChosenRequest req, boolean persistent) {
+       public void callFailure(final SendableGet get, final 
LowLevelGetException e, int prio, boolean persistent) {
                if(!persistent) {
-                       get.onFailure(e, keyNum, null, clientContext);
-                       return;
-               }
-               if(get instanceof SupportsBulkCallFailure) {
-                       // Getter MUST BE ACTIVATED for us to use it as a key.
-                       // The below job doesn't write anything, so it will run 
fast.
+                       get.onFailure(e, null, null, clientContext);
+               } else {
                        jobRunner.queue(new DBJob() {

                                public void run(ObjectContainer container, 
ClientContext context) {
-                                       container.activate(get, 1);
-                                       if(logMINOR)
-                                               Logger.minor(this, "Calling 
bulk failure for "+get);
-                                       SupportsBulkCallFailure getter = 
(SupportsBulkCallFailure) get;
-                                       BulkCallFailureItem item = new 
BulkCallFailureItem(e, keyNum, (PersistentChosenRequest) req);
-                                       BulkCaller caller = null;
-                                       synchronized(this) {
-                                               BulkCallFailureItem[] items = 
(BulkCallFailureItem[]) bulkFailureLookupItems.get(get);
-                                               if(items == null) {
-                                                       
bulkFailureLookupItems.put(getter, new BulkCallFailureItem[] { item } );
-                                               } else {
-                                                       BulkCallFailureItem[] 
newItems = new BulkCallFailureItem[items.length+1];
-                                                       System.arraycopy(items, 
0, newItems, 0, items.length);
-                                                       newItems[items.length] 
= item;
-                                                       
bulkFailureLookupItems.put(getter, newItems);
-                                               }
-                                               caller = (BulkCaller) 
bulkFailureLookupJob.get(getter);
-                                               if(caller == null) {
-                                                       caller = new 
BulkCaller(getter);
-                                                       
bulkFailureLookupJob.put(getter, caller);
-                                               } else
-                                                       caller = null;
-                                               
-                                       }
-                                       if(caller != null)
-                                               jobRunner.queue(caller, prio, 
true);
-                                       else {
-                                               if(logMINOR)
-                                                       Logger.minor(this, "Not 
calling bulk failure for "+get);
-                                       }
-                                       container.deactivate(get, 1);
+                                       get.onFailure(e, null, container, 
clientContext);
                                }

-                       }, NativeThread.HIGH_PRIORITY, false);
-                       return;
+                       }, prio, false);
                }
-               jobRunner.queue(new DBJob() {
-
-                       public void run(ObjectContainer container, 
ClientContext context) {
-                               container.activate(get, 1);
-                               if(logMINOR)
-                                       Logger.minor(this, "callFailure() on 
"+get+" : "+e);
-                               get.onFailure(e, keyNum, selectorContainer, 
clientContext);
-                               if(logMINOR)
-                                       Logger.minor(this, "Deleting "+req);
-                               
selectorContainer.delete((PersistentChosenRequest)req);
-                               container.deactivate(get, 1);
-                       }
-                       
-               }, prio, false);
        }

-       public void callFailure(final SendableInsert put, final 
LowLevelPutException e, final Object keyNum, int prio, final ChosenRequest req, 
boolean persistent) {
+       public void callFailure(final SendableInsert insert, final 
LowLevelPutException e, int prio, boolean persistent) {
                if(!persistent) {
-                       put.onFailure(e, keyNum, null, clientContext);
-                       return;
-               }
-               jobRunner.queue(new DBJob() {
+                       insert.onFailure(e, null, null, clientContext);
+               } else {
+                       jobRunner.queue(new DBJob() {

-                       public void run(ObjectContainer container, 
ClientContext context) {
-                               container.activate(put, 1);
-                               put.onFailure(e, keyNum, selectorContainer, 
clientContext);
-                               if(logMINOR)
-                                       Logger.minor(this, "Deleting "+req);
-                               
selectorContainer.delete((PersistentChosenRequest)req);
-                               container.deactivate(put, 1);
-                       }
-                       
-               }, NativeThread.NORM_PRIORITY, false);
-       }
-
-       public void callSuccess(final SendableInsert put, final Object keyNum, 
int prio, final ChosenRequest req, boolean persistent) {
-               if(!persistent) {
-                       put.onSuccess(keyNum, null, clientContext);
-                       return;
+                               public void run(ObjectContainer container, 
ClientContext context) {
+                                       insert.onFailure(e, null, container, 
context);
+                               }
+                               
+                       }, prio, false);
                }
-               jobRunner.queue(new DBJob() {
-
-                       public void run(ObjectContainer container, 
ClientContext context) {
-                               container.activate(put, 1);
-                               put.onSuccess(keyNum, selectorContainer, 
clientContext);
-                               if(logMINOR)
-                                       Logger.minor(this, "Deleting "+req);
-                               
selectorContainer.delete((PersistentChosenRequest)req);
-                               container.deactivate(put, 1);
-                       }
-                       
-               }, NativeThread.NORM_PRIORITY+1, false);
        }
-
+       
        public FECQueue getFECQueue() {
                return clientContext.fecQueue;
        }
@@ -1154,29 +1114,6 @@
                return schedCore.addToFetching(key);
        }

-       public void requeue(final ChosenRequest req) {
-               if(req.isPersistent()) {
-                       this.clientContext.jobRunner.queue(new DBJob() {
-
-                               public void run(ObjectContainer container, 
ClientContext context) {
-                                       container.activate(req.request, 1);
-                                       if(logMINOR)
-                                               Logger.minor(this, "Requeueing 
"+req);
-                                       if(req.request.isCancelled(container)) {
-                                               container.delete(req);
-                                               return;
-                                       }
-                                       container.deactivate(req.request, 1);
-                                       addToStarterQueue(req);
-                               }
-                               
-                       }, NativeThread.HIGH_PRIORITY, false);
-               } else {
-                       if(req.request.isCancelled(null)) return;
-                       addToStarterQueue(req);
-               }
-       }
-       
        public long countPersistentQueuedRequests(ObjectContainer container) {
                return schedCore.countQueuedRequests(container);
        }
@@ -1197,6 +1134,6 @@
                else
                        
schedTransient.removeFromAllRequestsByClientRequest(get, clientRequest, 
dontComplain, null);
        }
+

-       
 }

Modified: 
branches/db4o/freenet/src/freenet/client/async/ClientRequestSchedulerBase.java
===================================================================
--- 
branches/db4o/freenet/src/freenet/client/async/ClientRequestSchedulerBase.java  
    2008-08-12 19:25:54 UTC (rev 21772)
+++ 
branches/db4o/freenet/src/freenet/client/async/ClientRequestSchedulerBase.java  
    2008-08-12 19:42:51 UTC (rev 21773)
@@ -12,6 +12,7 @@
 import freenet.crypt.RandomSource;
 import freenet.keys.Key;
 import freenet.node.BaseSendableGet;
+import freenet.node.RequestScheduler;
 import freenet.node.RequestStarter;
 import freenet.node.SendableInsert;
 import freenet.node.SendableRequest;
@@ -51,6 +52,7 @@
        protected final SortedVectorByNumber[] priorities;
        protected final Map allRequestsByClientRequest;
        protected final List /* <BaseSendableGet> */ recentSuccesses;
+       protected transient ClientRequestScheduler sched;

        abstract boolean persistent();

@@ -90,6 +92,8 @@
                        container.deactivate(v, 1);
                addToGrabArray(prio, retryCount, fixRetryCount(retryCount), 
req.getClient(), req.getClientRequest(), req, random, container);
                if(logMINOR) Logger.minor(this, "Registered "+req+" on 
prioclass="+prio+", retrycount="+retryCount+" v.size()="+vSize);
+               if(req.persistent())
+                       sched.maybeAddToStarterQueue(req, container);
        }

        synchronized void addToGrabArray(short priorityClass, int retryCount, 
int rc, Object client, ClientRequester cr, SendableRequest req, RandomSource 
random, ObjectContainer container) {
@@ -139,7 +143,7 @@
                return Math.max(0, retryCount-MIN_RETRY_COUNT);
        }

-       public void reregisterAll(ClientRequester request, RandomSource random, 
ClientRequestScheduler lock, ObjectContainer container, ClientContext context) {
+       public void reregisterAll(ClientRequester request, RandomSource random, 
RequestScheduler lock, ObjectContainer container, ClientContext context) {
                SendableRequest[] reqs;
                synchronized(lock) {
                        Set h = (Set) allRequestsByClientRequest.get(request);

Modified: 
branches/db4o/freenet/src/freenet/client/async/ClientRequestSchedulerCore.java
===================================================================
--- 
branches/db4o/freenet/src/freenet/client/async/ClientRequestSchedulerCore.java  
    2008-08-12 19:25:54 UTC (rev 21772)
+++ 
branches/db4o/freenet/src/freenet/client/async/ClientRequestSchedulerCore.java  
    2008-08-12 19:42:51 UTC (rev 21773)
@@ -16,6 +16,7 @@
 import com.db4o.types.Db4oList;
 import com.db4o.types.Db4oMap;

+import freenet.client.FetchContext;
 import freenet.crypt.RandomSource;
 import freenet.keys.ClientKey;
 import freenet.keys.Key;
@@ -47,7 +48,6 @@
        /** Identifier in the database for the node we are attached to */
        private final long nodeDBHandle;
        final PersistentCooldownQueue persistentCooldownQueue;
-       private transient ClientRequestScheduler sched;
        private transient long initTime;

        /**
@@ -106,6 +106,7 @@
                } else {
                        this.persistentCooldownQueue = null;
                }
+
        }

        private void onStarted(ObjectContainer container, long cooldownTime, 
ClientRequestScheduler sched, ClientContext context) {
@@ -119,13 +120,13 @@
                if(!isInsertScheduler) {
                        persistentCooldownQueue.setCooldownTime(cooldownTime);
                }
+               this.sched = sched;
+               this.initTime = System.currentTimeMillis();
+               // We DO NOT want to rerun the query after consuming the 
initial set...
                if(!isInsertScheduler)
                        keysFetching = new HashSet();
                else
                        keysFetching = null;
-               this.sched = sched;
-               this.initTime = System.currentTimeMillis();
-               // We DO NOT want to rerun the query after consuming the 
initial set...
                preRegisterMeRunner = new DBJob() {

                        public void run(ObjectContainer container, 
ClientContext context) {
@@ -211,43 +212,6 @@
                runner.queue(preRegisterMeRunner, NativeThread.NORM_PRIORITY, 
true);
        }

-       void fillStarterQueue(ObjectContainer container) {
-               ObjectSet results = container.query(new Predicate() {
-                       public boolean match(PersistentChosenRequest req) {
-                               if(req.core != ClientRequestSchedulerCore.this) 
return false;
-                               return true;
-                       }
-               });
-               int count = 0;
-               while(results.hasNext()) {
-                       count++;
-                       PersistentChosenRequest req = (PersistentChosenRequest) 
results.next();
-                       container.activate(req, 2);
-                       container.activate(req.key, 5);
-                       container.activate(req.ckey, 5);
-                       if(req.request == null) {
-                               container.delete(req);
-                               Logger.error(this, "Deleting bogus 
PersistentChosenRequest");
-                               continue;
-                       }
-                       container.activate(req.request, 1);
-                       container.activate(req.request.getClientRequest(), 1);
-                       if(req.token != null)
-                               container.activate(req.token, 5);
-                       if(req.request.isCancelled(container)) {
-                               container.delete(req);
-                               continue;
-                       }
-                       if(logMINOR)
-                               Logger.minor(this, "Adding old request: "+req);
-                       sched.addToStarterQueue(req);
-               }
-//             if(count > ClientRequestScheduler.MAX_STARTER_QUEUE_SIZE)
-                       Logger.error(this, "Added "+count+" requests to the 
starter queue, size now "+sched.starterQueueSize());
-//             else
-//                     Logger.normal(this, "Added "+count+" requests to the 
starter queue, size now "+sched.starterQueueSize());
-       }
-       
        // We pass in the schedTransient to the next two methods so that we can 
select between either of them.

        private int removeFirstAccordingToPriorities(boolean tryOfferedKeys, 
int fuzz, RandomSource random, OfferedKeysList[] offeredKeys, 
ClientRequestSchedulerNonPersistent schedTransient, boolean transientOnly, 
short maxPrio, ObjectContainer container){
@@ -288,11 +252,11 @@
        // We prevent a number of race conditions (e.g. adding a retry count 
and then another 
        // thread removes it cos its empty) ... and in addToGrabArray etc we 
already sync on this.
        // The worry is ... is there any nested locking outside of the 
hierarchy?
-       ChosenRequest removeFirst(int fuzz, RandomSource random, 
OfferedKeysList[] offeredKeys, RequestStarter starter, 
ClientRequestSchedulerNonPersistent schedTransient, boolean transientOnly, 
boolean notTransient, short maxPrio, int retryCount, ClientContext context, 
ObjectContainer container) {
+       ChosenBlock removeFirst(int fuzz, RandomSource random, 
OfferedKeysList[] offeredKeys, RequestStarter starter, 
ClientRequestSchedulerNonPersistent schedTransient, boolean transientOnly, 
boolean notTransient, short maxPrio, int retryCount, ClientContext context, 
ObjectContainer container) {
                SendableRequest req = removeFirstInner(fuzz, random, 
offeredKeys, starter, schedTransient, transientOnly, notTransient, maxPrio, 
retryCount, context, container);
                if(isInsertScheduler && req instanceof SendableGet) {
                        IllegalStateException e = new 
IllegalStateException("removeFirstInner returned a SendableGet on an insert 
scheduler!!");
-                       req.internalError(null, e, sched, container, context, 
req.persistent());
+                       req.internalError(e, sched, container, context, 
req.persistent());
                        throw e;
                }
                return maybeMakeChosenRequest(req, container, context);
@@ -300,7 +264,7 @@

        private int ctr;

-       public ChosenRequest maybeMakeChosenRequest(SendableRequest req, 
ObjectContainer container, ClientContext context) {
+       public ChosenBlock maybeMakeChosenRequest(SendableRequest req, 
ObjectContainer container, ClientContext context) {
                if(req == null) return null;
                if(req.isEmpty(container) || req.isCancelled(container)) return 
null;
                Object token = req.chooseKey(this, req.persistent() ? container 
: null, context);
@@ -319,26 +283,27 @@
                                else
                                        ckey = null;
                        }
-                       ChosenRequest ret;
-                       if(req.persistent()) {
-                               container.activate(key, 5);
-                               container.activate(ckey, 5);
-                               container.activate(req.getClientRequest(), 1);
-                               if(key != null && key.getRoutingKey() == null)
-                                       throw new NullPointerException();
-                               ret = new PersistentChosenRequest(this, req, 
token, key == null ? null : key.cloneKey(), 
-                                               ckey == null ? null : 
ckey.cloneKey(), req.getPriorityClass(container), container);
-                               container.set(ret);
-                               if(logMINOR)
-                                       Logger.minor(this, "Storing "+ret+" for 
"+req);
-                               container.deactivate(key, 5);
-                               container.deactivate(ckey, 5);
-                               container.deactivate(req.getClientRequest(), 1);
+                       ChosenBlock ret;
+                       assert(!req.persistent());
+                       if(key != null && key.getRoutingKey() == null)
+                               throw new NullPointerException();
+                       boolean localRequestOnly;
+                       boolean cacheLocalRequests;
+                       boolean ignoreStore;
+                       if(req instanceof SendableGet) {
+                               SendableGet sg = (SendableGet) req;
+                               FetchContext ctx = sg.getContext();
+                               if(container != null)
+                                       container.activate(ctx, 1);
+                               localRequestOnly = ctx.localRequestOnly;
+                               cacheLocalRequests = ctx.cacheLocalRequests;
+                               ignoreStore = ctx.ignoreStore;
                        } else {
-                               if(key != null && key.getRoutingKey() == null)
-                                       throw new NullPointerException();
-                               ret = new ChosenRequest(req, token, key, ckey, 
req.getPriorityClass(container), null);
+                               localRequestOnly = false;
+                               cacheLocalRequests = false;
+                               ignoreStore = false;
                        }
+                       ret = new TransientChosenBlock(req, token, key, ckey, 
localRequestOnly, cacheLocalRequests, ignoreStore, sched);
                        return ret;
                }
        }
@@ -346,9 +311,9 @@
        SendableRequest removeFirstInner(int fuzz, RandomSource random, 
OfferedKeysList[] offeredKeys, RequestStarter starter, 
ClientRequestSchedulerNonPersistent schedTransient, boolean transientOnly, 
boolean notTransient, short maxPrio, int retryCount, ClientContext context, 
ObjectContainer container) {
                // Priorities start at 0
                if(logMINOR) Logger.minor(this, "removeFirst()");
-               boolean tryOfferedKeys = offeredKeys != null && 
random.nextBoolean();
+               boolean tryOfferedKeys = offeredKeys != null && (!notTransient) 
&& random.nextBoolean();
                int choosenPriorityClass = 
removeFirstAccordingToPriorities(tryOfferedKeys, fuzz, random, offeredKeys, 
schedTransient, transientOnly, maxPrio, container);
-               if(choosenPriorityClass == -1 && offeredKeys != null && 
!tryOfferedKeys) {
+               if(choosenPriorityClass == -1 && offeredKeys != null && 
(!tryOfferedKeys)) {
                        tryOfferedKeys = true;
                        choosenPriorityClass = 
removeFirstAccordingToPriorities(tryOfferedKeys, fuzz, random, offeredKeys, 
schedTransient, transientOnly, maxPrio, container);
                }
@@ -669,7 +634,7 @@
                                        // Cancel the request, and commit so it 
isn't tried again.
                                        if(reg.getters != null) {
                                                for(int 
k=0;k<reg.getters.length;k++)
-                                                       
reg.getters[k].internalError(null, t, sched, container, context, true);
+                                                       
reg.getters[k].internalError(t, sched, container, context, true);
                                        }
                                }
                                if(reg.listener != null)
@@ -745,21 +710,6 @@
                }
        }

-       public void removeChosenRequest(final ChosenRequest req) {
-               int prio = NativeThread.NORM_PRIORITY+1;
-               assert(prio < ClientRequestScheduler.TRIP_PENDING_PRIORITY);
-               if(req != null && req.isPersistent()) {
-               sched.clientContext.jobRunner.queue(new DBJob() {
-                       public void run(ObjectContainer container, 
ClientContext context) {
-                               container.delete(req);
-                       }
-                       public String toString() {
-                               return "Delete "+req;
-                       }
-               }, prio, false);
-               }
-       }
-
        protected Set makeSetForAllRequestsByClientRequest(ObjectContainer 
container) {
                return new Db4oSet(container, 1);
        }

Modified: 
branches/db4o/freenet/src/freenet/client/async/ClientRequestSchedulerNonPersistent.java
===================================================================
--- 
branches/db4o/freenet/src/freenet/client/async/ClientRequestSchedulerNonPersistent.java
     2008-08-12 19:25:54 UTC (rev 21772)
+++ 
branches/db4o/freenet/src/freenet/client/async/ClientRequestSchedulerNonPersistent.java
     2008-08-12 19:42:51 UTC (rev 21773)
@@ -38,6 +38,7 @@

        ClientRequestSchedulerNonPersistent(ClientRequestScheduler sched, 
boolean forInserts, boolean forSSKs) {
                super(forInserts, forSSKs, new HashMap(), new LinkedList());
+               this.sched = sched;
                recentSuccesses = new LinkedList();
                if(forInserts)
                        pendingKeys = null;

Modified: branches/db4o/freenet/src/freenet/client/async/OfferedKeysList.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/OfferedKeysList.java 
2008-08-12 19:25:54 UTC (rev 21772)
+++ branches/db4o/freenet/src/freenet/client/async/OfferedKeysList.java 
2008-08-12 19:42:51 UTC (rev 21773)
@@ -4,6 +4,7 @@
 package freenet.client.async;

 import java.util.HashSet;
+import java.util.List;
 import java.util.Vector;

 import com.db4o.ObjectContainer;
@@ -15,6 +16,7 @@
 import freenet.node.NodeClientCore;
 import freenet.node.RequestClient;
 import freenet.node.RequestScheduler;
+import freenet.node.SendableRequestSender;
 import freenet.node.NodeClientCore.SimpleRequestSenderCompletionListener;
 import freenet.support.Logger;

@@ -140,24 +142,31 @@
                return 0; // All keys have equal chance even if they've been 
tried before.
        }

-       public void internalError(Object keyNum, Throwable t, RequestScheduler 
sched, ObjectContainer container, ClientContext context, boolean persistent) {
+       public void internalError(Throwable t, RequestScheduler sched, 
ObjectContainer container, ClientContext context, boolean persistent) {
                Logger.error(this, "Internal error: "+t, t);
        }

-       public boolean send(NodeClientCore node, RequestScheduler sched, 
ChosenRequest req) {
-               Key key = (Key) req.token;
-               // Have to cache it in order to propagate it; FIXME
-               // Don't let a node force us to start a real request for a 
specific key.
-               // We check the datastore, take up offers if any (on a short 
timeout), and then quit if we still haven't fetched the data.
-               // Obviously this may have a marginal impact on load but it 
should only be marginal.
-               core.asyncGet(key, true, true, new 
SimpleRequestSenderCompletionListener() {
+       @Override
+       public SendableRequestSender getSender(ObjectContainer container, 
ClientContext context) {
+               return new SendableRequestSender() {

-                       public void completed(boolean success) {
-                               // Ignore
+                       public boolean send(NodeClientCore core, 
RequestScheduler sched, ClientContext context, ChosenBlock req) {
+                               Key key = (Key) req.token;
+                               // Have to cache it in order to propagate it; 
FIXME
+                               // Don't let a node force us to start a real 
request for a specific key.
+                               // We check the datastore, take up offers if 
any (on a short timeout), and then quit if we still haven't fetched the data.
+                               // Obviously this may have a marginal impact on 
load but it should only be marginal.
+                               core.asyncGet(key, true, true, new 
SimpleRequestSenderCompletionListener() {
+
+                                       public void completed(boolean success) {
+                                               // Ignore
+                                       }
+                                       
+                               });
+                               return true;
                        }

-               });
-               return true;
+               };
        }

        public boolean canRemove(ObjectContainer container) {
@@ -185,4 +194,9 @@
                return isSSK;
        }

+       @Override
+       public List<PersistentChosenBlock> makeBlocks(PersistentChosenRequest 
request, RequestScheduler sched, ObjectContainer container, ClientContext 
context) {
+               throw new UnsupportedOperationException("Transient only");
+       }
+
 }

Added: branches/db4o/freenet/src/freenet/client/async/PersistentChosenBlock.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/PersistentChosenBlock.java   
                        (rev 0)
+++ branches/db4o/freenet/src/freenet/client/async/PersistentChosenBlock.java   
2008-08-12 19:42:51 UTC (rev 21773)
@@ -0,0 +1,160 @@
+package freenet.client.async;
+
+import freenet.keys.ClientKey;
+import freenet.keys.Key;
+import freenet.node.LowLevelGetException;
+import freenet.node.LowLevelPutException;
+import freenet.node.NodeClientCore;
+import freenet.node.RequestScheduler;
+import freenet.node.SendableGet;
+import freenet.node.SendableRequestSender;
+import freenet.support.Logger;
+
+/**
+ * A block within a ChosenRequest.
+ * @author Matthew Toseland <toad at amphibian.dyndns.org> (0xE43DA450)
+ */
+public class PersistentChosenBlock extends ChosenBlock {
+       
+       public final PersistentChosenRequest parent;
+       public final boolean isInsert;
+       
+       /* Completion */
+       private boolean finished;
+       
+       /** If a SendableGet failed, failedGet will be set to the exception 
generated. Cannot be null if it failed. */
+       private LowLevelGetException failedGet;
+       
+       private boolean fetchSucceeded; // The actual block is not our problem.
+       
+       /* Inserts */
+       private boolean insertSucceeded;
+       /** If a SendableInsert failed, failedPut will be set to the exception 
generated. Cannot be null if it failed. */
+       private LowLevelPutException failedPut;
+       
+       public PersistentChosenBlock(boolean isInsert, PersistentChosenRequest 
parent, Object token, Key key, ClientKey ckey, RequestScheduler sched) {
+               super(token, key == null ? null : key.cloneKey(), ckey == null 
? null : ckey.cloneKey(), parent.localRequestOnly, parent.cacheLocalRequests, 
parent.ignoreStore, sched);
+               this.isInsert = isInsert;
+               this.parent = parent;
+       }
+       
+       @Override
+       public void onFetchSuccess(ClientContext context) {
+               assert(!isInsert);
+               synchronized(this) {
+                       if(finished) {
+                               Logger.error(this, "Already finished in 
onSuccess() on "+this, new Exception("debug"));
+                               return;
+                       }
+                       finished = true;
+                       fetchSucceeded = true;
+               }
+               parent.onFinished(this, context);
+               parent.scheduler.succeeded((SendableGet)parent.request, this);
+       }
+
+       @Override
+       public void onFailure(LowLevelGetException e, ClientContext context) {
+               assert(!isInsert);
+               synchronized(this) {
+                       if(finished) {
+                               Logger.error(this, "Already finished in 
onFailure() on "+this, new Exception("debug"));
+                               return;
+                       }
+                       if(e == null)
+                               throw new NullPointerException();
+                       failedGet = e;
+                       finished = true;
+               }
+               parent.onFinished(this, context);
+       }
+       
+       @Override
+       public void onInsertSuccess(ClientContext context) {
+               assert(isInsert);
+               synchronized(this) {
+                       if(finished) {
+                               Logger.error(this, "Already finished in 
onSuccess() on "+this, new Exception("debug"));
+                               return;
+                       }
+                       insertSucceeded = true;
+                       finished = true;
+               }
+               parent.onFinished(this, context);
+       }
+       
+       @Override
+       public void onFailure(LowLevelPutException e, ClientContext context) {
+               assert(isInsert);
+               synchronized(this) {
+                       if(finished) {
+                               Logger.error(this, "Already finished in 
onFailure() on "+this, new Exception("debug"));
+                               return;
+                       }
+                       if(e == null)
+                               throw new NullPointerException();
+                       failedPut = e;
+                       finished = true;
+               }
+               parent.onFinished(this, context);
+       }
+       
+       LowLevelGetException failedGet() {
+               return failedGet;
+       }
+       
+       boolean insertSucceeded() {
+               return insertSucceeded;
+       }
+       
+       boolean fetchSucceeded() {
+               return fetchSucceeded;
+       }
+       
+       LowLevelPutException failedPut() {
+               return failedPut;
+       }
+
+       @Override
+       public boolean isPersistent() {
+               return true;
+       }
+
+       @Override
+       public boolean isCancelled() {
+               // We can't tell without accesing the database, and we can't 
access the database on the request starter thread.
+               return false;
+       }
+
+       @Override
+       public boolean send(NodeClientCore core, RequestScheduler sched) {
+               try {
+                       return super.send(core, sched);
+               } finally {
+                       boolean wasFinished;
+                       synchronized(this) {
+                               wasFinished = finished;
+                               if(!finished) {
+                                       finished = true;
+                                       if(parent.request instanceof 
SendableGet) {
+                                               Logger.error(this, "SendableGet 
"+parent.request+" didn't call a callback on "+this);
+                                       }
+                               }
+                       }
+                       if(!wasFinished) {
+                               parent.onFinished(this, sched.getContext());
+                       }
+               }
+       }
+
+       @Override
+       public short getPriority() {
+               return parent.prio;
+       }
+
+       @Override
+       public SendableRequestSender getSender(ClientContext context) {
+               return parent.sender;
+       }
+       
+}

Deleted: 
branches/db4o/freenet/src/freenet/client/async/PersistentChosenRequest.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/PersistentChosenRequest.java 
2008-08-12 19:25:54 UTC (rev 21772)
+++ branches/db4o/freenet/src/freenet/client/async/PersistentChosenRequest.java 
2008-08-12 19:42:51 UTC (rev 21773)
@@ -1,55 +0,0 @@
-/* This code is part of Freenet. It is distributed under the GNU General
- * Public License, version 2 (or at your option any later version). See
- * http://www.gnu.org/ for further details of the GPL. */
-package freenet.client.async;
-
-import com.db4o.ObjectContainer;
-
-import freenet.keys.ClientKey;
-import freenet.keys.Key;
-import freenet.node.SendableRequest;
-
-/**
- * These must be deleted once the request has been executed. See RegisterMe.
- * 
- * When we choose a request on the database thread, if it is persistent, we 
store it to the database.
- * The reason for this is that we may crash before it completes, in which case 
we don't want to lose 
- * it, and the process of choosing a request is destructive i.e. it will be 
removed from the queue
- * structures before removeFirst() returns. If we do it this way we get the 
best of all worlds:
- * - We remove the data from the queue, so the request won't be chosen again 
until it's done, and
- *   choosing requests won't have to go over and ignore a lot of slots.
- * - If we restart, we will restart the persistent requests we were running, 
and will therefore get
- *   all the relevant callbacks.
- * @author toad
- */
-public class PersistentChosenRequest extends ChosenRequest {
-       
-       ClientRequestSchedulerCore core;
-       // A persistent hashCode is helpful for debugging and lets us put PCR's 
into hash-based maps and sets.
-       private final int hashCode;
-       
-       PersistentChosenRequest(ClientRequestSchedulerCore core, 
SendableRequest req, Object tok, Key key, ClientKey ckey, short prio, 
ObjectContainer container) {
-               super(req, tok, key, ckey, prio, container);
-               if(tok == null) throw new NullPointerException();
-               int hash = core.hashCode() ^ req.hashCode();
-               if(key != null)
-                       hash ^= key.hashCode();
-               if(ckey != null)
-                       hash ^= ckey.hashCode();
-               if(tok != null)
-                       hash ^= tok.hashCode();
-               hashCode = hash;
-               this.core = core;
-       }
-       
-       public int hashCode() {
-               return hashCode;
-       }
-       
-       public boolean equals(Object o) {
-               if(!(o instanceof PersistentChosenRequest)) return false;
-               PersistentChosenRequest req = (PersistentChosenRequest) o;
-               if(req.core != core) return false;
-               return super.equals(o);
-       }
-}

Modified: 
branches/db4o/freenet/src/freenet/client/async/SingleBlockInserter.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/SingleBlockInserter.java     
2008-08-12 19:25:54 UTC (rev 21772)
+++ branches/db4o/freenet/src/freenet/client/async/SingleBlockInserter.java     
2008-08-12 19:42:51 UTC (rev 21773)
@@ -6,6 +6,8 @@
 import java.io.IOException;
 import java.lang.ref.SoftReference;
 import java.net.MalformedURLException;
+import java.util.Collections;
+import java.util.List;

 import com.db4o.ObjectContainer;

@@ -25,6 +27,7 @@
 import freenet.node.RequestClient;
 import freenet.node.RequestScheduler;
 import freenet.node.SendableInsert;
+import freenet.node.SendableRequestSender;
 import freenet.support.Logger;
 import freenet.support.SimpleFieldSet;
 import freenet.support.api.Bucket;
@@ -342,25 +345,32 @@
                return finished;
        }

-       public boolean send(NodeClientCore core, RequestScheduler sched, 
ChosenRequest req) {
-               // Ignore keyNum, key, since we're only sending one block.
-               try {
-                       if(logMINOR) Logger.minor(this, "Starting request: 
"+this);
-                       ClientKeyBlock b = (ClientKeyBlock) req.token;
-                       if(b != null)
-                               core.realPut(b, req.cacheLocalRequests);
-                       else {
-                               Logger.error(this, "Asked to send empty block 
on "+this, new Exception("error"));
-                               return false;
+       @Override
+       public SendableRequestSender getSender(ObjectContainer container, 
ClientContext context) {
+               return new SendableRequestSender() {
+
+                       public boolean send(NodeClientCore core, 
RequestScheduler sched, ClientContext context, ChosenBlock req) {
+                               // Ignore keyNum, key, since we're only sending 
one block.
+                               try {
+                                       if(logMINOR) Logger.minor(this, 
"Starting request: "+this);
+                                       ClientKeyBlock b = (ClientKeyBlock) 
req.token;
+                                       if(b != null)
+                                               core.realPut(b, 
req.cacheLocalRequests);
+                                       else {
+                                               Logger.error(this, "Asked to 
send empty block on "+this, new Exception("error"));
+                                               return false;
+                                       }
+                               } catch (LowLevelPutException e) {
+                                       req.onFailure(e, context);
+                                       if(logMINOR) Logger.minor(this, 
"Request failed: "+this+" for "+e);
+                                       return true;
+                               }
+                               if(logMINOR) Logger.minor(this, "Request 
succeeded: "+this);
+                               req.onInsertSuccess(context);
+                               return true;
                        }
-               } catch (LowLevelPutException e) {
-                       sched.callFailure((SendableInsert) this, e, req.token, 
NativeThread.NORM_PRIORITY, req, req.isPersistent());
-                       if(logMINOR) Logger.minor(this, "Request failed: 
"+this+" for "+e);
-                       return true;
-               }
-               if(logMINOR) Logger.minor(this, "Request succeeded: "+this);
-               sched.callSuccess(this, req.token, NativeThread.NORM_PRIORITY, 
req, req.isPersistent());
-               return true;
+                       
+               };
        }

        public RequestClient getClient() {
@@ -413,4 +423,10 @@
                return getBlock(container, context, false);
        }

+       @Override
+       public List<PersistentChosenBlock> makeBlocks(PersistentChosenRequest 
request, RequestScheduler sched, ObjectContainer container, ClientContext 
context) {
+               PersistentChosenBlock block = new PersistentChosenBlock(true, 
request, getBlock(container, context, false), null, null, sched);
+               return Collections.singletonList(block);
+       }
+
 }

Modified: 
branches/db4o/freenet/src/freenet/client/async/SplitFileFetcherSegment.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/SplitFileFetcherSegment.java 
2008-08-12 19:25:54 UTC (rev 21772)
+++ branches/db4o/freenet/src/freenet/client/async/SplitFileFetcherSegment.java 
2008-08-12 19:42:51 UTC (rev 21773)
@@ -5,6 +5,7 @@

 import java.io.IOException;
 import java.io.OutputStream;
+import java.util.HashSet;
 import java.util.Vector;

 import com.db4o.ObjectContainer;
@@ -562,9 +563,11 @@
                }
                int maxTries = blockFetchContext.maxNonSplitfileRetries;
                RequestScheduler sched = context.getFetchScheduler(false);
-               boolean set = onNonFatalFailure(e, blockNo, seg, container, 
context, sched, maxTries);
-               if(persistent && set)
-                       container.set(this);
+               SplitFileFetcherSubSegment sub = onNonFatalFailure(e, blockNo, 
seg, container, context, sched, maxTries);
+               if(sub != null) {
+                       sub.schedule(container, context, false, false);
+                       if(persistent && sub != seg) container.deactivate(sub, 
1);
+               }
        }

        public void onNonFatalFailure(FetchException[] failures, int[] 
blockNos, SplitFileFetcherSubSegment seg, ObjectContainer container, 
ClientContext context) {
@@ -574,17 +577,28 @@
                int maxTries = blockFetchContext.maxNonSplitfileRetries;
                RequestScheduler sched = context.getFetchScheduler(false);
                boolean set = false;
-               for(int i=0;i<failures.length;i++)
-                       if(onNonFatalFailure(failures[i], blockNos[i], seg, 
container, context, sched, maxTries))
-                               set = true;
-               if(persistent && set)
-                       container.set(this);
+               HashSet<SplitFileFetcherSubSegment> toSchedule = null;
+               for(int i=0;i<failures.length;i++) {
+                       SplitFileFetcherSubSegment sub = 
+                               onNonFatalFailure(failures[i], blockNos[i], 
seg, container, context, sched, maxTries);
+                       if(sub != null) {
+                               if(toSchedule == null)
+                                       toSchedule = new 
HashSet<SplitFileFetcherSubSegment>();
+                               toSchedule.add(sub);
+                       }
+               }
+               if(toSchedule != null && !toSchedule.isEmpty()) {
+                       for(SplitFileFetcherSubSegment sub : toSchedule) {
+                               sub.schedule(container, context, false, false);
+                               if(persistent && sub != seg) 
container.deactivate(sub, 1);
+                       }
+               }
        }

        /**
         * Caller must set(this) iff returns true.
         */
-       private boolean onNonFatalFailure(FetchException e, int blockNo, 
SplitFileFetcherSubSegment seg, ObjectContainer container, ClientContext 
context, RequestScheduler sched, int maxTries) {
+       private SplitFileFetcherSubSegment onNonFatalFailure(FetchException e, 
int blockNo, SplitFileFetcherSubSegment seg, ObjectContainer container, 
ClientContext context, RequestScheduler sched, int maxTries) {
                if(logMINOR) Logger.minor(this, "Calling onNonFatalFailure for 
block "+blockNo+" on "+this+" from "+seg);
                int tries;
                boolean failed = false;
@@ -592,7 +606,7 @@
                ClientCHK key;
                SplitFileFetcherSubSegment sub = null;
                synchronized(this) {
-                       if(isFinished(container)) return false;
+                       if(isFinished(container)) return null;
                        if(blockNo < dataKeys.length) {
                                key = dataKeys[blockNo];
                                if(persistent)
@@ -601,7 +615,7 @@
                                if(tries > maxTries && maxTries >= 0) failed = 
true;
                                else {
                                        sub = getSubSegment(tries, container, 
false, seg);
-                                       if(tries % 
ClientRequestScheduler.COOLDOWN_RETRIES == 0) {
+                                       if(tries % 
RequestScheduler.COOLDOWN_RETRIES == 0) {
                                                long now = 
System.currentTimeMillis();
                                                if(dataCooldownTimes[blockNo] > 
now)
                                                        Logger.error(this, 
"Already on the cooldown queue! for "+this+" data block no "+blockNo, new 
Exception("error"));
@@ -619,7 +633,7 @@
                                if(tries > maxTries && maxTries >= 0) failed = 
true;
                                else {
                                        sub = getSubSegment(tries, container, 
false, seg);
-                                       if(tries % 
ClientRequestScheduler.COOLDOWN_RETRIES == 0) {
+                                       if(tries % 
RequestScheduler.COOLDOWN_RETRIES == 0) {
                                                long now = 
System.currentTimeMillis();
                                                if(checkCooldownTimes[checkNo] 
> now)
                                                        Logger.error(this, 
"Already on the cooldown queue! for "+this+" check block no "+blockNo, new 
Exception("error"));
@@ -637,23 +651,27 @@
                        onFatalFailure(e, blockNo, seg, container, context);
                        if(logMINOR)
                                Logger.minor(this, "Not retrying block 
"+blockNo+" on "+this+" : tries="+tries+"/"+maxTries);
-                       return false;
+                       return null;
                }
+               boolean mustSchedule = false;
                if(cooldown) {
                        // Registered to cooldown queue
                        if(logMINOR)
                                Logger.minor(this, "Added to cooldown queue: 
"+key+" for "+this+" was on segment "+seg+" now registered to "+sub);
                } else {
                        // If we are here we are going to retry
-                       sub.add(blockNo, false, container, context, false);
+                       mustSchedule = sub.add(blockNo, true, container, 
context, false);
                        if(logMINOR)
                                Logger.minor(this, "Retrying block "+blockNo+" 
on "+this+" : tries="+tries+"/"+maxTries+" : "+sub);
                }
                if(persistent) {
-                       if(sub != null && sub != seg) container.deactivate(sub, 
1);
+                       container.set(this);
                        container.deactivate(key, 5);
                }
-               return true;
+               if(mustSchedule) 
+                       return sub;
+               else
+                       return null;
        }

        private SplitFileFetcherSubSegment getSubSegment(int retryCount, 
ObjectContainer container, boolean noCreate, SplitFileFetcherSubSegment 
dontDeactivate) {
@@ -743,14 +761,15 @@
                        if(logMINOR)
                                Logger.minor(this, "scheduling "+seg+" : 
"+seg.blockNums);

-                       seg.schedule(container, context, true, regmeOnly);
-                       if(persistent)
-                               container.deactivate(seg, 1);
                        synchronized(this) {
                                scheduled = true;
                        }
                        if(persistent)
                                container.set(this);
+                       // Schedule(true) will deactivate me, so we need to do 
it after storing scheduled.
+                       seg.schedule(container, context, true, regmeOnly);
+                       if(persistent)
+                               container.deactivate(seg, 1);
                } catch (Throwable t) {
                        Logger.error(this, "Caught "+t+" scheduling "+this, t);
                        fail(new FetchException(FetchException.INTERNAL_ERROR, 
t), container, context, true);
@@ -1106,7 +1125,7 @@
                if(persistent)
                        container.activate(seg, 1);
                if(seg != null) {
-                       seg.removeBlockNum(blockNum, container);
+                       seg.removeBlockNum(blockNum, container, false);
                        seg.possiblyRemoveFromParent(container, context);
                }
                for(int i=0;i<subSegments.size();i++) {
@@ -1114,7 +1133,7 @@
                        if(checkSeg == seg) continue;
                        if(persistent)
                                container.activate(checkSeg, 1);
-                       if(checkSeg.removeBlockNum(blockNum, container))
+                       if(checkSeg.removeBlockNum(blockNum, container, false))
                                Logger.error(this, "Block number "+blockNum+" 
was registered to wrong subsegment "+checkSeg+" should be "+seg);
                        if(persistent)
                                container.deactivate(checkSeg, 1);

Modified: 
branches/db4o/freenet/src/freenet/client/async/SplitFileFetcherSubSegment.java
===================================================================
--- 
branches/db4o/freenet/src/freenet/client/async/SplitFileFetcherSubSegment.java  
    2008-08-12 19:25:54 UTC (rev 21772)
+++ 
branches/db4o/freenet/src/freenet/client/async/SplitFileFetcherSubSegment.java  
    2008-08-12 19:42:51 UTC (rev 21773)
@@ -1,6 +1,8 @@
 package freenet.client.async;

 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Vector;

 import com.db4o.ObjectContainer;
@@ -21,6 +23,7 @@
 import freenet.node.KeysFetchingLocally;
 import freenet.node.LowLevelGetException;
 import freenet.node.RequestClient;
+import freenet.node.RequestScheduler;
 import freenet.node.SendableGet;
 import freenet.node.SupportsBulkCallFailure;
 import freenet.support.Logger;
@@ -229,11 +232,17 @@
        public void onFailure(BulkCallFailureItem[] items, ObjectContainer 
container, ClientContext context) {
                FetchException[] fetchExceptions = new 
FetchException[items.length];
                int countFatal = 0;
+               if(persistent) {
+                       container.activate(blockNums, 2);
+               }
                for(int i=0;i<items.length;i++) {
                        fetchExceptions[i] = translateException(items[i].e);
                        if(fetchExceptions[i].isFatal()) countFatal++;
+                       removeBlockNum(((Integer)items[i].token).intValue(), 
container, true);
                }
                if(persistent) {
+                       container.set(blockNums);
+                       container.deactivate(blockNums, 2);
                        container.activate(segment, 1);
                        container.activate(parent, 1);
                        container.activate(segment.errors, 1);
@@ -335,6 +344,7 @@
                } else {
                        segment.onNonFatalFailure(e, 
((Integer)token).intValue(), this, container, context);
                }
+               removeBlockNum(((Integer)token).intValue(), container, false);
                if(persistent) {
                        container.deactivate(segment, 1);
                        container.deactivate(parent, 1);
@@ -532,7 +542,10 @@

        }

-       public void add(int blockNo, boolean dontSchedule, ObjectContainer 
container, ClientContext context, boolean dontComplainOnDupes) {
+       /**
+        * @return True if the caller should schedule.
+        */
+       public boolean add(int blockNo, boolean dontSchedule, ObjectContainer 
container, ClientContext context, boolean dontComplainOnDupes) {
                if(persistent) {
 //                     container.activate(segment, 1);
                        container.activate(blockNums, 1);
@@ -554,7 +567,6 @@
                        } else {
                                blockNums.add(i);
                        }
-                       if(dontSchedule) schedule = false;
                        /**
                         * Race condition:
                         * 
@@ -575,8 +587,11 @@
                }
                if(persistent)
                        container.set(blockNums);
-               if(schedule)
+               if(schedule) {
+                       if(dontSchedule) return true;
                        context.getChkFetchScheduler().register(null, new 
SendableGet[] { this }, false, persistent, true, null, null);
+               }
+               return false;
        }

        public String toString() {
@@ -729,9 +744,9 @@
                getScheduler(context).register(firstTime ? segment : null, new 
SendableGet[] { this }, regmeOnly, persistent, true, 
segment.blockFetchContext.blocks, null);
        }

-       public boolean removeBlockNum(int blockNum, ObjectContainer container) {
+       public boolean removeBlockNum(int blockNum, ObjectContainer container, 
boolean callerActivatesAndSets) {
                if(logMINOR) Logger.minor(this, "Removing block "+blockNum+" 
from "+this);
-               if(persistent)
+               if(persistent && !callerActivatesAndSets)
                        container.activate(blockNums, 2);
                boolean found = false;
                synchronized(segment) {
@@ -746,11 +761,41 @@
                                }
                        }
                }
-               if(persistent) {
+               if(persistent && !callerActivatesAndSets) {
                        container.set(blockNums);
                        container.deactivate(blockNums, 2);
                }
                return found;
        }

+       @Override
+       public List<PersistentChosenBlock> makeBlocks(PersistentChosenRequest 
request, RequestScheduler sched, ObjectContainer container, ClientContext 
context) {
+               if(persistent) {
+                       container.activate(segment, 1);
+                       container.activate(blockNums, 1);
+               }
+               Integer[] blockNumbers;
+               synchronized(this) {
+                       blockNumbers = (Integer[]) blockNums.toArray(new 
Integer[blockNums.size()]);
+               }
+               ArrayList<PersistentChosenBlock> blocks = new 
ArrayList<PersistentChosenBlock>();
+               for(int i=0;i<blockNumbers.length;i++) {
+                       ClientKey key = segment.getBlockKey(blockNumbers[i], 
container);
+                       Key k = key.getNodeKey();
+                       if(key == null) {
+                               if(logMINOR)
+                                       Logger.minor(this, "Block 
"+blockNumbers[i]+" is null, maybe race condition");
+                               continue;
+                       }
+                       PersistentChosenBlock block = new 
PersistentChosenBlock(false, request, blockNumbers[i], k, key, sched);
+                       blocks.add(block);
+               }
+               blocks.trimToSize();
+               if(persistent) {
+                       container.deactivate(segment, 1);
+                       container.deactivate(blockNums, 1);
+               }
+               return blocks;
+       }
+
 }

Added: branches/db4o/freenet/src/freenet/client/async/TransientChosenBlock.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/TransientChosenBlock.java    
                        (rev 0)
+++ branches/db4o/freenet/src/freenet/client/async/TransientChosenBlock.java    
2008-08-12 19:42:51 UTC (rev 21773)
@@ -0,0 +1,72 @@
+package freenet.client.async;
+
+import freenet.keys.ClientKey;
+import freenet.keys.ClientKeyBlock;
+import freenet.keys.Key;
+import freenet.node.LowLevelGetException;
+import freenet.node.LowLevelPutException;
+import freenet.node.RequestScheduler;
+import freenet.node.SendableGet;
+import freenet.node.SendableInsert;
+import freenet.node.SendableRequest;
+import freenet.node.SendableRequestSender;
+
+/**
+ * A ChosenBlock which isn't persistent.
+ * @author Matthew Toseland <toad at amphibian.dyndns.org> (0xE43DA450)
+ *
+ */
+public class TransientChosenBlock extends ChosenBlock {
+
+       public final SendableRequest request;
+       public final RequestScheduler sched;
+
+       public TransientChosenBlock(SendableRequest req, Object token, Key key, 
ClientKey ckey, 
+                       boolean localRequestOnly, boolean cacheLocalRequests, 
boolean ignoreStore, RequestScheduler sched) {
+               super(token, key, ckey, localRequestOnly, cacheLocalRequests, 
ignoreStore, sched);
+               this.request = req;
+               this.sched = sched;
+       }
+
+       @Override
+       public boolean isCancelled() {
+               return request.isCancelled(null);
+       }
+
+       @Override
+       public boolean isPersistent() {
+               return false;
+       }
+
+       public void onFailure(LowLevelPutException e, ClientContext context) {
+               ((SendableInsert) request).onFailure(e, token, null, context);
+       }
+
+       public void onInsertSuccess(ClientContext context) {
+               ((SendableInsert) request).onSuccess(token, null, context);
+       }
+
+       public void onFailure(LowLevelGetException e, ClientContext context) {
+               ((SendableGet) request).onFailure(e, token, null, context);
+       }
+
+       public void onSuccess(ClientKeyBlock data, boolean fromStore, 
ClientContext context) {
+               ((SendableGet) request).onSuccess(data, fromStore, token, null, 
context);
+       }
+
+       @Override
+       public void onFetchSuccess(ClientContext context) {
+               sched.succeeded((SendableGet)request, this);
+       }
+
+       @Override
+       public short getPriority() {
+               return request.getPriorityClass(null);
+       }
+
+       @Override
+       public SendableRequestSender getSender(ClientContext context) {
+               return request.getSender(null, context);
+       }
+       
+}

Modified: branches/db4o/freenet/src/freenet/keys/ClientCHK.java
===================================================================
--- branches/db4o/freenet/src/freenet/keys/ClientCHK.java       2008-08-12 
19:25:54 UTC (rev 21772)
+++ branches/db4o/freenet/src/freenet/keys/ClientCHK.java       2008-08-12 
19:42:51 UTC (rev 21773)
@@ -40,7 +40,7 @@
     private ClientCHK(ClientCHK key) {
        this.routingKey = new byte[key.routingKey.length];
        System.arraycopy(key.routingKey, 0, routingKey, 0, 
key.routingKey.length);
-       this.nodeKey = (NodeCHK) key.nodeKey.cloneKey();
+       this.nodeKey = null;
        this.cryptoKey = new byte[key.cryptoKey.length];
        System.arraycopy(key.cryptoKey, 0, cryptoKey, 0, key.cryptoKey.length);
        this.controlDocument = key.controlDocument;

Modified: branches/db4o/freenet/src/freenet/node/BulkCallFailureItem.java
===================================================================
--- branches/db4o/freenet/src/freenet/node/BulkCallFailureItem.java     
2008-08-12 19:25:54 UTC (rev 21772)
+++ branches/db4o/freenet/src/freenet/node/BulkCallFailureItem.java     
2008-08-12 19:42:51 UTC (rev 21773)
@@ -1,18 +1,13 @@
 package freenet.node;

-import freenet.client.async.PersistentChosenRequest;
-
 public class BulkCallFailureItem {

        public final LowLevelGetException e;
        public final Object token;
-       /** Removed by ClientRequestScheduler, implementor of 
SupportsBulkCallFailure should ignore. */
-       public final PersistentChosenRequest req;

-       public BulkCallFailureItem(LowLevelGetException e, Object token, 
PersistentChosenRequest req) {
+       public BulkCallFailureItem(LowLevelGetException e, Object token) {
                this.e = e;
                this.token = token;
-               this.req = req;
        }

 }

Modified: branches/db4o/freenet/src/freenet/node/RequestScheduler.java
===================================================================
--- branches/db4o/freenet/src/freenet/node/RequestScheduler.java        
2008-08-12 19:25:54 UTC (rev 21772)
+++ branches/db4o/freenet/src/freenet/node/RequestScheduler.java        
2008-08-12 19:42:51 UTC (rev 21773)
@@ -6,7 +6,7 @@
 import java.util.LinkedList;

 import freenet.client.FECQueue;
-import freenet.client.async.ChosenRequest;
+import freenet.client.async.ChosenBlock;
 import freenet.client.async.ClientContext;
 import freenet.keys.ClientKey;
 import freenet.keys.Key;
@@ -20,7 +20,7 @@
         * another may also work. Also, delete the ChosenRequest if it is 
persistent. 
         * @param req The request we ran, which must be deleted.
         * */
-       public void succeeded(BaseSendableGet get, ChosenRequest req);
+       public void succeeded(BaseSendableGet get, ChosenBlock req);

        /**
         * After a key has been requested a few times, it is added to the 
cooldown queue for
@@ -49,34 +49,26 @@

        public void queueFillRequestStarterQueue();

-       public LinkedList getRequestStarterQueue();
-
-       public ChosenRequest getBetterNonPersistentRequest(ChosenRequest req);
-
        public KeysFetchingLocally fetchingKeys();

        public void removeFetchingKey(Key key);

-       public void removeChosenRequest(ChosenRequest req);
-
-       /** Call onFailure() on the database thread, then delete the 
PersistentChosenRequest. For a non-persistent request, 
-        * just call onFailure() immediately. */
-       public void callFailure(final SendableGet get, final 
LowLevelGetException e, final Object keyNum, int prio, ChosenRequest req, 
boolean persistent);
+       public void callFailure(SendableGet get, LowLevelGetException e, int 
prio, boolean persistent);

-       /** Call onFailure() on the database thread, then delete the 
PersistentChosenRequest. For a non-persistent request, 
-        * just call onFailure() immediately. */
-       public void callFailure(final SendableInsert put, final 
LowLevelPutException e, final Object keyNum, int prio, ChosenRequest req, 
boolean persistent);
-
-       /** Call onSuccess() on the database thread, then delete the 
PersistentChosenRequest. For a non-persistent request, 
-        * just call onFailure() immediately. */
-       public void callSuccess(final SendableInsert put, final Object keyNum, 
int prio, ChosenRequest req, boolean persistent);
-
+       public void callFailure(SendableInsert insert, LowLevelPutException 
exception, int prio, boolean persistent);
+       
        public FECQueue getFECQueue();

        public ClientContext getContext();

        public boolean addToFetching(Key key);

-       public void requeue(ChosenRequest req);
-       
+       public ChosenBlock grabRequest();
+
+       public void removeRunningRequest(SendableRequest request);
+
+       public abstract boolean isRunningRequest(SendableRequest request);
+
+       public void start(NodeClientCore core);
+
 }

Modified: branches/db4o/freenet/src/freenet/node/RequestStarter.java
===================================================================
--- branches/db4o/freenet/src/freenet/node/RequestStarter.java  2008-08-12 
19:25:54 UTC (rev 21772)
+++ branches/db4o/freenet/src/freenet/node/RequestStarter.java  2008-08-12 
19:42:51 UTC (rev 21773)
@@ -3,11 +3,9 @@
  * http://www.gnu.org/ for further details of the GPL. */
 package freenet.node;

-import java.util.LinkedList;
-
 import com.db4o.ObjectContainer;

-import freenet.client.async.ChosenRequest;
+import freenet.client.async.ChosenBlock;
 import freenet.client.async.ClientContext;
 import freenet.keys.Key;
 import freenet.support.Logger;
@@ -53,9 +51,6 @@
                return !((prio < MAXIMUM_PRIORITY_CLASS) || (prio > 
MINIMUM_PRIORITY_CLASS));
        }

-       /** Queue of requests to run. Added to by jobs on the database thread. 
*/
-       private LinkedList queue;
-       
        final BaseRequestThrottle throttle;
        final TokenBucket inputBucket;
        final TokenBucket outputBucket;
@@ -84,10 +79,10 @@

        void setScheduler(RequestScheduler sched) {
                this.sched = sched;
-               queue = sched.getRequestStarterQueue();
        }

        void start() {
+               sched.start(core);
                core.getExecutor().execute(this, name);
                sched.queueFillRequestStarterQueue();
        }
@@ -99,7 +94,7 @@
        }

        void realRun() {
-               ChosenRequest req = null;
+               ChosenBlock req = null;
                sentRequestTime = System.currentTimeMillis();
                // The last time at which we sent a request or decided not to
                long cycleTime = sentRequestTime;
@@ -121,10 +116,10 @@
                        sched.moveKeysFromCooldownQueue();
                        boolean logMINOR = Logger.shouldLog(Logger.MINOR, this);
                        if(req == null) {
-                               req = getRequest(logMINOR);
+                               req = sched.grabRequest();
                        }
                        if(req != null) {
-                               if(logMINOR) Logger.minor(this, "Running 
"+req+" priority "+req.prio);
+                               if(logMINOR) Logger.minor(this, "Running 
"+req+" priority "+req.getPriority());
                                // Wait
                                long delay = throttle.getDelay();
                                if(logMINOR) Logger.minor(this, 
"Delay="+delay+" from "+throttle);
@@ -161,7 +156,7 @@
                                // Always take the lock on RequestStarter 
first. AFAICS we don't synchronize on RequestStarter anywhere else.
                                // Nested locks here prevent extra latency when 
there is a race, and therefore allow us to sleep indefinitely
                                synchronized(this) {
-                                       req = getRequest(logMINOR);
+                                       req = sched.grabRequest();
                                        if(req == null) {
                                                try {
                                                        wait(100*1000); // as 
close to indefinite as I'm comfortable with! Toad
@@ -174,7 +169,7 @@
                        if(req == null) continue;
                        if(!startRequest(req, logMINOR)) {
                                // Don't log if it's a cancelled transient 
request.
-                               if(!((!req.isPersistent()) && 
req.request.isCancelled(null)))
+                               if(!((!req.isPersistent()) && 
req.isCancelled()))
                                        Logger.normal(this, "No requests to 
start on "+req);
                        }
                        req = null;
@@ -182,64 +177,12 @@
                }
        }

-       /**
-        * Pull a request from the from-the-database-thread queue. Then ask for 
a higher priority non-persistent request.
-        * If there isn't one, use the one we just pulled; if there is one, put 
it back on the queue.
-        * 
-        * Obviously, there will be a slightly higher latency for the database 
queue; for example, when adding a new 
-        * higher priority persistent request, we have a queue-length penalty 
before starting to request it. We also
-        * have a significant penalty from all the database access (even if the 
request itself is cached, the database
-        * thread is probably doing other things so we have to wait for that to 
finish).
-        * @return
-        */
-       private ChosenRequest getRequest(boolean logMINOR) {
-               boolean usedReq = true;
-               ChosenRequest req = null;
-               while(true) {
-                       synchronized(queue) {
-                               if(queue.isEmpty()) break;
-                               req = (ChosenRequest) queue.removeFirst();
-                       }
-                       if((!req.isPersistent()) && 
req.request.isCancelled(null)) continue;
-                       break;
-               }
-               ChosenRequest betterReq = 
sched.getBetterNonPersistentRequest(req);
-               if(req != null) {
-                       if(betterReq != null) {
-                               if(logMINOR)
-                                       Logger.minor(this, "Not using "+req+" 
in favour of "+betterReq);
-                               synchronized(queue) {
-                                       queue.addFirst(req);
-                                       queue.remove(betterReq);
-                               }
-                               req = null;
-                               usedReq = false;
-                       }
-               }
-               if(req == null) {
-                       usedReq = false;
-                       req = betterReq;
-               }
-               if(usedReq || req == null)
-                       sched.queueFillRequestStarterQueue();
-               if(req == null && logMINOR) Logger.minor(this, "No requests 
found");
-               if(req != null && !this.isInsert) {
-                       if(!sched.addToFetching(req.key)) {
-                               sched.requeue(req);
-                               Logger.error(this, "Skipping request as 
duplicate: "+req);
-                               req = null;
-                       }
-               }
-               return req;
-       }
-
-       private boolean startRequest(ChosenRequest req, boolean logMINOR) {
-               if((!req.isPersistent()) && req.request.isCancelled(null)) {
+       private boolean startRequest(ChosenBlock req, boolean logMINOR) {
+               if((!req.isPersistent()) && req.isCancelled()) {
                        sched.removeFetchingKey(req.key);
-                       sched.removeChosenRequest(req);
                        return false;
                }
-               if(logMINOR) Logger.minor(this, "Running request "+req+" 
priority "+req.prio);
+               if(logMINOR) Logger.minor(this, "Running request "+req+" 
priority "+req.getPriority());
                core.getExecutor().execute(new SenderThread(req, req.key), 
"RequestStarter$SenderThread for "+req);
                return true;
        }
@@ -259,10 +202,10 @@

        private class SenderThread implements Runnable {

-               private final ChosenRequest req;
+               private final ChosenBlock req;
                private final Key key;

-               public SenderThread(ChosenRequest req, Key key) {
+               public SenderThread(ChosenBlock req, Key key) {
                        this.req = req;
                        this.key = key;
                }
@@ -274,18 +217,13 @@
                    if (key != null)
                        
stats.reportOutgoingLocalRequestLocation(key.toNormalizedDouble());
                    if(!req.send(core, sched)) {
-                               if(!((!req.isPersistent()) && 
req.request.isCancelled(null)))
+                               if(!((!req.isPersistent()) && 
req.isCancelled()))
                                        Logger.error(this, "run() not able to 
send a request on "+req);
                                else
                                        Logger.normal(this, "run() not able to 
send a request on "+req+" - request was cancelled");
                        }
                        if(Logger.shouldLog(Logger.MINOR, this)) 
                                Logger.minor(this, "Finished "+req);
-                       } catch (Throwable t) {
-                               // Remove it if something is thrown.
-                               // But normally send(), callFailure() or 
callSuccess() will remove it.
-                               Logger.error(this, "Caught "+t, t);
-                               sched.removeChosenRequest(req);
                        } finally {
                                sched.removeFetchingKey(key);
                        }
@@ -300,6 +238,10 @@
        }

        public boolean exclude(RandomGrabArrayItem item, ObjectContainer 
container, ClientContext context) {
+               if(sched.isRunningRequest((SendableRequest)item)) {
+                       Logger.normal(this, "Excluding already-running request: 
"+item);
+                       return true;
+               }
                if(isInsert) return false;
                if(!(item instanceof BaseSendableGet)) {
                        Logger.error(this, "On a request scheduler, exclude() 
called with "+item, new Exception("error"));

Modified: branches/db4o/freenet/src/freenet/node/SendableGet.java
===================================================================
--- branches/db4o/freenet/src/freenet/node/SendableGet.java     2008-08-12 
19:25:54 UTC (rev 21772)
+++ branches/db4o/freenet/src/freenet/node/SendableGet.java     2008-08-12 
19:42:51 UTC (rev 21773)
@@ -6,14 +6,12 @@
 import com.db4o.ObjectContainer;

 import freenet.client.FetchContext;
-import freenet.client.async.ChosenRequest;
 import freenet.client.async.ClientContext;
 import freenet.client.async.ClientRequestScheduler;
 import freenet.client.async.ClientRequester;
 import freenet.keys.ClientKey;
 import freenet.keys.ClientKeyBlock;
 import freenet.keys.Key;
-import freenet.support.Logger;
 import freenet.support.io.NativeThread;

 /**
@@ -55,50 +53,12 @@
                this.parent = parent;
        }

-       /** Do the request, blocking. Called by RequestStarter. 
-        * Also responsible for deleting it.
-        * @return True if a request was executed. False if caller should try 
to find another request, and remove
-        * this one from the queue. */
-       public boolean send(NodeClientCore core, final RequestScheduler sched, 
ChosenRequest req) {
-               Object keyNum = req.token;
-               ClientKey key = req.ckey;
-               if(key == null) {
-                       Logger.error(this, "Key is null in send(): keyNum = 
"+keyNum+" for "+this);
-                       return false;
-               }
-               if(Logger.shouldLog(Logger.MINOR, this))
-                       Logger.minor(this, "Sending get for key "+keyNum+" : 
"+key);
-               boolean logMINOR = Logger.shouldLog(Logger.MINOR, this);
-               if((!req.isPersistent()) && isCancelled(null)) {
-                       if(logMINOR) Logger.minor(this, "Cancelled: "+this);
-                       sched.callFailure(this, new 
LowLevelGetException(LowLevelGetException.CANCELLED), keyNum, 
NativeThread.NORM_PRIORITY+1, req, req.isPersistent());
-                       return false;
-               }
-               try {
-                       try {
-                               core.realGetKey(key, req.localRequestOnly, 
req.cacheLocalRequests, req.ignoreStore);
-                       } catch (final LowLevelGetException e) {
-                               sched.callFailure(this, e, keyNum, 
NativeThread.NORM_PRIORITY+1, req, req.isPersistent());
-                               return true;
-                       } catch (Throwable t) {
-                               Logger.error(this, "Caught "+t, t);
-                               sched.callFailure(this, new 
LowLevelGetException(LowLevelGetException.INTERNAL_ERROR), keyNum, 
NativeThread.NORM_PRIORITY+1, req, req.isPersistent());
-                               return true;
-                       }
-                       // We must remove the request even in this case.
-                       // On other paths, callFailure() will do the removal.
-                       sched.removeChosenRequest(req);
-                       // Don't call onSuccess(), it will be called for us by 
backdoor coalescing.
-                       sched.succeeded(this, req);
-                       
-               } catch (Throwable t) {
-                       Logger.error(this, "Caught "+t, t);
-                       sched.callFailure(this, new 
LowLevelGetException(LowLevelGetException.INTERNAL_ERROR), keyNum, 
NativeThread.NORM_PRIORITY+1, req, req.isPersistent());
-                       return true;
-               }
-               return true;
+       static final SendableGetRequestSender sender = new 
SendableGetRequestSender();
+       
+       public SendableRequestSender getSender(ObjectContainer container, 
ClientContext context) {
+               return sender;
        }
-
+       
        public ClientRequestScheduler getScheduler(ClientContext context) {
                if(isSSK())
                        return context.getSskFetchScheduler();
@@ -119,8 +79,11 @@
        /** Reset the cooldown times when the request is reregistered. */
        public abstract void resetCooldownTimes(ObjectContainer container);

-       public void internalError(final Object keyNum, final Throwable t, final 
RequestScheduler sched, ObjectContainer container, ClientContext context, 
boolean persistent) {
-               sched.callFailure(this, new 
LowLevelGetException(LowLevelGetException.INTERNAL_ERROR, t.getMessage(), t), 
keyNum, NativeThread.MAX_PRIORITY, null, persistent);
+       /**
+        * An internal error occurred, effecting this SendableGet, 
independantly of any ChosenBlock's.
+        */
+       public void internalError(final Throwable t, final RequestScheduler 
sched, ObjectContainer container, ClientContext context, boolean persistent) {
+               sched.callFailure(this, new 
LowLevelGetException(LowLevelGetException.INTERNAL_ERROR, t.getMessage(), t), 
NativeThread.MAX_PRIORITY, persistent);
        }

        /**

Added: branches/db4o/freenet/src/freenet/node/SendableGetRequestSender.java
===================================================================
--- branches/db4o/freenet/src/freenet/node/SendableGetRequestSender.java        
                        (rev 0)
+++ branches/db4o/freenet/src/freenet/node/SendableGetRequestSender.java        
2008-08-12 19:42:51 UTC (rev 21773)
@@ -0,0 +1,49 @@
+package freenet.node;
+
+import freenet.client.async.ChosenBlock;
+import freenet.client.async.ClientContext;
+import freenet.keys.ClientKey;
+import freenet.support.Logger;
+
+public class SendableGetRequestSender implements SendableRequestSender {
+
+       /** Do the request, blocking. Called by RequestStarter. 
+        * Also responsible for deleting it.
+        * @return True if a request was executed. False if caller should try 
to find another request, and remove
+        * this one from the queue. */
+       public boolean send(NodeClientCore core, final RequestScheduler sched, 
ClientContext context, ChosenBlock req) {
+               Object keyNum = req.token;
+               ClientKey key = req.ckey;
+               if(key == null) {
+                       Logger.error(SendableGet.class, "Key is null in send(): 
keyNum = "+keyNum+" for "+req);
+                       return false;
+               }
+               boolean logMINOR = Logger.shouldLog(Logger.MINOR, this);
+               if(Logger.shouldLog(Logger.MINOR, SendableGet.class))
+                       Logger.minor(SendableGet.class, "Sending get for key 
"+keyNum+" : "+key);
+               if(req.isCancelled()) {
+                       if(logMINOR) Logger.minor(SendableGet.class, 
"Cancelled: "+req);
+                       req.onFailure(new 
LowLevelGetException(LowLevelGetException.CANCELLED), context);
+                       return false;
+               }
+               try {
+                       try {
+                               core.realGetKey(key, req.localRequestOnly, 
req.cacheLocalRequests, req.ignoreStore);
+                       } catch (final LowLevelGetException e) {
+                               req.onFailure(e, context);
+                               return true;
+                       } catch (Throwable t) {
+                               Logger.error(this, "Caught "+t, t);
+                               req.onFailure(new 
LowLevelGetException(LowLevelGetException.INTERNAL_ERROR), context);
+                               return true;
+                       }
+                       req.onFetchSuccess(context);
+               } catch (Throwable t) {
+                       Logger.error(this, "Caught "+t, t);
+                       req.onFailure(new 
LowLevelGetException(LowLevelGetException.INTERNAL_ERROR), context);
+                       return true;
+               }
+               return true;
+       }
+
+}

Modified: branches/db4o/freenet/src/freenet/node/SendableInsert.java
===================================================================
--- branches/db4o/freenet/src/freenet/node/SendableInsert.java  2008-08-12 
19:25:54 UTC (rev 21772)
+++ branches/db4o/freenet/src/freenet/node/SendableInsert.java  2008-08-12 
19:42:51 UTC (rev 21773)
@@ -6,6 +6,7 @@
 import com.db4o.ObjectContainer;

 import freenet.client.async.ClientContext;
+import freenet.support.io.NativeThread;

 /**
  * Callback interface for a low level insert, which is immediately sendable. 
These
@@ -25,8 +26,8 @@
        /** Called when we don't! */
        public abstract void onFailure(LowLevelPutException e, Object keyNum, 
ObjectContainer container, ClientContext context);

-       public void internalError(Object keyNum, Throwable t, RequestScheduler 
sched, ObjectContainer container, ClientContext context, boolean persistent) {
-               onFailure(new 
LowLevelPutException(LowLevelPutException.INTERNAL_ERROR, t.getMessage(), t), 
keyNum, container, context);
+       public void internalError(Throwable t, RequestScheduler sched, 
ObjectContainer container, ClientContext context, boolean persistent) {
+               sched.callFailure(this, new 
LowLevelPutException(LowLevelPutException.INTERNAL_ERROR, t.getMessage(), t), 
NativeThread.MAX_PRIORITY, persistent);
        }

 }

Modified: branches/db4o/freenet/src/freenet/node/SendableRequest.java
===================================================================
--- branches/db4o/freenet/src/freenet/node/SendableRequest.java 2008-08-12 
19:25:54 UTC (rev 21772)
+++ branches/db4o/freenet/src/freenet/node/SendableRequest.java 2008-08-12 
19:42:51 UTC (rev 21773)
@@ -1,11 +1,15 @@
 package freenet.node;

+import java.util.List;
+
 import com.db4o.ObjectContainer;

-import freenet.client.async.ChosenRequest;
+import freenet.client.async.ChosenBlock;
 import freenet.client.async.ClientContext;
 import freenet.client.async.ClientRequestScheduler;
 import freenet.client.async.ClientRequester;
+import freenet.client.async.PersistentChosenBlock;
+import freenet.client.async.PersistentChosenRequest;
 import freenet.support.Logger;
 import freenet.support.RandomGrabArray;
 import freenet.support.RandomGrabArrayItem;
@@ -54,18 +58,14 @@
         * currently running, on the cooldown queue etc. */
        public abstract Object[] sendableKeys(ObjectContainer container);

-       /** ONLY called by RequestStarter. Start the actual request using the 
NodeClientCore
-        * provided, and the key and key number earlier got from chooseKey(). 
-        * The request itself may have been removed from the overall queue 
already. For 
-        * persistent requests, the callbacks will be called on the database 
thread, and we 
-        * will delete the PersistentChosenRequest from there before committing.
-        * @param sched The scheduler this request has just been grabbed from.
-        * @param keyNum The key number that was fed into getKeyObject().
-        * @param key The key returned from grabKey().
-        * @param ckey The client key for decoding, if available (mandatory for 
SendableGet, null otherwise).
-        * @return True if a request was sent, false otherwise (in which case 
the request will
-        * be removed if it hasn't already been). */
-       public abstract boolean send(NodeClientCore node, RequestScheduler 
sched, ChosenRequest request);
+       /**
+        * Get or create a SendableRequestSender for this object. This is a 
non-persistent
+        * object used to send the requests. @see SendableGet.getSender().
+        * @param container A database handle may be necessary for creating it.
+        * @param context A client context may also be necessary.
+        * @return
+        */
+       public abstract SendableRequestSender getSender(ObjectContainer 
container, ClientContext context);

        /** If true, the request has been cancelled, or has completed, either 
way it need not
         * be registered any more. isEmpty() on the other hand means there are 
no queued blocks.
@@ -127,6 +127,10 @@
        public abstract boolean isSSK();

        /** Requeue after an internal error */
-       public abstract void internalError(Object keyNum, Throwable t, 
RequestScheduler sched, ObjectContainer container, ClientContext context, 
boolean persistent);
+       public abstract void internalError(Throwable t, RequestScheduler sched, 
ObjectContainer container, ClientContext context, boolean persistent);

+       /** Construct a full set of ChosenBlock's for a persistent request. 
These are transient, so we will need to clone keys
+        * etc. */
+       public abstract List<PersistentChosenBlock> 
makeBlocks(PersistentChosenRequest request, RequestScheduler sched, 
ObjectContainer container, ClientContext context);
+
 }

Added: branches/db4o/freenet/src/freenet/node/SendableRequestSender.java
===================================================================
--- branches/db4o/freenet/src/freenet/node/SendableRequestSender.java           
                (rev 0)
+++ branches/db4o/freenet/src/freenet/node/SendableRequestSender.java   
2008-08-12 19:42:51 UTC (rev 21773)
@@ -0,0 +1,26 @@
+package freenet.node;
+
+import freenet.client.async.ChosenBlock;
+import freenet.client.async.ClientContext;
+
+/**
+ * Interface for class responsible for doing the actual sending of requests.
+ * Strictly non-persistent.
+ * @author Matthew Toseland <toad at amphibian.dyndns.org> (0xE43DA450)
+ */
+public interface SendableRequestSender {
+       
+       /** ONLY called by RequestStarter. Start the actual request using the 
NodeClientCore
+        * provided, and the key and key number earlier got from chooseKey(). 
+        * The request itself may have been removed from the overall queue 
already. For 
+        * persistent requests, the callbacks will be called on the database 
thread, and we 
+        * will delete the PersistentChosenRequest from there before committing.
+        * @param sched The scheduler this request has just been grabbed from.
+        * @param keyNum The key number that was fed into getKeyObject().
+        * @param key The key returned from grabKey().
+        * @param ckey The client key for decoding, if available (mandatory for 
SendableGet, null otherwise).
+        * @return True if a request was sent, false otherwise (in which case 
the request will
+        * be removed if it hasn't already been). */
+       public abstract boolean send(NodeClientCore node, RequestScheduler 
sched, ClientContext context, ChosenBlock request);
+       
+}

Modified: branches/db4o/freenet/src/freenet/node/SimpleSendableInsert.java
===================================================================
--- branches/db4o/freenet/src/freenet/node/SimpleSendableInsert.java    
2008-08-12 19:25:54 UTC (rev 21772)
+++ branches/db4o/freenet/src/freenet/node/SimpleSendableInsert.java    
2008-08-12 19:42:51 UTC (rev 21773)
@@ -3,12 +3,16 @@
  * http://www.gnu.org/ for further details of the GPL. */
 package freenet.node;

+import java.util.List;
+
 import com.db4o.ObjectContainer;

-import freenet.client.async.ChosenRequest;
+import freenet.client.async.ChosenBlock;
 import freenet.client.async.ClientContext;
 import freenet.client.async.ClientRequestScheduler;
 import freenet.client.async.ClientRequester;
+import freenet.client.async.PersistentChosenBlock;
+import freenet.client.async.PersistentChosenRequest;
 import freenet.keys.CHKBlock;
 import freenet.keys.ClientKey;
 import freenet.keys.KeyBlock;
@@ -72,22 +76,28 @@
                return 0;
        }

-       public boolean send(NodeClientCore core, RequestScheduler sched, 
ChosenRequest req) {
-               // Ignore keyNum, key, since this is a single block
-               boolean logMINOR = Logger.shouldLog(Logger.MINOR, this);
-               try {
-                       if(logMINOR) Logger.minor(this, "Starting request: 
"+this);
-                       core.realPut(block, shouldCache());
-               } catch (LowLevelPutException e) {
-                       sched.callFailure(this, e, req.token, 
NativeThread.NORM_PRIORITY, req, false);
-                       if(logMINOR) Logger.minor(this, "Request failed: 
"+this+" for "+e);
-                       return true;
-               } finally {
-                       finished = true;
-               }
-               if(logMINOR) Logger.minor(this, "Request succeeded: "+this);
-               sched.callSuccess(this, req.token, NativeThread.NORM_PRIORITY, 
req, false);
-               return true;
+       @Override
+       public SendableRequestSender getSender(ObjectContainer container, 
ClientContext context) {
+               return new SendableRequestSender() {
+
+                       public boolean send(NodeClientCore core, 
RequestScheduler sched, ClientContext context, ChosenBlock req) {
+                               // Ignore keyNum, key, since this is a single 
block
+                               boolean logMINOR = 
Logger.shouldLog(Logger.MINOR, this);
+                               try {
+                                       if(logMINOR) Logger.minor(this, 
"Starting request: "+this);
+                                       core.realPut(block, shouldCache());
+                               } catch (LowLevelPutException e) {
+                                       onFailure(e, req.token, null, context);
+                                       if(logMINOR) Logger.minor(this, 
"Request failed: "+this+" for "+e);
+                                       return true;
+                               } finally {
+                                       finished = true;
+                               }
+                               if(logMINOR) Logger.minor(this, "Request 
succeeded: "+this);
+                               onSuccess(req.token, null, context);
+                               return true;
+                       }
+               };
        }

        public RequestClient getClient() {
@@ -146,4 +156,10 @@
        public boolean isSSK() {
                return block instanceof SSKBlock;
        }
+
+       @Override
+       public List<PersistentChosenBlock> makeBlocks(PersistentChosenRequest 
request, RequestScheduler sched, ObjectContainer container, ClientContext 
context) {
+               // Transient-only so no makeBlocks().
+               throw new UnsupportedOperationException();
+       }
 }


Reply via email to