Author: toad
Date: 2008-05-31 13:12:13 +0000 (Sat, 31 May 2008)
New Revision: 20155

Added:
   branches/db4o/freenet/src/freenet/client/async/ChosenRequest.java
Modified:
   branches/db4o/freenet/src/freenet/client/async/ClientRequestScheduler.java
   
branches/db4o/freenet/src/freenet/client/async/ClientRequestSchedulerCore.java
   branches/db4o/freenet/src/freenet/node/RequestScheduler.java
   branches/db4o/freenet/src/freenet/node/RequestStarter.java
Log:
Return ChosenRequest's, not SendableRequest's.
This means we call chooseKey() and getNodeKey() on the database thread, which 
means no database I/O on the request starter thread.


Added: branches/db4o/freenet/src/freenet/client/async/ChosenRequest.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/ChosenRequest.java           
                (rev 0)
+++ branches/db4o/freenet/src/freenet/client/async/ChosenRequest.java   
2008-05-31 13:12:13 UTC (rev 20155)
@@ -0,0 +1,29 @@
+/* This code is part of Freenet. It is distributed under the GNU General
+ * Public License, version 2 (or at your option any later version). See
+ * http://www.gnu.org/ for further details of the GPL. */
+package freenet.client.async;
+
+import freenet.keys.Key;
+import freenet.node.SendableRequest;
+
+/**
+ * A request chosen by ClientRequestScheduler.
+ * @author toad
+ */
+public class ChosenRequest {
+
+       /** The request object */
+       public final SendableRequest request;
+       /** The token indicating the key within the request to be 
fetched/inserted.
+        * Meaning is entirely defined by the request. */
+       public final Object token;
+       /** The key to be fetched, null if not a BaseSendableGet */
+       public final Key key;
+
+       ChosenRequest(SendableRequest req, Object tok, Key key) {
+               request = req;
+               token = tok;
+               this.key = key;
+       }
+       
+}

Modified: 
branches/db4o/freenet/src/freenet/client/async/ClientRequestScheduler.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/ClientRequestScheduler.java  
2008-05-31 12:35:43 UTC (rev 20154)
+++ branches/db4o/freenet/src/freenet/client/async/ClientRequestScheduler.java  
2008-05-31 13:12:13 UTC (rev 20155)
@@ -17,6 +17,7 @@
 import freenet.keys.KeyBlock;
 import freenet.keys.KeyVerifyException;
 import freenet.node.BaseSendableGet;
+import freenet.node.KeysFetchingLocally;
 import freenet.node.LowLevelGetException;
 import freenet.node.Node;
 import freenet.node.NodeClientCore;
@@ -241,7 +242,7 @@
                        schedTransient.addPendingKey(key, getter);
        }

-       private synchronized SendableRequest removeFirst() {
+       private synchronized ChosenRequest removeFirst() {
                if(!databaseExecutor.onThread()) {
                        throw new IllegalStateException("Not on database 
thread!");
                }
@@ -254,7 +255,7 @@
                return schedCore.removeFirst(fuzz, random, offeredKeys, 
starter, schedTransient, false, (short) -1, -1);
        }

-       public SendableRequest getBetterNonPersistentRequest(SendableRequest 
req) {
+       public ChosenRequest getBetterNonPersistentRequest(ChosenRequest req) {
                short fuzz = -1;
                if(PRIORITY_SOFT.equals(choosenPriorityScheduler))
                        fuzz = -1;
@@ -262,8 +263,8 @@
                        fuzz = 0;       
                if(req == null)
                        return schedCore.removeFirst(fuzz, random, offeredKeys, 
starter, schedTransient, true, (short) -1, -1);
-               short prio = req.getPriorityClass();
-               int retryCount = req.getRetryCount();
+               short prio = req.request.getPriorityClass();
+               int retryCount = req.request.getRetryCount();
                return schedCore.removeFirst(fuzz, random, offeredKeys, 
starter, schedTransient, true, prio, retryCount);
        }

@@ -282,7 +283,7 @@

        private Runnable requestStarterQueueFiller = new Runnable() {
                public void run() {
-                       SendableRequest req = null;
+                       ChosenRequest req = null;
                        while(true) {
                                synchronized(starterQueue) {
                                        if(req != null) {
@@ -531,4 +532,12 @@
                // Approximately... there might be some overlap in the two 
pendingKeys's...
                return schedCore.countQueuedRequests() + 
schedTransient.countQueuedRequests();
        }
+
+       public KeysFetchingLocally fetchingKeys() {
+               return schedCore;
+       }
+
+       public void removeFetchingKey(Key key) {
+               schedCore.removeFetchingKey(key);
+       }
 }

Modified: 
branches/db4o/freenet/src/freenet/client/async/ClientRequestSchedulerCore.java
===================================================================
--- 
branches/db4o/freenet/src/freenet/client/async/ClientRequestSchedulerCore.java  
    2008-05-31 12:35:43 UTC (rev 20154)
+++ 
branches/db4o/freenet/src/freenet/client/async/ClientRequestSchedulerCore.java  
    2008-05-31 13:12:13 UTC (rev 20155)
@@ -3,6 +3,7 @@
  * http://www.gnu.org/ for further details of the GPL. */
 package freenet.client.async;

+import java.util.HashSet;
 import java.util.List;

 import com.db4o.ObjectContainer;
@@ -14,7 +15,9 @@

 import freenet.crypt.RandomSource;
 import freenet.keys.ClientKey;
+import freenet.keys.Key;
 import freenet.node.BaseSendableGet;
+import freenet.node.KeysFetchingLocally;
 import freenet.node.Node;
 import freenet.node.RequestStarter;
 import freenet.node.SendableGet;
@@ -33,7 +36,7 @@
  * Does not refer to any non-persistable classes as member variables: Node 
must always 
  * be passed in if we need to use it!
  */
-class ClientRequestSchedulerCore extends ClientRequestSchedulerBase {
+class ClientRequestSchedulerCore extends ClientRequestSchedulerBase implements 
KeysFetchingLocally {

        private ObjectContainer container;
        private static boolean logMINOR;
@@ -43,8 +46,19 @@
        private transient RandomSource random;
        private transient PrioritizedSerialExecutor databaseExecutor;
        private transient ClientRequestScheduler sched;
-
+       
        /**
+        * All Key's we are currently fetching. 
+        * Locally originated requests only, avoids some complications with 
HTL, 
+        * and also has the benefit that we can see stuff that's been scheduled 
on a SenderThread
+        * but that thread hasn't started yet. FIXME: Both issues can be 
avoided: first we'd get 
+        * rid of the SenderThread and start the requests directly and 
asynchronously, secondly
+        * we'd move this to node but only track keys we are fetching at max 
HTL.
+        * LOCKING: Always lock this LAST.
+        */
+       private transient HashSet keysFetching;
+       
+       /**
         * Fetch a ClientRequestSchedulerCore from the database, or create a 
new one.
         * @param node
         * @param forInserts
@@ -95,8 +109,12 @@
                }
                this.random = random;
                this.databaseExecutor = databaseExecutor;
+               if(!isInsertScheduler)
+                       keysFetching = new HashSet();
+               else
+                       keysFetching = null;
+               this.sched = sched;
                databaseExecutor.execute(registerMeRunner, 
NativeThread.NORM_PRIORITY, "Register request");
-               this.sched = sched;
        }

        // We pass in the schedTransient to the next two methods so that we can 
select between either of them.
@@ -139,7 +157,25 @@
        // We prevent a number of race conditions (e.g. adding a retry count 
and then another 
        // thread removes it cos its empty) ... and in addToGrabArray etc we 
already sync on this.
        // The worry is ... is there any nested locking outside of the 
hierarchy?
-       SendableRequest removeFirst(int fuzz, RandomSource random, 
OfferedKeysList[] offeredKeys, RequestStarter starter, 
ClientRequestSchedulerNonPersistent schedTransient, boolean transientOnly, 
short maxPrio, int retryCount) {
+       ChosenRequest removeFirst(int fuzz, RandomSource random, 
OfferedKeysList[] offeredKeys, RequestStarter starter, 
ClientRequestSchedulerNonPersistent schedTransient, boolean transientOnly, 
short maxPrio, int retryCount) {
+               SendableRequest req = removeFirstInner(fuzz, random, 
offeredKeys, starter, schedTransient, transientOnly, maxPrio, retryCount);
+               Object token = req.chooseKey(this);
+               if(token == null) {
+                       return null;
+               } else {
+                       Key key;
+                       if(isInsertScheduler)
+                               key = null;
+                       else
+                               key = ((BaseSendableGet)req).getNodeKey(token);
+                       ChosenRequest ret = new ChosenRequest(req, token, key);
+                       if(key != null)
+                               keysFetching.add(key);
+                       return ret;
+               }
+       }
+       
+       SendableRequest removeFirstInner(int fuzz, RandomSource random, 
OfferedKeysList[] offeredKeys, RequestStarter starter, 
ClientRequestSchedulerNonPersistent schedTransient, boolean transientOnly, 
short maxPrio, int retryCount) {
                // Priorities start at 0
                if(logMINOR) Logger.minor(this, "removeFirst()");
                boolean tryOfferedKeys = offeredKeys != null && 
random.nextBoolean();
@@ -156,7 +192,7 @@
                for(;choosenPriorityClass <= 
RequestStarter.MINIMUM_PRIORITY_CLASS;choosenPriorityClass++) {
                        if(logMINOR) Logger.minor(this, "Using priority 
"+choosenPriorityClass);
                if(tryOfferedKeys) {
-                       
if(offeredKeys[choosenPriorityClass].hasValidKeys(starter))
+                       if(offeredKeys[choosenPriorityClass].hasValidKeys(this))
                                return offeredKeys[choosenPriorityClass];
                }
                SortedVectorByNumber perm = null;
@@ -401,6 +437,18 @@
                        }
                }
        }
+
+       public boolean hasKey(Key key) {
+               synchronized(keysFetching) {
+                       return keysFetching.contains(key);
+               }
+       }
+
+       public void removeFetchingKey(Key key) {
+               synchronized(keysFetching) {
+                       keysFetching.remove(key);
+               }
+       }

 }


Modified: branches/db4o/freenet/src/freenet/node/RequestScheduler.java
===================================================================
--- branches/db4o/freenet/src/freenet/node/RequestScheduler.java        
2008-05-31 12:35:43 UTC (rev 20154)
+++ branches/db4o/freenet/src/freenet/node/RequestScheduler.java        
2008-05-31 13:12:13 UTC (rev 20155)
@@ -5,7 +5,9 @@

 import java.util.LinkedList;

+import freenet.client.async.ChosenRequest;
 import freenet.keys.ClientKey;
+import freenet.keys.Key;

 public interface RequestScheduler {

@@ -45,6 +47,10 @@

        public LinkedList getRequestStarterQueue();

-       public SendableRequest getBetterNonPersistentRequest(SendableRequest 
req);
+       public ChosenRequest getBetterNonPersistentRequest(ChosenRequest req);
+
+       public KeysFetchingLocally fetchingKeys();
+
+       public void removeFetchingKey(Key key);

 }

Modified: branches/db4o/freenet/src/freenet/node/RequestStarter.java
===================================================================
--- branches/db4o/freenet/src/freenet/node/RequestStarter.java  2008-05-31 
12:35:43 UTC (rev 20154)
+++ branches/db4o/freenet/src/freenet/node/RequestStarter.java  2008-05-31 
13:12:13 UTC (rev 20155)
@@ -6,6 +6,7 @@
 import java.util.HashSet;
 import java.util.LinkedList;

+import freenet.client.async.ChosenRequest;
 import freenet.keys.Key;
 import freenet.support.Logger;
 import freenet.support.OOMHandler;
@@ -20,7 +21,7 @@
  * And you have to provide a RequestStarterClient. We do round robin between 
  * clients on the same priority level.
  */
-public class RequestStarter implements Runnable, KeysFetchingLocally, 
RandomGrabArrayItemExclusionList {
+public class RequestStarter implements Runnable, 
RandomGrabArrayItemExclusionList {

        /*
         * Priority classes
@@ -77,7 +78,6 @@
                this.averageInputBytesPerRequest = averageInputBytesPerRequest;
                this.isInsert = isInsert;
                this.isSSK = isSSK;
-               if(!isInsert) keysFetching = new HashSet();
        }

        void setScheduler(RequestScheduler sched) {
@@ -97,7 +97,7 @@
        }

        void realRun() {
-               SendableRequest req = null;
+               ChosenRequest req = null;
                sentRequestTime = System.currentTimeMillis();
                // The last time at which we sent a request or decided not to
                long cycleTime = sentRequestTime;
@@ -171,7 +171,7 @@
                        }
                        if(req == null) continue;
                        if(!startRequest(req, logMINOR)) {
-                               if(!req.isCancelled())
+                               if(!req.request.isCancelled())
                                        Logger.normal(this, "No requests to 
start on "+req);
                        }
                        req = null;
@@ -189,16 +189,16 @@
         * thread is probably doing other things so we have to wait for that to 
finish).
         * @return
         */
-       private SendableRequest getRequest() {
-               SendableRequest req;
+       private ChosenRequest getRequest() {
+               ChosenRequest req;
                while(true) {
                        synchronized(queue) {
-                               req = (SendableRequest) queue.removeFirst();
+                               req = (ChosenRequest) queue.removeFirst();
                        }
-                       if(req.isCancelled()) continue;
+                       if(req.request.isCancelled()) continue;
                        break;
                }
-               SendableRequest betterReq = 
sched.getBetterNonPersistentRequest(req);
+               ChosenRequest betterReq = 
sched.getBetterNonPersistentRequest(req);
                if(req != null) {
                        if(betterReq != null) {
                                synchronized(queue) {
@@ -213,59 +213,10 @@
                return req;
        }

-       /**
-        * All Key's we are currently fetching. 
-        * Locally originated requests only, avoids some complications with 
HTL, 
-        * and also has the benefit that we can see stuff that's been scheduled 
on a SenderThread
-        * but that thread hasn't started yet. FIXME: Both issues can be 
avoided: first we'd get 
-        * rid of the SenderThread and start the requests directly and 
asynchronously, secondly
-        * we'd move this to node but only track keys we are fetching at max 
HTL.
-        * LOCKING: Always lock this LAST.
-        */
-       private HashSet keysFetching;
-       
-       private boolean startRequest(SendableRequest req, boolean logMINOR) {
-               // Create a thread to handle starting the request, and the 
resulting feedback
-               Object keyNum = null;
-               Key key = null;
-               while(true) {
-                       try {
-                               keyNum = req.chooseKey(isInsert ? null : this);
-                               if(keyNum == null) return false;
-                               if(!isInsert) {
-                                       key = 
((BaseSendableGet)req).getNodeKey(keyNum);
-                                       if(key == null) return false;
-                                       synchronized(keysFetching) {
-                                               keysFetching.add(key);
-                                       }
-                               }
-                               core.getExecutor().execute(new 
SenderThread(req, keyNum, key), "RequestStarter$SenderThread for "+req);
-                               if(logMINOR) Logger.minor(this, "Started 
"+req+" key "+keyNum);
-                               return true;
-                       } catch (OutOfMemoryError e) {
-                               OOMHandler.handleOOM(e);
-                               System.err.println("Will retry above failed 
operation...");
-                               // Possibly out of threads
-                               try {
-                                       Thread.sleep(5000);
-                               } catch (InterruptedException e1) {
-                                       // Ignore
-                               }
-                               synchronized(keysFetching) {
-                                       if(key != null) 
keysFetching.remove(key);
-                               }
-                       } catch (Throwable t) {
-                               if(keyNum != null) {
-                                       // Re-queue
-                                       Logger.error(this, "Caught "+t+" while 
trying to start request");
-                                       req.internalError(keyNum, t, sched);
-                                       return true; // Sort of ... maybe it 
will clear
-                               }
-                               synchronized(keysFetching) {
-                                       if(key != null) 
keysFetching.remove(key);
-                               }
-                       }
-               }
+       private boolean startRequest(ChosenRequest req, boolean logMINOR) {
+               if(sched.fetchingKeys().hasKey(req.key)) return false;
+               core.getExecutor().execute(new SenderThread(req.request, 
req.token, req.key), "RequestStarter$SenderThread for "+req);
+               return true;
        }

        public void run() {
@@ -306,9 +257,7 @@
                                Logger.minor(this, "Finished "+req);
                        } finally {
                                if(!isInsert) {
-                                       synchronized(keysFetching) {
-                                               keysFetching.remove(key);
-                                       }
+                                       sched.removeFetchingKey(key);
                                }
                        }
                }
@@ -321,16 +270,10 @@
                }
        }

-       public boolean hasKey(Key key) {
-               synchronized(keysFetching) {
-                       return keysFetching.contains(key);
-               }
-       }
-
        public boolean exclude(RandomGrabArrayItem item) {
                if(isInsert) return false;
                BaseSendableGet get = (BaseSendableGet) item;
-               if(get.hasValidKeys(this))
+               if(get.hasValidKeys(sched.fetchingKeys()))
                        return false;
                Logger.normal(this, "Excluding (no valid keys): "+get);
                return true;


Reply via email to