Hi all :
A few day later, I post a bug about No-Durable topic consumer client
receive message got out of heap exception see at
https://issues.apache.org/activemq/browse/AMQ-1325
This day I found the problem is the procuder send meessage faster and
the consumer receiver message slower, If I set thread sleep 100 millisecond
, the consumer client would not throw outofMemory exception.
And I found org.apacke.activemq.MessageDispatchChannel using LinkedList
has no max capacity, if the consumer recevie slower, the LinkedList size
will increase, this is why throw outofMemory exception.
So I changed MessageDispatchChannel below , it run has no outofMemory
exception.
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import javax.jms.JMSException;
import com.thtf.ezone.ezesb.mq.command.MessageDispatch;
public class MessageDispatchChannel {
//private final Object mutex = new Object();
private final LinkedList<MessageDispatch> list;
private AtomicBoolean closed=new AtomicBoolean(false);
private AtomicBoolean running=new AtomicBoolean(false);
private final int capacity;
/** Current number of elements */
private final AtomicInteger count = new AtomicInteger(0);
/** Lock held by take, poll, etc */
private final ReentrantLock lock;
/** Wait queue for waiting takes */
private final Condition notEmpty;
/** Wait queue for waiting puts */
private final Condition notFull;
public MessageDispatchChannel() {
this.list = new LinkedList<MessageDispatch>();
this.capacity=Integer.MAX_VALUE;
lock = new ReentrantLock(false);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
public MessageDispatchChannel(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.list = new LinkedList<MessageDispatch>();
this.capacity=capacity;
lock = new ReentrantLock(false);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
public void enqueue(MessageDispatch message) throws
InterruptedException{
if (closed.get() ||!running.get()){
return ;
}
// synchronized(mutex) {
// if (list.size()>=this.capacity){
// try {
// System.out.println(" overflow.");
// mutex.wait(1000);
// } catch (InterruptedException e) {
// }
// }
// list.addLast(message);
// mutex.notify();
// }
if (message == null) throw new NullPointerException();
// Note: convention in all put/take/etc is to preset
// local var holding count negative to indicate failure unless set.
int c = -1;
final ReentrantLock lock = this.lock;
final AtomicInteger count = this.count;
lock.lockInterruptibly();
try {
/*
* Note that count is used in wait guard even though it is
* not protected by lock. This works because count can
* only decrease at this point (all other puts are shut
* out by lock), and we (or some other waiting put) are
* signalled if it ever changes from
* capacity. Similarly for all other uses of count in
* other wait guards.
*/
try {
while (count.get() == capacity)
notFull.await();
} catch (InterruptedException ie) {
notFull.signal(); // propagate to a non-interrupted thread
throw ie;
}
list.addLast(message);
c = count.getAndIncrement();
notEmpty.signal();
if (c + 1 < capacity)
notFull.signal();
} finally {
lock.unlock();
}
}
public void enqueueFirst(MessageDispatch message) throws
InterruptedException{
if (closed.get() ||!running.get()){
return ;
}
// synchronized(mutex) {
// if (list.size()>=this.capacity){
// try {
// mutex.wait(1000);
// } catch (InterruptedException e) {
// }
// }
// list.addFirst(message);
// mutex.notify();
// }
if (message == null) throw new NullPointerException();
// Note: convention in all put/take/etc is to preset
// local var holding count negative to indicate failure unless set.
int c = -1;
final ReentrantLock lock = this.lock;
final AtomicInteger count = this.count;
lock.lockInterruptibly();
try {
/*
* Note that count is used in wait guard even though it is
* not protected by lock. This works because count can
* only decrease at this point (all other puts are shut
* out by lock), and we (or some other waiting put) are
* signalled if it ever changes from
* capacity. Similarly for all other uses of count in
* other wait guards.
*/
try {
while (count.get() == capacity)
notFull.await();
} catch (InterruptedException ie) {
notFull.signal(); // propagate to a non-interrupted thread
throw ie;
}
list.addFirst(message);
c = count.getAndIncrement();
notEmpty.signal();
if (c + 1 < capacity)
notFull.signal();
} finally {
lock.unlock();
}
}
public boolean isEmpty() {
// synchronized(mutex) {
// return list.isEmpty();
// }
return count.get()==0;
}
/**
* Used to get an enqueued message.
* The amount of time this method blocks is based on the timeout value.
* - if timeout==-1 then it blocks until a message is received.
* - if timeout==0 then it it tries to not block at all, it returns a
message if it is available
* - if timeout>0 then it blocks up to timeout amount of time.
*
* Expired messages will consumed by this method.
*
* @throws JMSException
*
* @return null if we timeout or if the consumer is closed.
* @throws InterruptedException
*/
public MessageDispatch dequeue(long timeout) throws InterruptedException
{
// synchronized (mutex) {
// // Wait until the consumer is ready to deliver messages.
// while(timeout != 0 && !closed && (list.isEmpty() || !running))
{
// if (timeout == -1) {
// mutex.wait();
// } else {
// mutex.wait(timeout);
// break;
// }
// }
// if (closed || !running || list.isEmpty()) {
// return null;
// }
// return list.removeFirst();
// }
if (closed.get() ||!running.get()){
return null;
}
MessageDispatch x = null;
int c = -1;
long nanos = TimeUnit.MILLISECONDS.toNanos(timeout);
final AtomicInteger count = this.count;
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
if (count.get() > 0) {
x = list.poll();
c = count.getAndDecrement();
notFull.signal();
if (c > 1)
notEmpty.signal();
break;
}
if (nanos <= 0)
return null;
try {
nanos = notEmpty.awaitNanos(nanos);
} catch (InterruptedException ie) {
notEmpty.signal(); // propagate to a non-interrupted
thread
throw ie;
}
}
} finally {
lock.unlock();
}
return x;
}
public MessageDispatch dequeueNoWait() {
if (closed.get() ||!running.get()){
return null;
}
// synchronized (mutex) {
// if (closed || !running || list.isEmpty()) {
// return null;
// }
// return list.removeFirst();
// }
final AtomicInteger count = this.count;
if (count.get() == 0)
return null;
MessageDispatch x = null;
int c = -1;
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count.get() > 0) {
x =list.poll();
c = count.getAndDecrement();
notFull.signal();
if (c > 1)
notEmpty.signal();
}
} finally {
lock.unlock();
}
return x;
}
public MessageDispatch peek() {
if (closed.get() ||!running.get()){
return null;
}
// synchronized (mutex) {
// if (closed || !running || list.isEmpty()) {
// return null;
// }
// return list.getFirst();
// }
final ReentrantLock lock = this.lock;
lock.lock();
final AtomicInteger count = this.count;
if (count.get() == 0)
return null;
try {
return list.peek();
} finally {
lock.unlock();
}
}
public void clear() {
// synchronized(mutex) {
// list.clear();
// }
final ReentrantLock lock = this.lock;
lock.lock();
try {
list.clear();
if (count.getAndSet(0) == capacity)
notFull.signalAll();
} finally {
lock.unlock();
}
}
public boolean isClosed() {
return closed.get();
}
public int size() {
// synchronized(mutex) {
// return list.size();
// }
return count.get();
}
public int remainingCapacity() {
return capacity - count.get();
}
public List<MessageDispatch> removeAll() {
// synchronized(mutex) {
// ArrayList <MessageDispatch>rc = new
ArrayList<MessageDispatch>(list);
// list.clear();
// return rc;
// }
final ReentrantLock lock = this.lock;
lock.lock();
try {
ArrayList <MessageDispatch>rc = new
ArrayList<MessageDispatch>(list);
list.clear();
if (count.getAndSet(0) == capacity)
notFull.signalAll();
return rc;
} finally {
lock.unlock();
}
}
public String toString() {
// synchronized(mutex) {
// return list.toString();
// }
final ReentrantLock lock = this.lock;
lock.lock();
try {
return list.toString();
} finally {
lock.unlock();
}
}
public void start() {
// synchronized (mutex) {
// running = true;
// mutex.notifyAll();
// }
final ReentrantLock lock = this.lock;
lock.lock();
try {
count.set(0);
running.compareAndSet(false, true);
} finally {
lock.unlock();
}
}
public void stop() {
// synchronized (mutex) {
// running = false;
// mutex.notifyAll();
// }
final ReentrantLock lock = this.lock;
lock.lock();
try {
count.set(0);
running.compareAndSet(true, false);
} finally {
lock.unlock();
}
}
public void close() {
// synchronized (mutex) {
// if (!closed) {
// running = false;
// closed = true;
// }
// mutex.notifyAll();
// }
final ReentrantLock lock = this.lock;
lock.lock();
try {
count.set(0);
running.compareAndSet(true, false);
closed.compareAndSet(false, true);
} finally {
lock.unlock();
}
}
//public Object getMutex() {
// return mutex;
//}
public boolean isRunning() {
return running.get();
}
public ReentrantLock getLock(){
return this.lock;
}
}
I also write another MessageDispatchChannel using two lock ( Get Lock
and Put lock)
import java.util.AbstractQueue;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
public class MesssageDispatchBlockingChannel<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
/*
* A variant of the "two lock queue" algorithm. The putLock gates entry
to
* put (and offer), and has an associated condition for waiting puts.
* Similarly for the takeLock. The "count" field that they both rely on
is
* maintained as an atomic to avoid needing to get both locks in most
cases.
* Also, to minimize need for puts to get takeLock and vice-versa,
cascading
* notifies are used. When a put notices that it has enabled at least
one
* take, it signals taker. That taker in turn signals others if more
items
* have been entered since the signal. And symmetrically for takes
* signalling puts. Operations such as remove(Object) and iterators
acquire
* both locks.
*/
/**
*
*/
private static final long serialVersionUID = 1330823692422658197L;
/**
* Linked list node class
*/
static class Node<E> {
/** The item, volatile to ensure barrier separating write and
read */
volatile E item;
Node<E> next;
Node(E x) {
item = x;
}
}
/** The capacity bound, or Integer.MAX_VALUE if none */
private final int capacity;
/** Current number of elements */
private final AtomicInteger count = new AtomicInteger(0);
/** Head of linked list */
private transient Node<E> head;
/** Tail of linked list */
private transient Node<E> last;
/** Lock held by take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();
/** Wait queue for waiting takes */
private final Condition notEmpty = takeLock.newCondition();
/** Lock held by put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();
/** Wait queue for waiting puts */
private final Condition notFull = putLock.newCondition();
private AtomicBoolean closed = new AtomicBoolean(false);
private AtomicBoolean running = new AtomicBoolean(false);
public boolean enqueue(E o) throws InterruptedException {
if (closed.get() || !running.get()) {
return false;
}
return this.offer(o);
}
public boolean enqueueFirst(E o) throws InterruptedException {
if (closed.get() || !running.get()) {
return false;
}
if (o == null)
throw new NullPointerException();
final AtomicInteger count = this.count;
if (count.get() == capacity)
return false;
int c = -1;
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
if (count.get() < capacity) {
addFirst(o);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
}
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
return c >= 0;
}
public boolean isEmpty() {
final AtomicInteger count = this.count;
return count.get() == 0;
}
public E dequeue(long timeout) throws InterruptedException {
if (closed.get() || !running.get()) {
return null;
}
return this.poll(timeout, TimeUnit.MILLISECONDS);
}
public E dequeueNoWait() {
if (closed.get() || !running.get()) {
return null;
}
return this.poll();
}
public List<E> removeAll() {
fullyLock();
try {
int size = count.get();
ArrayList<E> rc = new ArrayList<E>(size);
for (Node<E> p = head.next; p != null; p = p.next) {
rc.add(p.item);
}
head.next = null;
assert head.item == null;
last = head;
if (count.getAndSet(0) == capacity)
notFull.signalAll();
return rc;
} finally {
fullyUnlock();
}
}
public void start() {
fullyLock();
try {
count.set(0);
running.compareAndSet(false, true);
} finally {
fullyUnlock();
}
}
public void stop() {
fullyLock();
try {
count.set(0);
running.compareAndSet(true, false);
} finally {
fullyUnlock();
}
}
public void close() {
fullyLock();
try {
count.set(0);
running.compareAndSet(true, false);
closed.compareAndSet(false, true);
} finally {
fullyUnlock();
}
}
public boolean isClosed() {
return closed.get();
}
public boolean isRunning() {
return running.get();
}
/**
* Signal a waiting take. Called only from put/offer (which do not
otherwise
* ordinarily lock takeLock.)
*/
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
notEmpty.signal();
} finally {
takeLock.unlock();
}
}
/**
* Signal a waiting put. Called only from take/poll.
*/
private void signalNotFull() {
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
notFull.signal();
} finally {
putLock.unlock();
}
}
/**
* Create a node and link it at end of queue
*
* @param x
* the item
*/
private void addFirst(E x) {
Node<E> node = new Node<E>(x);
node.next = head;
head = node;
}
/**
* Create a node and link it at end of queue
*
* @param x
* the item
*/
private void insert(E x) {
last = last.next = new Node<E>(x);
}
/**
* Remove a node from head of queue,
*
* @return the node
*/
private E extract() {
Node<E> first = head.next;
head = first;
E x = first.item;
first.item = null;
return x;
}
/**
* Lock to prevent both puts and takes.
*/
protected void fullyLock() {
putLock.lock();
takeLock.lock();
}
/**
* Unlock to allow both puts and takes.
*/
protected void fullyUnlock() {
takeLock.unlock();
putLock.unlock();
}
/**
* Creates a <tt>LinkedBlockingQueue</tt> with a capacity of
* [EMAIL PROTECTED] Integer#MAX_VALUE}.
*/
public MesssageDispatchBlockingChannel() {
this(Integer.MAX_VALUE);
}
/**
* Creates a <tt>LinkedBlockingQueue</tt> with the given (fixed)
capacity.
*
* @param capacity
* the capacity of this queue.
* @throws IllegalArgumentException
* if <tt>capacity</tt> is not greater than zero.
*/
public MesssageDispatchBlockingChannel(int capacity) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}
/**
* Creates a <tt>LinkedBlockingQueue</tt> with a capacity of
* [EMAIL PROTECTED] Integer#MAX_VALUE}, initially containing the
elements of the
* given collection, added in traversal order of the collection's
iterator.
*
* @param c
* the collection of elements to initially contain
* @throws NullPointerException
* if <tt>c</tt> or any element within it is <tt>null</tt>
*/
public MesssageDispatchBlockingChannel(Collection<? extends E> c) {
this(Integer.MAX_VALUE);
for (E e : c)
add(e);
}
// this doc comment is overridden to remove the reference to collections
// greater in size than Integer.MAX_VALUE
/**
* Returns the number of elements in this queue.
*
* @return the number of elements in this queue.
*/
public int size() {
return count.get();
}
// this doc comment is a modified copy of the inherited doc comment,
// without the reference to unlimited queues.
/**
* Returns the number of elements that this queue can ideally (in the
* absence of memory or resource constraints) accept without blocking.
This
* is always equal to the initial capacity of this queue less the
current
* <tt>size</tt> of this queue.
* <p>
* Note that you <em>cannot</em> always tell if an attempt to
<tt>add</tt>
* an element will succeed by inspecting <tt>remainingCapacity</tt>
* because it may be the case that a waiting consumer is ready to
* <tt>take</tt> an element out of an otherwise full queue.
*/
public int remainingCapacity() {
return capacity - count.get();
}
/**
* Adds the specified element to the tail of this queue, waiting if
* necessary for space to become available.
*
* @param o
* the element to add
* @throws InterruptedException
* if interrupted while waiting.
* @throws NullPointerException
* if the specified element is <tt>null</tt>.
*/
public void put(E o) throws InterruptedException {
if (o == null)
throw new NullPointerException();
// Note: convention in all put/take/etc is to preset
// local var holding count negative to indicate failure unless
set.
int c = -1;
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
/*
* Note that count is used in wait guard even though it
is not
* protected by lock. This works because count can only
decrease at
* this point (all other puts are shut out by lock),
and we (or some
* other waiting put) are signalled if it ever changes
from
* capacity. Similarly for all other uses of count in
other wait
* guards.
*/
try {
while (count.get() == capacity)
notFull.await();
} catch (InterruptedException ie) {
notFull.signal(); // propagate to a
non-interrupted thread
throw ie;
}
insert(o);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
}
/**
* Inserts the specified element at the tail of this queue, waiting if
* necessary up to the specified wait time for space to become
available.
*
* @param o
* the element to add
* @param timeout
* how long to wait before giving up, in units of
<tt>unit</tt>
* @param unit
* a <tt>TimeUnit</tt> determining how to interpret the
* <tt>timeout</tt> parameter
* @return <tt>true</tt> if successful, or <tt>false</tt> if the
* specified waiting time elapses before space is available.
* @throws InterruptedException
* if interrupted while waiting.
* @throws NullPointerException
* if the specified element is <tt>null</tt>.
*/
public boolean offer(E o, long timeout, TimeUnit unit)
throws InterruptedException {
if (o == null)
throw new NullPointerException();
long nanos = unit.toNanos(timeout);
int c = -1;
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
for (;;) {
if (count.get() < capacity) {
insert(o);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
break;
}
if (nanos <= 0)
return false;
try {
nanos = notFull.awaitNanos(nanos);
} catch (InterruptedException ie) {
notFull.signal(); // propagate to a
non-interrupted thread
throw ie;
}
}
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
return true;
}
/**
* Inserts the specified element at the tail of this queue if possible,
* returning immediately if this queue is full.
*
* @param o
* the element to add.
* @return <tt>true</tt> if it was possible to add the element to this
* queue, else <tt>false</tt>
* @throws NullPointerException
* if the specified element is <tt>null</tt>
*/
public boolean offer(E o) {
if (o == null)
throw new NullPointerException();
final AtomicInteger count = this.count;
if (count.get() == capacity)
return false;
int c = -1;
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
if (count.get() < capacity) {
insert(o);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
}
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
return c >= 0;
}
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
try {
while (count.get() == 0)
notEmpty.await();
} catch (InterruptedException ie) {
notEmpty.signal(); // propagate to a
non-interrupted thread
throw ie;
}
x = extract();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
E x = null;
int c = -1;
long nanos = unit.toNanos(timeout);
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
for (;;) {
if (count.get() > 0) {
x = extract();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
break;
}
if (nanos <= 0)
return null;
try {
nanos = notEmpty.awaitNanos(nanos);
} catch (InterruptedException ie) {
notEmpty.signal(); // propagate to a
non-interrupted thread
throw ie;
}
}
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
public E poll() {
final AtomicInteger count = this.count;
if (count.get() == 0)
return null;
E x = null;
int c = -1;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
if (count.get() > 0) {
x = extract();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
}
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
public E peek() {
if (count.get() == 0)
return null;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
Node<E> first = head.next;
if (first == null)
return null;
else
return first.item;
} finally {
takeLock.unlock();
}
}
/**
* Removes a single instance of the specified element from this queue,
if
it
* is present.
*/
public boolean remove(Object o) {
if (o == null)
return false;
boolean removed = false;
fullyLock();
try {
Node<E> trail = head;
Node<E> p = head.next;
while (p != null) {
if (o.equals(p.item)) {
removed = true;
break;
}
trail = p;
p = p.next;
}
if (removed) {
p.item = null;
trail.next = p.next;
if (last == p)
last = trail;
if (count.getAndDecrement() == capacity)
notFull.signalAll();
}
} finally {
fullyUnlock();
}
return removed;
}
public Object[] toArray() {
fullyLock();
try {
int size = count.get();
Object[] a = new Object[size];
int k = 0;
for (Node<E> p = head.next; p != null; p = p.next)
a[k++] = p.item;
return a;
} finally {
fullyUnlock();
}
}
public <T> T[] toArray(T[] a) {
fullyLock();
try {
int size = count.get();
if (a.length < size)
a = (T[])
java.lang.reflect.Array.newInstance(a.getClass()
.getComponentType(), size);
int k = 0;
for (Node p = head.next; p != null; p = p.next)
a[k++] = (T) p.item;
if (a.length > k)
a[k] = null;
return a;
} finally {
fullyUnlock();
}
}
public String toString() {
fullyLock();
try {
return super.toString();
} finally {
fullyUnlock();
}
}
/**
* Atomically removes all of the elements from this queue. The queue
will
be
* empty after this call returns.
*/
public void clear() {
fullyLock();
try {
head.next = null;
assert head.item == null;
last = head;
if (count.getAndSet(0) == capacity)
notFull.signalAll();
} finally {
fullyUnlock();
}
}
public int drainTo(Collection<? super E> c) {
if (c == null)
throw new NullPointerException();
if (c == this)
throw new IllegalArgumentException();
Node first;
fullyLock();
try {
first = head.next;
head.next = null;
assert head.item == null;
last = head;
if (count.getAndSet(0) == capacity)
notFull.signalAll();
} finally {
fullyUnlock();
}
// Transfer the elements outside of locks
int n = 0;
for (Node<E> p = first; p != null; p = p.next) {
c.add(p.item);
p.item = null;
++n;
}
return n;
}
public int drainTo(Collection<? super E> c, int maxElements) {
if (c == null)
throw new NullPointerException();
if (c == this)
throw new IllegalArgumentException();
fullyLock();
try {
int n = 0;
Node<E> p = head.next;
while (p != null && n < maxElements) {
c.add(p.item);
p.item = null;
p = p.next;
++n;
}
if (n != 0) {
head.next = p;
assert head.item == null;
if (p == null)
last = head;
if (count.getAndAdd(-n) == capacity)
notFull.signalAll();
}
return n;
} finally {
fullyUnlock();
}
}
/**
* Returns an iterator over the elements in this queue in proper
sequence.
* The returned <tt>Iterator</tt> is a "weakly consistent" iterator that
* will never throw [EMAIL PROTECTED]
java.util.ConcurrentModificationException}, and
* guarantees to traverse elements as they existed upon construction of
the
* iterator, and may (but is not guaranteed to) reflect any
modifications
* subsequent to construction.
*
* @return an iterator over the elements in this queue in proper
sequence.
*/
public Iterator<E> iterator() {
return new Itr();
}
private class Itr implements Iterator<E> {
/*
* Basic weak-consistent iterator. At all times hold the next
item to
* hand out so that if hasNext() reports true, we will still
have it to
* return even if lost race with a take etc.
*/
private Node<E> current;
private Node<E> lastRet;
private E currentElement;
Itr() {
final ReentrantLock putLock =
MesssageDispatchBlockingChannel.this.putLock;
final ReentrantLock takeLock =
MesssageDispatchBlockingChannel.this.takeLock;
putLock.lock();
takeLock.lock();
try {
current = head.next;
if (current != null)
currentElement = current.item;
} finally {
takeLock.unlock();
putLock.unlock();
}
}
public boolean hasNext() {
return current != null;
}
public E next() {
final ReentrantLock putLock =
MesssageDispatchBlockingChannel.this.putLock;
final ReentrantLock takeLock =
MesssageDispatchBlockingChannel.this.takeLock;
putLock.lock();
takeLock.lock();
try {
if (current == null)
throw new NoSuchElementException();
E x = currentElement;
lastRet = current;
current = current.next;
if (current != null)
currentElement = current.item;
return x;
} finally {
takeLock.unlock();
putLock.unlock();
}
}
public void remove() {
if (lastRet == null)
throw new IllegalStateException();
final ReentrantLock putLock =
MesssageDispatchBlockingChannel.this.putLock;
final ReentrantLock takeLock =
MesssageDispatchBlockingChannel.this.takeLock;
putLock.lock();
takeLock.lock();
try {
Node<E> node = lastRet;
lastRet = null;
Node<E> trail = head;
Node<E> p = head.next;
while (p != null && p != node) {
trail = p;
p = p.next;
}
if (p == node) {
p.item = null;
trail.next = p.next;
if (last == p)
last = trail;
int c = count.getAndDecrement();
if (c == capacity)
notFull.signalAll();
}
} finally {
takeLock.unlock();
putLock.unlock();
}
}
}
/**
* Save the state to a stream (that is, serialize it).
*
* @serialData The capacity is emitted (int), followed by all of its
* elements (each an <tt>Object</tt>) in the proper order,
* followed by a null
* @param s
* the stream
*/
private void writeObject(java.io.ObjectOutputStream s)
throws java.io.IOException {
fullyLock();
try {
// Write out any hidden stuff, plus capacity
s.defaultWriteObject();
// Write out all elements in the proper order.
for (Node<E> p = head.next; p != null; p = p.next)
s.writeObject(p.item);
// Use trailing null as sentinel
s.writeObject(null);
} finally {
fullyUnlock();
}
}
/**
* Reconstitute this queue instance from a stream (that is, deserialize
it).
*
* @param s
* the stream
*/
private void readObject(java.io.ObjectInputStream s)
throws java.io.IOException, ClassNotFoundException {
// Read in capacity, and any hidden stuff
s.defaultReadObject();
count.set(0);
last = head = new Node<E>(null);
// Read in all elements and place in queue
for (;;) {
E item = (E) s.readObject();
if (item == null)
break;
add(item);
}
}
}
--
View this message in context:
http://www.nabble.com/No-Durable-topic-consumer-OutofMemory-solution-tf4155074s2354.html#a11822019
Sent from the ActiveMQ - Dev mailing list archive at Nabble.com.