Hello, as Leo pointed out DefaultQueue has some issues with mutex usage. Namely it doesn't check the result of Semaphore.attempt and proceeds either way so it may release a semaphore it never gotten.
A similar but more obscure situation exists in all enqueue methods too. If mutex.acquire throws an InterruptedException enqueue methods will release a token they never had thus ruining the semaphore. Attached is a patch that takes care of both cases. Concurrent programming is indeed full of ugly little details that are easy to miss. I also played with QueueTest a little bit and found that using nonzero timeouts helps performance. I am also attaching a changed version of QueueTest. Here are the results: [greg@slab /tmp]$ time java QueueTest 999999 0 Starting test Test complete Enqueue: 999999 sum 2056023531512393887 Dequeue: 999999 sum 2056023531512393887 real 0m17.804s user 0m17.230s sys 0m0.280s Even 1 millisecond timeout improves the performance significantly: [greg@slab /tmp]$ time java QueueTest 999999 1 Starting test Test complete Enqueue: 999999 sum 2056023531512393887 Dequeue: 999999 sum 2056023531512393887 real 0m13.059s user 0m12.770s sys 0m0.070s Further increasing timeout doesn't change the picture as dramatically. Bye Greg P.S. The runs were done with VariableSizeBuffer patch posted earlier applied.
Index: src/scratchpad/org/apache/avalon/excalibur/event/DefaultQueue.java =================================================================== RCS file: /home/cvspublic/jakarta-avalon-excalibur/src/scratchpad/org/apache/avalon/excalibur/event/DefaultQueue.java,v retrieving revision 1.8 diff -u -r1.8 DefaultQueue.java --- src/scratchpad/org/apache/avalon/excalibur/event/DefaultQueue.java 4 Feb 2002 18:21:50 -0000 1.8 +++ src/scratchpad/org/apache/avalon/excalibur/event/DefaultQueue.java 5 Mar 2002 +21:18:59 -0000 @@ -67,21 +67,24 @@ try { m_mutex.acquire(); + try + { + + if ( maxSize() > 0 && elements.length + m_reserve + size() > +maxSize() ) + { + throw new SinkFullException("Not enough room to enqueue these +elements."); + } - if ( maxSize() > 0 && elements.length + m_reserve + size() > maxSize() ) + enqueue = new DefaultPreparedEnqueue( this, elements ); + } + finally { - throw new SinkFullException("Not enough room to enqueue these elements."); + m_mutex.release(); } - - enqueue = new DefaultPreparedEnqueue( this, elements ); } catch ( InterruptedException ie ) { } - finally - { - m_mutex.release(); - } return enqueue; } @@ -93,22 +96,25 @@ try { m_mutex.acquire(); + try + { + + if ( maxSize() > 0 && 1 + m_reserve + size() > maxSize() ) + { + return false; + } - if ( maxSize() > 0 && 1 + m_reserve + size() > maxSize() ) + m_elements.add( element ); + success = true; + } + finally { - return false; + m_mutex.release(); } - - m_elements.add( element ); - success = true; } catch ( InterruptedException ie ) { } - finally - { - m_mutex.release(); - } return success; } @@ -121,23 +127,26 @@ try { m_mutex.acquire(); - if ( maxSize() > 0 && elements.length + m_reserve + size() > maxSize() ) + try { - throw new SinkFullException("Not enough room to enqueue these elements."); - } + if ( maxSize() > 0 && elements.length + m_reserve + size() > +maxSize() ) + { + throw new SinkFullException("Not enough room to enqueue these +elements."); + } - for ( int i = 0; i < len; i++ ) + for ( int i = 0; i < len; i++ ) + { + m_elements.add( elements[i] ); + } + } + finally { - m_elements.add( elements[i] ); + m_mutex.release(); } } catch ( InterruptedException ie ) { } - finally - { - m_mutex.release(); - } } public void enqueue( final QueueElement element ) @@ -146,20 +155,23 @@ try { m_mutex.acquire(); - if ( maxSize() > 0 && 1 + m_reserve + size() > maxSize() ) + try { - throw new SinkFullException("Not enough room to enqueue these elements."); - } + if ( maxSize() > 0 && 1 + m_reserve + size() > maxSize() ) + { + throw new SinkFullException("Not enough room to enqueue these +elements."); + } - m_elements.add( element ); + m_elements.add( element ); + } + finally + { + m_mutex.release(); + } } catch ( InterruptedException ie ) { } - finally - { - m_mutex.release(); - } } public QueueElement[] dequeue( final int numElements ) @@ -175,27 +187,30 @@ try { - m_mutex.attempt( m_timeout ); - - if ( size() < numElements ) - { - arraySize = size(); - } - - elements = new QueueElement[ arraySize ]; - - for ( int i = 0; i < arraySize; i++ ) - { - elements[i] = (QueueElement) m_elements.remove(); + if (m_mutex.attempt( m_timeout )) { + try + { + if ( size() < numElements ) + { + arraySize = size(); + } + + elements = new QueueElement[ arraySize ]; + + for ( int i = 0; i < arraySize; i++ ) + { + elements[i] = (QueueElement) m_elements.remove(); + } + } + finally + { + m_mutex.release(); + } } } catch ( InterruptedException ie ) { } - finally - { - m_mutex.release(); - } return elements; } @@ -206,22 +221,25 @@ try { - m_mutex.attempt( m_timeout ); - - elements = new QueueElement[ size() ]; - - for ( int i = 0; i < elements.length; i++ ) - { - elements[i] = (QueueElement) m_elements.remove(); + if (m_mutex.attempt( m_timeout )) { + try + { + elements = new QueueElement[ size() ]; + + for ( int i = 0; i < elements.length; i++ ) + { + elements[i] = (QueueElement) m_elements.remove(); + } + } + finally + { + m_mutex.release(); + } } } catch ( InterruptedException ie ) { } - finally - { - m_mutex.release(); - } return elements; } @@ -232,19 +250,22 @@ try { - m_mutex.attempt( m_timeout ); - - if ( size() > 0 ) - { - element = (QueueElement) m_elements.remove(); + if (m_mutex.attempt( m_timeout )) { + try + { + if ( size() > 0 ) + { + element = (QueueElement) m_elements.remove(); + } + } + finally + { + m_mutex.release(); + } } } catch ( InterruptedException ie ) { - } - finally - { - m_mutex.release(); } return element;
import org.apache.avalon.excalibur.event.DefaultQueue; import org.apache.avalon.excalibur.event.Queue; import org.apache.avalon.excalibur.event.QueueElement; import org.apache.avalon.excalibur.event.Sink; import org.apache.avalon.excalibur.event.Source; import org.apache.avalon.excalibur.event.SinkException; import org.apache.avalon.framework.CascadingRuntimeException; /** * Simple test to expose the thread queue bug * * @author <a href="mailto:[EMAIL PROTECTED]">Peter Royal</a> * @version VSS $Revision: 5 $ $Date: 3/04/02 1:38p $ */ public class QueueTest { private QueueStart start; private QueueEnd end; private Queue queue; private Thread[] stages; public static void main(String[] args) throws Exception { QueueTest qt = new QueueTest(); qt.initialize(Integer.parseInt(args[0]), Integer.parseInt(args[1])); qt.start(); } public void initialize(int count, long timeout) throws Exception { this.stages = new Thread[2]; this.queue = new DefaultQueue(); this.queue.setTimeout(timeout); this.start = new QueueStart(count); this.start.setSink(this.queue); this.stages[0] = new Thread(this.start); this.end = new QueueEnd(); this.end.setSource(this.queue); this.end.setTimeout(timeout); this.stages[1] = new Thread(this.end); } public void start() throws Exception { System.out.println("Starting test"); for (int i = 0; i < this.stages.length; i++) { this.stages[i].start(); } stop(); } public void stop() throws Exception { for (int i = 0; i < this.stages.length; i++) { try { this.stages[i].join(); } catch (InterruptedException e) { throw new CascadingRuntimeException("Stage unexpectedly interrupted", e); } } System.out.println("Test complete"); System.out.println("Enqueue: " + this.start.getCount() + " sum " + this.start.getSum()); System.out.println("Dequeue: " + this.end.getCount() + " sum " + this.end.getSum()); } private class QueueInteger implements QueueElement { private int integer; public QueueInteger(int integer) { this.integer = integer; } public int getInteger() { return integer; } } private class QueueStart implements Runnable { private Sink sink; private int queueCount; private int count; private long sum = 0; public QueueStart(int queueCount) { this.queueCount = queueCount; } protected void setSink(Sink sink) { this.sink = sink; } public int getCount() { return count; } public long getSum() { return sum; } public void run() { for (int i = 0; i < this.queueCount; i++) { try { this.sink.enqueue(new QueueInteger(i)); this.count++; sum = sum * 127 + i; } catch (SinkException e) { System.out.println("Unable to queue: " + e.getMessage()); } } try { this.sink.enqueue(new QueueInteger(-1)); } catch (SinkException e) { System.out.println("Unable to queue stop"); } } } private class QueueEnd implements Runnable { private Source source; private int count; private long timeout = 0; private long sum = 0; protected void setTimeout(long timeout) { this.timeout = timeout; } protected void setSource(Source source) { this.source = source; } public int getCount() { return count; } public long getSum() { return sum; } public void run() { while (true) { QueueElement qe = this.source.dequeue(); if (qe == null) { if (timeout > 0) { try { Thread.sleep(timeout); } catch (InterruptedException ie) { break; } } } else if (qe instanceof QueueInteger) { QueueInteger qi = (QueueInteger) qe; if (qi.getInteger() == -1) { break; } else { this.count++; sum = sum * 127 + qi.getInteger(); } } } } } }
-- To unsubscribe, e-mail: <mailto:[EMAIL PROTECTED]> For additional commands, e-mail: <mailto:[EMAIL PROTECTED]>