Hi all :
    A few day later, I post a bug about No-Durable topic consumer client
receive message got out of heap exception see at
https://issues.apache.org/activemq/browse/AMQ-1325 
    This day I found the problem is the procuder send meessage faster and
the consumer receiver message slower, If I set thread sleep 100 millisecond
, the consumer client would not throw outofMemory exception.
    And I found org.apacke.activemq.MessageDispatchChannel using LinkedList
has no max capacity, if the consumer recevie slower, the LinkedList size
will increase, this is why throw outofMemory exception.
   So I changed MessageDispatchChannel below , it run has no outofMemory
exception.

import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

import javax.jms.JMSException;

import com.thtf.ezone.ezesb.mq.command.MessageDispatch;

public class MessageDispatchChannel {

    //private final Object mutex = new Object();
    private final LinkedList<MessageDispatch> list;
    private AtomicBoolean closed=new AtomicBoolean(false);
    private AtomicBoolean running=new AtomicBoolean(false);
    private final int capacity;
    
    /** Current number of elements */
    private final AtomicInteger count = new AtomicInteger(0);
    
    /** Lock held by take, poll, etc */
    private final ReentrantLock lock;

    /** Wait queue for waiting takes */
    private final Condition notEmpty;
     /** Wait queue for waiting puts */
    private final Condition notFull;

    public MessageDispatchChannel() {
        this.list = new LinkedList<MessageDispatch>();
        this.capacity=Integer.MAX_VALUE;
        lock = new ReentrantLock(false);
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }
    
    public MessageDispatchChannel(int capacity) {
        if (capacity <= 0) throw new IllegalArgumentException();
        this.list = new LinkedList<MessageDispatch>();
        this.capacity=capacity;
        lock = new ReentrantLock(false);
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }

    public void enqueue(MessageDispatch message) throws
InterruptedException{
        if (closed.get() ||!running.get()){
                return ;
        }
//      synchronized(mutex) {
//              if (list.size()>=this.capacity){
//                      try {
//                              System.out.println(" overflow.");
//                                      mutex.wait(1000);
//                              } catch (InterruptedException e) {
//                              }
//              }
//              list.addLast(message);
//            mutex.notify();
//        }
        if (message == null) throw new NullPointerException();
        // Note: convention in all put/take/etc is to preset
        // local var holding count  negative to indicate failure unless set.
        int c = -1;        
        final ReentrantLock lock = this.lock;
        final AtomicInteger count = this.count;
        lock.lockInterruptibly();
        try {
            /*
             * Note that count is used in wait guard even though it is
             * not protected by lock. This works because count can
             * only decrease at this point (all other puts are shut
             * out by lock), and we (or some other waiting put) are
             * signalled if it ever changes from
             * capacity. Similarly for all other uses of count in
             * other wait guards.
             */
            try {
                while (count.get() == capacity)
                    notFull.await();
            } catch (InterruptedException ie) {
                notFull.signal(); // propagate to a non-interrupted thread
                throw ie;
            }
            list.addLast(message);
            c = count.getAndIncrement();
            notEmpty.signal();
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            lock.unlock();
        }    
        
    }

    public void enqueueFirst(MessageDispatch message) throws
InterruptedException{
        if (closed.get() ||!running.get()){
                return ;
        }
//        synchronized(mutex) {
//              if (list.size()>=this.capacity){
//                      try {
//                                      mutex.wait(1000);
//                              } catch (InterruptedException e) {
//                              }
//              }
//            list.addFirst(message);
//            mutex.notify();
//        }
        if (message == null) throw new NullPointerException();
        // Note: convention in all put/take/etc is to preset
        // local var holding count  negative to indicate failure unless set.
        int c = -1;
        final ReentrantLock lock = this.lock;
        final AtomicInteger count = this.count;
        lock.lockInterruptibly();
        try {
            /*
             * Note that count is used in wait guard even though it is
             * not protected by lock. This works because count can
             * only decrease at this point (all other puts are shut
             * out by lock), and we (or some other waiting put) are
             * signalled if it ever changes from
             * capacity. Similarly for all other uses of count in
             * other wait guards.
             */
            try {
                while (count.get() == capacity)
                    notFull.await();
            } catch (InterruptedException ie) {
                notFull.signal(); // propagate to a non-interrupted thread
                throw ie;
            }
            list.addFirst(message);
            c = count.getAndIncrement();
            notEmpty.signal();
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
                lock.unlock();
        }
    }

    public boolean isEmpty() {
//        synchronized(mutex) {
//            return list.isEmpty();
//        }
        return count.get()==0;
    }

    /**
     * Used to get an enqueued message. 
     * The amount of time this method blocks is based on the timeout value. 
     * - if timeout==-1 then it blocks until a message is received. 
     * - if timeout==0 then it it tries to not block at all, it returns a
message if it is available 
     * - if timeout>0 then it blocks up to timeout amount of time.
     * 
     * Expired messages will consumed by this method.  
     * 
     * @throws JMSException 
     * 
     * @return null if we timeout or if the consumer is closed.
     * @throws InterruptedException 
     */
    public MessageDispatch dequeue(long timeout) throws InterruptedException
{
//        synchronized (mutex) {
//            // Wait until the consumer is ready to deliver messages.
//            while(timeout != 0 && !closed && (list.isEmpty() || !running))
{
//                if (timeout == -1) {
//                    mutex.wait();
//                } else {
//                    mutex.wait(timeout);
//                    break;
//                }
//            }
//            if (closed || !running || list.isEmpty()) {
//                return null;
//            }
//            return list.removeFirst();
//        }
        if (closed.get() ||!running.get()){
                return null;
        }
        MessageDispatch x = null;
        int c = -1;
        long nanos = TimeUnit.MILLISECONDS.toNanos(timeout);
        final AtomicInteger count = this.count;
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            for (;;) {
                if (count.get() > 0) {
                    x = list.poll();
                    c = count.getAndDecrement();
                    notFull.signal();
                    if (c > 1)
                        notEmpty.signal();
                    break;
                }
                if (nanos <= 0)
                    return null;
                try {
                    nanos = notEmpty.awaitNanos(nanos);
                } catch (InterruptedException ie) {
                    notEmpty.signal(); // propagate to a non-interrupted
thread
                    throw ie;
                }
            }
        } finally {
                lock.unlock();
        }
        return x;
    }
    
    public MessageDispatch dequeueNoWait() {
        if (closed.get() ||!running.get()){
                return null;
        }
//        synchronized (mutex) {
//            if (closed || !running || list.isEmpty()) {
//                return null;
//            }
//            return list.removeFirst();
//        }
        final AtomicInteger count = this.count;
        if (count.get() == 0)
            return null;
        MessageDispatch x = null;
        int c = -1;
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            if (count.get() > 0) {
                x =list.poll();
                c = count.getAndDecrement();
                notFull.signal();
                if (c > 1)
                    notEmpty.signal();
            }
        } finally {
                lock.unlock();
        }
        return x;
    }
    
    public MessageDispatch peek() {
        if (closed.get() ||!running.get()){
                return null;
        }
//        synchronized (mutex) {
//            if (closed || !running || list.isEmpty()) {
//                return null;
//            }
//            return list.getFirst();
//        }     
        final ReentrantLock lock = this.lock;
        lock.lock();
        final AtomicInteger count = this.count;
        if (count.get() == 0)
            return null;
        try {
                return list.peek();
        } finally {
                lock.unlock();
        }
    }

    

    public void clear() {
//        synchronized(mutex) {
//            list.clear();
//        }
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
                list.clear();
            if (count.getAndSet(0) == capacity)
                notFull.signalAll();
        } finally {
                lock.unlock();
        }
    }

    public boolean isClosed() {
        return closed.get();
    }

    public int size() {
//        synchronized(mutex) {
//            return list.size();
//        }
        return count.get();
    }
    
    public int remainingCapacity() {
        return capacity - count.get();
    }
    
    public List<MessageDispatch> removeAll() {
//        synchronized(mutex) {
//            ArrayList <MessageDispatch>rc = new
ArrayList<MessageDispatch>(list);
//            list.clear();
//            return rc;
//        }
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
                ArrayList <MessageDispatch>rc = new
ArrayList<MessageDispatch>(list);
                list.clear();
                if (count.getAndSet(0) == capacity)
                notFull.signalAll();
                return rc;
        } finally {
                lock.unlock();
        }
    }

    public String toString() {
//        synchronized(mutex) {
//            return list.toString();
//        }
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return list.toString();
        } finally {
                lock.unlock();
        }
    }
    
    public void start() {
//        synchronized (mutex) {
//            running = true;
//            mutex.notifyAll();
//        }
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
                count.set(0);
                running.compareAndSet(false, true);
        } finally {
                lock.unlock();
        }
    }

    public void stop() {
//        synchronized (mutex) {
//            running = false;
//            mutex.notifyAll();
//        }
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
                count.set(0);
                running.compareAndSet(true, false);
        } finally {
                lock.unlock();
        }
    }

    public void close() {
//        synchronized (mutex) {
//            if (!closed) {
//                running = false;
//                closed = true;
//            }
//            mutex.notifyAll();
//        }
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
                count.set(0);
                running.compareAndSet(true, false);
                closed.compareAndSet(false, true);
        } finally {
                lock.unlock();
        }
    }

    //public Object getMutex() {
    //    return mutex;
    //}

    public boolean isRunning() {
        return running.get();
    }  
    
    public ReentrantLock getLock(){
        return this.lock;
    }
    
}

    I also write another MessageDispatchChannel using two lock ( Get Lock
and Put lock) 
import java.util.AbstractQueue;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

public class MesssageDispatchBlockingChannel<E> extends AbstractQueue<E>
                implements BlockingQueue<E>, java.io.Serializable {
        /*
         * A variant of the "two lock queue" algorithm. The putLock gates entry 
to
         * put (and offer), and has an associated condition for waiting puts.
         * Similarly for the takeLock. The "count" field that they both rely on 
is
         * maintained as an atomic to avoid needing to get both locks in most
cases.
         * Also, to minimize need for puts to get takeLock and vice-versa,
cascading
         * notifies are used. When a put notices that it has enabled at least 
one
         * take, it signals taker. That taker in turn signals others if more 
items
         * have been entered since the signal. And symmetrically for takes
         * signalling puts. Operations such as remove(Object) and iterators 
acquire
         * both locks.
         */

        /**
         * 
         */
        private static final long serialVersionUID = 1330823692422658197L;

        /**
         * Linked list node class
         */
        static class Node<E> {
                /** The item, volatile to ensure barrier separating write and 
read */
                volatile E item;
                Node<E> next;

                Node(E x) {
                        item = x;
                }
        }

        /** The capacity bound, or Integer.MAX_VALUE if none */
        private final int capacity;

        /** Current number of elements */
        private final AtomicInteger count = new AtomicInteger(0);

        /** Head of linked list */
        private transient Node<E> head;

        /** Tail of linked list */
        private transient Node<E> last;

        /** Lock held by take, poll, etc */
        private final ReentrantLock takeLock = new ReentrantLock();

        /** Wait queue for waiting takes */
        private final Condition notEmpty = takeLock.newCondition();

        /** Lock held by put, offer, etc */
        private final ReentrantLock putLock = new ReentrantLock();

        /** Wait queue for waiting puts */
        private final Condition notFull = putLock.newCondition();

        private AtomicBoolean closed = new AtomicBoolean(false);
        private AtomicBoolean running = new AtomicBoolean(false);

        public boolean enqueue(E o) throws InterruptedException {
                if (closed.get() || !running.get()) {
                        return false;
                }
                return this.offer(o);
        }

        public boolean enqueueFirst(E o) throws InterruptedException {
                if (closed.get() || !running.get()) {
                        return false;
                }

                if (o == null)
                        throw new NullPointerException();
                final AtomicInteger count = this.count;
                if (count.get() == capacity)
                        return false;
                int c = -1;
                final ReentrantLock putLock = this.putLock;
                putLock.lock();
                try {
                        if (count.get() < capacity) {
                                addFirst(o);
                                c = count.getAndIncrement();
                                if (c + 1 < capacity)
                                        notFull.signal();
                        }
                } finally {
                        putLock.unlock();
                }
                if (c == 0)
                        signalNotEmpty();
                return c >= 0;
        }

        public boolean isEmpty() {
                final AtomicInteger count = this.count;
                return count.get() == 0;
        }

        public E dequeue(long timeout) throws InterruptedException {
                if (closed.get() || !running.get()) {
                        return null;
                }
                return this.poll(timeout, TimeUnit.MILLISECONDS);
        }

        public E dequeueNoWait() {
                if (closed.get() || !running.get()) {
                        return null;
                }
                return this.poll();
        }       

        public List<E> removeAll() {
                fullyLock();
                try {
                        int size = count.get();
                        ArrayList<E> rc = new ArrayList<E>(size);
                        for (Node<E> p = head.next; p != null; p = p.next) {
                                rc.add(p.item);
                        }
                        head.next = null;
                        assert head.item == null;
                        last = head;
                        if (count.getAndSet(0) == capacity)
                                notFull.signalAll();
                        return rc;
                } finally {
                        fullyUnlock();
                }
        }

        public void start() {
                fullyLock();
                try {
                        count.set(0);
                        running.compareAndSet(false, true);
                } finally {
                        fullyUnlock();
                }
        }

        public void stop() {
                fullyLock();
                try {
                        count.set(0);
                        running.compareAndSet(true, false);
                } finally {
                        fullyUnlock();
                }
        }

        public void close() {
                fullyLock();
                try {
                        count.set(0);
                        running.compareAndSet(true, false);
                        closed.compareAndSet(false, true);
                } finally {
                        fullyUnlock();
                }
        }
        
        public boolean isClosed() {
                return closed.get();
        }

        public boolean isRunning() {
                return running.get();
        }

        /**
         * Signal a waiting take. Called only from put/offer (which do not
otherwise
         * ordinarily lock takeLock.)
         */
        private void signalNotEmpty() {
                final ReentrantLock takeLock = this.takeLock;
                takeLock.lock();
                try {
                        notEmpty.signal();
                } finally {
                        takeLock.unlock();
                }
        }

        /**
         * Signal a waiting put. Called only from take/poll.
         */
        private void signalNotFull() {
                final ReentrantLock putLock = this.putLock;
                putLock.lock();
                try {
                        notFull.signal();
                } finally {
                        putLock.unlock();
                }
        }

        /**
         * Create a node and link it at end of queue
         * 
         * @param x
         *            the item
         */
        private void addFirst(E x) {
                Node<E> node = new Node<E>(x);
                node.next = head;
                head = node;
        }

        /**
         * Create a node and link it at end of queue
         * 
         * @param x
         *            the item
         */
        private void insert(E x) {
                last = last.next = new Node<E>(x);
        }

        /**
         * Remove a node from head of queue,
         * 
         * @return the node
         */
        private E extract() {
                Node<E> first = head.next;
                head = first;
                E x = first.item;
                first.item = null;
                return x;
        }

        /**
         * Lock to prevent both puts and takes.
         */
        protected void fullyLock() {
                putLock.lock();
                takeLock.lock();
        }

        /**
         * Unlock to allow both puts and takes.
         */
        protected void fullyUnlock() {
                takeLock.unlock();
                putLock.unlock();
        }

        /**
         * Creates a <tt>LinkedBlockingQueue</tt> with a capacity of
         * [EMAIL PROTECTED] Integer#MAX_VALUE}.
         */
        public MesssageDispatchBlockingChannel() {
                this(Integer.MAX_VALUE);
        }

        /**
         * Creates a <tt>LinkedBlockingQueue</tt> with the given (fixed) 
capacity.
         * 
         * @param capacity
         *            the capacity of this queue.
         * @throws IllegalArgumentException
         *             if <tt>capacity</tt> is not greater than zero.
         */
        public MesssageDispatchBlockingChannel(int capacity) {
                if (capacity <= 0)
                        throw new IllegalArgumentException();
                this.capacity = capacity;
                last = head = new Node<E>(null);
        }

        /**
         * Creates a <tt>LinkedBlockingQueue</tt> with a capacity of
         * [EMAIL PROTECTED] Integer#MAX_VALUE}, initially containing the 
elements of the
         * given collection, added in traversal order of the collection's 
iterator.
         * 
         * @param c
         *            the collection of elements to initially contain
         * @throws NullPointerException
         *             if <tt>c</tt> or any element within it is <tt>null</tt>
         */
        public MesssageDispatchBlockingChannel(Collection<? extends E> c) {
                this(Integer.MAX_VALUE);
                for (E e : c)
                        add(e);
        }

        // this doc comment is overridden to remove the reference to collections
        // greater in size than Integer.MAX_VALUE
        /**
         * Returns the number of elements in this queue.
         * 
         * @return the number of elements in this queue.
         */
        public int size() {
                return count.get();
        }

        // this doc comment is a modified copy of the inherited doc comment,
        // without the reference to unlimited queues.
        /**
         * Returns the number of elements that this queue can ideally (in the
         * absence of memory or resource constraints) accept without blocking. 
This
         * is always equal to the initial capacity of this queue less the 
current
         * <tt>size</tt> of this queue.
         * <p>
         * Note that you <em>cannot</em> always tell if an attempt to 
<tt>add</tt>
         * an element will succeed by inspecting <tt>remainingCapacity</tt>
         * because it may be the case that a waiting consumer is ready to
         * <tt>take</tt> an element out of an otherwise full queue.
         */
        public int remainingCapacity() {
                return capacity - count.get();
        }

        /**
         * Adds the specified element to the tail of this queue, waiting if
         * necessary for space to become available.
         * 
         * @param o
         *            the element to add
         * @throws InterruptedException
         *             if interrupted while waiting.
         * @throws NullPointerException
         *             if the specified element is <tt>null</tt>.
         */
        public void put(E o) throws InterruptedException {
                if (o == null)
                        throw new NullPointerException();
                // Note: convention in all put/take/etc is to preset
                // local var holding count negative to indicate failure unless 
set.
                int c = -1;
                final ReentrantLock putLock = this.putLock;
                final AtomicInteger count = this.count;
                putLock.lockInterruptibly();
                try {
                        /*
                         * Note that count is used in wait guard even though it 
is not
                         * protected by lock. This works because count can only 
decrease at
                         * this point (all other puts are shut out by lock), 
and we (or some
                         * other waiting put) are signalled if it ever changes 
from
                         * capacity. Similarly for all other uses of count in 
other wait
                         * guards.
                         */
                        try {
                                while (count.get() == capacity)
                                        notFull.await();
                        } catch (InterruptedException ie) {
                                notFull.signal(); // propagate to a 
non-interrupted thread
                                throw ie;
                        }
                        insert(o);
                        c = count.getAndIncrement();
                        if (c + 1 < capacity)
                                notFull.signal();
                } finally {
                        putLock.unlock();
                }
                if (c == 0)
                        signalNotEmpty();
        }

        /**
         * Inserts the specified element at the tail of this queue, waiting if
         * necessary up to the specified wait time for space to become 
available.
         * 
         * @param o
         *            the element to add
         * @param timeout
         *            how long to wait before giving up, in units of 
<tt>unit</tt>
         * @param unit
         *            a <tt>TimeUnit</tt> determining how to interpret the
         *            <tt>timeout</tt> parameter
         * @return <tt>true</tt> if successful, or <tt>false</tt> if the
         *         specified waiting time elapses before space is available.
         * @throws InterruptedException
         *             if interrupted while waiting.
         * @throws NullPointerException
         *             if the specified element is <tt>null</tt>.
         */
        public boolean offer(E o, long timeout, TimeUnit unit)
                        throws InterruptedException {

                if (o == null)
                        throw new NullPointerException();
                long nanos = unit.toNanos(timeout);
                int c = -1;
                final ReentrantLock putLock = this.putLock;
                final AtomicInteger count = this.count;
                putLock.lockInterruptibly();
                try {
                        for (;;) {
                                if (count.get() < capacity) {
                                        insert(o);
                                        c = count.getAndIncrement();
                                        if (c + 1 < capacity)
                                                notFull.signal();
                                        break;
                                }
                                if (nanos <= 0)
                                        return false;
                                try {
                                        nanos = notFull.awaitNanos(nanos);
                                } catch (InterruptedException ie) {
                                        notFull.signal(); // propagate to a 
non-interrupted thread
                                        throw ie;
                                }
                        }
                } finally {
                        putLock.unlock();
                }
                if (c == 0)
                        signalNotEmpty();
                return true;
        }

        /**
         * Inserts the specified element at the tail of this queue if possible,
         * returning immediately if this queue is full.
         * 
         * @param o
         *            the element to add.
         * @return <tt>true</tt> if it was possible to add the element to this
         *         queue, else <tt>false</tt>
         * @throws NullPointerException
         *             if the specified element is <tt>null</tt>
         */
        public boolean offer(E o) {
                if (o == null)
                        throw new NullPointerException();
                final AtomicInteger count = this.count;
                if (count.get() == capacity)
                        return false;
                int c = -1;
                final ReentrantLock putLock = this.putLock;
                putLock.lock();
                try {
                        if (count.get() < capacity) {
                                insert(o);
                                c = count.getAndIncrement();
                                if (c + 1 < capacity)
                                        notFull.signal();
                        }
                } finally {
                        putLock.unlock();
                }
                if (c == 0)
                        signalNotEmpty();
                return c >= 0;
        }

        public E take() throws InterruptedException {
                E x;
                int c = -1;
                final AtomicInteger count = this.count;
                final ReentrantLock takeLock = this.takeLock;
                takeLock.lockInterruptibly();
                try {
                        try {
                                while (count.get() == 0)
                                        notEmpty.await();
                        } catch (InterruptedException ie) {
                                notEmpty.signal(); // propagate to a 
non-interrupted thread
                                throw ie;
                        }

                        x = extract();
                        c = count.getAndDecrement();
                        if (c > 1)
                                notEmpty.signal();
                } finally {
                        takeLock.unlock();
                }
                if (c == capacity)
                        signalNotFull();
                return x;
        }

        public E poll(long timeout, TimeUnit unit) throws InterruptedException {
                E x = null;
                int c = -1;
                long nanos = unit.toNanos(timeout);
                final AtomicInteger count = this.count;
                final ReentrantLock takeLock = this.takeLock;
                takeLock.lockInterruptibly();
                try {
                        for (;;) {
                                if (count.get() > 0) {
                                        x = extract();
                                        c = count.getAndDecrement();
                                        if (c > 1)
                                                notEmpty.signal();
                                        break;
                                }
                                if (nanos <= 0)
                                        return null;
                                try {
                                        nanos = notEmpty.awaitNanos(nanos);
                                } catch (InterruptedException ie) {
                                        notEmpty.signal(); // propagate to a 
non-interrupted thread
                                        throw ie;
                                }
                        }
                } finally {
                        takeLock.unlock();
                }
                if (c == capacity)
                        signalNotFull();
                return x;
        }

        public E poll() {
                final AtomicInteger count = this.count;
                if (count.get() == 0)
                        return null;
                E x = null;
                int c = -1;
                final ReentrantLock takeLock = this.takeLock;
                takeLock.lock();
                try {
                        if (count.get() > 0) {
                                x = extract();
                                c = count.getAndDecrement();
                                if (c > 1)
                                        notEmpty.signal();
                        }
                } finally {
                        takeLock.unlock();
                }
                if (c == capacity)
                        signalNotFull();
                return x;
        }

        public E peek() {
                if (count.get() == 0)
                        return null;
                final ReentrantLock takeLock = this.takeLock;
                takeLock.lock();
                try {
                        Node<E> first = head.next;
                        if (first == null)
                                return null;
                        else
                                return first.item;
                } finally {
                        takeLock.unlock();
                }
        }

        /**
         * Removes a single instance of the specified element from this queue, 
if
it
         * is present.
         */
        public boolean remove(Object o) {
                if (o == null)
                        return false;
                boolean removed = false;
                fullyLock();
                try {
                        Node<E> trail = head;
                        Node<E> p = head.next;
                        while (p != null) {
                                if (o.equals(p.item)) {
                                        removed = true;
                                        break;
                                }
                                trail = p;
                                p = p.next;
                        }
                        if (removed) {
                                p.item = null;
                                trail.next = p.next;
                                if (last == p)
                                        last = trail;
                                if (count.getAndDecrement() == capacity)
                                        notFull.signalAll();
                        }
                } finally {
                        fullyUnlock();
                }
                return removed;
        }

        public Object[] toArray() {
                fullyLock();
                try {
                        int size = count.get();
                        Object[] a = new Object[size];
                        int k = 0;
                        for (Node<E> p = head.next; p != null; p = p.next)
                                a[k++] = p.item;
                        return a;
                } finally {
                        fullyUnlock();
                }
        }

        public <T> T[] toArray(T[] a) {
                fullyLock();
                try {
                        int size = count.get();
                        if (a.length < size)
                                a = (T[]) 
java.lang.reflect.Array.newInstance(a.getClass()
                                                .getComponentType(), size);

                        int k = 0;
                        for (Node p = head.next; p != null; p = p.next)
                                a[k++] = (T) p.item;
                        if (a.length > k)
                                a[k] = null;
                        return a;
                } finally {
                        fullyUnlock();
                }
        }

        public String toString() {
                fullyLock();
                try {
                        return super.toString();
                } finally {
                        fullyUnlock();
                }
        }

        /**
         * Atomically removes all of the elements from this queue. The queue 
will
be
         * empty after this call returns.
         */
        public void clear() {
                fullyLock();
                try {
                        head.next = null;
                        assert head.item == null;
                        last = head;
                        if (count.getAndSet(0) == capacity)
                                notFull.signalAll();
                } finally {
                        fullyUnlock();
                }
        }

        public int drainTo(Collection<? super E> c) {
                if (c == null)
                        throw new NullPointerException();
                if (c == this)
                        throw new IllegalArgumentException();
                Node first;
                fullyLock();
                try {
                        first = head.next;
                        head.next = null;
                        assert head.item == null;
                        last = head;
                        if (count.getAndSet(0) == capacity)
                                notFull.signalAll();
                } finally {
                        fullyUnlock();
                }
                // Transfer the elements outside of locks
                int n = 0;
                for (Node<E> p = first; p != null; p = p.next) {
                        c.add(p.item);
                        p.item = null;
                        ++n;
                }
                return n;
        }

        public int drainTo(Collection<? super E> c, int maxElements) {
                if (c == null)
                        throw new NullPointerException();
                if (c == this)
                        throw new IllegalArgumentException();
                fullyLock();
                try {
                        int n = 0;
                        Node<E> p = head.next;
                        while (p != null && n < maxElements) {
                                c.add(p.item);
                                p.item = null;
                                p = p.next;
                                ++n;
                        }
                        if (n != 0) {
                                head.next = p;
                                assert head.item == null;
                                if (p == null)
                                        last = head;
                                if (count.getAndAdd(-n) == capacity)
                                        notFull.signalAll();
                        }
                        return n;
                } finally {
                        fullyUnlock();
                }
        }

        /**
         * Returns an iterator over the elements in this queue in proper 
sequence.
         * The returned <tt>Iterator</tt> is a "weakly consistent" iterator that
         * will never throw [EMAIL PROTECTED] 
java.util.ConcurrentModificationException}, and
         * guarantees to traverse elements as they existed upon construction of 
the
         * iterator, and may (but is not guaranteed to) reflect any 
modifications
         * subsequent to construction.
         * 
         * @return an iterator over the elements in this queue in proper 
sequence.
         */
        public Iterator<E> iterator() {
                return new Itr();
        }

        private class Itr implements Iterator<E> {
                /*
                 * Basic weak-consistent iterator. At all times hold the next 
item to
                 * hand out so that if hasNext() reports true, we will still 
have it to
                 * return even if lost race with a take etc.
                 */
                private Node<E> current;
                private Node<E> lastRet;
                private E currentElement;

                Itr() {
                        final ReentrantLock putLock =
MesssageDispatchBlockingChannel.this.putLock;
                        final ReentrantLock takeLock =
MesssageDispatchBlockingChannel.this.takeLock;
                        putLock.lock();
                        takeLock.lock();
                        try {
                                current = head.next;
                                if (current != null)
                                        currentElement = current.item;
                        } finally {
                                takeLock.unlock();
                                putLock.unlock();
                        }
                }

                public boolean hasNext() {
                        return current != null;
                }

                public E next() {
                        final ReentrantLock putLock =
MesssageDispatchBlockingChannel.this.putLock;
                        final ReentrantLock takeLock =
MesssageDispatchBlockingChannel.this.takeLock;
                        putLock.lock();
                        takeLock.lock();
                        try {
                                if (current == null)
                                        throw new NoSuchElementException();
                                E x = currentElement;
                                lastRet = current;
                                current = current.next;
                                if (current != null)
                                        currentElement = current.item;
                                return x;
                        } finally {
                                takeLock.unlock();
                                putLock.unlock();
                        }
                }

                public void remove() {
                        if (lastRet == null)
                                throw new IllegalStateException();
                        final ReentrantLock putLock =
MesssageDispatchBlockingChannel.this.putLock;
                        final ReentrantLock takeLock =
MesssageDispatchBlockingChannel.this.takeLock;
                        putLock.lock();
                        takeLock.lock();
                        try {
                                Node<E> node = lastRet;
                                lastRet = null;
                                Node<E> trail = head;
                                Node<E> p = head.next;
                                while (p != null && p != node) {
                                        trail = p;
                                        p = p.next;
                                }
                                if (p == node) {
                                        p.item = null;
                                        trail.next = p.next;
                                        if (last == p)
                                                last = trail;
                                        int c = count.getAndDecrement();
                                        if (c == capacity)
                                                notFull.signalAll();
                                }
                        } finally {
                                takeLock.unlock();
                                putLock.unlock();
                        }
                }
        }

        /**
         * Save the state to a stream (that is, serialize it).
         * 
         * @serialData The capacity is emitted (int), followed by all of its
         *             elements (each an <tt>Object</tt>) in the proper order,
         *             followed by a null
         * @param s
         *            the stream
         */
        private void writeObject(java.io.ObjectOutputStream s)
                        throws java.io.IOException {

                fullyLock();
                try {
                        // Write out any hidden stuff, plus capacity
                        s.defaultWriteObject();

                        // Write out all elements in the proper order.
                        for (Node<E> p = head.next; p != null; p = p.next)
                                s.writeObject(p.item);

                        // Use trailing null as sentinel
                        s.writeObject(null);
                } finally {
                        fullyUnlock();
                }
        }

        /**
         * Reconstitute this queue instance from a stream (that is, deserialize
it).
         * 
         * @param s
         *            the stream
         */
        private void readObject(java.io.ObjectInputStream s)
                        throws java.io.IOException, ClassNotFoundException {
                // Read in capacity, and any hidden stuff
                s.defaultReadObject();

                count.set(0);
                last = head = new Node<E>(null);

                // Read in all elements and place in queue
                for (;;) {
                        E item = (E) s.readObject();
                        if (item == null)
                                break;
                        add(item);
                }
        }
}


-- 
View this message in context: 
http://www.nabble.com/No-Durable-topic-consumer-OutofMemory-solution-tf4155074s2354.html#a11822019
Sent from the ActiveMQ - Dev mailing list archive at Nabble.com.

Reply via email to