User: sparre  
  Date: 01/10/05 12:34:14

  Added:       src/main/org/jboss/mq/threadpool ThreadPool.java Work.java
  Log:
  Created a thread pool with an embedded work queue for JBossMQ.
  And made the message pushers use it.
  
  Revision  Changes    Path
  1.1                  jbossmq/src/main/org/jboss/mq/threadpool/ThreadPool.java
  
  Index: ThreadPool.java
  ===================================================================
  /*
   * JBoss, the OpenSource J2EE webOS
   *
   * Distributable under LGPL license.
   * See terms of license at gnu.org.
   */
  package org.jboss.mq.threadpool;
  
  import java.lang.Thread;
  import java.lang.ThreadGroup;
  
  import java.util.ArrayList;
  import java.util.LinkedList;
  
  
  /**
   *  This is an implementation of a simple thread pool with
   *  an embedded work queue.
   *
   * @author    Ole Husgaard ([EMAIL PROTECTED])
   * @version   $Revision: 1.1 $
   */
  public class ThreadPool
  {
     /**
      *  The name of this thread pool
      */
     private String name;
  
     /**
      *  Flags that worker threads should be created as daemon threads.
      */
     private boolean daemon;
  
     /**
      *  The ThreadGroup of threads in this pool.
      */
     private ThreadGroup threadGroup;
  
     /**
      *  The worker threads.
      */
     private ArrayList workers;
  
     /**
      *  Maximum number of worker threads.
      */
     private int maxWorkers;
  
     /**
      *  Count of idle worker threads.
      *  Synchronized on the [@link #workers} field.
      */
     private int idleWorkers;
  
     /**
      *  Flags that we are shutting down the pool.
      *  Synchronized on the [@link #workers} field.
      */
     private volatile boolean stopping;
  
     /**
      *  The work queue.
      */
     private LinkedList queue;
  
  
     /**
      *  Create a new thread pool instance.
      *
      *  @param Name The name of this thread pool. This is used for naming its
      *         worker threads.
      *  @param threadGroup The <code>ThreadGroup</code> that worker threads
      *         in this pool should belong to.
      *  @param maxWorkers The maximum number of worker threads in this pool.
      *  @param daemon If <code>true</code>, worker threads will be created as
      *         daemon threads.
      */
     public ThreadPool(String name, ThreadGroup threadGroup, int maxWorkers,
                       boolean daemon)
     {
        if (name == null || threadGroup == null || maxWorkers <= 0)
           throw new IllegalArgumentException();
  
        this.name = name;
        this.daemon = daemon;
        this.threadGroup = threadGroup;
        workers = new ArrayList();
        this.maxWorkers = maxWorkers;
        idleWorkers = 0;
        stopping = false;
        queue = new LinkedList();
     }
  
     /**
      *  Shutdown this thread pool.
      *  This will not return until all enqueued work has been cancelled,
      *  and all worker threads have done any work they started and have
      *  died.
      */
      public void shutdown()
      {
         stopping = true;
  
         synchronized (queue) {
            // Remove all queued work
            queue.clear();
            // Notify all waiting threads
            queue.notifyAll();
         }
  
         // wait for all worker threads to die.
         synchronized (workers) {
            while (workers.size() > 0) {
               try {
                  // wait for some worker threads to die.
                  workers.wait();
               } catch (InterruptedException ex) {
                  // ignore
               }
            }
         }
      }
  
  
     /**
      *  Enqueue a piece of work for this thread to handle.
      *  As soon as a thread becomes available, it will call
      *  {@link Work#doWork} of the argument.
      *  If the pool is shutting down, this method will not enqueue the
      *  work, but instead simply return.
      *
      *  @param work The piece of work to be enqueued.
      */
     public void enqueueWork(Work work)
     {
  //System.err.println("ThreadPool("+name+"): enqueueWork() entered.");
        // We may want to start a worker thread
        synchronized (workers) {
  //System.err.println("ThreadPool("+name+"): enqueueWork(): 
idleWorkers="+idleWorkers+" stopping="+stopping+".");
  //System.err.println("ThreadPool("+name+"): enqueueWork(): 
workers.size()="+workers.size()+" maxWorkers="+maxWorkers+".");
           if (idleWorkers == 0 && !stopping && workers.size() < maxWorkers)
  {
              new WorkerThread(name + "-" + (workers.size() + 1)).start();
  //System.err.println("ThreadPool("+name+"): started new WorkerThread.");
  }
        }
  
        synchronized (queue) {
           if (stopping)
              return; // we are shutting down, cannot take new work.
  
           queue.addLast(work);
  //System.err.println("ThreadPool("+name+"): enqueueWork(): enqueued work..");
           queue.notify();
        }
     }
  
     /**
      *  Cancel a piece of enqueued work.
      *
      *  @param work The piece of work to be cancel.
      */
     public void cancelWork(Work work)
     {
        synchronized (queue) {
           // It may be enqueued several times.
           while (queue.remove(work))
              ;
        }
     }
  
  
     /**
      *  The threads that do the actual work.
      */
     private class WorkerThread
        extends Thread
     {
        /**
         *  Create a new WorkerThread.
         *  This must be called when holding the workers monitor.
         */
        WorkerThread(String name)
        {
           super(threadGroup, name);
           setDaemon(daemon);
           workers.add(this);
  //System.err.println("ThreadPool("+name+"): " + getName() + " created.");
        }
  
        /**
         *  Wait for work do to.
         *  This must be called when holding the queue monitor.
         *  This will temporarily increment the count of idle workers
         *  while waiting.
         */
        private void idle()
        {
           try {
              synchronized (workers) {
                ++idleWorkers;
              }
              //System.err.println("ThreadPool("+name+"): " + getName() + " starting 
to wait.");
              queue.wait();
           } catch (InterruptedException ex) {
              // ignore
           } finally {
              //System.err.println("ThreadPool("+name+"): " + getName() + " done 
waiting.");
              synchronized (workers) {
                --idleWorkers;
              }
           }
        }
  
        public void run()
        {
  //System.err.println("ThreadPool("+name+"): " + getName() + " started to run.");
           while (!stopping) {
              Work work = null;
  
              synchronized (queue) {
                 if (queue.size() == 0)
                    idle();
                 if (!stopping && queue.size() > 0)
                    work = (Work)queue.removeFirst();
              }
  
              if (work != null)
                 work.doWork();
           }
           synchronized (workers) {
              workers.remove(this);
              // Notify the shutdown thread.
              workers.notify();
           }
        }
     }
  }
  
  
  
  1.1                  jbossmq/src/main/org/jboss/mq/threadpool/Work.java
  
  Index: Work.java
  ===================================================================
  /*
   * JBoss, the OpenSource J2EE webOS
   *
   * Distributable under LGPL license.
   * See terms of license at gnu.org.
   */
  package org.jboss.mq.threadpool;
  
  /**
   *  This is the interface of work that the thread pool can do.
   *
   *  Users of the thread pool enqueue an object implementing this
   *  interface to have one of the threads in the thread pool call
   *  back the method declared here.
   *
   * @author    Ole Husgaard ([EMAIL PROTECTED])
   * @version   $Revision: 1.1 $
   */
  public interface Work
  {
     /**
      *  Callback to do the actual work.
      */
     public void doWork();
  }
  
  
  

_______________________________________________
Jboss-development mailing list
[EMAIL PROTECTED]
https://lists.sourceforge.net/lists/listinfo/jboss-development

Reply via email to