Author: rajdavies
Date: Mon Sep 1 23:14:22 2008
New Revision: 691114
URL: http://svn.apache.org/viewvc?rev=691114&view=rev
Log:
Fix for https://issues.apache.org/activemq/browse/AMQ-1909
and https://issues.apache.org/activemq/browse/AMQ-1914
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java?rev=691114&r1=691113&r2=691114&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
Mon Sep 1 23:14:22 2008
@@ -38,7 +38,7 @@
* from persistent storage
*/
public static final int MAX_PAGE_SIZE=200;
-
+ public static final int MAX_BROWSE_PAGE_SIZE=MAX_PAGE_SIZE*2;
protected final ActiveMQDestination destination;
protected final Broker broker;
protected final MessageStore store;
@@ -49,6 +49,7 @@
private int maxAuditDepth=2048;
private boolean enableAudit=true;
private int maxPageSize=MAX_PAGE_SIZE;
+ private int maxBrowsePageSize=MAX_BROWSE_PAGE_SIZE;
private boolean useCache=true;
private int minimumMessageSize=1024;
private boolean lazyDispatch=false;
@@ -187,6 +188,14 @@
public void setMaxPageSize(int maxPageSize) {
this.maxPageSize = maxPageSize;
}
+
+ public int getMaxBrowsePageSize() {
+ return this.maxBrowsePageSize;
+ }
+
+ public void setMaxBrowsePageSize(int maxPageSize) {
+ this.maxBrowsePageSize = maxPageSize;
+ }
public boolean isUseCache() {
return useCache;
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java?rev=691114&r1=691113&r2=691114&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
Mon Sep 1 23:14:22 2008
@@ -91,6 +91,10 @@
public void setMaxPageSize(int maxPageSize);
+ public int getMaxBrowsePageSize();
+
+ public void setMaxBrowsePageSize(int maxPageSize);
+
public boolean isUseCache();
public void setUseCache(boolean useCache);
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java?rev=691114&r1=691113&r2=691114&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
Mon Sep 1 23:14:22 2008
@@ -249,4 +249,12 @@
public void messageExpired(ConnectionContext context, Subscription
subs,MessageReference node) {
next.messageExpired(context,subs, node);
}
+
+ public int getMaxBrowsePageSize() {
+ return next.getMaxBrowsePageSize();
+ }
+
+ public void setMaxBrowsePageSize(int maxPageSize) {
+ next.setMaxBrowsePageSize(maxPageSize);
+ }
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=691114&r1=691113&r2=691114&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Mon Sep 1 23:14:22 2008
@@ -21,6 +21,7 @@
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
@@ -616,80 +617,91 @@
}
public Message[] browse() {
+ int count = 0;
List<Message> l = new ArrayList<Message>();
try {
- doPageIn(true);
- } catch (Exception e) {
- LOG.error("caught an exception browsing " + this, e);
- }
- synchronized (pagedInMessages) {
- for (QueueMessageReference node:pagedInMessages.values()){
- node.incrementReferenceCount();
- try {
- Message m = node.getMessage();
- if (m != null) {
- l.add(m);
+ synchronized (this.pagedInPendingDispatch) {
+ for (Iterator<QueueMessageReference> i =
this.pagedInPendingDispatch
+ .iterator(); i.hasNext()
+ && count < getMaxBrowsePageSize();) {
+ l.add(i.next().getMessage());
+ count++;
+ }
+ }
+ if (count < getMaxBrowsePageSize()) {
+ synchronized (pagedInMessages) {
+ for (Iterator<QueueMessageReference> i =
this.pagedInMessages
+ .values().iterator(); i.hasNext()
+ && count < getMaxBrowsePageSize();) {
+ Message m = i.next().getMessage();
+ if (l.contains(m) == false) {
+ l.add(m);
+ count++;
+ }
}
- } catch (IOException e) {
- LOG.error("caught an exception browsing " + this, e);
- } finally {
- node.decrementReferenceCount();
}
}
- }
- synchronized (messages) {
- try {
- messages.reset();
- while (messages.hasNext()) {
+ if (count < getMaxBrowsePageSize()) {
+ synchronized (messages) {
try {
- MessageReference r = messages.next();
- r.incrementReferenceCount();
- try {
- Message m = r.getMessage();
- if (m != null) {
- l.add(m);
+ messages.reset();
+ while (messages.hasNext()
+ && count < getMaxBrowsePageSize()) {
+ MessageReference node = messages.next();
+ messages.rollback(node.getMessageId());
+ if (node != null) {
+ Message m = node.getMessage();
+ if (l.contains(m) == false) {
+ l.add(m);
+ count++;
+ }
}
- } finally {
- r.decrementReferenceCount();
}
- } catch (IOException e) {
- LOG.error("caught an exception brwsing " + this, e);
+ } finally {
+ messages.release();
}
}
- } finally {
- messages.release();
}
+ } catch (IOException e) {
+ LOG.error("Problem retrieving message in browse() ", e);
}
-
return l.toArray(new Message[l.size()]);
}
- public Message getMessage(String messageId) {
- synchronized (messages) {
- try {
- messages.reset();
- while (messages.hasNext()) {
- try {
- MessageReference r = messages.next();
- if (messageId.equals(r.getMessageId().toString())) {
- r.incrementReferenceCount();
- try {
+ public Message getMessage(String id) {
+ MessageId msgId = new MessageId(id);
+ try {
+ synchronized (pagedInMessages) {
+ QueueMessageReference r = this.pagedInMessages.get(msgId);
+ if (r != null) {
+ return r.getMessage();
+ }
+ }
+ synchronized (messages) {
+ try {
+ messages.reset();
+ while (messages.hasNext()) {
+ try {
+ MessageReference r = messages.next();
+ messages.rollback(r.getMessageId());
+ if (msgId.equals(r.getMessageId())) {
Message m = r.getMessage();
if (m != null) {
return m;
}
- } finally {
- r.decrementReferenceCount();
+ break;
}
- break;
+ } catch (IOException e) {
+ LOG.error("got an exception retrieving message "
+ + id);
}
- } catch (IOException e) {
- LOG.error("got an exception retrieving message " +
messageId);
}
+ } finally {
+ messages.release();
}
- } finally {
- messages.release();
}
+ } catch (IOException e) {
+ LOG.error("got an exception retrieving message " + id);
}
return null;
}
@@ -852,7 +864,7 @@
* @return the number of messages removed
*/
public int moveMatchingMessagesTo(ConnectionContext context, String
selector, ActiveMQDestination dest) throws Exception {
- return moveMatchingMessagesTo(context, selector, dest, -1);
+ return moveMatchingMessagesTo(context, selector,
dest,Integer.MAX_VALUE);
}
/**
@@ -867,7 +879,9 @@
* Moves the messages matching the given filter up to the maximum number of
* matched messages
*/
- public int moveMatchingMessagesTo(ConnectionContext
context,MessageReferenceFilter filter, ActiveMQDestination dest,int
maximumMessages) throws Exception {
+ public int moveMatchingMessagesTo(ConnectionContext context,
+ MessageReferenceFilter filter, ActiveMQDestination dest,
+ int maximumMessages) throws Exception {
int movedCounter = 0;
Set<MessageReference> set = new
CopyOnWriteArraySet<MessageReference>();
do {
@@ -875,28 +889,27 @@
synchronized (pagedInMessages) {
set.addAll(pagedInMessages.values());
}
- List <MessageReference>list = new ArrayList<MessageReference>(set);
- for (MessageReference ref:list) {
+ List<MessageReference> list = new ArrayList<MessageReference>(set);
+ for (MessageReference ref : list) {
IndirectMessageReference r = (IndirectMessageReference) ref;
if (filter.evaluate(context, r)) {
// We should only move messages that can be locked.
- r.incrementReferenceCount();
- try {
- Message m = r.getMessage();
- BrokerSupport.resend(context, m, dest);
- removeMessage(context, r);
- set.remove(r);
- if (++movedCounter >= maximumMessages
- && maximumMessages > 0) {
- return movedCounter;
- }
- } finally {
- r.decrementReferenceCount();
+ Message m = r.getMessage();
+ BrokerSupport.resend(context, m, dest);
+ removeMessage(context, r);
+ set.remove(r);
+ if (++movedCounter >= maximumMessages
+ && maximumMessages > 0) {
+ return movedCounter;
+ }
+ } else {
+ synchronized (messages) {
+ messages.rollback(r.getMessageId());
}
}
-
}
- } while (set.size() <
this.destinationStatistics.getMessages().getCount());
+ } while (set.size() <
this.destinationStatistics.getMessages().getCount()
+ && set.size() < maximumMessages);
return movedCounter;
}
@@ -936,7 +949,9 @@
// make sure it gets queued for dispatched
again
dispatchLock.lock();
try {
- pagedInPendingDispatch.add(node);
+ synchronized(pagedInPendingDispatch) {
+ pagedInPendingDispatch.add(node);
+ }
} finally {
dispatchLock.unlock();
}
@@ -993,6 +1008,9 @@
public boolean evaluate(ConnectionContext context,
MessageReference r) {
return messageId.equals(r.getMessageId().toString());
}
+ public String toString() {
+ return "MessageIdFilter: "+messageId;
+ }
};
}
@@ -1031,21 +1049,13 @@
acknowledge(context, sub, ack, reference);
if (!ack.isInTransaction()) {
- reference.drop();
- destinationStatistics.getMessages().decrement();
- synchronized(pagedInMessages) {
- pagedInMessages.remove(reference.getMessageId());
- }
+ dropMessage(reference);
wakeup();
} else {
context.getTransaction().addSynchronization(new Synchronization() {
public void afterCommit() throws Exception {
- reference.drop();
- destinationStatistics.getMessages().decrement();
- synchronized(pagedInMessages) {
- pagedInMessages.remove(reference.getMessageId());
- }
+ dropMessage(reference);
wakeup();
}
@@ -1057,6 +1067,17 @@
}
+ private void dropMessage(QueueMessageReference reference) {
+ reference.drop();
+ destinationStatistics.getMessages().decrement();
+ synchronized(pagedInMessages) {
+ pagedInMessages.remove(reference.getMessageId());
+ }
+ synchronized(messages) {
+ messages.rollback(reference.getMessageId());
+ }
+ }
+
public void messageExpired(ConnectionContext context,MessageReference
reference) {
messageExpired(context,null,reference);
}
@@ -1117,8 +1138,16 @@
List<QueueMessageReference> result = null;
dispatchLock.lock();
try{
- int toPageIn =
(getMaxPageSize()+(int)destinationStatistics.getInflight().getCount()) -
pagedInMessages.size();
- toPageIn = Math.min(toPageIn,getMaxPageSize());
+
+ int toPageIn = 0;
+ if (force) {
+ toPageIn = getMaxPageSize();
+ } else {
+ toPageIn = (getMaxPageSize() + (int) destinationStatistics
+ .getInflight().getCount())
+ - pagedInMessages.size();
+ toPageIn = Math.min(toPageIn, getMaxPageSize());
+ }
if (isLazyDispatch()&& !force) {
// Only page in the minimum number of messages which can be
dispatched immediately.
toPageIn = Math.min(getConsumerMessageCountBeforeFull(),
toPageIn);
@@ -1129,6 +1158,7 @@
result = new ArrayList<QueueMessageReference>(toPageIn);
synchronized (messages) {
try {
+
messages.reset();
while (messages.hasNext() && count < toPageIn) {
MessageReference node = messages.next();
@@ -1161,17 +1191,19 @@
private void doDispatch(List<QueueMessageReference> list) throws Exception
{
dispatchLock.lock();
try {
- if(!pagedInPendingDispatch.isEmpty()) {
- // Try to first dispatch anything that had not been dispatched
before.
- pagedInPendingDispatch =
doActualDispatch(pagedInPendingDispatch);
- }
- // and now see if we can dispatch the new stuff.. and append to
the pending
- // list anything that does not actually get dispatched.
- if (list != null && !list.isEmpty()) {
- if (pagedInPendingDispatch.isEmpty()) {
- pagedInPendingDispatch.addAll(doActualDispatch(list));
- } else {
- pagedInPendingDispatch.addAll(list);
+ synchronized(pagedInPendingDispatch) {
+ if(!pagedInPendingDispatch.isEmpty()) {
+ // Try to first dispatch anything that had not been
dispatched before.
+ pagedInPendingDispatch =
doActualDispatch(pagedInPendingDispatch);
+ }
+ // and now see if we can dispatch the new stuff.. and append
to the pending
+ // list anything that does not actually get dispatched.
+ if (list != null && !list.isEmpty()) {
+ if (pagedInPendingDispatch.isEmpty()) {
+ pagedInPendingDispatch.addAll(doActualDispatch(list));
+ } else {
+ pagedInPendingDispatch.addAll(list);
+ }
}
}
} finally {
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java?rev=691114&r1=691113&r2=691114&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
Mon Sep 1 23:14:22 2008
@@ -21,6 +21,7 @@
import java.util.List;
import org.apache.activemq.ActiveMQMessageAudit;
import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.region.BaseDestination;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.command.MessageId;
@@ -34,7 +35,7 @@
*/
public class AbstractPendingMessageCursor implements PendingMessageCursor {
protected int memoryUsageHighWaterMark = 70;
- protected int maxBatchSize = 100;
+ protected int maxBatchSize = BaseDestination.MAX_PAGE_SIZE;
protected SystemUsage systemUsage;
protected int maxProducersToAudit=1024;
protected int maxAuditDepth=1000;
@@ -285,7 +286,7 @@
return this.audit.isDuplicate(messageId);
}
- protected synchronized void rollback(MessageId id) {
+ public synchronized void rollback(MessageId id) {
if (this.audit != null) {
audit.rollback(id);
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java?rev=691114&r1=691113&r2=691114&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
Mon Sep 1 23:14:22 2008
@@ -41,6 +41,7 @@
protected boolean cacheEnabled=false;
protected boolean batchResetNeeded = true;
protected boolean storeHasMessages = false;
+ protected Iterator<Entry<MessageId, Message>> iterator = null;
protected int size;
protected AbstractStoreCursor(Destination destination) {
@@ -93,8 +94,22 @@
}
public final void reset() {
+ if (batchList.isEmpty()) {
+ try {
+ fillBatch();
+ } catch (Exception e) {
+ LOG.error("Failed to fill batch", e);
+ throw new RuntimeException(e);
+ }
+ }
+ this.iterator = this.batchList.entrySet().iterator();
+ }
+
+ public void release() {
+ this.iterator=null;
}
+
public final void finished() {
}
@@ -102,22 +117,24 @@
if (batchList.isEmpty()) {
try {
fillBatch();
+ this.iterator = this.batchList.entrySet().iterator();
} catch (Exception e) {
LOG.error("Failed to fill batch", e);
throw new RuntimeException(e);
}
+ }else {
+ if (this.iterator==null) {
+ this.iterator=this.batchList.entrySet().iterator();
+ }
}
- boolean result= !batchList.isEmpty();
- return result;
+ return this.iterator.hasNext();
}
public final synchronized MessageReference next() {
Message result = null;
- if (!this.batchList.isEmpty()) {
- Iterator<Entry<MessageId, Message>> i =
this.batchList.entrySet().iterator();
- result = i.next().getValue();
+ if (!this.batchList.isEmpty()&&this.iterator.hasNext()) {
+ result = this.iterator.next().getValue();
result.decrementReferenceCount();
- i.remove();
}
return result;
}
@@ -141,6 +158,9 @@
if (size==0 && isStarted() && cacheEnabled) {
cacheEnabled=true;
}
+ if (iterator!=null) {
+ iterator.remove();
+ }
}
public final synchronized void remove(MessageReference node) {
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java?rev=691114&r1=691113&r2=691114&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java
Mon Sep 1 23:14:22 2008
@@ -24,6 +24,7 @@
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.MessageReference;
+import org.apache.activemq.command.MessageId;
import org.apache.activemq.usage.SystemUsage;
/**
@@ -282,5 +283,11 @@
* @return true if a cache is being used
*/
public boolean isUseCache();
+
+ /**
+ * remove from auditing the message id
+ * @param id
+ */
+ public void rollback(MessageId id);
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java?rev=691114&r1=691113&r2=691114&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
Mon Sep 1 23:14:22 2008
@@ -16,12 +16,10 @@
*/
package org.apache.activemq.broker.region.cursors;
-import org.apache.activemq.ActiveMQMessageAudit;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.Queue;
import org.apache.activemq.command.Message;
-import org.apache.activemq.kaha.Store;
import org.apache.activemq.usage.SystemUsage;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -163,6 +161,12 @@
nonPersistent.reset();
persistent.reset();
}
+
+ public void release() {
+ nonPersistent.release();
+ persistent.release();
+ }
+
public synchronized int size() {
return pendingCount;
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java?rev=691114&r1=691113&r2=691114&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
Mon Sep 1 23:14:22 2008
@@ -59,6 +59,7 @@
private boolean producerFlowControl = true;
private boolean optimizedDispatch=false;
private int maxPageSize=BaseDestination.MAX_PAGE_SIZE;
+ private int maxBrowsePageSize=BaseDestination.MAX_BROWSE_PAGE_SIZE;
private boolean useCache=true;
private long minimumMessageSize=1024;
private boolean useConsumerPriority=true;
@@ -119,6 +120,7 @@
destination.setMaxAuditDepth(getMaxQueueAuditDepth());
destination.setMaxProducersToAudit(getMaxProducersToAudit());
destination.setMaxPageSize(getMaxPageSize());
+ destination.setMaxBrowsePageSize(getMaxBrowsePageSize());
destination.setUseCache(isUseCache());
destination.setMinimumMessageSize((int) getMinimumMessageSize());
destination.setAdvisoryForConsumed(isAdvisoryForConsumed());
@@ -387,7 +389,15 @@
public void setMaxPageSize(int maxPageSize) {
this.maxPageSize = maxPageSize;
- }
+ }
+
+ public int getMaxBrowsePageSize() {
+ return maxBrowsePageSize;
+ }
+
+ public void setMaxBrowsePageSize(int maxPageSize) {
+ this.maxBrowsePageSize = maxPageSize;
+ }
public boolean isUseCache() {
return useCache;
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java?rev=691114&r1=691113&r2=691114&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java
(original)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java
Mon Sep 1 23:14:22 2008
@@ -100,9 +100,7 @@
else {
echo("Current queue size: " + initialQueueSize);
}
- // TODO uncommenting this line causes a hang!
- //int messageCount = initialQueueSize;
- int messageCount = 10;
+ int messageCount = initialQueueSize;
String[] messageIDs = new String[messageCount];
for (int i = 0; i < messageCount; i++) {
CompositeData cdata = compdatalist[i];
@@ -124,8 +122,7 @@
compdatalist = queue.browse();
int actualCount = compdatalist.length;
echo("Current queue size: " + actualCount);
- // TODO we seem to have browsed the queue and now there are messages
missing!
- //assertEquals("Should now have empty queue but was", initialQueueSize
- messageCount, actualCount);
+ assertEquals("Should now have empty queue but was", initialQueueSize -
messageCount, actualCount);
echo("Now browsing the second queue");
@@ -137,7 +134,7 @@
assertEquals("Unexpected number of messages ",messageCount,
newQueuesize);
}
- public void TODO_testRetryMessages() throws Exception {
+ public void testRetryMessages() throws Exception {
// lets speed up redelivery
ActiveMQConnectionFactory factory = (ActiveMQConnectionFactory)
connectionFactory;
factory.getRedeliveryPolicy().setCollisionAvoidancePercent((short) 0);
@@ -186,10 +183,7 @@
else {
echo("Current DLQ queue size: " + dlqQueueSize);
}
-
- // TODO uncommenting this line causes a hang!
- //int messageCount = dlqQueueSize;
- int messageCount = 10;
+ int messageCount = dlqQueueSize;
String[] messageIDs = new String[messageCount];
for (int i = 0; i < messageCount; i++) {
CompositeData cdata = compdatalist[i];