Author: gtully
Date: Tue Aug 24 09:50:25 2010
New Revision: 988455
URL: http://svn.apache.org/viewvc?rev=988455&view=rev
Log:
resolve: https://issues.apache.org/activemq/browse/AMQ-2876 - ensure an acked
message is not expired when processing the ack
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsSendReceiveWithMessageExpirationTest.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java?rev=988455&r1=988454&r2=988455&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
Tue Aug 24 09:50:25 2010
@@ -50,12 +50,11 @@ public class QueueSubscription extends P
final Queue queue = (Queue)q;
if (n.isExpired()) {
- if (broker.isExpired(n)) {
- queue.messageExpired(context, this, node);
- } else {
- LOG.debug("ignoring ack " + ack + ", for already expired
message: " + n);
+ // sync with message expiry processing
+ if (!broker.isExpired(n)) {
+ LOG.warn("ignoring ack " + ack + ", for already expired
message: " + n);
+ return;
}
- return;
}
queue.removeMessage(context, this, node, ack);
}
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsSendReceiveWithMessageExpirationTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsSendReceiveWithMessageExpirationTest.java?rev=988455&r1=988454&r2=988455&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsSendReceiveWithMessageExpirationTest.java
(original)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsSendReceiveWithMessageExpirationTest.java
Tue Aug 24 09:50:25 2010
@@ -17,6 +17,8 @@
package org.apache.activemq;
import java.util.Date;
+import java.util.Vector;
+import java.util.concurrent.TimeUnit;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
@@ -107,6 +109,58 @@ public class JmsSendReceiveWithMessageEx
assertNull(consumer.receive(1000));
}
+ public void testConsumeExpiredQueueAndDlq() throws Exception {
+
+ MessageProducer producerNormal = createProducer(0);
+ MessageProducer producerExpire = createProducer(500);
+
+ consumerDestination = session.createQueue("ActiveMQ.DLQ");
+ MessageConsumer dlqConsumer = createConsumer();
+
+ consumerDestination = session.createQueue(getConsumerSubject());
+ producerDestination = session.createQueue(getProducerSubject());
+
+
+ Connection consumerConnection = createConnection();
+ ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy();
+ prefetchPolicy.setAll(10);
+
((ActiveMQConnection)consumerConnection).setPrefetchPolicy(prefetchPolicy);
+ Session consumerSession = consumerConnection.createSession(false,
Session.CLIENT_ACKNOWLEDGE);
+ MessageConsumer consumer =
consumerSession.createConsumer(consumerDestination);
+ consumerConnection.start();
+ connection.start();
+
+ String msgBody = new String(new byte[20*1024]);
+ for (int i = 0; i < data.length; i++) {
+ Message message = session.createTextMessage(msgBody);
+ producerExpire.send(producerDestination, message);
+ }
+
+ for (int i = 0; i < data.length; i++) {
+ Message message = session.createTextMessage(msgBody);
+ producerNormal.send(producerDestination, message);
+ }
+
+ Vector<Message> messages = new Vector<Message>();
+ Message received;
+ while ((received = consumer.receive(1000)) != null) {
+ messages.add(received);
+ if (messages.size() == 1) {
+ TimeUnit.SECONDS.sleep(1);
+ }
+ received.acknowledge();
+ };
+
+ assertEquals("got messages", messageCount + 1, messages.size());
+
+ Vector<Message> dlqMessages = new Vector<Message>();
+ while ((received = dlqConsumer.receive(1000)) != null) {
+ dlqMessages.add(received);
+ };
+
+ assertEquals("got dlq messages", data.length - 1, dlqMessages.size());
+ }
+
/**
* Sends and consumes the messages to a queue destination.
*