leosimons 01/04/15 10:06:28 Added: src/java/org/apache/avalon/util/thread/semaphore ConditionalEvent.java DjikstraSemaphore.java ThreadBarrier.java Log: adding some useful threading utilities. Submitted by Karthik Rangaraju <[EMAIL PROTECTED]>. Revision Changes Path 1.1 jakarta-avalon/src/java/org/apache/avalon/util/thread/semaphore/ConditionalEvent.java Index: ConditionalEvent.java =================================================================== /* * Copyright (C) The Apache Software Foundation. All rights reserved. * * This software is published under the terms of the Apache Software License * version 1.1, a copy of which has been included with this distribution in * the LICENSE file. */ package org.apache.avalon.util.thread.semaphore; /** * This class implements a POSIX style "Event" object. The difference * between the ConditionalEvent and the java wait()/notify() technique is in * handling of event state. If a ConditionalEvent is signalled, a thread * that subsequently waits on it is immediately released. In case of auto * reset EventObjects, the object resets (unsignalled) itself as soon as it * is signalled and waiting thread(s) are released (based on whether signal() * or signalAll() was called). * * @author <a href="mailto:[EMAIL PROTECTED]">Karthik Rangaraju</a> */ public class ConditionalEvent { private boolean m_state = false; private boolean m_autoReset = false; // TODO: Need to add methods that block until a specified time and // return (though in real-life, I've never known what to do if a thread // timesout other than call the method again)! /** * Creates a manual reset ConditionalEvent with a specified initial state * @param pInitialState Sets the initial state of the ConditionalEvent. * Signalled if pInitialState is true, unsignalled otherwise. */ public ConditionalEvent( boolean initialState ) { m_state = initialState; } /** * Creates a ConditionalEvent with the defined initial state * @param pInitialState if true, the ConditionalEvent is signalled when * created. * @param pAutoReset if true creates an auto-reset ConditionalEvent */ public ConditionalEvent( boolean initialState, boolean autoReset ) { m_state = initialState; m_autoReset = autoReset; } /** * Checks if the event is signalled. Does not block on the operation * @return true is event is signalled, false otherwise. Does not reset * an autoreset event */ public boolean isSignalled() { return m_state; } /** * Signals the event. A single thread blocked on waitForSignal() is released * @see #signalAll() * @see #waitForSignal() */ public void signal() { synchronized ( this ) { m_state = true; notify(); } } /** * Current implementation only works with manual reset events. Releases * all threads blocked on waitForSignal() * @see #waitForSignal() */ public void signalAll() { synchronized ( this ) { m_state = true; notifyAll(); } } /** * Resets the event to an unsignalled state */ public void reset() { synchronized ( this ) { m_state = false; } } /** * If the event is signalled, this method returns immediately resetting the * signal, otherwise it blocks until the event is signalled. * @throws InterruptedException if the thread is interrupted when blocked */ public void waitForSignal() throws InterruptedException { synchronized ( this ) { while ( m_state == false ) { wait(); } if ( m_autoReset == true ) { m_state = false; } } } } 1.1 jakarta-avalon/src/java/org/apache/avalon/util/thread/semaphore/DjikstraSemaphore.java Index: DjikstraSemaphore.java =================================================================== /* * Copyright (C) The Apache Software Foundation. All rights reserved. * * This software is published under the terms of the Apache Software License * version 1.1, a copy of which has been included with this distribution in * the LICENSE file. */ package org.apache.avalon.util.thread.semaphore; /** * Also called counting semaphores, Djikstra semaphores are used to control * access to a set of resources. A Djikstra semaphore has a count associated * with it and each acquire() call reduces the count. A thread that tries to * acquire() a Djikstra semaphore with a zero count blocks until someone else * calls release() thus increasing the count. * * @author <a href="mailto:[EMAIL PROTECTED]">Karthik Rangaraju</a> */ public class DjikstraSemaphore { private int m_count; private int m_maxCount; private Object m_starvationLock = new Object(); /** * Creates a Djikstra semaphore with the specified max count and initial * count set to the max count (all resources released) * @param pMaxCount is the max semaphores that can be acquired */ public DjikstraSemaphore( int maxCount ) { this( maxCount, maxCount); } /** * Creates a Djikstra semaphore with the specified max count and an initial * count of acquire() operations that are assumed to have already been * performed. * @param pMaxCount is the max semaphores that can be acquired * @pInitialCount is the current count (setting it to zero means all * semaphores have already been acquired). 0 <= pInitialCount <= pMaxCount */ public DjikstraSemaphore( int maxCount, int initialCount ) { m_count = initialCount; m_maxCount = maxCount; } /** * If the count is non-zero, acquires a semaphore and decrements the count * by 1, otherwise blocks until a release() is executed by some other thread. * @throws InterruptedException is the thread is interrupted when blocked * @see #tryAcquire() * @see #acquireAll() */ public void acquire() throws InterruptedException { synchronized ( this ) { // Using a spin lock to take care of rogue threads that can enter // before a thread that has exited the wait state acquires the monitor while ( m_count == 0 ) { wait(); } m_count--; synchronized ( m_starvationLock ) { if ( m_count == 0 ) { m_starvationLock.notify(); } } } } /** * Non-blocking version of acquire(). * @return true if semaphore was acquired (count is decremented by 1), false * otherwise */ public boolean tryAcquire() { synchronized ( this ) { if ( m_count != 0 ) { m_count--; synchronized ( m_starvationLock ) { if ( m_count == 0 ) { m_starvationLock.notify(); } } return true; } else { return false; } } } /** * Releases a previously acquires semaphore and increments the count by one. * Does not check if the thread releasing the semaphore was a thread that * acquired the semaphore previously. If more releases are performed than * acquires, the count is not increased beyond the max count specified during * construction. * @see #release( int pCount ) * @see #releaseAll() */ public void release() { synchronized ( this ) { m_count++; if ( m_count > m_maxCount ) { m_count = m_maxCount; } notify(); } } /** * Same as release() except that the count is increased by pCount instead * of 1. The resulting count is capped at max count specified in the * constructor * @param pCount is the amount by which the counter should be incremented * @see #release() */ public void release(int count) { synchronized ( this ) { if ( m_count + count > m_maxCount ) { m_count = m_maxCount; } else { m_count += count; } notifyAll(); } } /** * Tries to acquire all the semaphores thus bringing the count to zero. * @throws InterruptedException if the thread is interrupted when blocked on * this call * @see #acquire() * @see #releaseAll() */ public void acquireAll() throws InterruptedException { synchronized ( this ) { for ( int index = 0; index < m_maxCount; index++ ) { acquire(); } } } /** * Releases all semaphores setting the count to max count. * Warning: If this method is called by a thread that did not make a * corresponding acquireAll() call, then you better know what you are doing! * @see #acquireAll() */ public void releaseAll() { synchronized ( this ) { release( m_maxCount ); notifyAll(); } } /** * This method blocks the calling thread until the count drops to zero. * The method is not stateful and hence a drop to zero will not be recognized * if a release happens before this call. You can use this method to implement * threads that dynamically increase the resource pool or that log occurences * of resource starvation. Also called a reverse-sensing semaphore * @throws InterruptedException if the thread is interrupted while waiting */ public void starvationCheck() throws InterruptedException { synchronized ( m_starvationLock ) { if ( m_count != 0 ) { m_starvationLock.wait(); } } } } 1.1 jakarta-avalon/src/java/org/apache/avalon/util/thread/semaphore/ThreadBarrier.java Index: ThreadBarrier.java =================================================================== /* * Copyright (C) The Apache Software Foundation. All rights reserved. * * This software is published under the terms of the Apache Software License * version 1.1, a copy of which has been included with this distribution in * the LICENSE file. */ package org.apache.avalon.util.thread.semaphore; /** * A thread barrier blocks all threads hitting it until a pre-defined number * of threads arrive at the barrier. This is useful for implementing release * consistent concurrency where you don't want to take the performance penalty * of providing mutual exclusion to shared resources * * @author <a href="mailto:[EMAIL PROTECTED]">Karthik Rangaraju</a> */ public class ThreadBarrier { private int m_threshold; private int m_count; /** * Initializes a thread barrier object with a given thread count * @param pCount is the number of threads that need to block on * barrierSynchronize() before they will be allowed to pass through * @see #barrierSynchronize() */ public ThreadBarrier( int count ) { m_threshold = count; m_count = 0; } /** * This method blocks all threads calling it until the threshold number of * threads have called it. It then releases all threads blocked by it * @throws InterruptedException if any thread blocked during the call is * interrupted */ public void barrierSynchronize() throws InterruptedException { synchronized ( this ) { if ( m_count != m_threshold - 1 ) { m_count++; wait(); } else { m_count = 0; notifyAll(); } } } } --------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]