Update of /cvsroot/freenet/freenet/src/freenet/thread
In directory sc8-pr-cvs1:/tmp/cvs-serv25857/src/freenet/thread
Modified Files:
Tag: stable
FastThreadFactory.java QThreadFactory.java ThreadFactory.java
Added Files:
Tag: stable
ThreadStatusSnapshot.java YThreadFactory.java
Log Message:
5029: Merge from unstable after months of work. MASSIVE changes.
Highlights:
* Next Generation Routing, massive related changes
* Major changes to handling of messages and connections (PeerHandler and related
changes)
* Even more non-blocking I/O
* Documentation improvements
* Lots of new diagnostics and config options
* Lots of bug fixes and performance tweaking
* Probably lots of new bugs too!
--- NEW FILE: ThreadStatusSnapshot.java ---
/* -*- Mode: java; c-basic-indent: 4; tab-width: 4 -*- */
/*
* Created on Sep 28, 2003
*
*/
package freenet.thread;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.Hashtable;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
/**
* @author Iakin
* Takes a snapshot of the current thread hiearchy and allow the user to access
various statuc information
* as well as dumping some stats to HTML
*
*/
public class ThreadStatusSnapshot {
Hashtable consumers = new Hashtable();
PoolThreadCount tc = new PoolThreadCount();
group root;
public ThreadStatusSnapshot(){
ThreadGroup tg = Thread.currentThread().getThreadGroup().getParent();
ThreadGroup topMost = null;
while(tg != null) {
topMost = tg;
tg = tg.getParent();
}
root = new group(topMost);
}
public Hashtable getPoolConsumers(){
return consumers;
}
/* Returns the total number of/the number of available threads in freds
threadPool*/
public PoolThreadCount getPoolThreadCounts(){
return tc;
}
private void countPooledThread(PooledThread t) { //TODO: Wouldn't it be better
to just ask the threadpool for the information counted here?
tc.totalPooled++;
Runnable job = t.job();
if (job != null)
countConsumer(job);
else
tc.pooledAvailable++;
}
private void countConsumer(Runnable job){
String type = job.toString();
type = type.substring(0, type.indexOf("@"));
Integer con = (Integer) consumers.get(type);
if (con == null) {
con = new Integer(1);
consumers.put(type, con);
}else
consumers.put(type, new Integer(con.intValue()+1)); //TODO:
There must be a better way to do this...
}
public static class PoolThreadCount {
int totalPooled = 0;
int pooledAvailable = 0;
}
class group{
String groupName;
List lThreads = new LinkedList();
List lSubGRoups = new LinkedList();
group(ThreadGroup group){
groupName = group.getName();
Thread[] aThreads = new Thread[group.activeCount()];
int threads = group.enumerate(aThreads, false);
for(int j=0; j<threads; j++){
Thread t = aThreads[j];
if(t != null) //Yes.. sometimes this can happen /Iakin
lThreads.add(new ThreadInfo(t));
}
ThreadGroup[] groups = new
ThreadGroup[group.activeGroupCount()];
int groupcount=group.enumerate(groups, false);
for(int i=0; i<groupcount; i++)
lSubGRoups.add(new group(groups[i]));
}
void toHTML(StringBuffer buffer){
buffer.append("\n<li><b>" + groupName + "</b><ul>");
Iterator it = lThreads.iterator();
while(it.hasNext()){
ThreadInfo t = (ThreadInfo)it.next();
buffer.append("\n<li>" +t.name);
if(t.jobString != null)
buffer.append(": " + t.jobString);
}
it = lSubGRoups.iterator();
while(it.hasNext())
((group)it.next()).toHTML(buffer);
buffer.append("</ul>");
}
class ThreadInfo{
String name;
String jobString;
ThreadInfo(Thread t){
name = t.getName();
if (t instanceof PooledThread) {
Runnable r = ((PooledThread)t).job();
if(r != null)
jobString = r.toString();
countPooledThread((PooledThread)t);
}
}
}
}
public String threadTreeToHTML(){
StringBuffer buffer = new StringBuffer();
root.toHTML(buffer);
return buffer.toString();
}
public String threadStatusToHTML(){
StringWriter ssw = new StringWriter(200);
PrintWriter sw = new PrintWriter(ssw);
sw.println("<table width=\"100%\">");
sw.println("<tr><td>Total pooled threads</td><td align=right>" +
tc.totalPooled + "</td></tr>");
sw.println("<tr><td>Available pooled threads</td><td align=right>" +
tc.pooledAvailable + "</td></tr>");
sw.println("<tr><td>Pooled threads in use</td><td align=right>" +
(tc.totalPooled - tc.pooledAvailable) + "</td></tr>");
sw.println("</table>");
return ssw.toString();
}
public String poolConsumersToHTML(){
StringWriter ssw = new StringWriter(200);
PrintWriter sw = new PrintWriter(ssw);
sw.println("<table width=\"100%\">");
sw.println("<tr><th align=\"left\">Class</th><th
align=\"right\">Threads used</th>");
Object[] types = getPoolConsumers().keySet().toArray();
java.util.Arrays.sort(types);
for (int x = 0; x < types.length; x++) {
sw.println
("<tr><td>" + types[x] +
"</td><td align=\"right\">" +
((Integer)
getPoolConsumers().get(types[x])).intValue() +
"</td></tr>");
}
sw.println("</table>");
return ssw.toString();
}
}
--- NEW FILE: YThreadFactory.java ---
/* -*- Mode: java; c-basic-indent: 4; indent-tabs-mode: nil -*- */
package freenet.thread;
import freenet.Core;
import freenet.support.Irreversible;
import freenet.support.Logger;
/**
* A derivative of QThreadFactory that queues jobs when it runs
* out of threads. All jobs go onto the queue, and are removed
* by idle threads.
*
* Unnecessary creation of threads is avoided in two ways.
* First, the thread deletion rate is limited. This avoids
* the overhead of deleting the thread and then creating
* it again right away. Second, thread creation is delayed
* so long as the oldest job on the queue hasn't been there
* more than a specified time.
*
* @author ejhuff
*/
public final class YThreadFactory implements ThreadFactory {
/**
* Thread creation parameters:
*
* If available falls below either minimum, thread creation is
* allowed. Only one thread creates threads at a time, and that
* thread waits until the oldest job on queue is old enough.
*
* The minimum available (total - active) threads.
*
* The minimum available/active ratio.
*
* The minimum age in millis of the oldest job on queue before a
* new thread is actually created.
*
*/
private static final int threadCreationThreshold = 9;
private static final double threadCreationRatio = 0.1;
private final long tolerableQueueDelay;
/**
* Thread deletion parameters:
*
* Both of these maximums must be exceeded for deletion to start:
*
* The maximum available (total - active) threads.
*
* The maximum available/active ratio.
*
* The minimum delay in millis between thread deletions.
*
*/
private static final int threadDeletionThreshold = 27;
private static final double threadDeletionRatio = 1;
private static final long threadDeletionDelay = 1000; // 1 per second
private final ThreadGroup tg;
private final JobQueue jobQueue = new JobQueue();
private final boolean logDEBUG;
private final int targetMaxThreads;
private final String prefix;
/**
* @param tg ThreadGroup all created threads will belong to.
* @param targetMaxThreads Avoid creating more than this number of threads.
* @param prefix Threads are named as prefix + threadNumber.
* @param tolerableQueueDelay Don't create new thread until oldest job
* on queue is older than this (milliseconds).
*/
public YThreadFactory(ThreadGroup tg, int targetMaxThreads,
String prefix, int tolerableQueueDelay) {
this.tg = tg;
this.targetMaxThreads = targetMaxThreads;
this.prefix = prefix;
this.tolerableQueueDelay = (long) tolerableQueueDelay;
this.logDEBUG = Core.logger.shouldLog(Logger.DEBUG, this);
new YThread();
}
/**
* @return the target maximum executing jobs.
* Caller may use this, together with
* activeThreads(), to determine load.
* This value is decreased by a thread
* which dies on outOfMemory exception.
*/
public final int maximumThreads() {
return targetMaxThreads;
}
/**
* @return the number of currently executing jobs
*/
public final int activeThreads() {
JobQueueSnapshot snap = new JobQueueSnapshot();
jobQueue.snap(snap);
return snap.active;
}
/**
* @return the instantaneous number of idle threads
*/
public final int availableThreads() {
JobQueueSnapshot snap = new JobQueueSnapshot();
jobQueue.snap(snap);
return snap.available;
}
/**
* @param job The job to be executed.
* @return null
*/
public final Thread getThread(Runnable job) {
jobQueue.enqueue(job);
return null;
}
private final class YThread extends Thread implements PooledThread {
private int jobsDone = 0;
private long maxQueueDelay = 0;
private long sumQueueDelay = 0;
private Runnable job = null;
public YThread() {
super(tg, "YThread-unnamed-as-yet");
super.start();
}
public Runnable job() {
return job;
}
public final void run() {
jobQueue.newThreadStarting();
JobQueueResult result = new JobQueueResult();
while (true) {
jobQueue.dequeue(result);
if (result.job == null) {
if (result.exit) {
// An unneeded thread exits.
double avgQueueDelay = (double)sumQueueDelay;
Core.diagnostics.occurrenceContinuous
("jobsPerYThread", jobsDone);
if (jobsDone > 0) {
avgQueueDelay /= jobsDone;
Core.diagnostics.occurrenceContinuous
("maxQueueDelayThisYThread", maxQueueDelay);
Core.diagnostics.occurrenceContinuous
("avgQueueDelayThisYThread", avgQueueDelay); //same
}
if (logDEBUG)
Core.logger.log(this, Thread.currentThread().getName() + "
ended. " +
result.available + " threads available. " +
result.active + " threads active. " +
jobsDone + " jobs done. " +
maxQueueDelay + " max queue delay. " +
avgQueueDelay + " avg queue delay.",
Core.logger.DEBUG);
return;
}
// (result.job == null && ! result.exit) means we
// should create and start one new thread, but
// only when the deadline has arrived.
while (true) {
long timeBeforeDeadline = jobQueue.timeBeforeDeadline();
if (timeBeforeDeadline <= 0) break;
try {
Thread.sleep(timeBeforeDeadline);
} catch (InterruptedException e) {}
}
new YThread();
} else { // (result.job != null) means we have a job to do.
job = result.job;
sumQueueDelay += result.queueDelay;
if (maxQueueDelay < result.queueDelay) {
maxQueueDelay = result.queueDelay;
}
try {
Core.diagnostics.occurrenceContinuous("jobQueueDelayAllYThreads",
result.queueDelay);
Core.diagnostics.occurrenceCounting("jobsExecuted", 1);
job.run();
} catch (Throwable e) {
Core.logger.log(this, "Unhandled exception " + e + " in job "
+ job,
e, Core.logger.ERROR);
freenet.node.Main.dumpInterestingObjects();
} finally {
jobsDone++;
job = null;
}
}
}
}
}
private final class JobQueueSnapshot {
int active = 0;
int available = 0;
}
private final class JobQueueResult {
Runnable job = null;
boolean exit = false;
int active = 0;
int available = 0;
long queueDelay = 0;
}
private final class JobQueue {
// information about threads
private int total = 0;
private int active = 0;
private int available = 0; // total == active + available
private int threadNumber = 0;
private long nextThreadDeletionTime = 0;
private boolean notCreatingThreads = true;
// information about the job queue.
private final JobQueueItem head = new JobQueueItem();
private JobQueueItem tail = head;
private JobQueueItem free = null;
private int freeListLength = 0;
private final class JobQueueItem {
JobQueueItem next = null;
Runnable job = null;
long enqueueTime = 0;
}
// A new thread has started. Account for it.
private synchronized void newThreadStarting() {
total++;
active++; // active + available == total
Thread.currentThread().setName(prefix + (threadNumber++));
notCreatingThreads = true;
}
private synchronized void snap(JobQueueSnapshot snap) {
snap.active = active;
snap.available = available;
}
// called only from getThread()
private synchronized void enqueue(Runnable job) {
if (free == null) {
free = new JobQueueItem();
freeListLength++;
}
JobQueueItem mine = free;
free = free.next;
freeListLength--;
mine.next = null;
mine.job = job;
mine.enqueueTime = System.currentTimeMillis();
tail.next = mine;
tail = mine;
this.notify(); // wake a thread
}
// called only from YThread.run()
private synchronized long timeBeforeDeadline() {
if (head.next == null) return tolerableQueueDelay;
return tolerableQueueDelay + head.next.enqueueTime -
System.currentTimeMillis();
}
// called only from YThread.run()
private synchronized void dequeue(JobQueueResult result) {
active--;
available++; // active + available == total
while (true) {
long now = System.currentTimeMillis();
long deleteDelay = 0;
// if we need more threads
if ( ( available < threadCreationThreshold ) ||
( available < active * threadCreationRatio )) {
// if not already creating a thread
if (notCreatingThreads) {
// then make one new thread.
notCreatingThreads = false;
active++;
available--; // active + available == total
result.active = active;
result.available = available;
result.exit = false;
result.job = null;
result.queueDelay = 0;
this.notify(); // wake another thread in case there is work
return;
}
}
if (head.next != null) {
active++;
available--; // active + available == total
JobQueueItem mine = head.next;
head.next = mine.next;
if (head.next == null) tail = head;
result.job = mine.job;
result.queueDelay = now - mine.enqueueTime;
mine.job = null;
mine.enqueueTime = 0;
mine.next = free;
free = mine;
freeListLength++;
while (freeListLength > 10000) {
mine = free;
free = free.next;
freeListLength--;
mine.next = null;
// mine will be garbage collected later
}
result.active = active;
result.available = available;
result.exit = false;
this.notify(); // wake the next thread
return;
}
// No jobs left on queue to run.
// If thread deletion is allowed at this time
// and if there are too many threads, delete one.
if (available > threadDeletionThreshold) {
if (available > active * threadDeletionRatio) {
deleteDelay = nextThreadDeletionTime - now;
if (deleteDelay <= 0) {
total--;
available--; // active + available == total
nextThreadDeletionTime = now + threadDeletionDelay;
result.active = active;
result.available = available;
result.exit = true;
result.job = null;
result.queueDelay = 0;
this.notify();
return;
}
}
}
try {
// idle threads wait here
if (deleteDelay > 0) {
// wait until it is time to consider deleting
// threads or until more work comes in.
this.wait(deleteDelay);
} else {
this.wait(); // no thread deletion presently planned
}
} catch ( InterruptedException ie ) {}
}
}
}
}
Index: FastThreadFactory.java
===================================================================
RCS file: /cvsroot/freenet/freenet/src/freenet/thread/FastThreadFactory.java,v
retrieving revision 1.18.2.2.2.1
retrieving revision 1.18.2.2.2.2
diff -u -w -r1.18.2.2.2.1 -r1.18.2.2.2.2
--- FastThreadFactory.java 4 Jul 2003 02:45:10 -0000 1.18.2.2.2.1
+++ FastThreadFactory.java 28 Oct 2003 20:20:47 -0000 1.18.2.2.2.2
@@ -71,6 +71,7 @@
FThread(int num, Runnable job) {
super(tg, "FThread-" + num);
this.job = job;
+ super.setPriority(Thread.NORM_PRIORITY);
super.start();
}
Index: QThreadFactory.java
===================================================================
RCS file: /cvsroot/freenet/freenet/src/freenet/thread/QThreadFactory.java,v
retrieving revision 1.19.2.2.2.5
retrieving revision 1.19.2.2.2.6
diff -u -w -r1.19.2.2.2.5 -r1.19.2.2.2.6
--- QThreadFactory.java 29 Jul 2003 21:08:09 -0000 1.19.2.2.2.5
+++ QThreadFactory.java 28 Oct 2003 20:20:47 -0000 1.19.2.2.2.6
@@ -1,3 +1,4 @@
+/* -*- Mode: java; c-basic-indent: 4; tab-width: 4 -*- */
package freenet.thread;
import freenet.Core;
@@ -11,6 +12,8 @@
*
* @author oskar
* @author tavin
+ * @author lostlogic
+ * @author ejhuff
*/
public final class QThreadFactory implements ThreadFactory, Runnable {
@@ -26,157 +29,166 @@
private final ThreadGroup tg;
- private int active = 0;
- private int available = 0;
+ private final CountLock countLock = new CountLock();
- private long threadNumber = 0;
+ private final NumLock numLock = new NumLock();
- private QThread headerThread = null;
+ private final HeadLock headLock = new HeadLock();
+
+ private final boolean logDEBUG;
+
+ private final MaxLock maxLock = new MaxLock();
- private final Object maxLock = new Object();
- private int desiredMax;
/**
* @param tg ThreadGroup all created threads will belong to
*/
public QThreadFactory(ThreadGroup tg, int desiredMax) {
this.tg = tg;
- this.desiredMax = desiredMax;
+ this.maxLock.setDesiredMax(desiredMax);
+ this.logDEBUG = Core.logger.shouldLog(Logger.DEBUG, this);
(new Thread(this, "Thread creation thread.")).start();
}
- public void run() {
- synchronized (this) {
+ public final void run() {
+ CountSnap snap = new CountSnap();
while (true) {
Throwable lastEx = null;
try {
-
- if (available < MINIMUM_AVAILABLE_ABS ||
- available < active * MINIMUM_AVAILABLE_RATIO) {
-
- int desired =
- Math.max((int) (active * IDEAL_AVAILABLE_RATIO),
+ boolean doLog = false;
+ int required;
+ countLock.snap(snap);
+ // start with a minimal required value.
+ required = (int)(snap.active *
MINIMUM_AVAILABLE_RATIO);
+ if ( snap.available < MINIMUM_AVAILABLE_ABS ||
+ snap.available < required ) {
+ // if we fell below that, set target higher.
+ required =
+ Math.max((int) (snap.active *
IDEAL_AVAILABLE_RATIO),
2 * MINIMUM_AVAILABLE_ABS);
- while (available < desired) {
- createThread();
}
- } else if (available > (3 * MINIMUM_AVAILABLE_ABS) &&
- available > active * MAXIMUM_AVAILABLE_RATIO) {
-
- int desired =
- Math.max((int) (active * IDEAL_AVAILABLE_RATIO),
+ while ( snap.available < required ) {
+ doLog = true;
+ headLock.push(new
QThread(numLock.newThreadNumber()));
+ // getThread could sneak in here,
+ // while available is one too small. That's
ok.
+ countLock.snapFree(snap); // does available++
+ required =
+ Math.min(required, // required must
not increase.
+ Math.max((int)
(snap.active * IDEAL_AVAILABLE_RATIO),
+ 2 *
MINIMUM_AVAILABLE_ABS));
+ }
+ if (logDEBUG && doLog)
+ Core.logger.log(this,"Thread creation thread
past creation loop" +
+ ", available:
" + snap.available +
+ ", required: "
+ required +
+ ", active: " +
snap.active,
+
Core.logger.DEBUG);
+ doLog = false;
+ int allowed;
+ countLock.snap(snap);
+ // Start with a maximal allowed value.
+ allowed = Math.max((int)(snap.active *
MAXIMUM_AVAILABLE_RATIO),
2 * MINIMUM_AVAILABLE_ABS);
+ // Repeatedly remove a thread from the stack and
signal it to die.
+ // But if all of the threads have meanwhile started
jobs, then
+ // do nothing, and the loop will terminate with
snap.available == 0
+ while ( snap.available > allowed ) {
+ doLog = true;
+ if (countLock.snapTake(snap)) {
+ // getThread can sneak in here
repeatedly,
+ // but it can never make
headLock.pop() be null.
+ headLock.pop().die();
+ }
+ // Let there be up to relative maximum threads
+ // sitting around. This will reduce thread flux a
+ // lot which should help CPU usage further. But
+ // don't let it go below 2 * absolute minimum.
+ allowed =
+ Math.max((int) (snap.active *
MAXIMUM_AVAILABLE_RATIO),
+ Math.max(allowed, //
allowed must not decrease.
+ 2 *
MINIMUM_AVAILABLE_ABS));
+ }
+ if (logDEBUG && doLog)
+ Core.logger.log(this,"Thread creation thread
past destruction loop" +
+ ", available:
" + snap.available +
+ ", allowed: "
+ allowed +
+ ", active: " +
snap.active,
+
Core.logger.DEBUG);
-
- while (available > desired) {
- destroyThread();
- }
- }
-
- try {
- wait(1000);
- } catch (InterruptedException e) {
- }
} catch (Throwable e) {
- if (lastEx == null || !lastEx.getClass().equals(e.getClass()))
+ if (lastEx == null || !lastEx.getClass().equals(e.getClass())) {
+ countLock.snap(snap);
Core.logger.log(this, "Exception in QThreadFactory. "
- + available + " thread available ,"
- + active + " running. Top: " + headerThread,
+ +
snap.available + " threads available ,"
+ + snap.active
+ " running. "
+ + "Top: " +
headLock.headerThread,
e, Core.logger.ERROR);
+ }
lastEx = e;
try {
- wait(20); // avoid runaway loop.
+ Thread.currentThread().sleep(20); // avoid runaway loop.
} catch (InterruptedException e2) {}
}
- }
+ countLock.waitForNotify(1000);
}
}
+ /**
+ * @return the target maximum executing jobs.
+ * Caller may use this, together with
+ * activeThreads(), to determine load.
+ * This value is decreased by a thread
+ * which dies on outOfMemory exception.
+ */
public final int maximumThreads() {
- synchronized (maxLock) {
- return desiredMax;
- }
+ return maxLock.desiredMax();
}
/**
* @return the number of currently executing jobs
*/
public final int activeThreads() {
- return active;
+ CountSnap snap = new CountSnap();
+ countLock.snap(snap);
+ return snap.active;
}
/**
- * @return the instantaneous number of idle threads;
- * may return a negative number if the force option
- * was used with getThread()
+ * @return the instantaneous number of idle threads
*/
public final int availableThreads() {
- return available;
+ CountSnap snap = new CountSnap();
+ countLock.snap(snap);
+ return snap.available;
}
/**
* @param job The job to be executed
+ * @return the thread which was popped or created.
*/
- public synchronized final Thread getThread(Runnable job) {
-
- // hopefully this will not happen often.
- if (headerThread == null) {
- createThread();
- }
-
- QThread thread = headerThread;
- headerThread = headerThread.next;
+ public final Thread getThread(Runnable job) {
+ QThread thread = null;
+ if (countLock.makeItMyself()) {
+ thread = new QThread(numLock.newThreadNumber());
+ } else {
+ thread = headLock.pop(); // thread cannot be null.
+ }
+ thread.setPriority(Thread.NORM_PRIORITY);
+ // synchronize so that getThread can't sneak in between
+ // test for job == null and this.wait() in thread.run().
+ synchronized (thread) {
thread.next = null;
thread.job = job;
- ++active;
- --available;
- thread.start();
-
- awaken();
- return thread;
- }
-
- /**
- * Creates a thread and adds it to the stack.
- */
- private synchronized final void createThread() {
- QThread newThread = new QThread(++threadNumber);
- newThread.next = headerThread;
- headerThread = newThread;
- available++;
- }
-
- /**
- * Removes a thread from the stack and signals it to die.
- */
- private synchronized final void destroyThread() {
- QThread dyingThread = headerThread;
- headerThread = headerThread.next;
- dyingThread.die();
- available--;
+ thread.notify();
}
- /**
- * Returns a thread to the stack when it is finished executing.
- */
- private synchronized final void returnThread(QThread returnThread) {
- returnThread.next = headerThread;
- headerThread = returnThread;
- active--;
- available++;
- }
-
- private synchronized final void awaken() {
- if ( ( available < MINIMUM_AVAILABLE_ABS) ||
- ( available < active * MINIMUM_AVAILABLE_RATIO) ||
- ( ( available > (3 * MINIMUM_AVAILABLE_ABS)) &&
- ( available > active * MAXIMUM_AVAILABLE_RATIO))) {
- notifyAll();
- }
+ countLock.mightNotify();
+ return thread;
}
private final class QThread extends Thread implements PooledThread {
- private QThread next;
+ private QThread next; // link for stack of available threads.
private Runnable job = null;
@@ -193,16 +205,25 @@
return job;
}
- public final synchronized void start() {
- this.notify();
- }
-
public final void run() {
while (alive.state()) {
synchronized (this) {
while (alive.state() && job == null) {
+ // If getThread didn't synchronize
(thread),
+ // it could sneak in here and change
job and
+ // issue it's notify after the test
but before
+ // the wait.
try {
- this.wait(200);
+ // There is no need to timeout this wait,
+ // so long as getThread()
synchronizes.
+ // getThread() is the only way
to get a
+ // thread. It synchronizes
(thread), then
+ // changes thread.job, and
only then does
+ // thread.notify().
thread.die() also
+ // synchronizes, then changes
+ // alive.state(), and only
then does
+ // thread.notify().
+ this.wait();
}
catch (InterruptedException e) {}
}
@@ -215,41 +236,153 @@
} catch (OutOfMemoryError e) {
Core.logger.log(this, "Got out of memory error, " +
"decreasing maximum threads by 5 from ~"+
- desiredMax+".", e, Core.logger.ERROR);
+
maxLock.desiredMax()+".",
+ e,
Core.logger.ERROR);
freenet.node.Main.dumpInterestingObjects();
- synchronized(maxLock) {
- desiredMax = Math.max(desiredMax - 5, 0);
- }
+ maxLock.decrementDesiredMax(5);
} catch (Throwable e) {
Core.logger.log(this, "Unhandled throw in job: "+e,
e, Core.logger.ERROR);
} finally {
jobsDone++;
job = null;
- if (alive.state()) // this should always be true
- returnThread(this);
+ if (alive.state()) {
+ headLock.push(this); // first
push it
+ countLock.free(); // then
announce it is there.
+ } // else the thread will exit now.
}
}
}
Core.diagnostics.occurrenceContinuous("jobsPerQThread", jobsDone);
- Core.logger.log(this, getName() + " ended. " + available +
- "threads available.", Core.logger.DEBUGGING);
+ CountSnap snap = new CountSnap();
+ countLock.snap(snap);
+ if (logDEBUG)
+ Core.logger.log(this, getName() + " ended. " +
+ snap.available +
"threads available." +
+ snap.active + "threads
active.",
+ Core.logger.DEBUG);
}
/**
- * You should probably only call this when the QThread is idle.
+ * Called only from QThreadFactory.run() in this context:
+ * if (countLock.snapTake(snap)) {
+ * headLock.pop().die();
+ * }
*/
- protected synchronized final void die() {
+ synchronized final void die() {
if (alive.state()) {
alive.change();
this.notify();
}
}
}
+
+ private final class MaxLock {
+ private int desiredMax;
+ synchronized void setDesiredMax(int desiredMax) {
+ this.desiredMax = desiredMax;
+ }
+ synchronized int desiredMax() {
+ return this.desiredMax;
+ }
+ synchronized void decrementDesiredMax(int decr) {
+ this.desiredMax = Math.max(this.desiredMax - decr, 0);
+ }
}
+ private final class NumLock {
+ private int threadNumber = 0;
+ synchronized int newThreadNumber() {
+ return threadNumber++;
+ }
+ }
+ private final class HeadLock {
+ QThread headerThread = null;
+ synchronized void push(QThread thread) {
+ thread.next = headerThread;
+ headerThread = thread;
+ }
+ synchronized QThread pop() {
+ QThread thread = headerThread;
+ headerThread = thread.next;
+ return thread;
+ }
+ }
+ private final class CountSnap {
+ int active = 0;
+ int available = 0;
+ }
+ private final class CountLock {
+ private int active = 0;
+ // available is always either the number of threads on
headLock.headerThread,
+ // or else one fewer, in the case where it has been decremented but the
+ // thread hasn't been removed yet.
+ private int available = 0;
+
+ // take a snapshot
+ synchronized void snap(CountSnap snap) {
+ snap.active = active;
+ snap.available = available;
+ }
+
+ // free a thread, then take a snapshot.
+ synchronized void snapFree(CountSnap snap) {
+ available++; // thread was already pushed onto stack.
+ snap.active = active;
+ snap.available = available;
+ }
+
+ // maybe take a thread, then take a snapshot.
+ // return true if got a thread.
+ synchronized boolean snapTake(CountSnap snap) {
+ boolean gotOne = false;
+ if (available >= 1) { // There is at least one on stack.
+ available--; // Make sure it stays that way,
+ gotOne = true; // even if getThread sneaks in.
+ }
+ snap.active = active;
+ snap.available = available;
+ return gotOne;
+ }
+
+ // Before calling free, available is one too small
+ // compared to the actual state of headLock.headerThread.
+ // QThreadFactory.run() or getThread() could sneak in before this.
+ synchronized void free() {
+ active--;
+ available++; // thread is already on the stack
+ }
+
+ synchronized boolean makeItMyself() {
+ boolean makeItMyself = false;
+ active++;
+ if (available > 0) {
+ available--;
+ } else {
+ makeItMyself = true;
+ }
+ return makeItMyself;
+ }
+
+ synchronized void mightNotify() {
+ if ( ( available < MINIMUM_AVAILABLE_ABS) ||
+ ( available < active * MINIMUM_AVAILABLE_RATIO) ||
+ ( ( available > (3 * MINIMUM_AVAILABLE_ABS)) &&
+ ( available > active * MAXIMUM_AVAILABLE_RATIO))) {
+ this.notifyAll();
+ }
+ }
+
+ synchronized void waitForNotify(int maxDelayMillis) {
+ try {
+ this.wait(maxDelayMillis);
+ } catch ( InterruptedException ie ) {}
+ }
+
+ }
+}
Index: ThreadFactory.java
===================================================================
RCS file: /cvsroot/freenet/freenet/src/freenet/thread/ThreadFactory.java,v
retrieving revision 1.7.2.1
retrieving revision 1.7.2.1.2.1
diff -u -w -r1.7.2.1 -r1.7.2.1.2.1
--- ThreadFactory.java 15 Feb 2003 04:13:01 -0000 1.7.2.1
+++ ThreadFactory.java 28 Oct 2003 20:20:47 -0000 1.7.2.1.2.1
@@ -1,3 +1,4 @@
+/* -*- Mode: java; c-basic-indent: 4; tab-width: 4 -*- */
package freenet.thread;
import java.io.PrintWriter;
@@ -26,6 +27,8 @@
/**
* @param r the job to run
+ * @return the thread executing the job, or null if the
+ * job was queued.
*/
Thread getThread(Runnable r);
_______________________________________________
cvs mailing list
[EMAIL PROTECTED]
http://dodo.freenetproject.org/cgi-bin/mailman/listinfo/cvs