Author: chirino
Date: Thu Sep 6 15:19:25 2007
New Revision: 573400
URL: http://svn.apache.org/viewvc?rev=573400&view=rev
Log:
Fix for the QueueWorkerPrefetchTest. The VMPendingMessageCursor.isEmpty() was
returning true when it had an a message that had been marked dropped due to it
being delivered by another subscription.
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/MessageReference.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/QueueWorkerPrefetchTest.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/MessageReference.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/MessageReference.java?rev=573400&r1=573399&r2=573400&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/MessageReference.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/MessageReference.java
Thu Sep 6 15:19:25 2007
@@ -55,5 +55,10 @@
* Returns true if this message is expired
*/
boolean isExpired();
+
+ /**
+ * Returns true if this message is dropped.
+ */
+ boolean isDropped();
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java?rev=573400&r1=573399&r2=573400&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java
Thu Sep 6 15:19:25 2007
@@ -35,7 +35,19 @@
* @return true if there are no pending messages
*/
public boolean isEmpty() {
- return list.isEmpty();
+ if (list.isEmpty()) {
+ return true;
+ } else {
+ for (Iterator<MessageReference> iterator = list.iterator();
iterator.hasNext();) {
+ MessageReference node = iterator.next();
+ if (!node.isDropped()) {
+ return false;
+ }
+ // We can remove dropped references.
+ iterator.remove();
+ }
+ return true;
+ }
}
/**
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java?rev=573400&r1=573399&r2=573400&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java
Thu Sep 6 15:19:25 2007
@@ -689,4 +689,8 @@
public void setBrokerOutTime(long brokerOutTime) {
this.brokerOutTime = brokerOutTime;
}
+
+ public boolean isDropped() {
+ return false;
+ }
}
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/QueueWorkerPrefetchTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/QueueWorkerPrefetchTest.java?rev=573400&r1=573399&r2=573400&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/QueueWorkerPrefetchTest.java
(original)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/QueueWorkerPrefetchTest.java
Thu Sep 6 15:19:25 2007
@@ -29,6 +29,7 @@
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
+import javax.jms.ObjectMessage;
import javax.jms.Queue;
import javax.jms.Session;
@@ -42,6 +43,7 @@
*/
public class QueueWorkerPrefetchTest extends TestCase implements
MessageListener
{
+ private static final int BATCH_SIZE = 10;
private static final long WAIT_TIMEOUT = 1000*10;
/** The connection URL. */
@@ -70,6 +72,14 @@
/** Messages sent to the work-item queue. */
private static class WorkMessage implements Serializable
{
+ private final int id;
+ public WorkMessage(int id) {
+ this.id = id;
+ }
+ @Override
+ public String toString() {
+ return "Work: "+id;
+ }
}
/**
@@ -79,6 +89,7 @@
*/
private static class Worker implements MessageListener
{
+
/** Counter shared between workers to decided when new work-item
messages are created. */
private static AtomicInteger counter = new AtomicInteger(0);
@@ -106,24 +117,23 @@
{
try
{
- boolean sendMessage = false;
-
- // Don't create a new work item for every 1000th message. */
- if (counter.incrementAndGet() % 1000 != 0)
- {
- sendMessage = true;
+ WorkMessage work =
(WorkMessage)((ObjectMessage)message).getObject();
+
+ long c = counter.incrementAndGet();
+ if (c % 1 == 0) {
+ System.out.println("Worker now has message count of: " +
c);
}
- if (sendMessage)
+ // Don't create a new work item for every BATCH_SIZE message.
*/
+ if (c % BATCH_SIZE != 0)
{
// Send new work item to work-item queue.
workItemProducer.send(session.createObjectMessage(
- new WorkMessage()));
+ new WorkMessage(work.id+1)));
}
// Send ack to master.
- masterItemProducer.send(session.createObjectMessage(
- new WorkMessage()));
+ masterItemProducer.send(session.createObjectMessage(work));
}
catch (JMSException e)
{
@@ -145,7 +155,7 @@
{
long acks = acksReceived.incrementAndGet();
latch.get().countDown();
- if (acks % 100 == 0) {
+ if (acks % 1 == 0) {
System.out.println("Master now has ack count of: " + acksReceived);
}
}
@@ -193,10 +203,10 @@
workers[i] = new Worker(connection.createSession(false,
Session.AUTO_ACKNOWLEDGE));
}
- // Send a message to the work queue, and wait for the 1000 acks from
the workers.
+ // Send a message to the work queue, and wait for the BATCH_SIZE acks
from the workers.
acksReceived.set(0);
- latch.set(new CountDownLatch(1000));
- workItemProducer.send(masterSession.createObjectMessage(new
WorkMessage()));
+ latch.set(new CountDownLatch(BATCH_SIZE));
+ workItemProducer.send(masterSession.createObjectMessage(new
WorkMessage(1)));
if (!latch.get().await(WAIT_TIMEOUT, TimeUnit.MILLISECONDS)) {
fail("First batch only received " + acksReceived + " messages");
@@ -209,8 +219,8 @@
// have a large pending queue. Creating a new worker at this point
however will
// receive this new message.
acksReceived.set(0);
- latch.set(new CountDownLatch(1000));
- workItemProducer.send(masterSession.createObjectMessage(new
WorkMessage()));
+ latch.set(new CountDownLatch(BATCH_SIZE));
+ workItemProducer.send(masterSession.createObjectMessage(new
WorkMessage(1)));
if (!latch.get().await(WAIT_TIMEOUT, TimeUnit.MILLISECONDS)) {
fail("Second batch only received " + acksReceived + " messages");