Update of /cvsroot/freenet/freenet/src/freenet/thread
In directory sc8-pr-cvs1:/tmp/cvs-serv7151
Modified Files:
QThreadFactory.java
Log Message:
Fixed several bugs. Moved all of the synchronized() {} blocks
into methods of the lock class, and made the variables private
to the lock class. Let javac find attempts to reference
a synchronized variable outside of the synchronized block.
Index: QThreadFactory.java
===================================================================
RCS file: /cvsroot/freenet/freenet/src/freenet/thread/QThreadFactory.java,v
retrieving revision 1.35
retrieving revision 1.36
diff -u -r1.35 -r1.36
--- QThreadFactory.java 27 Sep 2003 07:37:16 -0000 1.35
+++ QThreadFactory.java 28 Sep 2003 10:07:19 -0000 1.36
@@ -12,6 +12,8 @@
*
* @author oskar
* @author tavin
+ * @author lostlogic
+ * @author ejhuff
*/
public final class QThreadFactory implements ThreadFactory, Runnable {
@@ -28,280 +30,160 @@
private final ThreadGroup tg;
private final CountLock countLock = new CountLock();
- private int active = 0;
- private int available = 0; // never more than number of threads on stack.
private final NumLock numLock = new NumLock();
- private long threadNumber = 0;
private final HeadLock headLock = new HeadLock();
- private QThread headerThread = null;
- private boolean logDEBUG = false;
+ private final boolean logDEBUG;
+
+ private final MaxLock maxLock = new MaxLock();
- private final Object maxLock = new Object();
- private int desiredMax;
/**
* @param tg ThreadGroup all created threads will belong to
*/
public QThreadFactory(ThreadGroup tg, int desiredMax) {
this.tg = tg;
- this.desiredMax = desiredMax;
+ this.maxLock.setDesiredMax(desiredMax);
this.logDEBUG = Core.logger.shouldLog(Logger.DEBUG, this);
(new Thread(this, "Thread creation thread.")).start();
}
- public void run() {
+ public final void run() {
+ CountSnap snap = new CountSnap();
while (true) {
Throwable lastEx = null;
- int logAvailable = 0, logActive = 0;
- boolean doLog = false;
try {
+ boolean doLog = false;
int required;
- synchronized(countLock) {
- // start with a minimal required value.
- required = (int)(active *
MINIMUM_AVAILABLE_RATIO);
- if ( available < MINIMUM_AVAILABLE_ABS ||
- available < required ) {
- // if we fell below that, set target
higher.
- required =
- Math.max((int) (active * IDEAL_AVAILABLE_RATIO),
- 2 * MINIMUM_AVAILABLE_ABS);
- }
- logAvailable = available;
- logActive = active;
+ countLock.snap(snap);
+ // start with a minimal required value.
+ required = (int)(snap.active *
MINIMUM_AVAILABLE_RATIO);
+ if ( snap.available < MINIMUM_AVAILABLE_ABS ||
+ snap.available < required ) {
+ // if we fell below that, set target higher.
+ required =
+ Math.max((int) (snap.active *
IDEAL_AVAILABLE_RATIO),
+ 2 *
MINIMUM_AVAILABLE_ABS);
}
- while ( available < required ) {
+ while ( snap.available < required ) {
doLog = true;
- createThread();
- synchronized(countLock) {
- // required must not increase.
- required =
- Math.min(required,
-
Math.max((int) (active * IDEAL_AVAILABLE_RATIO),
-
2 * MINIMUM_AVAILABLE_ABS));
- if ( available > required &&
- available > active *
MINIMUM_AVAILABLE_RATIO ) {
- try {
- countLock.wait(500);
- } catch ( InterruptedException
ie ) {
- // active might have
changed.
- // required must not
increase.
- required =
-
Math.min(required,
-
Math.max((int) (active * IDEAL_AVAILABLE_RATIO),
-
2 * MINIMUM_AVAILABLE_ABS));
- }
- }
- logAvailable = available;
- logActive = active;
- }
+ headLock.push(new
QThread(numLock.newThreadNumber()));
+ // getThread could sneak in here,
+ // while available is one too small. That's
ok.
+ countLock.snapFree(snap); // does available++
+ required =
+ Math.min(required, // required must
not increase.
+ Math.max((int)
(snap.active * IDEAL_AVAILABLE_RATIO),
+ 2 *
MINIMUM_AVAILABLE_ABS));
}
if (logDEBUG && doLog)
Core.logger.log(this,"Thread creation thread
past creation loop" +
- ", available:
" + logAvailable +
+ ", available:
" + snap.available +
", required: "
+ required +
- ", active: " +
logActive,
+ ", active: " +
snap.active,
Core.logger.DEBUG);
doLog = false;
int allowed;
- synchronized(countLock) {
- // start with a maximal allowed value.
- allowed = (int)(active *
MAXIMUM_AVAILABLE_RATIO);
- if ( available > 3 * MINIMUM_AVAILABLE_ABS &&
- available > allowed ) {
- // if we are above that, set target
lower.
- allowed =
- Math.max((int) (active * IDEAL_AVAILABLE_RATIO),
- 2 * MINIMUM_AVAILABLE_ABS);
- }
- logAvailable = available;
- logActive = active;
+ countLock.snap(snap);
+ // start with a maximal allowed value.
+ allowed = (int)(snap.active * MAXIMUM_AVAILABLE_RATIO);
+ if ( snap.available > 3 * MINIMUM_AVAILABLE_ABS &&
+ snap.available > allowed ) {
+ // if we are above that, and above an absolute
threshold,
+ // set target lower, but not too low.
+ allowed =
+ Math.max((int) (snap.active *
IDEAL_AVAILABLE_RATIO),
+ 2 *
MINIMUM_AVAILABLE_ABS);
}
- while ( available > allowed ) {
+ // Repeatedly remove a thread from the stack and
signal it to die.
+ // But if all of the threads have meanwhile started
jobs, then
+ // do nothing, and the loop will terminate with
snap.available == 0
+ while ( snap.available > allowed ) {
doLog = true;
- destroyThread();
- synchronized(countLock) {
- // allowed must not decrease.
- allowed =
- Math.max((int) (active *
IDEAL_AVAILABLE_RATIO),
-
Math.max(allowed,
-
2 * MINIMUM_AVAILABLE_ABS));
- if ( available < allowed &&
- available < active *
MAXIMUM_AVAILABLE_RATIO ) {
- try {
- countLock.wait(500);
- } catch ( InterruptedException
ie ) {
- // active might have
changed.
- // allowed must not
decrease.
- allowed =
- Math.max((int)
(active * IDEAL_AVAILABLE_RATIO),
-
Math.max(allowed,
-
2 * MINIMUM_AVAILABLE_ABS));
- }
- }
- logAvailable = available;
- logActive = active;
- }
- }
+ if (countLock.snapTake(snap)) {
+ // getThread can sneak in here
repeatedly,
+ // but it can never make
headLock.pop() be null.
+ headLock.pop().die();
+ }
+ allowed =
+ Math.max((int) (snap.active *
IDEAL_AVAILABLE_RATIO),
+ Math.max(allowed, //
allowed must not decrease.
+ 2 *
MINIMUM_AVAILABLE_ABS));
+ }
if (logDEBUG && doLog)
Core.logger.log(this,"Thread creation thread
past destruction loop" +
- ", available:
" + logAvailable +
+ ", available:
" + snap.available +
", allowed: "
+ allowed +
- ", active: " +
logActive,
+ ", active: " +
snap.active,
Core.logger.DEBUG);
-
+
} catch (Throwable e) {
- if (lastEx == null || !lastEx.getClass().equals(e.getClass()))
- synchronized(countLock) {
- logAvailable = available;
- logActive = active;
- }
- Core.logger.log(this, "Exception in QThreadFactory. "
- + logAvailable + " threads available ,"
- + logActive + " running. Top: " + headerThread,
+ if (lastEx == null || !lastEx.getClass().equals(e.getClass())) {
+ countLock.snap(snap);
+ Core.logger.log(this, "Exception in
QThreadFactory. "
+ +
snap.available + " threads available ,"
+ + snap.active
+ " running. "
+ + "Top: " +
headLock.headerThread,
e,
Core.logger.ERROR);
+ }
lastEx = e;
try {
Thread.currentThread().sleep(20); // avoid runaway loop.
} catch (InterruptedException e2) {}
}
- synchronized(countLock) {
- try {
- countLock.wait(500);
- } catch ( InterruptedException ie ) {}
- }
+ countLock.waitForNotify(1000);
}
}
/**
- * @return the target maximum executing jobs
+ * @return the target maximum executing jobs.
* Caller may use this, together with
* activeThreads(), to determine load.
* This value is decreased by a thread
- * which dies on out of memory exception.
+ * which dies on outOfMemory exception.
*/
public final int maximumThreads() {
- synchronized (maxLock) {
- return desiredMax;
- }
+ return maxLock.desiredMax();
}
/**
* @return the number of currently executing jobs
*/
public final int activeThreads() {
- return active;
+ CountSnap snap = new CountSnap();
+ countLock.snap(snap);
+ return snap.active;
}
/**
- * @return the instantaneous number of idle threads;
- * may return an underestimate, even < 0.
+ * @return the instantaneous number of idle threads
*/
public final int availableThreads() {
- return available;
+ CountSnap snap = new CountSnap();
+ countLock.snap(snap);
+ return snap.available;
}
/**
* @param job The job to be executed
+ * @return the thread which was popped or created.
*/
public final Thread getThread(Runnable job) {
- boolean gotHead;
QThread thread = null;
- synchronized(countLock) {
- active++;
- available--; // never an overestimate
- }
- synchronized(headLock) {
- // hopefully this will not happen often.
- if (headerThread == null) {
- gotHead = false;
- } else {
- gotHead = true;
- thread = headerThread;
- headerThread = headerThread.next;
- }
- }
- if (!gotHead) {
- long newThreadNumber;
- synchronized(numLock) {
- newThreadNumber = ++threadNumber;
- }
- synchronized(countLock) {
- available++; // correct the underestimate.
- }
- thread = new QThread(newThreadNumber);
- }
+ if (countLock.makeItMyself()) {
+ thread = new QThread(numLock.newThreadNumber());
+ } else {
+ thread = headLock.pop(); // thread cannot be null.
+ }
thread.next = null;
thread.job = job;
thread.start();
- awaken();
+ countLock.mightNotify();
return thread;
}
- /**
- * Creates a thread and adds it to the stack.
- */
- private final void createThread() {
- QThread newThread;
- long newThreadNumber;
- synchronized(numLock) {
- newThreadNumber = ++threadNumber;
- }
- newThread = new QThread(newThreadNumber);
- synchronized(headLock) {
- newThread.next = headerThread;
- headerThread = newThread;
- }
- synchronized(countLock) {
- available++; // never an overestimate.
- }
- }
-
- /**
- * Removes a thread from the stack and signals it to die.
- * But if all of the threads have meanwhile started jobs,
- * then does nothing.
- */
- private final void destroyThread() {
- QThread dyingThread;
- synchronized(countLock) {
- if (available < 1) return;
- available--; // never an overestimate.
- }
- synchronized(headLock) {
- dyingThread = headerThread;
- headerThread = headerThread.next;
- }
- dyingThread.die();
- }
-
- /**
- * Returns a thread to the stack when it is finished executing.
- */
- private final void returnThread(QThread returnThread) {
- synchronized(headLock) {
- returnThread.next = headerThread;
- headerThread = returnThread;
- }
- synchronized(countLock) {
- active--;
- available++; // never an overestimate
- }
- }
-
- private final void awaken() {
- synchronized(countLock) {
- if ( ( available < MINIMUM_AVAILABLE_ABS) ||
- ( available < active * MINIMUM_AVAILABLE_RATIO) ||
- ( ( available > (3 * MINIMUM_AVAILABLE_ABS)) &&
- ( available > active * MAXIMUM_AVAILABLE_RATIO))) {
- countLock.notifyAll();
- }
- }
- }
-
private final class QThread extends Thread implements PooledThread {
private QThread next; // link for stack of available threads.
@@ -343,46 +225,149 @@
} catch (OutOfMemoryError e) {
Core.logger.log(this, "Got out of memory error, " +
"decreasing maximum threads by 5 from ~"+
-
desiredMax+".", e, Core.logger.ERROR);
+
maxLock.desiredMax()+".",
+ e,
Core.logger.ERROR);
freenet.node.Main.dumpInterestingObjects();
- synchronized(maxLock) {
- desiredMax = Math.max(desiredMax - 5, 0);
- }
+ maxLock.decrementDesiredMax(5);
} catch (Throwable e) {
Core.logger.log(this, "Unhandled throw in job: "+e,
e, Core.logger.ERROR);
} finally {
jobsDone++;
job = null;
- if (alive.state()) // this should always be true
- returnThread(this);
+ if (alive.state()) {
+ headLock.push(this); // first
push it
+ countLock.free(); // then
announce it is there.
+ } // else the thread will exit now.
}
}
}
Core.diagnostics.occurrenceContinuous("jobsPerQThread", jobsDone);
+ CountSnap snap = new CountSnap();
+ countLock.snap(snap);
if (logDEBUG)
- Core.logger.log(this, getName() + " ended. " +
available +
- "threads available.",
Core.logger.DEBUG);
+ Core.logger.log(this, getName() + " ended. " +
+ snap.available +
"threads available." +
+ snap.active + "threads
active.",
+ Core.logger.DEBUG);
}
/**
- * You should probably only call this when the QThread is idle.
+ * Called only from QThreadFactory.run() in this context:
+ * if (countLock.snapTake(snap)) {
+ * headLock.pop().die();
+ * }
*/
- protected synchronized final void die() {
+ synchronized final void die() {
if (alive.state()) {
alive.change();
this.notify();
}
}
}
-
- private class NumLock {}
- private class HeadLock {}
- private class CountLock {}
-}
+ private final class MaxLock {
+ private int desiredMax;
+ synchronized void setDesiredMax(int desiredMax) {
+ this.desiredMax = desiredMax;
+ }
+ synchronized int desiredMax() {
+ return this.desiredMax;
+ }
+ synchronized void decrementDesiredMax(int decr) {
+ this.desiredMax = Math.max(this.desiredMax - decr, 0);
+ }
+ }
+
+ private final class NumLock {
+ private int threadNumber = 0;
+ synchronized int newThreadNumber() {
+ return threadNumber++;
+ }
+ }
+
+ private final class HeadLock {
+ QThread headerThread = null;
+ synchronized void push(QThread thread) {
+ thread.next = headerThread;
+ headerThread = thread;
+ }
+ synchronized QThread pop() {
+ QThread thread = headerThread;
+ headerThread = thread.next;
+ return thread;
+ }
+ }
+
+ private final class CountSnap {
+ int active = 0;
+ int available = 0;
+ }
+
+ private final class CountLock {
+ private int active = 0;
+ // available is always either the number of threads on
headLock.headerThread,
+ // or else one fewer, in the case where it has been decremented but the
+ // thread hasn't been removed yet.
+ private int available = 0;
+
+ synchronized void snap(CountSnap snap) {
+ snap.active = active;
+ snap.available = available;
+ }
+
+ synchronized void snapFree(CountSnap snap) {
+ available++; // thread was already pushed onto stack.
+ snap.active = active;
+ snap.available = available;
+ }
+ synchronized boolean snapTake(CountSnap snap) {
+ boolean gotOne = false;
+ if (available >= 1) { // There is at least one on stack.
+ available--; // Make sure it stays that way,
+ gotOne = true; // even if getThread sneaks in.
+ }
+ snap.active = active;
+ snap.available = available;
+ return gotOne;
+ }
+
+ // Before calling free, available is one too small
+ // compared to the actual state of headLock.headerThread.
+ // QThreadFactory.run() or getThread() could sneak in before this.
+ synchronized void free() {
+ active--;
+ available++; // thread is already on the stack
+ }
+ synchronized boolean makeItMyself() {
+ boolean makeItMyself = false;
+ active++;
+ if (available > 0) {
+ available--;
+ } else {
+ makeItMyself = true;
+ }
+ return makeItMyself;
+ }
+ synchronized void mightNotify() {
+ if ( ( available < MINIMUM_AVAILABLE_ABS) ||
+ ( available < active * MINIMUM_AVAILABLE_RATIO) ||
+ ( ( available > (3 * MINIMUM_AVAILABLE_ABS)) &&
+ ( available > active * MAXIMUM_AVAILABLE_RATIO))) {
+ countLock.notifyAll();
+ }
+ }
+
+ synchronized void waitForNotify(int maxDelayMillis) {
+ try {
+ countLock.wait(maxDelayMillis);
+ } catch ( InterruptedException ie ) {}
+ }
+
+ }
+}
_______________________________________________
cvs mailing list
[EMAIL PROTECTED]
http://dodo.freenetproject.org/cgi-bin/mailman/listinfo/cvs