Author: dejanb
Date: Wed May 5 07:09:37 2010
New Revision: 941174
URL: http://svn.apache.org/viewvc?rev=941174&view=rev
Log:
https://issues.apache.org/activemq/browse/AMQ-2719 - order of redelivered
messages
Added:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/OutOfOrderTestCase.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=941174&r1=941173&r2=941174&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Wed May 5 07:09:37 2010
@@ -95,6 +95,7 @@ public class Queue extends BaseDestinati
private final LinkedHashMap<MessageId, QueueMessageReference>
pagedInMessages = new LinkedHashMap<MessageId, QueueMessageReference>();
// Messages that are paged in but have not yet been targeted at a
subscription
private List<QueueMessageReference> pagedInPendingDispatch = new
ArrayList<QueueMessageReference>(100);
+ private List<QueueMessageReference> redeliveredWaitingDispatch = new
ArrayList<QueueMessageReference>();
private MessageGroupMap messageGroupOwners;
private DispatchPolicy dispatchPolicy = new RoundRobinDispatchPolicy();
private MessageGroupMapFactory messageGroupMapFactory = new
MessageGroupHashBucketFactory();
@@ -377,7 +378,7 @@ public class Queue extends BaseDestinati
wakeup();
}
}
-
+
public void removeSubscription(ConnectionContext context, Subscription
sub, long lastDeiveredSequenceId) throws Exception {
destinationStatistics.getConsumers().decrement();
// synchronize with dispatch method so that no new messages are sent
@@ -406,7 +407,7 @@ public class Queue extends BaseDestinati
getMessageGroupOwners().removeConsumer(consumerId);
// redeliver inflight messages
- List<QueueMessageReference> list = new
ArrayList<QueueMessageReference>();
+
for (MessageReference ref : sub.remove(context, this)) {
QueueMessageReference qmr = (QueueMessageReference) ref;
if (qmr.getLockOwner() == sub) {
@@ -416,11 +417,10 @@ public class Queue extends BaseDestinati
qmr.incrementRedeliveryCounter();
}
}
- list.add(qmr);
+ redeliveredWaitingDispatch.add(qmr);
}
-
- if (!list.isEmpty()) {
- doDispatch(list);
+ if (!redeliveredWaitingDispatch.isEmpty()) {
+ doDispatch(new ArrayList());
}
}
if (!(this.optimizedDispatch || isSlave())) {
@@ -1220,7 +1220,7 @@ public class Queue extends BaseDestinati
// Perhaps we should page always into the pagedInPendingDispatch
list if
// !messages.isEmpty(), and then if
!pagedInPendingDispatch.isEmpty()
// then we do a dispatch.
- if (pageInMoreMessages || pendingBrowserDispatch != null) {
+ if (pageInMoreMessages || pendingBrowserDispatch != null ||
!redeliveredWaitingDispatch.isEmpty()) {
try {
pageInMessages(pendingBrowserDispatch != null);
@@ -1494,8 +1494,12 @@ public class Queue extends BaseDestinati
synchronized (dispatchMutex) {
synchronized (pagedInPendingDispatch) {
+ if (!redeliveredWaitingDispatch.isEmpty()) {
+ // Try first to dispatch redelivered messages to keep an
proper order
+ redeliveredWaitingDispatch =
doActualDispatch(redeliveredWaitingDispatch);
+ }
if (!pagedInPendingDispatch.isEmpty()) {
- // Try to first dispatch anything that had not been
+ // Next dispatch anything that had not been
// dispatched before.
pagedInPendingDispatch =
doActualDispatch(pagedInPendingDispatch);
}
Added:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/OutOfOrderTestCase.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/OutOfOrderTestCase.java?rev=941174&view=auto
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/OutOfOrderTestCase.java
(added)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/OutOfOrderTestCase.java
Wed May 5 07:09:37 2010
@@ -0,0 +1,118 @@
+package org.apache.activemq.bugs;
+
+import java.util.Date;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import junit.framework.TestCase;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.bugs.AMQ1866.ConsumerThread;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+public class OutOfOrderTestCase extends TestCase {
+
+ private static final Log log =
LogFactory.getLog(OutOfOrderTestCase.class);
+
+ public static final String BROKER_URL = "tcp://localhost:61616";
+ private static final int PREFETCH = 10;
+ private static final String CONNECTION_URL = BROKER_URL +
"?jms.prefetchPolicy.all=" + PREFETCH;
+
+ public static final String QUEUE_NAME = "QUEUE";
+ private static final String DESTINATION =
"QUEUE?consumer.exclusive=true";
+
+ BrokerService brokerService;
+ Session session;
+ Connection connection;
+
+ int seq = 0;
+
+ public void setUp() throws Exception {
+ brokerService = new BrokerService();
+ brokerService.setUseJmx(true);
+ brokerService.addConnector(BROKER_URL);
+ brokerService.deleteAllMessages();
+ brokerService.start();
+
+ ConnectionFactory connectionFactory = new
ActiveMQConnectionFactory(CONNECTION_URL);
+ connection = connectionFactory.createConnection();
+ connection.start();
+ session = connection.createSession(false,
Session.CLIENT_ACKNOWLEDGE);
+
+ }
+
+
+ protected void tearDown() throws Exception {
+ session.close();
+ connection.close();
+ brokerService.stop();
+ }
+
+
+
+ public void testOrder() throws Exception {
+
+ log.info("Producing messages 0-29 . . .");
+ Destination destination = session.createQueue(DESTINATION);
+ final MessageProducer messageProducer = session
+ .createProducer(destination);
+ try {
+ for (int i = 0; i < 30; ++i) {
+ final Message message = session
+ .createTextMessage(createMessageText(i));
+ message.setStringProperty("JMSXGroupID", "FOO");
+
+ messageProducer.send(message);
+ log.info("sent " + toString(message));
+ }
+ } finally {
+ messageProducer.close();
+ }
+
+ log.info("Consuming messages 0-9 . . .");
+ consumeBatch();
+
+ log.info("Consuming messages 10-19 . . .");
+ consumeBatch();
+
+ log.info("Consuming messages 20-29 . . .");
+ consumeBatch();
+ }
+
+ protected void consumeBatch() throws Exception {
+ Destination destination = session.createQueue(DESTINATION);
+ final MessageConsumer messageConsumer =
session.createConsumer(destination);
+ try {
+ for (int i = 0; i < 10; ++i) {
+ final Message message = messageConsumer.receive(1000L);
+ log.info("received " + toString(message));
+ assertEquals("Message out of order", createMessageText(seq++),
((TextMessage) message).getText());
+ message.acknowledge();
+ }
+ } finally {
+ messageConsumer.close();
+ }
+ }
+
+ private String toString(final Message message) throws JMSException {
+ String ret = "received message '" + ((TextMessage)
message).getText() + "' - " + message.getJMSMessageID();
+ if (message.getJMSRedelivered())
+ ret += " (redelivered)";
+ return ret;
+
+ }
+
+ private static String createMessageText(final int index) {
+ return "message #" + index;
+ }
+}
\ No newline at end of file