Update of /cvsroot/freenet/freenet/src/freenet/thread
In directory sc8-pr-cvs1:/tmp/cvs-serv3267
Added Files:
YThreadFactory.java
Log Message:
Yet another thread factory
--- NEW FILE: YThreadFactory.java ---
/* -*- Mode: java; c-basic-indent: 4; tab-width: 4 -*- */
package freenet.thread;
import freenet.Core;
import freenet.support.Irreversible;
import freenet.support.Logger;
/**
* A derivative of QThreadFactory that queues jobs when it runs
* out of threads. All jobs go onto the queue, and are removed
* by idle threads.
*
* Unnecessary creation of threads is avoided in two ways.
* First, the thread deletion rate is limited. This avoids
* the overhead of deleting the thread and then creating
* it again right away. Second, thread creation is delayed
* so long as the oldest job on the queue hasn't been there
* more than a specified time.
*
* @author ejhuff
*/
public final class YThreadFactory implements ThreadFactory {
/**
* Thread creation parameters:
*
* If available falls below either minimum, thread creation is
* allowed. Only one thread creates threads at a time, and that
* thread waits until the oldest job on queue is old enough.
*
* The minimum available (total - active) threads.
*
* The minimum available/active ratio.
*
* The minimum age in millis of the oldest job on queue before a
* new thread is actually created.
*
*/
private static final int threadCreationThreshold = 9;
private static final double threadCreationRatio = 0.1;
private static final long tolerableQueueDelay = 100;
/**
* Thread deletion parameters:
*
* Both of these maximums must be exceeded for deletion to start:
*
* The maximum available (total - active) threads.
*
* The maximum available/active ratio.
*
* The minimum delay in millis between thread deletions.
*
*/
private static final int threadDeletionThreshold = 27;
private static final double threadDeletionRatio = 1;
private static final long threadDeletionDelay = 1000; // 1 per second
private final ThreadGroup tg;
private final JobQueue jobQueue = new JobQueue();
private final boolean logDEBUG;
private final int targetMaxThreads;
private final String prefix;
/**
* @param tg ThreadGroup all created threads will belong to
* @param targetMaxThreads avoid creating more than this number of threads
*/
public YThreadFactory(ThreadGroup tg, int targetMaxThreads, String prefix) {
this.tg = tg;
this.targetMaxThreads = targetMaxThreads;
this.prefix = prefix;
this.logDEBUG = Core.logger.shouldLog(Logger.DEBUG, this);
new YThread();
}
/**
* @return the target maximum executing jobs.
* Caller may use this, together with
* activeThreads(), to determine load.
* This value is decreased by a thread
* which dies on outOfMemory exception.
*/
public final int maximumThreads() {
return targetMaxThreads;
}
/**
* @return the number of currently executing jobs
*/
public final int activeThreads() {
JobQueueSnapshot snap = new JobQueueSnapshot();
jobQueue.snap(snap);
return snap.active;
}
/**
* @return the instantaneous number of idle threads
*/
public final int availableThreads() {
JobQueueSnapshot snap = new JobQueueSnapshot();
jobQueue.snap(snap);
return snap.available;
}
/**
* @param job The job to be executed.
* @return null
*/
public final Thread getThread(Runnable job) {
jobQueue.enqueue(job);
return null;
}
private final class YThread extends Thread implements PooledThread {
private int jobsDone = 0;
private long maxQueueDelay = 0;
private long sumQueueDelay = 0;
private Runnable job = null;
public YThread() {
super(tg, "YThread-unnamed-as-yet");
super.start();
}
public Runnable job() {
return job;
}
public final void run() {
jobQueue.newThreadStarting();
JobQueueResult result = new JobQueueResult();
while (true) {
jobQueue.dequeue(result);
if (result.job == null) {
if (result.exit) {
// An unneeded thread exits.
double avgQueueDelay =
(double)sumQueueDelay;
Core.diagnostics.occurrenceContinuous
("jobsPerYThread", jobsDone);
if (jobsDone > 0) {
avgQueueDelay /= jobsDone;
Core.diagnostics.occurrenceContinuous
("maxQueueDelayThisYThread", maxQueueDelay);
Core.diagnostics.occurrenceContinuous
("avgQueueDelayThisYThread", avgQueueDelay); //same
}
if (logDEBUG)
Core.logger.log(this,
Thread.currentThread().getName() + " ended. " +
result.available + " threads available. " +
result.active + " threads active. " +
jobsDone + " jobs done. " +
maxQueueDelay + " max queue delay. " +
avgQueueDelay + " avg queue delay.",
Core.logger.DEBUG);
return;
}
// (result.job == null && ! result.exit) means
we
// should create and start one new thread, but
// only when the deadline has arrived.
while (true) {
long timeBeforeDeadline =
jobQueue.timeBeforeDeadline();
if (timeBeforeDeadline <= 0) break;
try {
Thread.sleep(timeBeforeDeadline);
} catch (InterruptedException e) {}
}
new YThread();
} else { // (result.job != null) means we have a job
to do.
job = result.job;
sumQueueDelay += result.queueDelay;
if (maxQueueDelay < result.queueDelay) {
maxQueueDelay = result.queueDelay;
}
try {
Core.diagnostics.occurrenceContinuous("jobQueueDelayAllYThreads",
result.queueDelay);
//
Core.diagnostics.occurrenceCounting("jobsExecuted", 1);
job.run();
} catch (Throwable e) {
Core.logger.log(this, "Unhandled
exception " + e + " in job " + job,
e,
Core.logger.ERROR);
freenet.node.Main.dumpInterestingObjects();
} finally {
jobsDone++;
job = null;
}
}
}
}
}
private final class JobQueueSnapshot {
int active = 0;
int available = 0;
}
private final class JobQueueResult {
Runnable job = null;
boolean exit = false;
int active = 0;
int available = 0;
long queueDelay = 0;
}
private final class JobQueue {
// information about threads
private int total = 0;
private int active = 0;
private int available = 0; // total == active + available
private int threadNumber = 0;
private long nextThreadDeletionTime = 0;
private boolean notCreatingThreads = true;
// information about the job queue.
private final JobQueueItem head = new JobQueueItem();
private JobQueueItem tail = head;
private JobQueueItem free = null;
private int freeListLength = 0;
private final class JobQueueItem {
JobQueueItem next = null;
Runnable job = null;
long enqueueTime = 0;
}
// A new thread has started. Account for it.
private synchronized void newThreadStarting() {
total++;
active++; // active + available == total
Thread.currentThread().setName(prefix + (threadNumber++));
notCreatingThreads = true;
}
private synchronized void snap(JobQueueSnapshot snap) {
snap.active = active;
snap.available = available;
}
// called only from getThread()
private synchronized void enqueue(Runnable job) {
if (free == null) {
free = new JobQueueItem();
freeListLength++;
}
JobQueueItem mine = free;
free = free.next;
freeListLength--;
mine.next = null;
mine.job = job;
mine.enqueueTime = System.currentTimeMillis();
tail.next = mine;
tail = mine;
this.notify(); // wake a thread
}
// called only from YThread.run()
private synchronized long timeBeforeDeadline() {
if (head.next == null) return tolerableQueueDelay;
return tolerableQueueDelay + head.next.enqueueTime -
System.currentTimeMillis();
}
// called only from YThread.run()
private synchronized void dequeue(JobQueueResult result) {
active--;
available++; // active + available == total
while (true) {
long now = System.currentTimeMillis();
long deleteDelay = 0;
// if we need more threads
if ( ( available < threadCreationThreshold ) ||
( available < active * threadCreationRatio ))
{
// if not already creating a thread
if (notCreatingThreads) {
// then make one new thread.
notCreatingThreads = false;
active++;
available--; // active + available ==
total
result.active = active;
result.available = available;
result.exit = false;
result.job = null;
result.queueDelay = 0;
this.notify(); // wake another thread
in case there is work
return;
}
}
if (head.next != null) {
active++;
available--; // active + available == total
JobQueueItem mine = head.next;
head.next = mine.next;
if (head.next == null) tail = head;
result.job = mine.job;
result.queueDelay = now - mine.enqueueTime;
mine.job = null;
mine.enqueueTime = 0;
mine.next = free;
free = mine;
freeListLength++;
while (freeListLength > 10000) {
mine = free;
free = free.next;
freeListLength--;
mine.next = null;
// mine will be garbage collected later
}
result.active = active;
result.available = available;
result.exit = false;
this.notify(); // wake the next thread
return;
}
// No jobs left on queue to run.
// If thread deletion is allowed at this time
// and if there are too many threads, delete one.
if (available > threadDeletionThreshold) {
if (available > active * threadDeletionRatio) {
deleteDelay = nextThreadDeletionTime -
now;
if (deleteDelay <= 0) {
total--;
available--; // active +
available == total
nextThreadDeletionTime = now +
threadDeletionDelay;
result.active = active;
result.available = available;
result.exit = true;
result.job = null;
result.queueDelay = 0;
this.notify();
return;
}
}
}
try {
// idle threads wait here
if (deleteDelay > 0) {
// wait until it is time to consider
deleting
// threads or until more work comes in.
this.wait(deleteDelay);
} else {
this.wait(); // no thread deletion
presently planned
}
} catch ( InterruptedException ie ) {}
}
}
}
}
_______________________________________________
cvs mailing list
[EMAIL PROTECTED]
http://dodo.freenetproject.org/cgi-bin/mailman/listinfo/cvs