Author: toad
Date: 2008-02-16 18:41:06 +0000 (Sat, 16 Feb 2008)
New Revision: 18007

Added:
   trunk/freenet/src/freenet/node/PrioRunnable.java
Modified:
   trunk/freenet/src/freenet/io/comm/UdpSocketHandler.java
   trunk/freenet/src/freenet/node/AnnounceSender.java
   trunk/freenet/src/freenet/node/CHKInsertHandler.java
   trunk/freenet/src/freenet/node/FNPPacketMangler.java
   trunk/freenet/src/freenet/node/NodeDispatcher.java
   trunk/freenet/src/freenet/node/PacketSender.java
   trunk/freenet/src/freenet/node/RequestHandler.java
   trunk/freenet/src/freenet/node/ResettingHTLProbeRequestSender.java
   trunk/freenet/src/freenet/node/SSKInsertHandler.java
   trunk/freenet/src/freenet/support/Executor.java
   trunk/freenet/src/freenet/support/PooledExecutor.java
Log:
Make thread priorities work for delayed jobs. Change API for both ticker and 
executor: we pass in the priority by passing in a PrioRunnable, not by passing 
in an extra integer. If it's not a PrioRunnable it runs at NORMAL.

Modified: trunk/freenet/src/freenet/io/comm/UdpSocketHandler.java
===================================================================
--- trunk/freenet/src/freenet/io/comm/UdpSocketHandler.java     2008-02-16 
18:27:40 UTC (rev 18006)
+++ trunk/freenet/src/freenet/io/comm/UdpSocketHandler.java     2008-02-16 
18:41:06 UTC (rev 18007)
@@ -16,6 +16,7 @@
 import freenet.node.LoggingConfigHandler;
 import freenet.node.Node;
 import freenet.node.NodeInitException;
+import freenet.node.PrioRunnable;
 import freenet.support.FileLoggerHook;
 import freenet.support.Logger;
 import freenet.support.OOMHandler;
@@ -301,11 +302,11 @@
                }
                super.start();
                if(!disableHangChecker) {
-                       node.executor.execute(new USMChecker(), 
"UdpSockerHandler watchdog", NativeThread.MAX_PRIORITY, false);
+                       node.executor.execute(new USMChecker(), 
"UdpSockerHandler watchdog", false);
                }
        }

-       public class USMChecker implements Runnable {
+       public class USMChecker implements PrioRunnable {
                public void run() {
                    freenet.support.Logger.OSThread.logPID(this);
                        while(true) {
@@ -364,6 +365,10 @@
                                }
                        }
                }
+
+               public int getPriority() {
+                       return NativeThread.MAX_PRIORITY;
+               }
        }

     public void close(boolean exit) {

Modified: trunk/freenet/src/freenet/node/AnnounceSender.java
===================================================================
--- trunk/freenet/src/freenet/node/AnnounceSender.java  2008-02-16 18:27:40 UTC 
(rev 18006)
+++ trunk/freenet/src/freenet/node/AnnounceSender.java  2008-02-16 18:41:06 UTC 
(rev 18007)
@@ -15,8 +15,9 @@
 import freenet.io.comm.ReferenceSignatureVerificationException;
 import freenet.support.Logger;
 import freenet.support.SimpleFieldSet;
+import freenet.support.io.NativeThread;

-public class AnnounceSender implements Runnable, ByteCounter {
+public class AnnounceSender implements PrioRunnable, ByteCounter {

     // Constants
     static final int ACCEPTED_TIMEOUT = 5000;
@@ -528,5 +529,9 @@
        public void sentPayload(int x) {
                // Doesn't count.
        }
+
+       public int getPriority() {
+               return NativeThread.HIGH_PRIORITY;
+       }

 }

Modified: trunk/freenet/src/freenet/node/CHKInsertHandler.java
===================================================================
--- trunk/freenet/src/freenet/node/CHKInsertHandler.java        2008-02-16 
18:27:40 UTC (rev 18006)
+++ trunk/freenet/src/freenet/node/CHKInsertHandler.java        2008-02-16 
18:41:06 UTC (rev 18007)
@@ -20,6 +20,7 @@
 import freenet.support.Logger;
 import freenet.support.OOMHandler;
 import freenet.support.ShortBuffer;
+import freenet.support.io.NativeThread;

 /**
  * @author amphibian
@@ -27,7 +28,7 @@
  * Handle an incoming insert request.
  * This corresponds to RequestHandler.
  */
-public class CHKInsertHandler implements Runnable, ByteCounter {
+public class CHKInsertHandler implements PrioRunnable, ByteCounter {


     static final int DATA_INSERT_TIMEOUT = 10000;
@@ -479,4 +480,8 @@
        public void sentPayload(int x) {
                node.sentPayload(x);
        }
+
+       public int getPriority() {
+               return NativeThread.HIGH_PRIORITY;
+       }
 }

Modified: trunk/freenet/src/freenet/node/FNPPacketMangler.java
===================================================================
--- trunk/freenet/src/freenet/node/FNPPacketMangler.java        2008-02-16 
18:27:40 UTC (rev 18006)
+++ trunk/freenet/src/freenet/node/FNPPacketMangler.java        2008-02-16 
18:41:06 UTC (rev 18007)
@@ -2579,11 +2579,14 @@

        private final void _fillJFKDHFIFOOffThread() {
                // do it off-thread
-               node.executor.execute(new Runnable() {
+               node.executor.execute(new PrioRunnable() {
                        public void run() {
                                _fillJFKDHFIFO();
                        }
-               }, "DiffieHellman exponential signing", 
NativeThread.HIGH_PRIORITY);
+                       public int getPriority() {
+                               return NativeThread.HIGH_PRIORITY;
+                       }
+               }, "DiffieHellman exponential signing");
        }

        private void _fillJFKDHFIFO() {

Modified: trunk/freenet/src/freenet/node/NodeDispatcher.java
===================================================================
--- trunk/freenet/src/freenet/node/NodeDispatcher.java  2008-02-16 18:27:40 UTC 
(rev 18006)
+++ trunk/freenet/src/freenet/node/NodeDispatcher.java  2008-02-16 18:41:06 UTC 
(rev 18007)
@@ -324,7 +324,7 @@
                }
                //if(!node.lockUID(id)) return false;
                RequestHandler rh = new RequestHandler(m, source, id, node, 
htl, key);
-               node.executor.execute(rh, "RequestHandler for UID "+id+" on 
"+node.getDarknetPortNumber(), NativeThread.HIGH_PRIORITY);
+               node.executor.execute(rh, "RequestHandler for UID "+id+" on 
"+node.getDarknetPortNumber());
                return true;
        }

@@ -365,10 +365,10 @@
                long now = System.currentTimeMillis();
                if(m.getSpec().equals(DMT.FNPSSKInsertRequest)) {
                        SSKInsertHandler rh = new SSKInsertHandler(m, source, 
id, node, now);
-                       node.executor.execute(rh, "SSKInsertHandler for "+id+" 
on "+node.getDarknetPortNumber(), NativeThread.HIGH_PRIORITY);
+                       node.executor.execute(rh, "SSKInsertHandler for "+id+" 
on "+node.getDarknetPortNumber());
                } else {
                        CHKInsertHandler rh = new CHKInsertHandler(m, source, 
id, node, now);
-                       node.executor.execute(rh, "CHKInsertHandler for "+id+" 
on "+node.getDarknetPortNumber(), NativeThread.HIGH_PRIORITY);
+                       node.executor.execute(rh, "CHKInsertHandler for "+id+" 
on "+node.getDarknetPortNumber());
                }
                if(logMINOR) Logger.minor(this, "Started InsertHandler for 
"+id);
                return true;
@@ -449,7 +449,7 @@
                                return true;
                        }
                        AnnounceSender sender = new AnnounceSender(m, uid, 
source, om, node);
-                       node.executor.execute(sender, "Announcement sender for 
"+uid, NativeThread.HIGH_PRIORITY);
+                       node.executor.execute(sender, "Announcement sender for 
"+uid);
                        success = true;
                        return true;
                } finally {

Modified: trunk/freenet/src/freenet/node/PacketSender.java
===================================================================
--- trunk/freenet/src/freenet/node/PacketSender.java    2008-02-16 18:27:40 UTC 
(rev 18006)
+++ trunk/freenet/src/freenet/node/PacketSender.java    2008-02-16 18:41:06 UTC 
(rev 18007)
@@ -418,7 +418,7 @@
                                        }
                                else
                                        try {
-                                               node.executor.execute(r, 
"Scheduled job: " + r, NativeThread.NORM_PRIORITY, true);
+                                               node.executor.execute(r, 
"Scheduled job: " + r, true);
                                        } catch(OutOfMemoryError e) {
                                                OOMHandler.handleOOM(e);
                                                System.err.println("Will retry 
above failed operation...");
@@ -469,6 +469,8 @@
                queueTimedJob(job, offset, false);
        }

+       
+       
        public void queueTimedJob(Runnable job, long offset, boolean 
runOnTickerAnyway) {
                // Run directly *if* that won't cause any priority problems.
                if(offset <= 0 && !runOnTickerAnyway) {

Added: trunk/freenet/src/freenet/node/PrioRunnable.java
===================================================================
--- trunk/freenet/src/freenet/node/PrioRunnable.java                            
(rev 0)
+++ trunk/freenet/src/freenet/node/PrioRunnable.java    2008-02-16 18:41:06 UTC 
(rev 18007)
@@ -0,0 +1,14 @@
+/* This code is part of Freenet. It is distributed under the GNU General
+ * Public License, version 2 (or at your option any later version). See
+ * http://www.gnu.org/ for further details of the GPL. */
+package freenet.node;
+
+/**
+ * A Runnable which specifies a priority.
+ * @author toad
+ */
+public interface PrioRunnable extends Runnable {
+
+       public int getPriority();
+       
+}

Modified: trunk/freenet/src/freenet/node/RequestHandler.java
===================================================================
--- trunk/freenet/src/freenet/node/RequestHandler.java  2008-02-16 18:27:40 UTC 
(rev 18006)
+++ trunk/freenet/src/freenet/node/RequestHandler.java  2008-02-16 18:41:06 UTC 
(rev 18007)
@@ -22,13 +22,14 @@
 import freenet.support.Logger;
 import freenet.support.SimpleFieldSet;
 import freenet.support.TimeUtil;
+import freenet.support.io.NativeThread;

 /**
  * Handle an incoming request. Does not do the actual fetching; that
  * is separated off into RequestSender so we get transfer coalescing
  * and both ends for free. 
  */
-public class RequestHandler implements Runnable, ByteCounter, 
RequestSender.Listener {
+public class RequestHandler implements PrioRunnable, ByteCounter, 
RequestSender.Listener {

        private static boolean logMINOR;
     final Message req;
@@ -599,5 +600,9 @@
        public void sentPayload(int x) {
                node.sentPayload(x);
        }
+
+       public int getPriority() {
+               return NativeThread.HIGH_PRIORITY;
+       }

 }

Modified: trunk/freenet/src/freenet/node/ResettingHTLProbeRequestSender.java
===================================================================
--- trunk/freenet/src/freenet/node/ResettingHTLProbeRequestSender.java  
2008-02-16 18:27:40 UTC (rev 18006)
+++ trunk/freenet/src/freenet/node/ResettingHTLProbeRequestSender.java  
2008-02-16 18:41:06 UTC (rev 18007)
@@ -23,7 +23,7 @@
  * ian.
  * @author toad
  */
-public class ResettingHTLProbeRequestSender implements Runnable, ByteCounter {
+public class ResettingHTLProbeRequestSender implements PrioRunnable, 
ByteCounter {

     // Constants
     static final int ACCEPTED_TIMEOUT = 5000;
@@ -77,7 +77,7 @@
     }

     public void start() {
-       node.executor.execute(this, "ResettingHTLProbeRequestSender for UID 
"+uid, NativeThread.HIGH_PRIORITY);
+       node.executor.execute(this, "ResettingHTLProbeRequestSender for UID 
"+uid);
     }

     public void run() {
@@ -536,4 +536,8 @@
                }
        }

+       public int getPriority() {
+               return NativeThread.HIGH_PRIORITY;
+       }
+
 }

Modified: trunk/freenet/src/freenet/node/SSKInsertHandler.java
===================================================================
--- trunk/freenet/src/freenet/node/SSKInsertHandler.java        2008-02-16 
18:27:40 UTC (rev 18006)
+++ trunk/freenet/src/freenet/node/SSKInsertHandler.java        2008-02-16 
18:41:06 UTC (rev 18007)
@@ -18,12 +18,13 @@
 import freenet.support.Logger;
 import freenet.support.OOMHandler;
 import freenet.support.ShortBuffer;
+import freenet.support.io.NativeThread;

 /**
  * Handles an incoming SSK insert.
  * SSKs need their own insert/request classes, see comments in SSKInsertSender.
  */
-public class SSKInsertHandler implements Runnable, ByteCounter {
+public class SSKInsertHandler implements PrioRunnable, ByteCounter {

        private static boolean logMINOR;

@@ -354,5 +355,9 @@
        public void sentPayload(int x) {
                node.sentPayload(x);
        }
+
+       public int getPriority() {
+               return NativeThread.HIGH_PRIORITY;
+       }

 }

Modified: trunk/freenet/src/freenet/support/Executor.java
===================================================================
--- trunk/freenet/src/freenet/support/Executor.java     2008-02-16 18:27:40 UTC 
(rev 18006)
+++ trunk/freenet/src/freenet/support/Executor.java     2008-02-16 18:41:06 UTC 
(rev 18007)
@@ -10,8 +10,7 @@

        /** Execute a job. */
        public void execute(Runnable job, String jobName);
-       public void execute(Runnable job, String jobName, int priority);
-       public void execute(Runnable job, String jobName, int priority, boolean 
fromTicker);
+       public void execute(Runnable job, String jobName, boolean fromTicker);

        /** Count the number of threads waiting for work */
        public int[] waitingThreads();

Modified: trunk/freenet/src/freenet/support/PooledExecutor.java
===================================================================
--- trunk/freenet/src/freenet/support/PooledExecutor.java       2008-02-16 
18:27:40 UTC (rev 18006)
+++ trunk/freenet/src/freenet/support/PooledExecutor.java       2008-02-16 
18:41:06 UTC (rev 18007)
@@ -3,6 +3,7 @@
  * http://www.gnu.org/ for further details of the GPL. */
 package freenet.support;

+import freenet.node.PrioRunnable;
 import freenet.node.Ticker;
 import freenet.support.io.NativeThread;
 import java.util.ArrayList;
@@ -43,14 +44,15 @@
        }

        public void execute(Runnable job, String jobName) {
-               execute(job, jobName, NativeThread.NORM_PRIORITY);
+               execute(job, jobName, false);
        }

-       public void execute(Runnable job, String jobName, int prio) {
-               execute(job, jobName, prio, false);
-       }
-       
-       public void execute(Runnable job, String jobName, int prio, boolean 
fromTicker) {
+       public void execute(Runnable job, String jobName, boolean fromTicker) {
+               int prio = NativeThread.NORM_PRIORITY;
+               if(job instanceof PrioRunnable) {
+                       prio = ((PrioRunnable)job).getPriority();
+               }
+               
                if(logMINOR) Logger.minor(this, "Executing "+job+" as 
"+jobName+" at prio "+prio);
                if(prio < NativeThread.MIN_PRIORITY || prio > 
NativeThread.MAX_PRIORITY)
                        throw new IllegalArgumentException("Unreconized 
priority level : "+prio+'!');


Reply via email to