Author: nextgens
Date: 2008-04-04 06:16:23 +0000 (Fri, 04 Apr 2008)
New Revision: 18977
Modified:
trunk/freenet/src/freenet/support/PooledExecutor.java
Log:
More sync. fixes
Modified: trunk/freenet/src/freenet/support/PooledExecutor.java
===================================================================
--- trunk/freenet/src/freenet/support/PooledExecutor.java 2008-04-04
06:12:05 UTC (rev 18976)
+++ trunk/freenet/src/freenet/support/PooledExecutor.java 2008-04-04
06:16:23 UTC (rev 18977)
@@ -23,48 +23,48 @@
private static boolean logMINOR;
// Ticker thread that runs at maximum priority.
private Ticker ticker;
-
- public void setTicker(Ticker ticker) {
+
+ public synchronized void setTicker(Ticker ticker) {
this.ticker = ticker;
}
-
+
public PooledExecutor() {
- for(int i=0; i<runningThreads.length; i++) {
+ for(int i = 0; i < runningThreads.length; i++) {
runningThreads[i] = new ArrayList();
waitingThreads[i] = new ArrayList();
threadCounter[i] = 0;
}
}
-
/** Maximum time a thread will wait for a job */
- static final int TIMEOUT = 5*60*1000;
-
+ static final int TIMEOUT = 5 * 60 * 1000;
+
public void start() {
logMINOR = Logger.shouldLog(Logger.MINOR, this);
}
-
+
public void execute(Runnable job, String jobName) {
execute(job, jobName, false);
}
-
+
public void execute(Runnable job, String jobName, boolean fromTicker) {
int prio = NativeThread.NORM_PRIORITY;
- if(job instanceof PrioRunnable) {
- prio = ((PrioRunnable)job).getPriority();
- }
-
- if(logMINOR) Logger.minor(this, "Executing "+job+" as
"+jobName+" at prio "+prio);
+ if(job instanceof PrioRunnable)
+ prio = ((PrioRunnable) job).getPriority();
+
+ if(logMINOR)
+ Logger.minor(this, "Executing " + job + " as " +
jobName + " at prio " + prio);
if(prio < NativeThread.MIN_PRIORITY || prio >
NativeThread.MAX_PRIORITY)
- throw new IllegalArgumentException("Unreconized
priority level : "+prio+'!');
+ throw new IllegalArgumentException("Unreconized
priority level : " + prio + '!');
while(true) {
MyThread t;
boolean mustStart = false;
boolean miss = false;
synchronized(this) {
jobCount++;
- if(!waitingThreads[prio-1].isEmpty()) {
- t = (MyThread)
waitingThreads[prio-1].remove(waitingThreads[prio-1].size()-1);
- if(logMINOR) Logger.minor(this,
"Reusing thread "+t);
+ if(!waitingThreads[prio - 1].isEmpty()) {
+ t = (MyThread) waitingThreads[prio -
1].remove(waitingThreads[prio - 1].size() - 1);
+ if(logMINOR)
+ Logger.minor(this, "Reusing
thread " + t);
} else {
// Must create new thread
if((!fromTicker) &&
NativeThread.usingNativeCode() && prio > Thread.currentThread().getPriority()) {
@@ -73,16 +73,18 @@
return;
}
// Will be coalesced by thread count
listings if we use "@" or "for"
- t = new MyThread("Pooled thread
awaiting work @"+(threadCounter[prio-1]), threadCounter[prio-1], prio,
!fromTicker);
- threadCounter[prio-1]++;
+ t = new MyThread("Pooled thread
awaiting work @" + (threadCounter[prio - 1]), threadCounter[prio - 1], prio,
!fromTicker);
+ threadCounter[prio - 1]++;
t.setDaemon(true);
mustStart = true;
miss = true;
}
}
synchronized(t) {
- if(!t.alive) continue;
- if(t.nextJob != null) continue;
+ if(!t.alive)
+ continue;
+ if(t.nextJob != null)
+ continue;
t.nextJob = job;
if(!mustStart)
// It is possible that we could get a
wierd race condition with
@@ -90,44 +92,46 @@
// level code. So we'd best use
notifyAll().
t.notifyAll();
}
- t.setName(jobName+"("+t.threadNo+")");
+ t.setName(jobName + "(" + t.threadNo + ")");
if(mustStart) {
t.start();
synchronized(this) {
- runningThreads[prio-1].add(t);
+ runningThreads[prio - 1].add(t);
if(miss)
jobMisses++;
if(logMINOR)
- Logger.minor(this, "Jobs:
"+jobMisses+" misses of "+jobCount+" starting urgently "+jobName);
+ Logger.minor(this, "Jobs: " +
jobMisses + " misses of " + jobCount + " starting urgently " + jobName);
}
- } else {
+ } else
if(logMINOR)
- Logger.minor(this, "Not starting: Jobs:
"+jobMisses+" misses of "+jobCount+" starting urgently "+jobName);
- }
+ synchronized(this) {
+ Logger.minor(this, "Not
starting: Jobs: " + jobMisses + " misses of " + jobCount + " starting urgently
" + jobName);
+ }
return;
}
}
public synchronized int[] runningThreads() {
int[] result = new int[runningThreads.length];
- for(int i=0; i<result.length; i++)
+ for(int i = 0; i < result.length; i++)
result[i] = runningThreads[i].size();
return result;
}
-
+
public synchronized int[] waitingThreads() {
int[] result = new int[waitingThreads.length];
- for(int i=0; i<result.length; i++)
+ for(int i = 0; i < result.length; i++)
result[i] = waitingThreads[i].size();
return result;
}
-
+
class MyThread extends NativeThread {
+
final String defaultName;
boolean alive = true;
Runnable nextJob;
final long threadNo;
-
+
public MyThread(String defaultName, long threadCounter, int
prio, boolean dontCheckRenice) {
super(defaultName, prio, dontCheckRenice);
this.defaultName = defaultName;
@@ -140,53 +144,49 @@
int nativePriority = getNativePriority();
while(true) {
Runnable job;
-
+
synchronized(this) {
job = nextJob;
nextJob = null;
}
-
+
if(job == null) {
synchronized(PooledExecutor.this) {
-
waitingThreads[nativePriority-1].add(this);
+ waitingThreads[nativePriority -
1].add(this);
}
synchronized(this) {
if(nextJob == null) {
this.setName(defaultName);
try {
wait(TIMEOUT);
- } catch
(InterruptedException e) {
- // Ignore
+ }
catch(InterruptedException e) {
+ // Ignore
}
}
job = nextJob;
nextJob = null;
- if(job == null) {
+ if(job == null)
alive = false;
- // execute() won't give
us another job if alive = false
- }
}
synchronized(PooledExecutor.this) {
-
waitingThreads[nativePriority-1].remove(this);
+ waitingThreads[nativePriority -
1].remove(this);
if(!alive) {
-
runningThreads[nativePriority-1].remove(this);
+
runningThreads[nativePriority - 1].remove(this);
if(logMINOR)
-
Logger.minor(this, "Exiting having executed "+ranJobs+" jobs : "+this);
+
Logger.minor(this, "Exiting having executed " + ranJobs + " jobs : " + this);
return;
}
}
}
-
+
// Run the job
try {
job.run();
- } catch (Throwable t) {
- Logger.error(this, "Caught "+t+"
running job "+job, t);
+ } catch(Throwable t) {
+ Logger.error(this, "Caught " + t + "
running job " + job, t);
}
ranJobs++;
}
}
-
}
-
}