Author: gtully
Date: Fri Nov 9 21:13:57 2012
New Revision: 1407640
URL: http://svn.apache.org/viewvc?rev=1407640&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-4166 - fix processing of expired -
stack-trace-1.txt was in error, expiry check should not have used the oneshot
broker check, just checing the messages state is required
Modified:
activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/util/RedeliveryPlugin.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/BrokerRedeliveryTest.java
Modified:
activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/util/RedeliveryPlugin.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/util/RedeliveryPlugin.java?rev=1407640&r1=1407639&r2=1407640&view=diff
==============================================================================
---
activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/util/RedeliveryPlugin.java
(original)
+++
activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/util/RedeliveryPlugin.java
Fri Nov 9 21:13:57 2012
@@ -127,7 +127,7 @@ public class RedeliveryPlugin extends Br
@Override
public void sendToDeadLetterQueue(ConnectionContext context,
MessageReference messageReference, Subscription subscription) {
- if (next.get().isExpired(messageReference)) {
+ if (messageReference.isExpired()) {
// there are two uses of sendToDeadLetterQueue, we are only
interested in valid messages
super.sendToDeadLetterQueue(context, messageReference,
subscription);
} else {
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/BrokerRedeliveryTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/BrokerRedeliveryTest.java?rev=1407640&r1=1407639&r2=1407640&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/BrokerRedeliveryTest.java
(original)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/BrokerRedeliveryTest.java
Fri Nov 9 21:13:57 2012
@@ -23,6 +23,7 @@ import javax.jms.MessageProducer;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ActiveMQPrefetchPolicy;
import org.apache.activemq.RedeliveryPolicy;
import org.apache.activemq.broker.region.policy.RedeliveryPolicyMap;
import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy;
@@ -43,7 +44,7 @@ public class BrokerRedeliveryTest extend
public void testScheduledRedelivery() throws Exception {
- sendMessage();
+ sendMessage(0);
ActiveMQConnection consumerConnection = (ActiveMQConnection)
createConnection();
RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
@@ -81,11 +82,38 @@ public class BrokerRedeliveryTest extend
consumerSession.commit();
}
- private void sendMessage() throws Exception {
+ public void testNoScheduledRedeliveryOfExpired() throws Exception {
+ ActiveMQConnection consumerConnection = (ActiveMQConnection)
createConnection();
+ consumerConnection.start();
+ Session consumerSession = consumerConnection.createSession(false,
Session.CLIENT_ACKNOWLEDGE);
+ MessageConsumer consumer = consumerSession.createConsumer(destination);
+ sendMessage(1000);
+ Message message = consumer.receive(1000);
+ assertNotNull("got message", message);
+
+ // ensure there is another consumer to redispatch to
+ MessageConsumer redeliverConsumer =
consumerSession.createConsumer(destination);
+
+ // allow consumed to expire so it gets redelivered
+ TimeUnit.SECONDS.sleep(2);
+ consumer.close();
+
+ // should go to dlq as it has expired
+ // validate DLQ
+ MessageConsumer dlqConsumer = consumerSession.createConsumer(new
ActiveMQQueue(SharedDeadLetterStrategy.DEFAULT_DEAD_LETTER_QUEUE_NAME));
+ Message dlqMessage = dlqConsumer.receive(2000);
+ assertNotNull("Got message from dql", dlqMessage);
+ assertEquals("message matches", message.getStringProperty("data"),
dlqMessage.getStringProperty("data"));
+ }
+
+ private void sendMessage(int timeToLive) throws Exception {
ActiveMQConnection producerConnection = (ActiveMQConnection)
createConnection();
producerConnection.start();
Session producerSession = producerConnection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = producerSession.createProducer(destination);
+ if (timeToLive > 0) {
+ producer.setTimeToLive(timeToLive);
+ }
Message message = producerSession.createMessage();
message.setStringProperty("data", data);
producer.send(message);