Author: tabish
Date: Tue Apr 9 17:05:21 2013
New Revision: 1466131
URL: http://svn.apache.org/r1466131
Log:
fix and test for: https://issues.apache.org/jira/browse/AMQ-4464
Modified:
activemq/trunk/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/NonBlockingConsumerRedeliveryTest.java
Modified:
activemq/trunk/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java?rev=1466131&r1=1466130&r2=1466131&view=diff
==============================================================================
---
activemq/trunk/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
(original)
+++
activemq/trunk/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
Tue Apr 9 17:05:21 2013
@@ -18,6 +18,7 @@ package org.apache.activemq;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
@@ -289,6 +290,7 @@ public class ActiveMQMessageConsumer imp
return session.isDupsOkAcknowledge() && !getDestination().isQueue() ;
}
+ @Override
public StatsImpl getStats() {
return stats;
}
@@ -380,6 +382,7 @@ public class ActiveMQMessageConsumer imp
* @throws JMSException if the JMS provider fails to receive the next
* message due to some internal error.
*/
+ @Override
public String getMessageSelector() throws JMSException {
checkClosed();
return selector;
@@ -394,6 +397,7 @@ public class ActiveMQMessageConsumer imp
* listener due to some internal error.
* @see
javax.jms.MessageConsumer#setMessageListener(javax.jms.MessageListener)
*/
+ @Override
public MessageListener getMessageListener() throws JMSException {
checkClosed();
return this.messageListener.get();
@@ -414,6 +418,7 @@ public class ActiveMQMessageConsumer imp
* message due to some internal error.
* @see javax.jms.MessageConsumer#getMessageListener
*/
+ @Override
public void setMessageListener(MessageListener listener) throws
JMSException {
checkClosed();
if (info.getPrefetchSize() == 0) {
@@ -436,6 +441,7 @@ public class ActiveMQMessageConsumer imp
}
}
+ @Override
public MessageAvailableListener getAvailableListener() {
return availableListener;
}
@@ -445,6 +451,7 @@ public class ActiveMQMessageConsumer imp
* message available so that the {@link MessageConsumer#receiveNoWait()}
can
* be called.
*/
+ @Override
public void setAvailableListener(MessageAvailableListener
availableListener) {
this.availableListener = availableListener;
}
@@ -514,6 +521,7 @@ public class ActiveMQMessageConsumer imp
* @return the next message produced for this message consumer, or null if
* this message consumer is concurrently closed
*/
+ @Override
public Message receive() throws JMSException {
checkClosed();
checkMessageListener();
@@ -547,6 +555,7 @@ public class ActiveMQMessageConsumer imp
}
if (session.isClientAcknowledge()) {
m.setAcknowledgeCallback(new Callback() {
+ @Override
public void execute() throws Exception {
session.checkClosed();
session.acknowledge();
@@ -554,6 +563,7 @@ public class ActiveMQMessageConsumer imp
});
} else if (session.isIndividualAcknowledge()) {
m.setAcknowledgeCallback(new Callback() {
+ @Override
public void execute() throws Exception {
session.checkClosed();
acknowledge(md);
@@ -577,6 +587,7 @@ public class ActiveMQMessageConsumer imp
* the timeout expires or this message consumer is concurrently
* closed
*/
+ @Override
public Message receive(long timeout) throws JMSException {
checkClosed();
checkMessageListener();
@@ -613,6 +624,7 @@ public class ActiveMQMessageConsumer imp
* @throws JMSException if the JMS provider fails to receive the next
* message due to some internal error.
*/
+ @Override
public Message receiveNoWait() throws JMSException {
checkClosed();
checkMessageListener();
@@ -651,6 +663,7 @@ public class ActiveMQMessageConsumer imp
* @throws JMSException if the JMS provider fails to close the consumer due
* to some internal error.
*/
+ @Override
public void close() throws JMSException {
if (!unconsumedMessages.isClosed()) {
if (session.getTransactionContext().isInTransaction()) {
@@ -743,6 +756,7 @@ public class ActiveMQMessageConsumer imp
executorService = Executors.newSingleThreadExecutor();
}
executorService.submit(new Runnable() {
+ @Override
public void run() {
try {
session.sendAck(ackToSend,true);
@@ -1197,6 +1211,10 @@ public class ActiveMQMessageConsumer imp
// Adjust the window size.
additionalWindowSize = Math.max(0, additionalWindowSize -
deliveredMessages.size());
redeliveryDelay = 0;
+
+ deliveredCounter -= deliveredMessages.size();
+ deliveredMessages.clear();
+
} else {
// only redelivery_ack after first delivery
@@ -1213,8 +1231,14 @@ public class ActiveMQMessageConsumer imp
final LinkedList<MessageDispatch>
pendingRedeliveries =
new
LinkedList<MessageDispatch>(deliveredMessages);
+ Collections.reverse(pendingRedeliveries);
+
+ deliveredCounter -= deliveredMessages.size();
+ deliveredMessages.clear();
+
// Start up the delivery again a little later.
session.getScheduler().executeAfterDelay(new
Runnable() {
+ @Override
public void run() {
try {
if (!unconsumedMessages.isClosed()) {
@@ -1236,9 +1260,13 @@ public class ActiveMQMessageConsumer imp
unconsumedMessages.enqueueFirst(md);
}
+ deliveredCounter -= deliveredMessages.size();
+ deliveredMessages.clear();
+
if (redeliveryDelay > 0 &&
!unconsumedMessages.isClosed()) {
// Start up the delivery again a little later.
session.getScheduler().executeAfterDelay(new
Runnable() {
+ @Override
public void run() {
try {
if (started.get()) {
@@ -1254,8 +1282,6 @@ public class ActiveMQMessageConsumer imp
}
}
}
- deliveredCounter -= deliveredMessages.size();
- deliveredMessages.clear();
}
}
if (messageListener.get() != null) {
@@ -1304,6 +1330,7 @@ public class ActiveMQMessageConsumer imp
}
}
+ @Override
public void dispatch(MessageDispatch md) {
MessageListener listener = this.messageListener.get();
try {
Modified:
activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/NonBlockingConsumerRedeliveryTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/NonBlockingConsumerRedeliveryTest.java?rev=1466131&r1=1466130&r2=1466131&view=diff
==============================================================================
---
activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/NonBlockingConsumerRedeliveryTest.java
(original)
+++
activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/NonBlockingConsumerRedeliveryTest.java
Tue Apr 9 17:05:21 2013
@@ -16,8 +16,11 @@
*/
package org.apache.activemq.usecases;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.concurrent.TimeUnit;
@@ -29,6 +32,7 @@ import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
+import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
@@ -78,6 +82,7 @@ public class NonBlockingConsumerRedelive
assertTrue("Pre-Rollback expects to receive: " + MSG_COUNT + "
messages.",
Wait.waitFor(new Wait.Condition(){
+ @Override
public boolean isSatisified() throws Exception {
LOG.info("Consumer has received " + received.size() + "
messages.");
return received.size() == MSG_COUNT;
@@ -91,6 +96,7 @@ public class NonBlockingConsumerRedelive
assertTrue("Post-Rollback expects to receive: " + MSG_COUNT + "
messages.",
Wait.waitFor(new Wait.Condition(){
+ @Override
public boolean isSatisified() throws Exception {
LOG.info("Consumer has received " + received.size() + "
messages since rollback.");
return received.size() == MSG_COUNT;
@@ -107,6 +113,76 @@ public class NonBlockingConsumerRedelive
}
@Test
+ public void testMessageDeleiveredInCorrectOrder() throws Exception {
+
+ final LinkedHashSet<Message> received = new LinkedHashSet<Message>();
+ final LinkedHashSet<Message> beforeRollback = new
LinkedHashSet<Message>();
+ final LinkedHashSet<Message> afterRollback = new
LinkedHashSet<Message>();
+
+ Connection connection = connectionFactory.createConnection();
+ Session session = connection.createSession(true,
Session.AUTO_ACKNOWLEDGE);
+ Destination destination = session.createQueue(destinationName);
+ MessageConsumer consumer = session.createConsumer(destination);
+
+ consumer.setMessageListener(new MessageListener() {
+ @Override
+ public void onMessage(Message message) {
+ received.add(message);
+ }
+ });
+
+ sendMessages();
+
+ session.commit();
+ connection.start();
+
+ assertTrue("Pre-Rollback expects to receive: " + MSG_COUNT + "
messages.",
+ Wait.waitFor(new Wait.Condition(){
+ @Override
+ public boolean isSatisified() throws Exception {
+ LOG.info("Consumer has received " + received.size() + "
messages.");
+ return received.size() == MSG_COUNT;
+ }
+ }
+ ));
+
+ beforeRollback.addAll(received);
+ received.clear();
+ session.rollback();
+
+ assertTrue("Post-Rollback expects to receive: " + MSG_COUNT + "
messages.",
+ Wait.waitFor(new Wait.Condition(){
+ @Override
+ public boolean isSatisified() throws Exception {
+ LOG.info("Consumer has received " + received.size() + "
messages since rollback.");
+ return received.size() == MSG_COUNT;
+ }
+ }
+ ));
+
+ afterRollback.addAll(received);
+ received.clear();
+
+ assertEquals(beforeRollback.size(), afterRollback.size());
+ assertEquals(beforeRollback, afterRollback);
+
+ Iterator<Message> after = afterRollback.iterator();
+ Iterator<Message> before = beforeRollback.iterator();
+
+ while (before.hasNext() && after.hasNext()) {
+ TextMessage original = (TextMessage) before.next();
+ TextMessage rolledBack = (TextMessage) after.next();
+
+ int originalInt = Integer.parseInt(original.getText());
+ int rolledbackInt = Integer.parseInt(rolledBack.getText());
+
+ assertEquals(originalInt, rolledbackInt);
+ }
+
+ session.commit();
+ }
+
+ @Test
public void testMessageDeleiveryDoesntStop() throws Exception {
final LinkedHashSet<Message> received = new LinkedHashSet<Message>();
@@ -130,6 +206,7 @@ public class NonBlockingConsumerRedelive
assertTrue("Pre-Rollback expects to receive: " + MSG_COUNT + "
messages.",
Wait.waitFor(new Wait.Condition(){
+ @Override
public boolean isSatisified() throws Exception {
LOG.info("Consumer has received " + received.size() + "
messages.");
return received.size() == MSG_COUNT;
@@ -145,6 +222,7 @@ public class NonBlockingConsumerRedelive
assertTrue("Post-Rollback expects to receive: " + MSG_COUNT + "
messages.",
Wait.waitFor(new Wait.Condition(){
+ @Override
public boolean isSatisified() throws Exception {
LOG.info("Consumer has received " + received.size() + "
messages since rollback.");
return received.size() == MSG_COUNT * 2;
@@ -182,6 +260,7 @@ public class NonBlockingConsumerRedelive
assertTrue("Pre-Rollback expects to receive: " + MSG_COUNT + "
messages.",
Wait.waitFor(new Wait.Condition(){
+ @Override
public boolean isSatisified() throws Exception {
LOG.info("Consumer has received " + received.size() + "
messages.");
return received.size() == MSG_COUNT;
@@ -194,6 +273,7 @@ public class NonBlockingConsumerRedelive
assertFalse("Delayed redelivery test not expecting any messages yet.",
Wait.waitFor(new Wait.Condition(){
+ @Override
public boolean isSatisified() throws Exception {
return received.size() > 0;
}
@@ -225,6 +305,7 @@ public class NonBlockingConsumerRedelive
assertTrue("Pre-Rollback expects to receive: " + MSG_COUNT + "
messages.",
Wait.waitFor(new Wait.Condition(){
+ @Override
public boolean isSatisified() throws Exception {
LOG.info("Consumer has received " + received.size() + "
messages.");
return received.size() == MSG_COUNT;
@@ -264,6 +345,7 @@ public class NonBlockingConsumerRedelive
assertTrue("Post-Rollback expects to receive: " + MSG_COUNT + "
messages.",
Wait.waitFor(new Wait.Condition(){
+ @Override
public boolean isSatisified() throws Exception {
LOG.info("Consumer has received " + received.size() + "
messages since rollback.");
return received.size() == MSG_COUNT;
@@ -307,6 +389,7 @@ public class NonBlockingConsumerRedelive
assertTrue("Pre-Rollback expects to receive: " + MSG_COUNT + "
messages.",
Wait.waitFor(new Wait.Condition(){
+ @Override
public boolean isSatisified() throws Exception {
LOG.info("Consumer has received " + received.size() + "
messages.");
return received.size() == MSG_COUNT;
@@ -329,6 +412,7 @@ public class NonBlockingConsumerRedelive
assertTrue("Post-Rollback expects to DLQ: " + MSG_COUNT + " messages.",
Wait.waitFor(new Wait.Condition(){
+ @Override
public boolean isSatisified() throws Exception {
LOG.info("Consumer has received " + dlqed.size() + "
messages in DLQ.");
return dlqed.size() == MSG_COUNT;