A common problem for networked applications accepting multiple
connections in individual threads that cannot control the
socket accept() paralellity (as the JDK1.4 ORB) need to restrict
the resources somewhere else.

For this purpose we designed a ParalellThreadGuard class that
can be used as follows:

  private ParalellThreadGuard guard =
    new ParalellThreadGuard(THREADS_ACTIVE, THREADS_ENQUEUE, THREADS_TIMEOUT);
...
  public void guardedMethod() throws RejectedException
  {
    guard.register(); // might throw a RejectedException
    try
    {
      ... // do something with your limited resources here
    }
    finally
    {
      guard.release();
    }
  }


This class seems to fit to the [lang] area, but might also fit in the scope of the sandboxed [ThreadPool] package.

I can contribute it to Apache with the proper licence header
if someone sees it fitting.

Cheers,
Christoph Reck
//--------------------------------------------------------------------
/*
 *  $RCSfile: ParalellThreadGuard.java $
 *
 *  $Revision: 1.0 $
 *  $Date: 2003/10/16 $
 *  author: [EMAIL PROTECTED]
 *
 * Copyright: Deutsches Zentrum fuer Luft und Raumfahrt (DLR) 2003
 */
//--------------------------------------------------------------------

import java.util.LinkedList;

/**
 * This guard provides two methods <code>[EMAIL PROTECTED] #register()}</code> and
 * <code>[EMAIL PROTECTED] #release()}</code> to limit the amount of threads
 * accessing a set of guarded methods. Usage:<p>
 * <pre>...
 * ParalellThreadGuard guard = new ParalellThreadGuard(4, 10, 0);
 * ...
 * public ReturnValue guardedMethod(...) throws RejectedException
 * {
 *    guard.register(); // might throw a RejectedException
 *    try
 *    {
 *      ... // do something with your limited resources here
 *    }
 *    finally
 *    {
 *      guard.release();
 *    }
 * }
 * ...
 * </pre>
 *
 * @author [EMAIL PROTECTED]
 */
public class ParalellThreadGuard
{
  private final static boolean WITH_TRACE = false;

  private int        allowedThreads;
  private int        queueLimit;
  private int        queueTimeout;
  private LinkedList threadQueue;
  private int        activeThreads;

  static final String CLASS_ID = "@(#) $Id: $, Copyright DLR 2003";

  /**
   * Constructor.
   * @param _allowedThreads The number of threads that are allowed to pass
   *                        registration without enqueing.
   * @param _queueLimit     The maximum threads allowed in the queue before
   *                        throwing a <code>RejectedException</code>.
   * @param _queueTimeout   The maximum time in milliseconds a thread may be
   *                        stopped in <code>register</code> before throwing a
   *                        <code>RejectedException</code>, a value of 0 will
   *                        allow waiting indefinitely.
   */
  public ParalellThreadGuard( int _allowedThreads, int _queueLimit,
                                                   int _queueTimeout )
  {
    allowedThreads = _allowedThreads;
    queueLimit     = _queueLimit;
    queueTimeout   = _queueTimeout;
    threadQueue    = new LinkedList();
    activeThreads  = 0;
  }

  /**
   * Returns <code>true</code> if no threads are active or pending.
   */
  public boolean isIdle()
  {
    return (activeThreads + threadQueue.size()) == 0;
  }

  /**
   * Returns the currently active thread count.
   */
  public int getActiveThreads()
  {
    return activeThreads;
  }

  /**
   * Returns the amount of threads pending for execution.
   */
  public int getPendingThreads()
  {
    return threadQueue.size();
  }

  /**
   * Register a thread in this guard, use [EMAIL PROTECTED] #release} to release the
   * guard and possibly re-start an enqueued thread.
   *
   * @throws RejectedException Either because to many threads are already
   *         pending or this thread was enqueued but exceeded the timeout.
   */
  public void register() throws RejectedException
  {
    // guard this method
    boolean allowExceution = true;
    Thread currentThread = Thread.currentThread();
    if( WITH_TRACE ) traceMessage( "testing " );
    synchronized(threadQueue)
    {
      // check resource limit to avoid excessive queue size
      int pending = threadQueue.size();
      if( pending >= queueLimit )
      {
        throw( new RejectedException(
            "Too many paralell calls ("
            + (pending + activeThreads + 1) + ">"
            + (queueLimit + activeThreads)
            + ") to guardedMethod()" ) );
      }
      if( (activeThreads + pending) >= allowedThreads )
      {
        // enqueue the thread
        allowExceution = false;
        threadQueue.add( currentThread );
        if( WITH_TRACE ) traceMessage( "enqueued" );
      }
    }
    // wait for execution permission
    long waittime = queueTimeout;
    long timeout = (waittime > 0)
        ? System.currentTimeMillis() + queueTimeout
        : Long.MAX_VALUE;
    while( !allowExceution )
    {
      try
      {
        synchronized(currentThread)
        {
          if (waittime > 0)
            currentThread.wait(waittime);
          else
            currentThread.wait();
        }
        long currentTime = System.currentTimeMillis();
        if( currentTime < timeout )
        {
          waittime = timeout - currentTime;
          // wait again for interrupt notification
          continue;
        }
        synchronized(threadQueue)
        {
          // TIMEOUT: throw exception
          threadQueue.remove(currentThread);
          throw (new RejectedException(
              "Pending call to guardedMethod() timed out after " +
              queueTimeout / 1000 + " seconds."));
        }
      }
      catch( InterruptedException _ex )
      {
        // now this thread should be able to continue...
        if( WITH_TRACE ) traceMessage( "signaled" );
        if( activeThreads < allowedThreads )
        {
          allowExceution = true;
        }
      }
    }
    // return control to method that called this guard
    ++activeThreads;
    if( WITH_TRACE ) traceMessage( "active  " );
  }

  /**
   * Release the guarded resources for this thread, possibly re-starting
   * an enqueued thread.
   *
   * @see #register
   */
  public void release()
  {
    --activeThreads;
    synchronized(threadQueue)
    {
      if( ! threadQueue.isEmpty() )
      {
        Thread next = (Thread) threadQueue.remove(0);
        next.interrupt();
      }
    }
    if( WITH_TRACE ) traceMessage( "released" );
  }

  private void traceMessage(String _msg)
  {
    System.out.println( Thread.currentThread().getName()
                        + " " + _msg + " ("
                        + "Active=" + activeThreads
                        + ", Pending=" + threadQueue.size() + ")" );
  }

  /** Dummy inner class - to be replaced by the dims.gen version **/
  public class RejectedException extends Exception
  {
    public RejectedException(String _msg)
    {
      super(_msg);
    }
  }

} // end of ParalellThreadGuard
//--------------------------------------------------------------------
/*
 *  $RCSfile: ParalellThreadGuardTest.java $
 *
 *  $Revision: 1.0 $
 *  $Date: 2003/10/16 $
 *  author: [EMAIL PROTECTED]
 *
 * Copyright: Deutsches Zentrum fuer Luft und Raumfahrt (DLR) 2003
 */
//--------------------------------------------------------------------

import junit.framework.Test;
import junit.framework.TestCase;
import junit.framework.TestSuite;
import java.text.SimpleDateFormat;
import java.util.Date;

/**
 ** Test case for the ParalellThreadGuard class.
 **/
public class ParalellThreadGuardTest extends TestCase
{
  private static final int THREADS_ACTIVE  = 3;
  private static final int THREADS_ENQUEUE = 2;
  private static final int THREADS_TIMEOUT = 7000;
  private static final int DUMMY_TEST_START_TIME = 2000;
  private static final int DUMMY_TEST_WAIT_TIME = 11000;
  private static final int DUMMY_TEST_LOOP_COUNT = 9;
  private static final String TEST_RESULT_STRING = "1234579";
  private Target testTarget;
  private SimpleDateFormat dateFormatter;

  /**
   * Inner class representing the test target.
   */
  private class Target
  {
    private ParalellThreadGuard guard =
      new ParalellThreadGuard(THREADS_ACTIVE, THREADS_ENQUEUE, THREADS_TIMEOUT);
    String guardTestResultAccumulator = "";
    /**
     * The guarded method to test, catenates the ID of all calls into
     * the [EMAIL PROTECTED] guardTestResultAccumulator} string, which then can
     * be checked as the test result.
     */
    public void   guardedMethod(String id)
           throws ParalellThreadGuard.RejectedException, Exception
    {
      guard.register(); // might throw a RejectedException
      try
      {
        guardTestResultAccumulator += id;
        System.out.println( id + " running... " + timeString() );
        try
        {
          Thread.sleep(DUMMY_TEST_WAIT_TIME);
        }
        catch (InterruptedException ex)
        {
          throw(
            new Exception(id + " guardedMethod was unexpectedly interrupted.") );
        }
      }
      finally
      {
        guard.release();
      }
    }
  }

  /**
   * Inner class multiply instantiated to serve as paralell test actors.
   */
  public class Actor extends Thread
  {
    public Actor(String _name)
    {
      super(_name);
    }

    public void run()
    {
      String id = this.getName();
      try
      {
        System.out.println(id + " started... " + timeString() );
        testTarget.guardedMethod(id);
        System.out.println(id + " completed. " + timeString() );
      }
      catch( ParalellThreadGuard.RejectedException _ex )
      {
        System.err.println(id + " execution rejected: " + _ex.getMessage() );
      }
      catch( Exception _ex )
      {
        System.err.println(id + " Unexpected exception: ");
        _ex.printStackTrace();
      }
    }
  }

  public void paralellThreadGuardTest()
  {
    System.out.println( "Starting paralellThreadGuardTest() "
                        + "every " + (DUMMY_TEST_START_TIME/1000) + "sec "
                        + "lasting " + (DUMMY_TEST_WAIT_TIME/1000) + "sec "
                        + "repeating " + DUMMY_TEST_LOOP_COUNT + " times "
                        + "(max active:" + THREADS_ACTIVE + ","
                        + " max queued:" + THREADS_ENQUEUE + ","
                        + " timeout:" + (THREADS_TIMEOUT/1000) + "sec)" );
    Actor actor = null; // exiting the loop will hold the last actor.
    for( int i = 0; i < DUMMY_TEST_LOOP_COUNT; ++i )
    {
      String id = String.valueOf(i + 1);
      actor = new Actor(id);
      actor.start();
      try
      {
        // wait 2 seconds before starting next thread
        Thread.sleep(DUMMY_TEST_START_TIME);
      }
      catch( InterruptedException _ex )
      {
        System.err.println("Unexpected interrupt of sleep() in the test loop.");
      }
    }
    try
    {
      actor.join();
    }
    catch( InterruptedException _ex )
    {
      System.err.println("Unexpected interrupt of join() in the test main.");
    }
    String testResultString = testTarget.guardTestResultAccumulator;
    System.out.println( "Test completed with sequence: \""
                        + testResultString + "\"");
    this.assertTrue( "Expected: \"" + TEST_RESULT_STRING + "\"",
                     TEST_RESULT_STRING.equals(testResultString) );
  }

  private String timeString()
  {
    return dateFormatter.format( new Date() );
  }
  /**
   * Creates a new <code>TestCase</code> instance.
   */
  public ParalellThreadGuardTest( String _testName )
  {
    super( _testName );
  }

  protected void setUp()
  {
    testTarget = new Target();
    dateFormatter = new SimpleDateFormat("HH:mm:ss");
  }

  protected void tearDown()
  {
    testTarget = null;
    dateFormatter = null;
  }

  public static Test suite()
  {
    TestSuite suite = new TestSuite();
    suite.addTest( new ParalellThreadGuardTest("paralellThreadGuardTest") );
    return suite;
  }

  /**
   * This allows the tests to run as a standalone application.
   *
   * @param args[] a <code>String</code> value
   */
  public static void main( String[] args )
  {
    String[] testCaseName = { ParalellThreadGuardTest.class.getName() };
    junit.textui.TestRunner.main( testCaseName );
  }

} // end of ParalellThreadGuardTest
---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]

Reply via email to