Author: toad
Date: 2008-05-28 22:25:52 +0000 (Wed, 28 May 2008)
New Revision: 20118
Added:
branches/db4o/freenet/src/freenet/support/PrioritizedSerialExecutor.java
Log:
PrioritizedSerialExecutor. Runs jobs in series, on a single thread, but the
queue is prioritized so that important jobs run first.
Added: branches/db4o/freenet/src/freenet/support/PrioritizedSerialExecutor.java
===================================================================
--- branches/db4o/freenet/src/freenet/support/PrioritizedSerialExecutor.java
(rev 0)
+++ branches/db4o/freenet/src/freenet/support/PrioritizedSerialExecutor.java
2008-05-28 22:25:52 UTC (rev 20118)
@@ -0,0 +1,139 @@
+package freenet.support;
+
+import java.util.LinkedList;
+
+import freenet.node.PrioRunnable;
+import freenet.support.io.NativeThread;
+
+public class PrioritizedSerialExecutor implements Executor {
+
+ private final LinkedList[] jobs;
+ private final int priority;
+ private final int defaultPriority;
+ private boolean waiting;
+
+ private String name;
+ private Executor realExecutor;
+ private boolean running;
+
+ private static final int NEWJOB_TIMEOUT = 5*60*1000;
+
+ private final Runnable runner = new PrioRunnable() {
+
+ public int getPriority() {
+ return priority;
+ }
+
+ public void run() {
+ while(true) {
+ Runnable job = null;
+ synchronized(jobs) {
+ for(int i=0;i<jobs.length;i++) {
+ if(!jobs[i].isEmpty()) {
+ job = (Runnable)
jobs[i].removeFirst();
+ break;
+ }
+ }
+ if(job == null) {
+ waiting = true;
+ try {
+ //NB: notify only on
adding work or this quits early.
+
jobs.wait(NEWJOB_TIMEOUT);
+ } catch (InterruptedException
e) {
+ // Ignore
+ }
+ waiting=false;
+ for(int i=0;i<jobs.length;i++) {
+ if(!jobs[i].isEmpty()) {
+ job =
(Runnable) jobs[i].removeFirst();
+ break;
+ }
+ }
+ if(job == null) {
+ running=false;
+ return;
+ }
+ }
+ }
+ try {
+ job.run();
+ } catch (Throwable t) {
+ Logger.error(this, "Caught "+t, t);
+ Logger.error(this, "While running
"+job+" on "+this);
+ }
+ }
+ }
+
+ };
+
+ public PrioritizedSerialExecutor(int priority, int
internalPriorityCount, int defaultPriority) {
+ jobs = new LinkedList[internalPriorityCount];
+ for(int i=0;i<jobs.length;i++)
+ jobs[i] = new LinkedList();
+ this.priority = priority;
+ this.defaultPriority = defaultPriority;
+ }
+
+ public void start(Executor realExecutor, String name) {
+ this.realExecutor=realExecutor;
+ this.name=name;
+ synchronized (jobs) {
+ boolean empty = true;
+ for(int i=0;i<jobs.length;i++) {
+ if(!jobs[i].isEmpty()) {
+ empty = false;
+ break;
+ }
+ }
+ if(!empty)
+ reallyStart(Logger.shouldLog(Logger.MINOR,
this));
+ }
+ }
+
+ private void reallyStart(boolean logMINOR) {
+ running=true;
+ if(logMINOR) Logger.minor(this, "Starting thread... "+name+" :
"+runner);
+ realExecutor.execute(runner, name);
+ }
+
+ public void execute(Runnable job, String jobName) {
+ int prio = defaultPriority;
+ if(job instanceof PrioRunnable)
+ prio = ((PrioRunnable) job).getPriority();
+ execute(job, prio, jobName);
+ }
+
+ public void execute(Runnable job, int prio, String jobName) {
+ boolean logMINOR = Logger.shouldLog(Logger.MINOR, this);
+ synchronized(jobs) {
+ if(logMINOR)
+ Logger.minor(this, "Running "+jobName+" :
"+job+" priority "+prio+" running="+running+" waiting="+waiting);
+ jobs[prio].addLast(job);
+ jobs.notifyAll();
+ if(!running && realExecutor != null) {
+ reallyStart(logMINOR);
+ }
+ }
+ }
+
+ public void execute(Runnable job, String jobName, boolean fromTicker) {
+ execute(job, jobName);
+ }
+
+ public int[] runningThreads() {
+ int[] retval = new int[NativeThread.JAVA_PRIORITY_RANGE+1];
+ if (running)
+ retval[priority] = 1;
+ return retval;
+ }
+
+ public int[] waitingThreads() {
+ int[] retval = new int[NativeThread.JAVA_PRIORITY_RANGE+1];
+ synchronized(jobs) {
+ if(waiting)
+ retval[priority] = 1;
+ }
+ return retval;
+ }
+
+}