Author: toad
Date: 2008-03-21 22:47:42 +0000 (Fri, 21 Mar 2008)
New Revision: 18702

Added:
   trunk/freenet/src/freenet/node/KeysFetchingLocally.java
Modified:
   trunk/freenet/src/freenet/node/RequestStarter.java
Log:
Track keys currently being fetched at the RequestStarter level. Expose an 
interface.

Added: trunk/freenet/src/freenet/node/KeysFetchingLocally.java
===================================================================
--- trunk/freenet/src/freenet/node/KeysFetchingLocally.java                     
        (rev 0)
+++ trunk/freenet/src/freenet/node/KeysFetchingLocally.java     2008-03-21 
22:47:42 UTC (rev 18702)
@@ -0,0 +1,12 @@
+package freenet.node;
+
+import freenet.keys.Key;
+
+public interface KeysFetchingLocally {
+       
+       /**
+        * Is this key currently being fetched locally?
+        */
+       public boolean hasKey(Key key);
+
+}

Modified: trunk/freenet/src/freenet/node/RequestStarter.java
===================================================================
--- trunk/freenet/src/freenet/node/RequestStarter.java  2008-03-21 22:20:37 UTC 
(rev 18701)
+++ trunk/freenet/src/freenet/node/RequestStarter.java  2008-03-21 22:47:42 UTC 
(rev 18702)
@@ -3,6 +3,10 @@
  * http://www.gnu.org/ for further details of the GPL. */
 package freenet.node;

+import java.util.HashSet;
+
+import freenet.keys.ClientKey;
+import freenet.keys.Key;
 import freenet.support.Logger;
 import freenet.support.OOMHandler;
 import freenet.support.TokenBucket;
@@ -14,7 +18,7 @@
  * And you have to provide a RequestStarterClient. We do round robin between 
  * clients on the same priority level.
  */
-public class RequestStarter implements Runnable {
+public class RequestStarter implements Runnable, KeysFetchingLocally {

        /*
         * Priority classes
@@ -68,6 +72,7 @@
                this.averageInputBytesPerRequest = averageInputBytesPerRequest;
                this.isInsert = isInsert;
                this.isSSK = isSSK;
+               if(!isInsert) keysFetching = new HashSet();
        }

        void setScheduler(RequestScheduler sched) {
@@ -165,14 +170,33 @@
                }
        }

+       /**
+        * All Key's we are currently fetching. 
+        * Locally originated requests only, avoids some complications with 
HTL, 
+        * and also has the benefit that we can see stuff that's been scheduled 
on a SenderThread
+        * but that thread hasn't started yet. FIXME: Both issues can be 
avoided: first we'd get 
+        * rid of the SenderThread and start the requests directly and 
asynchronously, secondly
+        * we'd move this to node but only track keys we are fetching at max 
HTL.
+        * LOCKING: Always lock this LAST.
+        */
+       private HashSet keysFetching;
+       
        private boolean startRequest(SendableRequest req, boolean logMINOR) {
                // Create a thread to handle starting the request, and the 
resulting feedback
                Object keyNum = null;
+               Key key = null;
                while(true) {
                        try {
                                keyNum = req.chooseKey();
                                if(keyNum == null) return false;
-                               core.getExecutor().execute(new 
SenderThread(req, keyNum), "RequestStarter$SenderThread for "+req);
+                               if(!isInsert) {
+                                       key = 
((SendableGet)req).getKey(keyNum).getNodeKey();
+                                       if(key == null) return false;
+                                       synchronized(keysFetching) {
+                                               keysFetching.add(key);
+                                       }
+                               }
+                               core.getExecutor().execute(new 
SenderThread(req, keyNum, key), "RequestStarter$SenderThread for "+req);
                                if(logMINOR) Logger.minor(this, "Started 
"+req+" key "+keyNum);
                                return true;
                        } catch (OutOfMemoryError e) {
@@ -184,6 +208,7 @@
                                } catch (InterruptedException e1) {
                                        // Ignore
                                }
+                               if(key != null) keysFetching.remove(key);
                        } catch (Throwable t) {
                                if(keyNum != null) {
                                        // Re-queue
@@ -191,6 +216,7 @@
                                        Logger.error(this, "Caught "+t+" while 
trying to start request");
                                        return true; // Sort of ... maybe it 
will clear
                                }
+                               if(key != null) keysFetching.remove(key);
                        }
                }
        }
@@ -215,18 +241,26 @@

                private final SendableRequest req;
                private final Object keyNum;
+               private final Key key;

-               public SenderThread(SendableRequest req, Object keyNum) {
+               public SenderThread(SendableRequest req, Object keyNum, Key 
key) {
                        this.req = req;
                        this.keyNum = keyNum;
+                       this.key = key;
                }

                public void run() {
+                       try {
                    freenet.support.Logger.OSThread.logPID(this);
                        if(!req.send(core, sched, keyNum))
                                Logger.normal(this, "run() not able to send a 
request");
                        if(Logger.shouldLog(Logger.MINOR, this)) 
                                Logger.minor(this, "Finished "+req);
+                       } finally {
+                               synchronized(keysFetching) {
+                                       keysFetching.remove(key);
+                               }
+                       }
                }

        }
@@ -237,4 +271,10 @@
                }
        }

+       public boolean hasKey(Key key) {
+               synchronized(keysFetching) {
+                       return keysFetching.contains(key);
+               }
+       }
+
 }


Reply via email to