On Monday 04 March 2002 01:58 pm, Leo Sutic wrote: > 1) Can you put some code in DefaultQueue and see if it actually enqueues > the right number of elements? Just after every m_elements.add, put a > elementsAdded++. > Just to figure out just where the elements disappear.
They all enqueue properly > 2) Can you run the threads one at a time? That is, run thread 1. The queue > fills up. > Run thread 2. Wait until all elements are processed and the second queue > is full. > Run thread 3. Works fine. > I just want to see if it really is a threading error or if it is a logic > error. > As far as I can see, all methods in DefaultQueue are correctly synchronized > (with the exceptions I've told you), and I could not find any errors in the > VariableSizeBuffer. I've attached my updated QueueTest, you can comment out a line in main() to control whether it runs threaded or not. I've also attached my modified DefaultQueue with the counters. -pete -- peter royal -> [EMAIL PROTECTED]
import org.apache.avalon.excalibur.event.Queue; import org.apache.avalon.excalibur.event.QueueElement; import org.apache.avalon.excalibur.event.Sink; import org.apache.avalon.excalibur.event.Source; import org.apache.avalon.excalibur.event.SourceException; import org.apache.avalon.framework.CascadingRuntimeException; import com.pace2020.edi.util.DefaultQueue; /** * Simple test to expose the thread queue bug * * @author <a href="mailto:[EMAIL PROTECTED]">Peter Royal</a> * @version VSS $Revision: 5 $ $Date: 3/04/02 1:38p $ */ public class QueueTest { private QueueStart start; private QueueEnd end; private DefaultQueue queue; private Thread[] stages; public static void main(String[] args) throws Exception { QueueTest qt = new QueueTest(); qt.initialize(Integer.parseInt(args[0])); qt.start(); // qt.testNoThreads(); } public void initialize(int count) throws Exception { this.stages = new Thread[2]; this.queue = new DefaultQueue(); this.start = new QueueStart(count); this.start.setSource(this.queue); this.stages[0] = new Thread(this.start); this.end = new QueueEnd(); this.end.setSink(this.queue); this.stages[1] = new Thread(this.end); } public void testNoThreads() { System.out.println("Running start..."); this.start.run(); System.out.println("Running end..."); this.end.run(); printStats(); } public void start() throws Exception { System.out.println("Starting test"); for (int i = 0; i < this.stages.length; i++) { this.stages[i].start(); } stop(); } public void stop() throws Exception { for (int i = 0; i < this.stages.length; i++) { try { this.stages[i].join(); } catch (InterruptedException e) { throw new CascadingRuntimeException("Stage unexpectedly interrupted", e); } } printStats(); } private void printStats() { System.out.println("Test complete"); System.out.println("Enqueue: " + this.start.getCount()); System.out.println("DefaultQueue.enqueueCount: " + this.queue.getEnqueueCount()); System.out.println("Dequeue: " + this.end.getCount()); System.out.println("DefaultQueue.dequeueCount: " + this.queue.getDequeueCount()); System.out.println("DefaultQueue.dequeueAttempt: " + this.queue.getDequeueAttempt()); } private class QueueInteger implements QueueElement { private int integer; public QueueInteger(int integer) { this.integer = integer; } public int getInteger() { return integer; } } private class QueueStart implements Runnable { private Source source; private int queueCount; private int count; public QueueStart(int queueCount) { this.queueCount = queueCount; } protected void setSource(Source source) { this.source = source; } public int getCount() { return count; } public void run() { for (int i = 0; i < this.queueCount; i++) { try { this.source.enqueue(new QueueInteger(i)); this.count++; } catch (SourceException e) { System.out.println("Unable to queue: " + e.getMessage()); } } try { this.source.enqueue(new QueueInteger(-1)); } catch (SourceException e) { System.out.println("Unable to queue stop"); } } } private class QueueEnd implements Runnable { private Sink sink; private int count; protected void setSink(Sink sink) { this.sink = sink; } public int getCount() { return count; } public void run() { while (true) { QueueElement qe = this.sink.dequeue(); if (qe == null) { } else if (qe instanceof QueueInteger) { QueueInteger qi = (QueueInteger) qe; if (qi.getInteger() == -1) { break; } else { this.count++; } } } } } }
/* * Copyright (C) The Apache Software Foundation. All rights reserved. * * This software is published under the terms of the Apache Software License * version 1.1, a copy of which has been included with this distribution in * the LICENSE.txt file. */ package com.pace2020.edi.util; import org.apache.avalon.excalibur.collections.Buffer; import org.apache.avalon.excalibur.collections.VariableSizeBuffer; import org.apache.avalon.excalibur.concurrent.Mutex; import org.apache.avalon.excalibur.event.AbstractQueue; import org.apache.avalon.excalibur.event.PreparedEnqueue; import org.apache.avalon.excalibur.event.QueueElement; import org.apache.avalon.excalibur.event.SourceException; import org.apache.avalon.excalibur.event.SourceFullException; /** * The default queue implementation is a variable size queue. This queue is * ThreadSafe, however the overhead in synchronization costs a few extra millis. * * @author <a href="mailto:[EMAIL PROTECTED]">Berin Loritsch</a> */ public final class DefaultQueue extends AbstractQueue { private final Buffer m_elements; private final Mutex m_mutex; private int m_reserve; private final int m_maxSize; private int enqueueCount; private int dequeueCount; private int dequeueAttempt; public DefaultQueue( int size ) { int maxSize; if ( size > 0 ) { m_elements = new VariableSizeBuffer( size ); maxSize = size; } else { m_elements = new VariableSizeBuffer(); maxSize = -1; } m_mutex = new Mutex(); m_reserve = 0; m_maxSize = maxSize; } public DefaultQueue() { this( -1 ); } public int size() { return m_elements.size(); } public int maxSize() { return m_maxSize; } public PreparedEnqueue prepareEnqueue( final QueueElement[] elements ) throws SourceException { PreparedEnqueue enqueue = null; try { m_mutex.acquire(); if ( maxSize() > 0 && elements.length + m_reserve + size() > maxSize() ) { throw new SourceFullException("Not enough room to enqueue these elements."); } enqueue = new DefaultPreparedEnqueue( this, elements ); } catch ( InterruptedException ie ) { } finally { m_mutex.release(); } return enqueue; } public boolean tryEnqueue( final QueueElement element ) { boolean success = false; try { m_mutex.acquire(); if ( maxSize() > 0 && 1 + m_reserve + size() > maxSize() ) { return false; } m_elements.add( element ); success = true; } catch ( InterruptedException ie ) { } finally { m_mutex.release(); } return success; } public void enqueue( final QueueElement[] elements ) throws SourceException { final int len = elements.length; try { m_mutex.acquire(); if ( maxSize() > 0 && elements.length + m_reserve + size() > maxSize() ) { throw new SourceFullException("Not enough room to enqueue these elements."); } for ( int i = 0; i < len; i++ ) { m_elements.add( elements[i] ); } } catch ( InterruptedException ie ) { } finally { m_mutex.release(); } } public void enqueue( final QueueElement element ) throws SourceException { try { m_mutex.acquire(); if ( maxSize() > 0 && 1 + m_reserve + size() > maxSize() ) { throw new SourceFullException("Not enough room to enqueue these elements."); } m_elements.add( element ); this.enqueueCount++; } catch ( InterruptedException ie ) { } finally { m_mutex.release(); } } public QueueElement[] dequeue( final int numElements ) { int arraySize = numElements; if ( size() < numElements ) { arraySize = size(); } QueueElement[] elements = null; try { m_mutex.attempt( m_timeout ); if ( size() < numElements ) { arraySize = size(); } elements = new QueueElement[ arraySize ]; for ( int i = 0; i < arraySize; i++ ) { elements[i] = (QueueElement) m_elements.remove(); } } catch ( InterruptedException ie ) { } finally { m_mutex.release(); } return elements; } public QueueElement[] dequeueAll() { QueueElement[] elements = null; try { m_mutex.attempt( m_timeout ); elements = new QueueElement[ size() ]; for ( int i = 0; i < elements.length; i++ ) { elements[i] = (QueueElement) m_elements.remove(); } } catch ( InterruptedException ie ) { } finally { m_mutex.release(); } return elements; } public QueueElement dequeue() { QueueElement element = null; try { if (m_mutex.attempt( m_timeout )) { this.dequeueAttempt++; if ( size() > 0 ) { element = (QueueElement) m_elements.remove(); this.dequeueCount++; } } } catch ( InterruptedException ie ) { } finally { m_mutex.release(); } return element; } private final static class DefaultPreparedEnqueue implements PreparedEnqueue { private final DefaultQueue m_parent; private QueueElement[] m_elements; private DefaultPreparedEnqueue( DefaultQueue parent, QueueElement[] elements ) { m_parent = parent; m_elements = elements; } public void commit() { if ( null == m_elements ) { throw new IllegalStateException("This PreparedEnqueue has already been processed!"); } try { m_parent.enqueue( m_elements ); m_parent.m_reserve -= m_elements.length; m_elements = null; } catch (Exception e) { throw new IllegalStateException("Default enqueue did not happen--should be impossible"); // will never happen } } public void abort() { if ( null == m_elements ) { throw new IllegalStateException("This PreparedEnqueue has already been processed!"); } m_parent.m_reserve -= m_elements.length; m_elements = null; } } public int getEnqueueCount() { return enqueueCount; } public int getDequeueCount() { return dequeueCount; } public int getDequeueAttempt() { return dequeueAttempt; } }
-- To unsubscribe, e-mail: <mailto:[EMAIL PROTECTED]> For additional commands, e-mail: <mailto:[EMAIL PROTECTED]>