Update of /cvsroot/freenet/freenet/src/freenet/thread
In directory sc8-pr-cvs1:/tmp/cvs-serv25857/src/freenet/thread

Modified Files:
      Tag: stable
        FastThreadFactory.java QThreadFactory.java ThreadFactory.java 
Added Files:
      Tag: stable
        ThreadStatusSnapshot.java YThreadFactory.java 
Log Message:
5029: Merge from unstable after months of work. MASSIVE changes.
Highlights:
* Next Generation Routing, massive related changes
* Major changes to handling of messages and connections (PeerHandler and related 
changes)
* Even more non-blocking I/O
* Documentation improvements
* Lots of new diagnostics and config options
* Lots of bug fixes and performance tweaking
* Probably lots of new bugs too!


--- NEW FILE: ThreadStatusSnapshot.java ---
/* -*- Mode: java; c-basic-indent: 4; tab-width: 4 -*- */
/*
 * Created on Sep 28, 2003
 *
 */
package freenet.thread;

import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.Hashtable;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;


/**
 * @author Iakin
 * Takes a snapshot of the current thread hiearchy and allow the user to access 
various statuc information
 * as well as dumping some stats to HTML 
 *
 */
public class ThreadStatusSnapshot {
        Hashtable consumers = new Hashtable();
        PoolThreadCount tc = new PoolThreadCount();
        group root;
        public ThreadStatusSnapshot(){
                ThreadGroup tg = Thread.currentThread().getThreadGroup().getParent();
                ThreadGroup topMost = null;
                while(tg != null) {
                        topMost = tg;
                        tg = tg.getParent();
                }
                root = new group(topMost);
        }
        
        public Hashtable getPoolConsumers(){
                return consumers;
        }
        /* Returns the total number of/the number of available threads in freds 
threadPool*/
        public PoolThreadCount getPoolThreadCounts(){
                return tc;
        }
        private void countPooledThread(PooledThread t) { //TODO: Wouldn't it be better 
to just ask the threadpool for the information counted here?
                tc.totalPooled++;
                Runnable job = t.job();
                if (job != null)
                        countConsumer(job);
                else
                        tc.pooledAvailable++;
        }
                
        private void countConsumer(Runnable job){
                String type = job.toString();
                type = type.substring(0, type.indexOf("@"));
                Integer con = (Integer) consumers.get(type);
                if (con == null) {
                        con = new Integer(1);
                        consumers.put(type, con);
                }else
                        consumers.put(type, new Integer(con.intValue()+1)); //TODO: 
There must be a better way to do this...
        }
        public static class PoolThreadCount {
                int totalPooled = 0;
                int pooledAvailable = 0;
        }
        
        class group{
                String groupName;
                List lThreads = new LinkedList();
                List lSubGRoups = new LinkedList();
                
                group(ThreadGroup group){
                        groupName = group.getName();
                        Thread[] aThreads = new Thread[group.activeCount()];
                        int threads = group.enumerate(aThreads, false);
                        for(int j=0; j<threads; j++){
                                Thread t = aThreads[j];
                                if(t != null) //Yes.. sometimes this can happen /Iakin
                                        lThreads.add(new ThreadInfo(t));
                        }
                        ThreadGroup[] groups = new 
ThreadGroup[group.activeGroupCount()];
                        int groupcount=group.enumerate(groups, false);
                        for(int i=0; i<groupcount; i++)
                                lSubGRoups.add(new group(groups[i]));
                }
                
                void toHTML(StringBuffer buffer){
                        buffer.append("\n<li><b>" + groupName + "</b><ul>");
                        Iterator it = lThreads.iterator();
                        while(it.hasNext()){
                                ThreadInfo t = (ThreadInfo)it.next();
                                buffer.append("\n<li>" +t.name);
                                if(t.jobString != null)
                                        buffer.append(": " + t.jobString);
                        }
                        it = lSubGRoups.iterator();
                        while(it.hasNext())
                                ((group)it.next()).toHTML(buffer);
                        buffer.append("</ul>");
                }
                
                class ThreadInfo{
                        String name;
                        String jobString;
                        ThreadInfo(Thread t){
                                name = t.getName();
                                if (t instanceof PooledThread) {
                                        Runnable r = ((PooledThread)t).job();
                                        if(r != null)
                                                jobString = r.toString();
                                        countPooledThread((PooledThread)t);
                                }
                        }
                }
        }
        
        public String threadTreeToHTML(){
                StringBuffer buffer = new StringBuffer();
                root.toHTML(buffer);
                return buffer.toString();
        }
        
        public String threadStatusToHTML(){
                StringWriter ssw = new StringWriter(200);
                PrintWriter sw = new PrintWriter(ssw);
                sw.println("<table width=\"100%\">");
                sw.println("<tr><td>Total pooled threads</td><td align=right>" + 
tc.totalPooled + "</td></tr>");
                sw.println("<tr><td>Available pooled threads</td><td align=right>" + 
tc.pooledAvailable + "</td></tr>");
                sw.println("<tr><td>Pooled threads in use</td><td align=right>" + 
(tc.totalPooled - tc.pooledAvailable) + "</td></tr>");
                sw.println("</table>");
                return ssw.toString();
        }
        
        public String poolConsumersToHTML(){
                StringWriter ssw = new StringWriter(200);
                PrintWriter sw = new PrintWriter(ssw);
                sw.println("<table width=\"100%\">");
                sw.println("<tr><th align=\"left\">Class</th><th 
align=\"right\">Threads used</th>");
                Object[] types = getPoolConsumers().keySet().toArray();
                java.util.Arrays.sort(types);
                for (int x = 0; x < types.length; x++) {
                        sw.println
                                ("<tr><td>" + types[x] + 
                                 "</td><td align=\"right\">" +
                                 ((Integer) 
getPoolConsumers().get(types[x])).intValue() +
                                 "</td></tr>");
                }
                sw.println("</table>");
                return ssw.toString();
        }
}

--- NEW FILE: YThreadFactory.java ---
/* -*- Mode: java; c-basic-indent: 4; indent-tabs-mode: nil -*- */
package freenet.thread;

import freenet.Core;
import freenet.support.Irreversible;
import freenet.support.Logger;

/**
 * A derivative of QThreadFactory that queues jobs when it runs
 * out of threads.  All jobs go onto the queue, and are removed
 * by idle threads.
 *
 * Unnecessary creation of threads is avoided in two ways.
 * First, the thread deletion rate is limited.  This avoids
 * the overhead of deleting the thread and then creating
 * it again right away.  Second, thread creation is delayed
 * so long as the oldest job on the queue hasn't been there
 * more than a specified time.
 *
 * @author ejhuff
 */
public final class YThreadFactory implements ThreadFactory {
    
    /** 
     * Thread creation parameters:
     *
     * If available falls below either minimum, thread creation is
     * allowed.  Only one thread creates threads at a time, and that
     * thread waits until the oldest job on queue is old enough.
     *
     * The minimum available (total - active) threads.
     *
     * The minimum available/active ratio.
     *
     * The minimum age in millis of the oldest job on queue before a
     * new thread is actually created.
     *
     */

    private static final int threadCreationThreshold = 9;

    private static final double threadCreationRatio = 0.1;

    private final long tolerableQueueDelay;
    

    /** 
     * Thread deletion parameters:
     *
     * Both of these maximums must be exceeded for deletion to start:
     *
     * The maximum available (total - active) threads.
     *
     * The maximum available/active ratio.
     *
     * The minimum delay in millis between thread deletions.
     *
     */
    private static final int threadDeletionThreshold = 27;

    private static final double threadDeletionRatio = 1;

    private static final long threadDeletionDelay = 1000; // 1 per second
    
    private final ThreadGroup tg;
    
    private final JobQueue jobQueue = new JobQueue();
    
    private final boolean logDEBUG;
        
    private final int targetMaxThreads;

    private final String prefix;

    /**
     * @param tg                  ThreadGroup all created threads will belong to.
     * @param targetMaxThreads    Avoid creating more than this number of threads.
     * @param prefix              Threads are named as prefix + threadNumber.
     * @param tolerableQueueDelay Don't create new thread until oldest job
     *                            on queue is older than this (milliseconds).
     */
    public YThreadFactory(ThreadGroup tg, int targetMaxThreads, 
                          String prefix, int tolerableQueueDelay) {
        this.tg = tg;
        this.targetMaxThreads = targetMaxThreads;
        this.prefix = prefix;
        this.tolerableQueueDelay = (long) tolerableQueueDelay;
        this.logDEBUG = Core.logger.shouldLog(Logger.DEBUG, this);
        new YThread();
    }
    
    /**
     * @return  the target maximum executing jobs.
     *          Caller may use this, together with
     *          activeThreads(), to determine load.
     *          This value is decreased by a thread
     *          which dies on outOfMemory exception.
     */
    public final int maximumThreads() {
        return targetMaxThreads;
    }
    
    /**
     * @return  the number of currently executing jobs
     */
    public final int activeThreads() {
        JobQueueSnapshot snap = new JobQueueSnapshot();
        jobQueue.snap(snap);
        return snap.active;
    }
    
    /**
     * @return  the instantaneous number of idle threads
     */
    public final int availableThreads() {
        JobQueueSnapshot snap = new JobQueueSnapshot();
        jobQueue.snap(snap);
        return snap.available;
    }
    
    /**
     * @param job  The job to be executed.
     * @return null
     */
    public final Thread getThread(Runnable job) {
        jobQueue.enqueue(job); 
        return null;
    }
    
    private final class YThread extends Thread implements PooledThread {
        
        private int jobsDone = 0;
        private long maxQueueDelay = 0;
        private long sumQueueDelay = 0;
        private Runnable job = null;

        public YThread() {
            super(tg, "YThread-unnamed-as-yet");
            super.start();
        }

        public Runnable job() {
            return job;
        }
        
        public final void run() {    
            jobQueue.newThreadStarting();
            JobQueueResult result = new JobQueueResult();
            while (true) {
                jobQueue.dequeue(result);
                if (result.job == null) {
                    if (result.exit) {
                        // An unneeded thread exits.
                        double avgQueueDelay = (double)sumQueueDelay;
                        Core.diagnostics.occurrenceContinuous
                            ("jobsPerYThread", jobsDone);
                        if (jobsDone > 0) {
                            avgQueueDelay /= jobsDone;
                            Core.diagnostics.occurrenceContinuous
                                ("maxQueueDelayThisYThread", maxQueueDelay);
                            Core.diagnostics.occurrenceContinuous
                                ("avgQueueDelayThisYThread", avgQueueDelay); //same
                        }
                        if (logDEBUG)
                            Core.logger.log(this, Thread.currentThread().getName() + " 
ended. " + 
                                            result.available + " threads available. " +
                                            result.active + " threads active. " +
                                            jobsDone + " jobs done. " +
                                            maxQueueDelay + " max queue delay. " +
                                            avgQueueDelay + " avg queue delay.",
                                            Core.logger.DEBUG);
                        return;
                    }
                    // (result.job == null && ! result.exit) means we
                    // should create and start one new thread, but
                    // only when the deadline has arrived.
                    while (true) {
                        long timeBeforeDeadline = jobQueue.timeBeforeDeadline();
                        if (timeBeforeDeadline <= 0) break;
                        try {
                            Thread.sleep(timeBeforeDeadline);
                        } catch (InterruptedException e) {}
                    }
                    new YThread();
                } else { // (result.job != null) means we have a job to do.
                    job = result.job;
                    sumQueueDelay += result.queueDelay;
                    if (maxQueueDelay < result.queueDelay) {
                        maxQueueDelay = result.queueDelay;
                    }
                    try {
                        
Core.diagnostics.occurrenceContinuous("jobQueueDelayAllYThreads", 
                                                              result.queueDelay);
                        Core.diagnostics.occurrenceCounting("jobsExecuted", 1);
                        job.run();
                    } catch (Throwable e) {
                        Core.logger.log(this, "Unhandled exception " + e + " in job " 
+ job,
                                        e, Core.logger.ERROR);
                        freenet.node.Main.dumpInterestingObjects();
                    } finally {
                        jobsDone++;
                        job = null;
                    }
                }
            }
        }

    }
    
    private final class JobQueueSnapshot {
        int active = 0;
        int available = 0;
    }

    private final class JobQueueResult {
        Runnable job = null;
        boolean exit = false;
        int active = 0;
        int available = 0;
        long queueDelay = 0;
    }
    
    private final class JobQueue {
        // information about threads
        private int total = 0;
        private int active = 0;
        private int available = 0; // total == active + available
        private int threadNumber = 0;
        private long nextThreadDeletionTime = 0;
        private boolean notCreatingThreads = true;

        // information about the job queue.
        private final JobQueueItem head = new JobQueueItem();
        private JobQueueItem tail = head;
        private JobQueueItem free = null;
        private int freeListLength = 0;

        private final class JobQueueItem {
            JobQueueItem next = null;
            Runnable job = null;
            long enqueueTime = 0;
        }

        // A new thread has started.  Account for it.
        private synchronized void newThreadStarting() {
            total++;
            active++; // active + available == total
            Thread.currentThread().setName(prefix + (threadNumber++));
            notCreatingThreads = true;
        }

        private synchronized void snap(JobQueueSnapshot snap) {
            snap.active = active;
            snap.available = available;
        }
        
        // called only from getThread()
        private synchronized void enqueue(Runnable job) {
            if (free == null) {
                free = new JobQueueItem();
                freeListLength++;
            }
            JobQueueItem mine = free;
            free = free.next;
            freeListLength--;
            mine.next = null;
            mine.job = job;
            mine.enqueueTime = System.currentTimeMillis();
            tail.next = mine;
            tail = mine;
            this.notify(); // wake a thread
        }

        // called only from YThread.run()
        private synchronized long timeBeforeDeadline() {
            if (head.next == null) return tolerableQueueDelay;
            return tolerableQueueDelay + head.next.enqueueTime -
                System.currentTimeMillis();
        }

        // called only from YThread.run()
        private synchronized void dequeue(JobQueueResult result) {
            active--;
            available++; // active + available == total
            while (true) {
                long now = System.currentTimeMillis();
                long deleteDelay = 0;
                // if we need more threads
                if ( ( available < threadCreationThreshold ) ||
                     ( available < active * threadCreationRatio )) {
                    // if not already creating a thread
                    if (notCreatingThreads) {
                        // then make one new thread.
                        notCreatingThreads = false;
                        active++;
                        available--; // active + available == total
                        result.active = active;
                        result.available = available;
                        result.exit = false;
                        result.job = null;
                        result.queueDelay = 0;
                        this.notify(); // wake another thread in case there is work
                        return;
                    }
                }
                if (head.next != null) {
                    active++;
                    available--; // active + available == total
                    JobQueueItem mine = head.next;
                    head.next = mine.next;
                    if (head.next == null) tail = head;
                    result.job = mine.job;
                    result.queueDelay = now - mine.enqueueTime;
                    mine.job = null;
                    mine.enqueueTime = 0;
                    mine.next = free;
                    free = mine;
                    freeListLength++;
                    while (freeListLength > 10000) {
                        mine = free;
                        free = free.next;
                        freeListLength--;
                        mine.next = null;
                        // mine will be garbage collected later
                    }
                    result.active = active;
                    result.available = available;
                    result.exit = false;
                    this.notify(); // wake the next thread
                    return;
                }
                // No jobs left on queue to run.
                // If thread deletion is allowed at this time
                // and if there are too many threads, delete one.
                if (available > threadDeletionThreshold) {
                    if (available > active * threadDeletionRatio) {
                        deleteDelay = nextThreadDeletionTime - now;
                        if (deleteDelay <= 0) {
                            total--;
                            available--; // active + available == total
                            nextThreadDeletionTime = now + threadDeletionDelay;
                            result.active = active;
                            result.available = available;
                            result.exit = true;
                            result.job = null;
                            result.queueDelay = 0;
                            this.notify();
                            return;
                        }
                    }
                }
                try {
                    // idle threads wait here
                    if (deleteDelay > 0) {
                        // wait until it is time to consider deleting
                        // threads or until more work comes in.
                        this.wait(deleteDelay);
                    } else {
                        this.wait(); // no thread deletion presently planned
                    }
                } catch ( InterruptedException ie ) {}
            }
        }

    }
}

Index: FastThreadFactory.java
===================================================================
RCS file: /cvsroot/freenet/freenet/src/freenet/thread/FastThreadFactory.java,v
retrieving revision 1.18.2.2.2.1
retrieving revision 1.18.2.2.2.2
diff -u -w -r1.18.2.2.2.1 -r1.18.2.2.2.2
--- FastThreadFactory.java      4 Jul 2003 02:45:10 -0000       1.18.2.2.2.1
+++ FastThreadFactory.java      28 Oct 2003 20:20:47 -0000      1.18.2.2.2.2
@@ -71,6 +71,7 @@
         FThread(int num, Runnable job) {
             super(tg, "FThread-" + num);
             this.job = job;
+            super.setPriority(Thread.NORM_PRIORITY);
             super.start();
         }
 

Index: QThreadFactory.java
===================================================================
RCS file: /cvsroot/freenet/freenet/src/freenet/thread/QThreadFactory.java,v
retrieving revision 1.19.2.2.2.5
retrieving revision 1.19.2.2.2.6
diff -u -w -r1.19.2.2.2.5 -r1.19.2.2.2.6
--- QThreadFactory.java 29 Jul 2003 21:08:09 -0000      1.19.2.2.2.5
+++ QThreadFactory.java 28 Oct 2003 20:20:47 -0000      1.19.2.2.2.6
@@ -1,3 +1,4 @@
+/* -*- Mode: java; c-basic-indent: 4; tab-width: 4 -*- */
 package freenet.thread;
 
 import freenet.Core;
@@ -11,6 +12,8 @@
  *
  * @author oskar
  * @author tavin
+ * @author lostlogic
+ * @author ejhuff
  */
 public final class QThreadFactory implements ThreadFactory, Runnable {
 
@@ -26,157 +29,166 @@
 
     private final ThreadGroup tg;
 
-    private int active = 0;
-    private int available = 0;
+    private final CountLock countLock = new CountLock();
 
-    private long threadNumber = 0;
+    private final NumLock numLock = new NumLock();
 
-    private QThread headerThread = null;
+    private final HeadLock headLock = new HeadLock();
+       
+       private final boolean logDEBUG;
+       
+    private final MaxLock maxLock = new MaxLock();
 
-    private final Object maxLock = new Object();
-    private int desiredMax;
     /**
      * @param tg     ThreadGroup all created threads will belong to
      */
     public QThreadFactory(ThreadGroup tg, int desiredMax) {
         this.tg = tg;
-        this.desiredMax = desiredMax;
+        this.maxLock.setDesiredMax(desiredMax);
+               this.logDEBUG = Core.logger.shouldLog(Logger.DEBUG, this);
         (new Thread(this, "Thread creation thread.")).start();
     }
 
-    public void run() {
-        synchronized (this) {
+    public final void run() {
+               CountSnap snap = new CountSnap();
             while (true) {
                 Throwable lastEx = null;
                 try {
-
-                    if (available < MINIMUM_AVAILABLE_ABS || 
-                        available < active * MINIMUM_AVAILABLE_RATIO) {
-                        
-                        int desired = 
-                            Math.max((int) (active * IDEAL_AVAILABLE_RATIO),
+                               boolean doLog = false;
+                int required;
+                               countLock.snap(snap);
+                               // start with a minimal required value.
+                               required = (int)(snap.active * 
MINIMUM_AVAILABLE_RATIO);
+                               if ( snap.available < MINIMUM_AVAILABLE_ABS || 
+                                        snap.available < required ) {
+                                       // if we fell below that, set target higher.
+                                       required =
+                                               Math.max((int) (snap.active * 
IDEAL_AVAILABLE_RATIO),
                                      2 * MINIMUM_AVAILABLE_ABS);
-                        while (available < desired) {
-                            createThread();
                         }
-                    } else if (available > (3 * MINIMUM_AVAILABLE_ABS) &&
-                               available > active * MAXIMUM_AVAILABLE_RATIO) {
-                        
-                        int desired = 
-                            Math.max((int) (active * IDEAL_AVAILABLE_RATIO),
+                while ( snap.available < required ) {
+                                       doLog = true;
+                                       headLock.push(new 
QThread(numLock.newThreadNumber()));
+                                       // getThread could sneak in here,
+                                       // while available is one too small.  That's 
ok.
+                                       countLock.snapFree(snap); // does available++
+                                       required = 
+                                               Math.min(required, // required must 
not increase.
+                                                                Math.max((int) 
(snap.active * IDEAL_AVAILABLE_RATIO),
+                                                                                 2 * 
MINIMUM_AVAILABLE_ABS));
+                } 
+                               if (logDEBUG && doLog)
+                                       Core.logger.log(this,"Thread creation thread 
past creation loop" + 
+                                                                       ", available: 
" + snap.available + 
+                                                                       ", required: " 
+ required + 
+                                                                       ", active: " + 
snap.active,
+                                                                       
Core.logger.DEBUG);
+                               doLog = false;
+                int allowed;
+                               countLock.snap(snap);
+                               // Start with a maximal allowed value.
+                               allowed = Math.max((int)(snap.active * 
MAXIMUM_AVAILABLE_RATIO),
                                      2 * MINIMUM_AVAILABLE_ABS);
+                               // Repeatedly remove a thread from the stack and 
signal it to die.
+                               // But if all of the threads have meanwhile started 
jobs, then
+                               // do nothing, and the loop will terminate with 
snap.available == 0
+                while ( snap.available > allowed ) {
+                                       doLog = true;
+                                       if (countLock.snapTake(snap)) {
+                                               // getThread can sneak in here 
repeatedly,
+                                               // but it can never make 
headLock.pop() be null.
+                                               headLock.pop().die();
+                                       }
+                    // Let there be up to relative maximum threads
+                    // sitting around.  This will reduce thread flux a
+                    // lot which should help CPU usage further.  But
+                    // don't let it go below 2 * absolute minimum.
+                                       allowed = 
+                                               Math.max((int) (snap.active * 
MAXIMUM_AVAILABLE_RATIO),
+                                                                Math.max(allowed, // 
allowed must not decrease.
+                                                                                 2 * 
MINIMUM_AVAILABLE_ABS));
+                               } 
+                               if (logDEBUG && doLog)
+                                       Core.logger.log(this,"Thread creation thread 
past destruction loop" + 
+                                                                       ", available: 
" + snap.available + 
+                                                                       ", allowed: " 
+ allowed + 
+                                                                       ", active: " + 
snap.active,
+                                                                       
Core.logger.DEBUG);
                         
-                        
-                        while (available > desired) {
-                            destroyThread();
-                        }
-                    }
-
-                    try {
-                        wait(1000);
-                    } catch (InterruptedException e) {
-                    }
                 } catch (Throwable e) {
-                    if (lastEx == null || !lastEx.getClass().equals(e.getClass()))
+                if (lastEx == null || !lastEx.getClass().equals(e.getClass())) {
+                                       countLock.snap(snap);
                         Core.logger.log(this, "Exception in QThreadFactory. "
-                                        + available + " thread available ," 
-                                        + active + " running. Top: " + headerThread, 
+                                                                       + 
snap.available + " threads available ," 
+                                                                       + snap.active 
+ " running. "
+                                                                       + "Top: " + 
headLock.headerThread, 
                                         e, Core.logger.ERROR);
+                               }
                     lastEx = e;
                     try {
-                        wait(20); // avoid runaway loop.
+                    Thread.currentThread().sleep(20); // avoid runaway loop.
                     } catch (InterruptedException e2) {}
                 }
-            }
+                       countLock.waitForNotify(1000); 
         }
     }
 
+    /**
+     * @return  the target maximum executing jobs.
+        *          Caller may use this, together with
+        *          activeThreads(), to determine load.
+        *          This value is decreased by a thread
+        *          which dies on outOfMemory exception.
+     */
     public final int maximumThreads() {
-        synchronized (maxLock) {
-            return desiredMax;
-        }
+               return maxLock.desiredMax();
     }
 
     /**
      * @return  the number of currently executing jobs
      */
     public final int activeThreads() {
-        return active;
+               CountSnap snap = new CountSnap();
+               countLock.snap(snap);
+        return snap.active;
     }
 
     /**
-     * @return  the instantaneous number of idle threads;
-     *          may return a negative number if the force option
-     *          was used with getThread()
+     * @return  the instantaneous number of idle threads
      */
     public final int availableThreads() {
-        return available;
+               CountSnap snap = new CountSnap();
+               countLock.snap(snap);
+        return snap.available;
     }
 
     /**
      * @param job       The job to be executed
+        * @return the thread which was popped or created.
      */
-    public synchronized final Thread getThread(Runnable job) {
-
-        // hopefully this will not happen often.
-        if (headerThread == null) {
-            createThread();
-        }
-
-        QThread thread = headerThread;
-        headerThread = headerThread.next;
+    public final Thread getThread(Runnable job) {
+        QThread thread = null;
+        if (countLock.makeItMyself()) {
+                       thread = new QThread(numLock.newThreadNumber());
+        } else {
+                       thread = headLock.pop(); // thread cannot be null.
+               }
+               thread.setPriority(Thread.NORM_PRIORITY);
+               // synchronize so that getThread can't sneak in between
+               // test for job == null and this.wait() in thread.run().
+               synchronized (thread) {
         thread.next = null;
         thread.job = job;
-        ++active;
-        --available;
-        thread.start();
-
-        awaken();
-       return thread;
-    }
-
-    /**
-     * Creates a thread and adds it to the stack.
-     */
-    private synchronized final void createThread() {
-        QThread newThread = new QThread(++threadNumber);
-        newThread.next = headerThread;
-        headerThread = newThread;
-        available++;
-    }
-
-    /**
-     * Removes a thread from the stack and signals it to die.
-     */
-    private synchronized final void destroyThread() {
-        QThread dyingThread = headerThread;
-        headerThread = headerThread.next;
-        dyingThread.die();
-        available--;
+                       thread.notify();
     }
 
-    /**
-     * Returns a thread to the stack when it is finished executing.
-     */
-    private synchronized final void returnThread(QThread returnThread) {
-        returnThread.next = headerThread;
-        headerThread = returnThread;
-        active--;
-        available++;
-    }
-
-    private synchronized final void awaken() {
-         if ( ( available < MINIMUM_AVAILABLE_ABS) ||
-              ( available < active * MINIMUM_AVAILABLE_RATIO) || 
-              ( ( available > (3 * MINIMUM_AVAILABLE_ABS)) &&
-                ( available > active * MAXIMUM_AVAILABLE_RATIO))) {
-             notifyAll();
-         }
+               countLock.mightNotify();
+               return thread;
     }
 
     private final class QThread extends Thread implements PooledThread {
 
-        private QThread next;
+        private QThread next; // link for stack of available threads.
     
         private Runnable job = null;
 
@@ -193,16 +205,25 @@
             return job;
         }
 
-        public final synchronized void start() {
-            this.notify();
-        }
-
         public final void run() {
             while (alive.state()) {
                 synchronized (this) {
                     while (alive.state() && job == null) {
+                                               // If getThread didn't synchronize 
(thread),
+                                               // it could sneak in here and change 
job and
+                                               // issue it's notify after the test 
but before
+                                               // the wait.
                         try {
-                            this.wait(200);
+                            // There is no need to timeout this wait,
+                                                       // so long as getThread() 
synchronizes.
+                                                       // getThread() is the only way 
to get a
+                                                       // thread.  It synchronizes 
(thread), then
+                                                       // changes thread.job, and 
only then does
+                                                       // thread.notify().  
thread.die() also
+                                                       // synchronizes, then changes
+                                                       // alive.state(), and only 
then does
+                                                       // thread.notify().
+                            this.wait();
                         }
                         catch (InterruptedException e) {}
                     }
@@ -215,41 +236,153 @@
                     } catch (OutOfMemoryError e) {
                         Core.logger.log(this, "Got out of memory error, " +
                                         "decreasing maximum threads by 5 from ~"+
-                                       desiredMax+".", e, Core.logger.ERROR);
+                                                                               
maxLock.desiredMax()+".", 
+                                                                               e, 
Core.logger.ERROR);
                        freenet.node.Main.dumpInterestingObjects();
-                        synchronized(maxLock) {
-                            desiredMax = Math.max(desiredMax - 5, 0);
-                        }
+                        maxLock.decrementDesiredMax(5);
                     } catch (Throwable e) {
                         Core.logger.log(this, "Unhandled throw in job: "+e,
                                         e, Core.logger.ERROR);
                     } finally {
                         jobsDone++;
                         job = null;
-                        if (alive.state()) // this should always be true
-                            returnThread(this);
+                        if (alive.state()) {
+                                                       headLock.push(this); // first 
push it
+                                                       countLock.free(); // then 
announce it is there.
+                                               } // else the thread will exit now.
                     }
                 }
             }
 
             Core.diagnostics.occurrenceContinuous("jobsPerQThread", jobsDone);
             
-            Core.logger.log(this, getName() + " ended. " + available + 
-                            "threads available.", Core.logger.DEBUGGING);
+                       CountSnap snap = new CountSnap();
+                       countLock.snap(snap);
+                       if (logDEBUG)
+                               Core.logger.log(this, getName() + " ended. " + 
+                                                               snap.available + 
"threads available." +
+                                                               snap.active + "threads 
active.",
+                                                               Core.logger.DEBUG);
         }
 
         /**
-         * You should probably only call this when the QThread is idle.
+                *  Called only from QThreadFactory.run() in this context:
+                *  if (countLock.snapTake(snap)) {
+                *      headLock.pop().die();
+                *  }
          */
-        protected synchronized final void die() {
+        synchronized final void die() {
             if (alive.state()) {
                 alive.change();
                 this.notify();
             }
         }
     }
+       
+       private final class MaxLock {
+               private int desiredMax;
+               synchronized void setDesiredMax(int desiredMax) {
+                       this.desiredMax = desiredMax;
+               }
+               synchronized int desiredMax() {
+                       return this.desiredMax;
+               }
+               synchronized void decrementDesiredMax(int decr) {
+                       this.desiredMax = Math.max(this.desiredMax - decr, 0);
+               }
 }
 
+    private final class NumLock {
+               private int threadNumber = 0;
+               synchronized int newThreadNumber() {
+                       return threadNumber++;
+               }
+       }
 
+    private final class HeadLock {
+               QThread headerThread = null;
+               synchronized void push(QThread thread) {
+                       thread.next = headerThread;
+                       headerThread = thread;
+               }
+               synchronized QThread pop() {
+                       QThread thread = headerThread;
+                       headerThread = thread.next;
+                       return thread;
+               }
+       }
 
+       private final class CountSnap {
+               int active = 0;
+               int available = 0;
+       }
 
+    private final class CountLock {
+               private int active = 0;
+               // available is always either the number of threads on 
headLock.headerThread,
+               // or else one fewer, in the case where it has been decremented but the
+               // thread hasn't been removed yet.
+               private int available = 0;
+               
+               // take a snapshot
+               synchronized void snap(CountSnap snap) {
+                       snap.active = active;
+                       snap.available = available;
+               }
+               
+               // free a thread, then take a snapshot.
+               synchronized void snapFree(CountSnap snap) {
+                       available++; // thread was already pushed onto stack.
+                       snap.active = active;
+                       snap.available = available;
+               }
+               
+               // maybe take a thread, then take a snapshot.
+               // return true if got a thread.
+               synchronized boolean snapTake(CountSnap snap) {
+                       boolean gotOne = false;
+                       if (available >= 1) { // There is at least one on stack.
+                               available--;      // Make sure it stays that way,
+                               gotOne = true;    // even if getThread sneaks in.
+                       }
+                       snap.active = active;
+                       snap.available = available;
+                       return gotOne;
+               }
+               
+               // Before calling free, available is one too small
+               // compared to the actual state of headLock.headerThread.
+        // QThreadFactory.run() or getThread() could sneak in before this.
+               synchronized void free() {
+            active--;
+            available++; // thread is already on the stack
+               }
+               
+               synchronized boolean makeItMyself() {
+                       boolean makeItMyself = false;
+            active++;
+                       if (available > 0) {
+                               available--; 
+                       } else {
+                               makeItMyself = true;
+                       }
+                       return makeItMyself;
+               }
+               
+        synchronized void mightNotify() { 
+            if ( ( available < MINIMUM_AVAILABLE_ABS) ||
+                 ( available < active * MINIMUM_AVAILABLE_RATIO) || 
+                 ( ( available > (3 * MINIMUM_AVAILABLE_ABS)) &&
+                                  ( available > active * MAXIMUM_AVAILABLE_RATIO))) {
+                               this.notifyAll();
+            }
+        } 
+               
+               synchronized void waitForNotify(int maxDelayMillis) {
+                       try { 
+                               this.wait(maxDelayMillis); 
+                       } catch ( InterruptedException ie ) {}
+               }
+               
+       }
+}

Index: ThreadFactory.java
===================================================================
RCS file: /cvsroot/freenet/freenet/src/freenet/thread/ThreadFactory.java,v
retrieving revision 1.7.2.1
retrieving revision 1.7.2.1.2.1
diff -u -w -r1.7.2.1 -r1.7.2.1.2.1
--- ThreadFactory.java  15 Feb 2003 04:13:01 -0000      1.7.2.1
+++ ThreadFactory.java  28 Oct 2003 20:20:47 -0000      1.7.2.1.2.1
@@ -1,3 +1,4 @@
+/* -*- Mode: java; c-basic-indent: 4; tab-width: 4 -*- */
 package freenet.thread;
 import java.io.PrintWriter;
 
@@ -26,6 +27,8 @@
 
     /**
      * @param r      the job to run
+     * @return  the thread executing the job, or null if the
+     *          job was queued.
      */
     Thread getThread(Runnable r);
 

_______________________________________________
cvs mailing list
[EMAIL PROTECTED]
http://dodo.freenetproject.org/cgi-bin/mailman/listinfo/cvs

Reply via email to