Author: toad
Date: 2008-01-16 15:02:20 +0000 (Wed, 16 Jan 2008)
New Revision: 17067
Modified:
trunk/freenet/src/freenet/client/async/SingleBlockInserter.java
trunk/freenet/src/freenet/client/async/SplitFileFetcherSubSegment.java
trunk/freenet/src/freenet/node/RequestStarter.java
trunk/freenet/src/freenet/node/SendableGet.java
trunk/freenet/src/freenet/node/SendableInsert.java
trunk/freenet/src/freenet/node/SendableRequest.java
trunk/freenet/src/freenet/node/SimpleSendableInsert.java
Log:
RequestStarter: Remove the request block number from the request on-thread
before running it off-thread (potentially some time later).
So we won't create large numbers of threads to run the same request when there
is a delay in starting the off-thread requests due to load.
Modified: trunk/freenet/src/freenet/client/async/SingleBlockInserter.java
===================================================================
--- trunk/freenet/src/freenet/client/async/SingleBlockInserter.java
2008-01-16 13:52:28 UTC (rev 17066)
+++ trunk/freenet/src/freenet/client/async/SingleBlockInserter.java
2008-01-16 15:02:20 UTC (rev 17067)
@@ -283,7 +283,8 @@
return finished;
}
- public boolean send(NodeClientCore core, RequestScheduler sched) {
+ public boolean send(NodeClientCore core, RequestScheduler sched, int
keyNum) {
+ // Ignore keyNum, key, since we're only sending one block.
try {
if(logMINOR) Logger.minor(this, "Starting request:
"+this);
ClientKeyBlock b = getBlock();
@@ -339,4 +340,16 @@
return true;
}
+ public synchronized int[] allKeys() {
+ if(finished)
+ return new int[] {};
+ else
+ return new int[] { 0 };
+ }
+
+ public synchronized int chooseKey() {
+ if(finished) return -1;
+ else return 0;
+ }
+
}
Modified: trunk/freenet/src/freenet/client/async/SplitFileFetcherSubSegment.java
===================================================================
--- trunk/freenet/src/freenet/client/async/SplitFileFetcherSubSegment.java
2008-01-16 13:52:28 UTC (rev 17066)
+++ trunk/freenet/src/freenet/client/async/SplitFileFetcherSubSegment.java
2008-01-16 15:02:20 UTC (rev 17067)
@@ -63,6 +63,10 @@
return segment.getBlockKey(token);
}
+ public Object getKeyObject(int token) {
+ return getKey(token);
+ }
+
public synchronized int[] allKeys() {
int[] nums = new int[blockNums.size()];
for(int i=0;i<nums.length;i++)
Modified: trunk/freenet/src/freenet/node/RequestStarter.java
===================================================================
--- trunk/freenet/src/freenet/node/RequestStarter.java 2008-01-16 13:52:28 UTC
(rev 17066)
+++ trunk/freenet/src/freenet/node/RequestStarter.java 2008-01-16 15:02:20 UTC
(rev 17067)
@@ -143,19 +143,24 @@
}
}
if(req == null) continue;
- startRequest(req, logMINOR);
+ if(!startRequest(req, logMINOR)) {
+ Logger.error(this, "No requests to start on
"+req);
+ }
req = null;
cycleTime = sentRequestTime =
System.currentTimeMillis();
}
}
- private void startRequest(SendableRequest req, boolean logMINOR) {
+ private boolean startRequest(SendableRequest req, boolean logMINOR) {
// Create a thread to handle starting the request, and the
resulting feedback
+ int keyNum = -1;
while(true) {
try {
- core.getExecutor().execute(new
SenderThread(req), "RequestStarter$SenderThread for "+req);
+ keyNum = req.chooseKey();
+ if(keyNum == -1) return false;
+ core.getExecutor().execute(new
SenderThread(req, keyNum), "RequestStarter$SenderThread for "+req);
if(logMINOR) Logger.minor(this, "Started "+req);
- break;
+ return true;
} catch (OutOfMemoryError e) {
OOMHandler.handleOOM(e);
System.err.println("Will retry above failed
operation...");
@@ -165,6 +170,13 @@
} catch (InterruptedException e1) {
// Ignore
}
+ } catch (Throwable t) {
+ if(keyNum != -1) {
+ // Re-queue
+ req.internalError(keyNum, t);
+ Logger.error(this, "Caught "+t+" while
trying to start request");
+ return true; // Sort of ... maybe it
will clear
+ }
}
}
}
@@ -185,14 +197,16 @@
private class SenderThread implements Runnable {
private final SendableRequest req;
+ private final int keyNum;
- public SenderThread(SendableRequest req) {
+ public SenderThread(SendableRequest req, int keyNum) {
this.req = req;
+ this.keyNum = keyNum;
}
public void run() {
freenet.support.Logger.OSThread.logPID(this);
- if(!req.send(core, sched))
+ if(!req.send(core, sched, keyNum))
Logger.normal(this, "run() not able to send a
request");
if(Logger.shouldLog(Logger.MINOR, this))
Logger.minor(this, "Finished "+req);
Modified: trunk/freenet/src/freenet/node/SendableGet.java
===================================================================
--- trunk/freenet/src/freenet/node/SendableGet.java 2008-01-16 13:52:28 UTC
(rev 17066)
+++ trunk/freenet/src/freenet/node/SendableGet.java 2008-01-16 15:02:20 UTC
(rev 17067)
@@ -23,13 +23,6 @@
/** Parent BaseClientGetter. Required for schedulers. */
public final ClientRequester parent;
- /** Choose a key to fetch.
- * @return An integer identifying a specific key. -1 indicates no keys
available. */
- public abstract int chooseKey();
-
- /** All key identifiers */
- public abstract int[] allKeys();
-
/** Get a numbered key to fetch. */
public abstract ClientKey getKey(int token);
@@ -57,10 +50,10 @@
/** Do the request, blocking. Called by RequestStarter.
* @return True if a request was executed. False if caller should try
to find another request, and remove
* this one from the queue. */
- public boolean send(NodeClientCore core, RequestScheduler sched) {
+ public boolean send(NodeClientCore core, RequestScheduler sched, int
keyNum) {
+ ClientKey key = getKey(keyNum);
FetchContext ctx = getContext();
boolean logMINOR = Logger.shouldLog(Logger.MINOR, this);
- while(true) {
synchronized (this) {
if(isCancelled()) {
if(logMINOR) Logger.minor(this,
"Cancelled: "+this);
@@ -69,19 +62,8 @@
}
}
ClientKeyBlock block;
- int keyNum = -1;
try {
- keyNum = chooseKey();
- if(keyNum == -1) {
- if(logMINOR) Logger.minor(this, "No
more keys: "+this);
- return false;
- }
try {
- ClientKey key = getKey(keyNum);
- if(key == null) {
- if(logMINOR) Logger.minor(this,
"No key "+keyNum+": "+this);
- continue;
- }
block = core.realGetKey(key,
ctx.localRequestOnly, ctx.cacheLocalRequests, ctx.ignoreStore);
} catch (LowLevelGetException e) {
onFailure(e, keyNum);
@@ -99,7 +81,6 @@
return true;
}
return true;
- }
}
public void schedule() {
@@ -126,4 +107,8 @@
getScheduler().removePendingKey(this, false, key);
}
+ public void internalError(int keyNum, Throwable t) {
+ onFailure(new
LowLevelGetException(LowLevelGetException.INTERNAL_ERROR, t.getMessage(), t),
keyNum);
+ }
+
}
Modified: trunk/freenet/src/freenet/node/SendableInsert.java
===================================================================
--- trunk/freenet/src/freenet/node/SendableInsert.java 2008-01-16 13:52:28 UTC
(rev 17066)
+++ trunk/freenet/src/freenet/node/SendableInsert.java 2008-01-16 15:02:20 UTC
(rev 17067)
@@ -17,4 +17,8 @@
/** Called when we don't! */
public abstract void onFailure(LowLevelPutException e);
+ public void internalError(int keyNum, Throwable t) {
+ onFailure(new
LowLevelPutException(LowLevelPutException.INTERNAL_ERROR, t.getMessage(), t));
+ }
+
}
Modified: trunk/freenet/src/freenet/node/SendableRequest.java
===================================================================
--- trunk/freenet/src/freenet/node/SendableRequest.java 2008-01-16 13:52:28 UTC
(rev 17066)
+++ trunk/freenet/src/freenet/node/SendableRequest.java 2008-01-16 15:02:20 UTC
(rev 17067)
@@ -17,12 +17,24 @@
public abstract int getRetryCount();
+ /** Choose a key to fetch. Removes the block number from any internal
queues
+ * (but not the key itself, implementors must have a separate queue of
block
+ * numbers and mapping of block numbers to keys).
+ * @return An integer identifying a specific key. -1 indicates no keys
available. */
+ public abstract int chooseKey();
+
+ /** All key identifiers */
+ public abstract int[] allKeys();
+
/** ONLY called by RequestStarter. Start the actual request using the
NodeClientCore
- * provided. The request has been removed from the structure already,
if canRemove().
- * @param sched The scheduler this request has just been removed from.
+ * provided, and the key and key number earlier got from chooseKey().
+ * The request itself may have been removed from the overall queue
already.
+ * @param sched The scheduler this request has just been grabbed from.
+ * @param keyNum The key number that was fed into getKeyObject().
+ * @param key The key returned from grabKey().
* @return True if a request was sent, false otherwise (in which case
the request will
* be removed if it hasn't already been). */
- public abstract boolean send(NodeClientCore node, RequestScheduler
sched);
+ public abstract boolean send(NodeClientCore node, RequestScheduler
sched, int keyNum);
/** Get client context object */
public abstract Object getClient();
@@ -47,4 +59,7 @@
if(arr != null) arr.remove(this);
}
+ /** Requeue after an internal error */
+ public abstract void internalError(int keyNum, Throwable t);
+
}
Modified: trunk/freenet/src/freenet/node/SimpleSendableInsert.java
===================================================================
--- trunk/freenet/src/freenet/node/SimpleSendableInsert.java 2008-01-16
13:52:28 UTC (rev 17066)
+++ trunk/freenet/src/freenet/node/SimpleSendableInsert.java 2008-01-16
15:02:20 UTC (rev 17067)
@@ -62,7 +62,8 @@
return 0;
}
- public boolean send(NodeClientCore core, RequestScheduler sched) {
+ public boolean send(NodeClientCore core, RequestScheduler sched, int
keyNum) {
+ // Ignore keyNum, key, since this is a single block
boolean logMINOR = Logger.shouldLog(Logger.MINOR, this);
try {
if(logMINOR) Logger.minor(this, "Starting request:
"+this);
@@ -112,4 +113,14 @@
// This is only used as-is by the random reinsert from a
request code. Subclasses should override!
return false;
}
+
+ public synchronized int[] allKeys() {
+ if(finished) return new int[] {};
+ return new int[] { 0 };
+ }
+
+ public synchronized int chooseKey() {
+ if(finished) return -1;
+ else return 0;
+ }
}