Author: j16sdiz
Date: 2009-04-03 08:59:10 +0000 (Fri, 03 Apr 2009)
New Revision: 26416
Modified:
trunk/freenet/src/freenet/support/SerialExecutor.java
Log:
Use LinkedBlockingQueue, reduce locking time
Modified: trunk/freenet/src/freenet/support/SerialExecutor.java
===================================================================
--- trunk/freenet/src/freenet/support/SerialExecutor.java 2009-04-03
08:58:50 UTC (rev 26415)
+++ trunk/freenet/src/freenet/support/SerialExecutor.java 2009-04-03
08:59:10 UTC (rev 26416)
@@ -1,19 +1,22 @@
package freenet.support;
-import java.util.LinkedList;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
import freenet.node.PrioRunnable;
import freenet.support.io.NativeThread;
public class SerialExecutor implements Executor {
- private final LinkedList<Runnable> jobs;
+ private final LinkedBlockingQueue<Runnable> jobs;
+ private final Object syncLock;
private final int priority;
- private boolean threadWaiting;
+
+ private volatile boolean threadWaiting;
+ private volatile boolean threadStarted;
private String name;
private Executor realExecutor;
- private boolean threadStarted;
private static final int NEWJOB_TIMEOUT = 5*60*1000;
@@ -25,24 +28,25 @@
public void run() {
while(true) {
- Runnable job;
- synchronized(jobs) {
- if(jobs.isEmpty()) {
+ synchronized (syncLock) {
threadWaiting = true;
+ }
+ Runnable job = null;
try {
- //NB: notify only on
adding work or this quits early.
-
jobs.wait(NEWJOB_TIMEOUT);
+ job = jobs.poll(NEWJOB_TIMEOUT,
TimeUnit.MILLISECONDS);
} catch (InterruptedException
e) {
- // Ignore
+ // ignore
}
+ synchronized (syncLock) {
threadWaiting=false;
- if (jobs.isEmpty()) {
- threadStarted=false;
- return;
}
+ if (job == null) {
+ synchronized (syncLock) {
+ threadStarted = false;
}
- job = jobs.removeFirst();
+ return;
}
+
try {
job.run();
} catch (Throwable t) {
@@ -55,34 +59,39 @@
};
public SerialExecutor(int priority) {
- jobs = new LinkedList<Runnable>();
+ jobs = new LinkedBlockingQueue<Runnable>();
this.priority = priority;
+ this.syncLock = new Object();
}
public void start(Executor realExecutor, String name) {
this.realExecutor=realExecutor;
this.name=name;
- synchronized (jobs) {
+ synchronized (syncLock) {
if (!jobs.isEmpty())
reallyStart(Logger.shouldLog(Logger.MINOR,
this));
}
}
private void reallyStart(boolean logMINOR) {
+ synchronized (syncLock) {
threadStarted=true;
- if(logMINOR) Logger.minor(this, "Starting thread... "+name+" :
"+runner);
+ }
+ if (logMINOR)
+ Logger.minor(this, "Starting thread... " + name + " : "
+ runner);
realExecutor.execute(runner, name);
}
public void execute(Runnable job, String jobName) {
boolean logMINOR = Logger.shouldLog(Logger.MINOR, this);
- synchronized(jobs) {
- if(logMINOR) Logger.minor(this, "Running "+jobName+" :
"+job+" running="+threadStarted+" waiting="+threadWaiting);
- jobs.addLast(job);
- jobs.notifyAll();
- if (!threadStarted && realExecutor!=null) {
+ if (logMINOR)
+ Logger.minor(this, "Running " + jobName + " : " + job +
" started=" + threadStarted + " waiting="
+ + threadWaiting);
+ jobs.add(job);
+
+ synchronized (syncLock) {
+ if (!threadStarted && realExecutor != null)
reallyStart(logMINOR);
- }
}
}
@@ -99,7 +108,7 @@
public int[] waitingThreads() {
int[] retval = new int[NativeThread.JAVA_PRIORITY_RANGE+1];
- synchronized(jobs) {
+ synchronized (syncLock) {
if(threadStarted && threadWaiting)
retval[priority] = 1;
}
@@ -107,7 +116,7 @@
}
public int getWaitingThreadsCount() {
- synchronized(jobs) {
+ synchronized (syncLock) {
return (threadStarted && threadWaiting) ? 1 : 0;
}
}
_______________________________________________
cvs mailing list
[email protected]
http://emu.freenetproject.org/cgi-bin/mailman/listinfo/cvs