Author: toad
Date: 2008-02-16 18:41:06 +0000 (Sat, 16 Feb 2008)
New Revision: 18007
Added:
trunk/freenet/src/freenet/node/PrioRunnable.java
Modified:
trunk/freenet/src/freenet/io/comm/UdpSocketHandler.java
trunk/freenet/src/freenet/node/AnnounceSender.java
trunk/freenet/src/freenet/node/CHKInsertHandler.java
trunk/freenet/src/freenet/node/FNPPacketMangler.java
trunk/freenet/src/freenet/node/NodeDispatcher.java
trunk/freenet/src/freenet/node/PacketSender.java
trunk/freenet/src/freenet/node/RequestHandler.java
trunk/freenet/src/freenet/node/ResettingHTLProbeRequestSender.java
trunk/freenet/src/freenet/node/SSKInsertHandler.java
trunk/freenet/src/freenet/support/Executor.java
trunk/freenet/src/freenet/support/PooledExecutor.java
Log:
Make thread priorities work for delayed jobs. Change API for both ticker and
executor: we pass in the priority by passing in a PrioRunnable, not by passing
in an extra integer. If it's not a PrioRunnable it runs at NORMAL.
Modified: trunk/freenet/src/freenet/io/comm/UdpSocketHandler.java
===================================================================
--- trunk/freenet/src/freenet/io/comm/UdpSocketHandler.java 2008-02-16
18:27:40 UTC (rev 18006)
+++ trunk/freenet/src/freenet/io/comm/UdpSocketHandler.java 2008-02-16
18:41:06 UTC (rev 18007)
@@ -16,6 +16,7 @@
import freenet.node.LoggingConfigHandler;
import freenet.node.Node;
import freenet.node.NodeInitException;
+import freenet.node.PrioRunnable;
import freenet.support.FileLoggerHook;
import freenet.support.Logger;
import freenet.support.OOMHandler;
@@ -301,11 +302,11 @@
}
super.start();
if(!disableHangChecker) {
- node.executor.execute(new USMChecker(),
"UdpSockerHandler watchdog", NativeThread.MAX_PRIORITY, false);
+ node.executor.execute(new USMChecker(),
"UdpSockerHandler watchdog", false);
}
}
- public class USMChecker implements Runnable {
+ public class USMChecker implements PrioRunnable {
public void run() {
freenet.support.Logger.OSThread.logPID(this);
while(true) {
@@ -364,6 +365,10 @@
}
}
}
+
+ public int getPriority() {
+ return NativeThread.MAX_PRIORITY;
+ }
}
public void close(boolean exit) {
Modified: trunk/freenet/src/freenet/node/AnnounceSender.java
===================================================================
--- trunk/freenet/src/freenet/node/AnnounceSender.java 2008-02-16 18:27:40 UTC
(rev 18006)
+++ trunk/freenet/src/freenet/node/AnnounceSender.java 2008-02-16 18:41:06 UTC
(rev 18007)
@@ -15,8 +15,9 @@
import freenet.io.comm.ReferenceSignatureVerificationException;
import freenet.support.Logger;
import freenet.support.SimpleFieldSet;
+import freenet.support.io.NativeThread;
-public class AnnounceSender implements Runnable, ByteCounter {
+public class AnnounceSender implements PrioRunnable, ByteCounter {
// Constants
static final int ACCEPTED_TIMEOUT = 5000;
@@ -528,5 +529,9 @@
public void sentPayload(int x) {
// Doesn't count.
}
+
+ public int getPriority() {
+ return NativeThread.HIGH_PRIORITY;
+ }
}
Modified: trunk/freenet/src/freenet/node/CHKInsertHandler.java
===================================================================
--- trunk/freenet/src/freenet/node/CHKInsertHandler.java 2008-02-16
18:27:40 UTC (rev 18006)
+++ trunk/freenet/src/freenet/node/CHKInsertHandler.java 2008-02-16
18:41:06 UTC (rev 18007)
@@ -20,6 +20,7 @@
import freenet.support.Logger;
import freenet.support.OOMHandler;
import freenet.support.ShortBuffer;
+import freenet.support.io.NativeThread;
/**
* @author amphibian
@@ -27,7 +28,7 @@
* Handle an incoming insert request.
* This corresponds to RequestHandler.
*/
-public class CHKInsertHandler implements Runnable, ByteCounter {
+public class CHKInsertHandler implements PrioRunnable, ByteCounter {
static final int DATA_INSERT_TIMEOUT = 10000;
@@ -479,4 +480,8 @@
public void sentPayload(int x) {
node.sentPayload(x);
}
+
+ public int getPriority() {
+ return NativeThread.HIGH_PRIORITY;
+ }
}
Modified: trunk/freenet/src/freenet/node/FNPPacketMangler.java
===================================================================
--- trunk/freenet/src/freenet/node/FNPPacketMangler.java 2008-02-16
18:27:40 UTC (rev 18006)
+++ trunk/freenet/src/freenet/node/FNPPacketMangler.java 2008-02-16
18:41:06 UTC (rev 18007)
@@ -2579,11 +2579,14 @@
private final void _fillJFKDHFIFOOffThread() {
// do it off-thread
- node.executor.execute(new Runnable() {
+ node.executor.execute(new PrioRunnable() {
public void run() {
_fillJFKDHFIFO();
}
- }, "DiffieHellman exponential signing",
NativeThread.HIGH_PRIORITY);
+ public int getPriority() {
+ return NativeThread.HIGH_PRIORITY;
+ }
+ }, "DiffieHellman exponential signing");
}
private void _fillJFKDHFIFO() {
Modified: trunk/freenet/src/freenet/node/NodeDispatcher.java
===================================================================
--- trunk/freenet/src/freenet/node/NodeDispatcher.java 2008-02-16 18:27:40 UTC
(rev 18006)
+++ trunk/freenet/src/freenet/node/NodeDispatcher.java 2008-02-16 18:41:06 UTC
(rev 18007)
@@ -324,7 +324,7 @@
}
//if(!node.lockUID(id)) return false;
RequestHandler rh = new RequestHandler(m, source, id, node,
htl, key);
- node.executor.execute(rh, "RequestHandler for UID "+id+" on
"+node.getDarknetPortNumber(), NativeThread.HIGH_PRIORITY);
+ node.executor.execute(rh, "RequestHandler for UID "+id+" on
"+node.getDarknetPortNumber());
return true;
}
@@ -365,10 +365,10 @@
long now = System.currentTimeMillis();
if(m.getSpec().equals(DMT.FNPSSKInsertRequest)) {
SSKInsertHandler rh = new SSKInsertHandler(m, source,
id, node, now);
- node.executor.execute(rh, "SSKInsertHandler for "+id+"
on "+node.getDarknetPortNumber(), NativeThread.HIGH_PRIORITY);
+ node.executor.execute(rh, "SSKInsertHandler for "+id+"
on "+node.getDarknetPortNumber());
} else {
CHKInsertHandler rh = new CHKInsertHandler(m, source,
id, node, now);
- node.executor.execute(rh, "CHKInsertHandler for "+id+"
on "+node.getDarknetPortNumber(), NativeThread.HIGH_PRIORITY);
+ node.executor.execute(rh, "CHKInsertHandler for "+id+"
on "+node.getDarknetPortNumber());
}
if(logMINOR) Logger.minor(this, "Started InsertHandler for
"+id);
return true;
@@ -449,7 +449,7 @@
return true;
}
AnnounceSender sender = new AnnounceSender(m, uid,
source, om, node);
- node.executor.execute(sender, "Announcement sender for
"+uid, NativeThread.HIGH_PRIORITY);
+ node.executor.execute(sender, "Announcement sender for
"+uid);
success = true;
return true;
} finally {
Modified: trunk/freenet/src/freenet/node/PacketSender.java
===================================================================
--- trunk/freenet/src/freenet/node/PacketSender.java 2008-02-16 18:27:40 UTC
(rev 18006)
+++ trunk/freenet/src/freenet/node/PacketSender.java 2008-02-16 18:41:06 UTC
(rev 18007)
@@ -418,7 +418,7 @@
}
else
try {
- node.executor.execute(r,
"Scheduled job: " + r, NativeThread.NORM_PRIORITY, true);
+ node.executor.execute(r,
"Scheduled job: " + r, true);
} catch(OutOfMemoryError e) {
OOMHandler.handleOOM(e);
System.err.println("Will retry
above failed operation...");
@@ -469,6 +469,8 @@
queueTimedJob(job, offset, false);
}
+
+
public void queueTimedJob(Runnable job, long offset, boolean
runOnTickerAnyway) {
// Run directly *if* that won't cause any priority problems.
if(offset <= 0 && !runOnTickerAnyway) {
Added: trunk/freenet/src/freenet/node/PrioRunnable.java
===================================================================
--- trunk/freenet/src/freenet/node/PrioRunnable.java
(rev 0)
+++ trunk/freenet/src/freenet/node/PrioRunnable.java 2008-02-16 18:41:06 UTC
(rev 18007)
@@ -0,0 +1,14 @@
+/* This code is part of Freenet. It is distributed under the GNU General
+ * Public License, version 2 (or at your option any later version). See
+ * http://www.gnu.org/ for further details of the GPL. */
+package freenet.node;
+
+/**
+ * A Runnable which specifies a priority.
+ * @author toad
+ */
+public interface PrioRunnable extends Runnable {
+
+ public int getPriority();
+
+}
Modified: trunk/freenet/src/freenet/node/RequestHandler.java
===================================================================
--- trunk/freenet/src/freenet/node/RequestHandler.java 2008-02-16 18:27:40 UTC
(rev 18006)
+++ trunk/freenet/src/freenet/node/RequestHandler.java 2008-02-16 18:41:06 UTC
(rev 18007)
@@ -22,13 +22,14 @@
import freenet.support.Logger;
import freenet.support.SimpleFieldSet;
import freenet.support.TimeUtil;
+import freenet.support.io.NativeThread;
/**
* Handle an incoming request. Does not do the actual fetching; that
* is separated off into RequestSender so we get transfer coalescing
* and both ends for free.
*/
-public class RequestHandler implements Runnable, ByteCounter,
RequestSender.Listener {
+public class RequestHandler implements PrioRunnable, ByteCounter,
RequestSender.Listener {
private static boolean logMINOR;
final Message req;
@@ -599,5 +600,9 @@
public void sentPayload(int x) {
node.sentPayload(x);
}
+
+ public int getPriority() {
+ return NativeThread.HIGH_PRIORITY;
+ }
}
Modified: trunk/freenet/src/freenet/node/ResettingHTLProbeRequestSender.java
===================================================================
--- trunk/freenet/src/freenet/node/ResettingHTLProbeRequestSender.java
2008-02-16 18:27:40 UTC (rev 18006)
+++ trunk/freenet/src/freenet/node/ResettingHTLProbeRequestSender.java
2008-02-16 18:41:06 UTC (rev 18007)
@@ -23,7 +23,7 @@
* ian.
* @author toad
*/
-public class ResettingHTLProbeRequestSender implements Runnable, ByteCounter {
+public class ResettingHTLProbeRequestSender implements PrioRunnable,
ByteCounter {
// Constants
static final int ACCEPTED_TIMEOUT = 5000;
@@ -77,7 +77,7 @@
}
public void start() {
- node.executor.execute(this, "ResettingHTLProbeRequestSender for UID
"+uid, NativeThread.HIGH_PRIORITY);
+ node.executor.execute(this, "ResettingHTLProbeRequestSender for UID
"+uid);
}
public void run() {
@@ -536,4 +536,8 @@
}
}
+ public int getPriority() {
+ return NativeThread.HIGH_PRIORITY;
+ }
+
}
Modified: trunk/freenet/src/freenet/node/SSKInsertHandler.java
===================================================================
--- trunk/freenet/src/freenet/node/SSKInsertHandler.java 2008-02-16
18:27:40 UTC (rev 18006)
+++ trunk/freenet/src/freenet/node/SSKInsertHandler.java 2008-02-16
18:41:06 UTC (rev 18007)
@@ -18,12 +18,13 @@
import freenet.support.Logger;
import freenet.support.OOMHandler;
import freenet.support.ShortBuffer;
+import freenet.support.io.NativeThread;
/**
* Handles an incoming SSK insert.
* SSKs need their own insert/request classes, see comments in SSKInsertSender.
*/
-public class SSKInsertHandler implements Runnable, ByteCounter {
+public class SSKInsertHandler implements PrioRunnable, ByteCounter {
private static boolean logMINOR;
@@ -354,5 +355,9 @@
public void sentPayload(int x) {
node.sentPayload(x);
}
+
+ public int getPriority() {
+ return NativeThread.HIGH_PRIORITY;
+ }
}
Modified: trunk/freenet/src/freenet/support/Executor.java
===================================================================
--- trunk/freenet/src/freenet/support/Executor.java 2008-02-16 18:27:40 UTC
(rev 18006)
+++ trunk/freenet/src/freenet/support/Executor.java 2008-02-16 18:41:06 UTC
(rev 18007)
@@ -10,8 +10,7 @@
/** Execute a job. */
public void execute(Runnable job, String jobName);
- public void execute(Runnable job, String jobName, int priority);
- public void execute(Runnable job, String jobName, int priority, boolean
fromTicker);
+ public void execute(Runnable job, String jobName, boolean fromTicker);
/** Count the number of threads waiting for work */
public int[] waitingThreads();
Modified: trunk/freenet/src/freenet/support/PooledExecutor.java
===================================================================
--- trunk/freenet/src/freenet/support/PooledExecutor.java 2008-02-16
18:27:40 UTC (rev 18006)
+++ trunk/freenet/src/freenet/support/PooledExecutor.java 2008-02-16
18:41:06 UTC (rev 18007)
@@ -3,6 +3,7 @@
* http://www.gnu.org/ for further details of the GPL. */
package freenet.support;
+import freenet.node.PrioRunnable;
import freenet.node.Ticker;
import freenet.support.io.NativeThread;
import java.util.ArrayList;
@@ -43,14 +44,15 @@
}
public void execute(Runnable job, String jobName) {
- execute(job, jobName, NativeThread.NORM_PRIORITY);
+ execute(job, jobName, false);
}
- public void execute(Runnable job, String jobName, int prio) {
- execute(job, jobName, prio, false);
- }
-
- public void execute(Runnable job, String jobName, int prio, boolean
fromTicker) {
+ public void execute(Runnable job, String jobName, boolean fromTicker) {
+ int prio = NativeThread.NORM_PRIORITY;
+ if(job instanceof PrioRunnable) {
+ prio = ((PrioRunnable)job).getPriority();
+ }
+
if(logMINOR) Logger.minor(this, "Executing "+job+" as
"+jobName+" at prio "+prio);
if(prio < NativeThread.MIN_PRIORITY || prio >
NativeThread.MAX_PRIORITY)
throw new IllegalArgumentException("Unreconized
priority level : "+prio+'!');