On Monday 04 March 2002 01:58 pm, Leo Sutic wrote:
> 1) Can you put some code in DefaultQueue and see if it actually enqueues
> the right number of elements? Just after every m_elements.add, put a
> elementsAdded++.
>    Just to figure out just where the elements disappear.

They all enqueue properly

> 2) Can you run the threads one at a time? That is, run thread 1. The queue
> fills up.
>    Run thread 2. Wait until all elements are processed and the second queue
> is full.
>    Run thread 3.

Works fine.

> I just want to see if it really is a threading error or if it is a logic
> error.
> As far as I can see, all methods in DefaultQueue are correctly synchronized
> (with the exceptions I've told you), and I could not find any errors in the
> VariableSizeBuffer.

I've attached my updated QueueTest, you can comment out a line in main() to 
control whether it runs threaded or not. I've also attached my modified 
DefaultQueue with the counters.
-pete

-- 
peter royal -> [EMAIL PROTECTED]

import org.apache.avalon.excalibur.event.Queue;
import org.apache.avalon.excalibur.event.QueueElement;
import org.apache.avalon.excalibur.event.Sink;
import org.apache.avalon.excalibur.event.Source;
import org.apache.avalon.excalibur.event.SourceException;
import org.apache.avalon.framework.CascadingRuntimeException;

import com.pace2020.edi.util.DefaultQueue;

/**
 * Simple test to expose the thread queue bug
 *
 * @author <a href="mailto:[EMAIL PROTECTED]";>Peter Royal</a>
 * @version VSS $Revision: 5 $ $Date: 3/04/02 1:38p $
 */
public class QueueTest
{
    private QueueStart start;
    private QueueEnd end;

    private DefaultQueue queue;

    private Thread[] stages;

    public static void main(String[] args) throws Exception
    {
        QueueTest qt = new QueueTest();

        qt.initialize(Integer.parseInt(args[0]));
        qt.start();
//        qt.testNoThreads();
    }

    public void initialize(int count) throws Exception
    {
        this.stages = new Thread[2];

        this.queue = new DefaultQueue();

        this.start = new QueueStart(count);
        this.start.setSource(this.queue);
        this.stages[0] = new Thread(this.start);

        this.end = new QueueEnd();
        this.end.setSink(this.queue);
        this.stages[1] = new Thread(this.end);
    }

    public void testNoThreads() {
        System.out.println("Running start...");
        this.start.run();

        System.out.println("Running end...");
        this.end.run();

        printStats();
    }

    public void start() throws Exception
    {
        System.out.println("Starting test");

        for (int i = 0; i < this.stages.length; i++) {
            this.stages[i].start();
        }

        stop();
    }

    public void stop() throws Exception
    {
        for (int i = 0; i < this.stages.length; i++) {
            try {
                this.stages[i].join();
            } catch (InterruptedException e) {
                throw new CascadingRuntimeException("Stage unexpectedly interrupted", e);
            }
        }

        printStats();
    }

    private void printStats()
    {
        System.out.println("Test complete");

        System.out.println("Enqueue: " + this.start.getCount());
        System.out.println("DefaultQueue.enqueueCount: " + this.queue.getEnqueueCount());
        System.out.println("Dequeue: " + this.end.getCount());
        System.out.println("DefaultQueue.dequeueCount: " + this.queue.getDequeueCount());
        System.out.println("DefaultQueue.dequeueAttempt: " + this.queue.getDequeueAttempt());
    }

    private class QueueInteger implements QueueElement
    {
        private int integer;

        public QueueInteger(int integer)
        {
            this.integer = integer;
        }

        public int getInteger()
        {
            return integer;
        }
    }

    private class QueueStart implements Runnable
    {
        private Source source;
        private int queueCount;
        private int count;

        public QueueStart(int queueCount)
        {
            this.queueCount = queueCount;
        }

        protected void setSource(Source source)
        {
            this.source = source;
        }

        public int getCount()
        {
            return count;
        }

        public void run()
        {
            for (int i = 0; i < this.queueCount; i++) {
                try {
                    this.source.enqueue(new QueueInteger(i));
                    this.count++;
                } catch (SourceException e) {
                    System.out.println("Unable to queue: " + e.getMessage());
                }
            }

            try {
                this.source.enqueue(new QueueInteger(-1));
            } catch (SourceException e) {
                System.out.println("Unable to queue stop");
            }
        }
    }

    private class QueueEnd implements Runnable
    {
        private Sink sink;
        private int count;

        protected void setSink(Sink sink)
        {
            this.sink = sink;
        }

        public int getCount()
        {
            return count;
        }

        public void run()
        {
            while (true) {
                QueueElement qe = this.sink.dequeue();

                if (qe == null) {

                } else if (qe instanceof QueueInteger) {
                    QueueInteger qi = (QueueInteger) qe;

                    if (qi.getInteger() == -1) {
                        break;
                    } else {
                        this.count++;
                    }
                }
            }
        }
    }
}
/*
 * Copyright (C) The Apache Software Foundation. All rights reserved.
 *
 * This software is published under the terms of the Apache Software License
 * version 1.1, a copy of which has been included with this distribution in
 * the LICENSE.txt file.
 */
package com.pace2020.edi.util;

import org.apache.avalon.excalibur.collections.Buffer;
import org.apache.avalon.excalibur.collections.VariableSizeBuffer;
import org.apache.avalon.excalibur.concurrent.Mutex;
import org.apache.avalon.excalibur.event.AbstractQueue;
import org.apache.avalon.excalibur.event.PreparedEnqueue;
import org.apache.avalon.excalibur.event.QueueElement;
import org.apache.avalon.excalibur.event.SourceException;
import org.apache.avalon.excalibur.event.SourceFullException;

/**
 * The default queue implementation is a variable size queue.  This queue is
 * ThreadSafe, however the overhead in synchronization costs a few extra millis.
 *
 * @author <a href="mailto:[EMAIL PROTECTED]";>Berin Loritsch</a>
 */
public final class DefaultQueue extends AbstractQueue
{
    private final Buffer    m_elements;
    private final Mutex     m_mutex;
    private       int       m_reserve;
    private final int       m_maxSize;
    private int enqueueCount;
    private int dequeueCount;
    private int dequeueAttempt;

    public DefaultQueue( int size )
    {
        int maxSize;

        if ( size > 0 )
        {
            m_elements = new VariableSizeBuffer( size );
            maxSize = size;
        }
        else
        {
            m_elements = new VariableSizeBuffer();
            maxSize = -1;
        }

        m_mutex = new Mutex();
        m_reserve = 0;
        m_maxSize = maxSize;
    }

    public DefaultQueue()
    {
        this( -1 );
    }

    public int size()
    {
        return m_elements.size();
    }

    public int maxSize()
    {
        return m_maxSize;
    }

    public PreparedEnqueue prepareEnqueue( final QueueElement[] elements )
        throws SourceException
    {
        PreparedEnqueue enqueue = null;

        try
        {
            m_mutex.acquire();

            if ( maxSize() > 0 && elements.length + m_reserve + size() > maxSize() )
            {
                throw new SourceFullException("Not enough room to enqueue these elements.");
            }

            enqueue = new DefaultPreparedEnqueue( this, elements );
        }
        catch ( InterruptedException ie )
        {
        }
        finally
        {
            m_mutex.release();
        }

        return enqueue;
    }

    public boolean tryEnqueue( final QueueElement element )
    {
        boolean success = false;

        try
        {
            m_mutex.acquire();

            if (  maxSize() > 0 && 1 + m_reserve + size() > maxSize() )
            {
                return false;
            }

            m_elements.add( element );
            success = true;
        }
        catch ( InterruptedException ie )
        {
        }
        finally
        {
            m_mutex.release();
        }

        return success;
    }

    public void enqueue( final QueueElement[] elements )
        throws SourceException
    {
        final int len = elements.length;

        try
        {
            m_mutex.acquire();
            if (  maxSize() > 0 && elements.length + m_reserve + size() > maxSize() )
            {
                throw new SourceFullException("Not enough room to enqueue these elements.");
            }

            for ( int i = 0; i < len; i++ )
            {
                m_elements.add( elements[i] );
            }
        }
        catch ( InterruptedException ie )
        {
        }
        finally
        {
            m_mutex.release();
        }
    }

    public void enqueue( final QueueElement element )
        throws SourceException
    {
        try
        {
            m_mutex.acquire();
            if (  maxSize() > 0 && 1 + m_reserve + size() > maxSize() )
            {
                throw new SourceFullException("Not enough room to enqueue these elements.");
            }

            m_elements.add( element );
            this.enqueueCount++;
        }
        catch ( InterruptedException ie )
        {
        }
        finally
        {
            m_mutex.release();
        }
    }

    public QueueElement[] dequeue( final int numElements )
    {
        int arraySize = numElements;

        if ( size() < numElements )
        {
            arraySize = size();
        }

        QueueElement[] elements = null;

        try
        {
            m_mutex.attempt( m_timeout );

            if ( size() < numElements )
            {
                arraySize = size();
            }

            elements = new QueueElement[ arraySize ];

            for ( int i = 0; i < arraySize; i++ )
            {
                elements[i] = (QueueElement) m_elements.remove();
            }
        }
        catch ( InterruptedException ie )
        {
        }
        finally
        {
            m_mutex.release();
        }

        return elements;
    }

    public QueueElement[] dequeueAll()
    {
        QueueElement[] elements = null;

        try
        {
            m_mutex.attempt( m_timeout );

            elements = new QueueElement[ size() ];

            for ( int i = 0; i < elements.length; i++ )
            {
                elements[i] = (QueueElement) m_elements.remove();
            }
        }
        catch ( InterruptedException ie )
        {
        }
        finally
        {
            m_mutex.release();
        }

        return elements;
    }

    public QueueElement dequeue()
    {
        QueueElement element = null;

        try
        {
            if (m_mutex.attempt( m_timeout ))
            {
                this.dequeueAttempt++;
                if ( size() > 0 )
                {
                    element = (QueueElement) m_elements.remove();
                    this.dequeueCount++;
                }
            }
        }
        catch ( InterruptedException ie )
        {
        }
        finally
        {
            m_mutex.release();
        }

        return element;
    }

    private final static class DefaultPreparedEnqueue implements PreparedEnqueue
    {
        private final DefaultQueue m_parent;
        private       QueueElement[] m_elements;

        private DefaultPreparedEnqueue( DefaultQueue parent, QueueElement[] elements )
        {
            m_parent = parent;
            m_elements = elements;
        }


        public void commit()
        {
            if ( null == m_elements )
            {
                throw new IllegalStateException("This PreparedEnqueue has already been processed!");
            }

            try
            {
                m_parent.enqueue( m_elements );
                m_parent.m_reserve -= m_elements.length;
                m_elements = null;
            }
            catch (Exception e)
            {
                throw new IllegalStateException("Default enqueue did not happen--should be impossible");
                // will never happen
            }
        }

        public void abort()
        {
            if ( null == m_elements )
            {
                throw new IllegalStateException("This PreparedEnqueue has already been processed!");
            }

            m_parent.m_reserve -= m_elements.length;
            m_elements = null;
        }
    }

    public int getEnqueueCount()
    {
        return enqueueCount;
    }

    public int getDequeueCount()
    {
        return dequeueCount;
    }

    public int getDequeueAttempt()
    {
        return dequeueAttempt;
    }
}
--
To unsubscribe, e-mail:   <mailto:[EMAIL PROTECTED]>
For additional commands, e-mail: <mailto:[EMAIL PROTECTED]>

Reply via email to