Author: toad
Date: 2008-03-21 22:47:42 +0000 (Fri, 21 Mar 2008)
New Revision: 18702
Added:
trunk/freenet/src/freenet/node/KeysFetchingLocally.java
Modified:
trunk/freenet/src/freenet/node/RequestStarter.java
Log:
Track keys currently being fetched at the RequestStarter level. Expose an
interface.
Added: trunk/freenet/src/freenet/node/KeysFetchingLocally.java
===================================================================
--- trunk/freenet/src/freenet/node/KeysFetchingLocally.java
(rev 0)
+++ trunk/freenet/src/freenet/node/KeysFetchingLocally.java 2008-03-21
22:47:42 UTC (rev 18702)
@@ -0,0 +1,12 @@
+package freenet.node;
+
+import freenet.keys.Key;
+
+public interface KeysFetchingLocally {
+
+ /**
+ * Is this key currently being fetched locally?
+ */
+ public boolean hasKey(Key key);
+
+}
Modified: trunk/freenet/src/freenet/node/RequestStarter.java
===================================================================
--- trunk/freenet/src/freenet/node/RequestStarter.java 2008-03-21 22:20:37 UTC
(rev 18701)
+++ trunk/freenet/src/freenet/node/RequestStarter.java 2008-03-21 22:47:42 UTC
(rev 18702)
@@ -3,6 +3,10 @@
* http://www.gnu.org/ for further details of the GPL. */
package freenet.node;
+import java.util.HashSet;
+
+import freenet.keys.ClientKey;
+import freenet.keys.Key;
import freenet.support.Logger;
import freenet.support.OOMHandler;
import freenet.support.TokenBucket;
@@ -14,7 +18,7 @@
* And you have to provide a RequestStarterClient. We do round robin between
* clients on the same priority level.
*/
-public class RequestStarter implements Runnable {
+public class RequestStarter implements Runnable, KeysFetchingLocally {
/*
* Priority classes
@@ -68,6 +72,7 @@
this.averageInputBytesPerRequest = averageInputBytesPerRequest;
this.isInsert = isInsert;
this.isSSK = isSSK;
+ if(!isInsert) keysFetching = new HashSet();
}
void setScheduler(RequestScheduler sched) {
@@ -165,14 +170,33 @@
}
}
+ /**
+ * All Key's we are currently fetching.
+ * Locally originated requests only, avoids some complications with
HTL,
+ * and also has the benefit that we can see stuff that's been scheduled
on a SenderThread
+ * but that thread hasn't started yet. FIXME: Both issues can be
avoided: first we'd get
+ * rid of the SenderThread and start the requests directly and
asynchronously, secondly
+ * we'd move this to node but only track keys we are fetching at max
HTL.
+ * LOCKING: Always lock this LAST.
+ */
+ private HashSet keysFetching;
+
private boolean startRequest(SendableRequest req, boolean logMINOR) {
// Create a thread to handle starting the request, and the
resulting feedback
Object keyNum = null;
+ Key key = null;
while(true) {
try {
keyNum = req.chooseKey();
if(keyNum == null) return false;
- core.getExecutor().execute(new
SenderThread(req, keyNum), "RequestStarter$SenderThread for "+req);
+ if(!isInsert) {
+ key =
((SendableGet)req).getKey(keyNum).getNodeKey();
+ if(key == null) return false;
+ synchronized(keysFetching) {
+ keysFetching.add(key);
+ }
+ }
+ core.getExecutor().execute(new
SenderThread(req, keyNum, key), "RequestStarter$SenderThread for "+req);
if(logMINOR) Logger.minor(this, "Started
"+req+" key "+keyNum);
return true;
} catch (OutOfMemoryError e) {
@@ -184,6 +208,7 @@
} catch (InterruptedException e1) {
// Ignore
}
+ if(key != null) keysFetching.remove(key);
} catch (Throwable t) {
if(keyNum != null) {
// Re-queue
@@ -191,6 +216,7 @@
Logger.error(this, "Caught "+t+" while
trying to start request");
return true; // Sort of ... maybe it
will clear
}
+ if(key != null) keysFetching.remove(key);
}
}
}
@@ -215,18 +241,26 @@
private final SendableRequest req;
private final Object keyNum;
+ private final Key key;
- public SenderThread(SendableRequest req, Object keyNum) {
+ public SenderThread(SendableRequest req, Object keyNum, Key
key) {
this.req = req;
this.keyNum = keyNum;
+ this.key = key;
}
public void run() {
+ try {
freenet.support.Logger.OSThread.logPID(this);
if(!req.send(core, sched, keyNum))
Logger.normal(this, "run() not able to send a
request");
if(Logger.shouldLog(Logger.MINOR, this))
Logger.minor(this, "Finished "+req);
+ } finally {
+ synchronized(keysFetching) {
+ keysFetching.remove(key);
+ }
+ }
}
}
@@ -237,4 +271,10 @@
}
}
+ public boolean hasKey(Key key) {
+ synchronized(keysFetching) {
+ return keysFetching.contains(key);
+ }
+ }
+
}