Repository: activemq
Updated Branches:
  refs/heads/master 441973b48 -> 0142c4dc8


https://issues.apache.org/jira/browse/AMQ-5614 - additional expiration 
attribute on dead letter strategy and tests


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/0142c4dc
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/0142c4dc
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/0142c4dc

Branch: refs/heads/master
Commit: 0142c4dc89855d78450e64572abbac90e9432870
Parents: be919fb
Author: gtully <[email protected]>
Authored: Wed Feb 25 14:32:10 2015 +0000
Committer: gtully <[email protected]>
Committed: Wed Feb 25 14:32:43 2015 +0000

----------------------------------------------------------------------
 .../activemq/broker/region/RegionBroker.java    |   9 +-
 .../policy/AbstractDeadLetterStrategy.java      |   9 +
 .../region/policy/DeadLetterStrategy.java       |   8 +
 .../java/org/apache/activemq/TestSupport.java   |   3 +-
 .../broker/policy/DeadLetterExpiryTest.java     | 187 +++++++++++++++++++
 .../broker/policy/DeadLetterTestSupport.java    |   2 +-
 6 files changed, 214 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/0142c4dc/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
index 893ded3..bdeeb76 100755
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
@@ -777,8 +777,13 @@ public class RegionBroker extends EmptyBroker {
                         if 
(deadLetterStrategy.isSendToDeadLetterQueue(message)) {
                             // message may be inflight to other subscriptions 
so do not modify
                             message = message.copy();
-                            stampAsExpired(message);
-                            message.setExpiration(0);
+                            long dlqExpiration = 
deadLetterStrategy.getExpiration();
+                            if (dlqExpiration > 0) {
+                                dlqExpiration += System.currentTimeMillis();
+                            } else {
+                                stampAsExpired(message);
+                            }
+                            message.setExpiration(dlqExpiration);
                             if (!message.isPersistent()) {
                                 message.setPersistent(true);
                                 message.setProperty("originalDeliveryMode", 
"NON_PERSISTENT");

http://git-wip-us.apache.org/repos/asf/activemq/blob/0142c4dc/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbstractDeadLetterStrategy.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbstractDeadLetterStrategy.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbstractDeadLetterStrategy.java
index f029920..ed376b7 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbstractDeadLetterStrategy.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbstractDeadLetterStrategy.java
@@ -32,6 +32,7 @@ public abstract class AbstractDeadLetterStrategy implements 
DeadLetterStrategy {
     private boolean processExpired = true;
     private boolean enableAudit = true;
     private final ActiveMQMessageAudit messageAudit = new 
ActiveMQMessageAudit();
+    private long expiration;
 
     @Override
     public void rollback(Message message) {
@@ -98,4 +99,12 @@ public abstract class AbstractDeadLetterStrategy implements 
DeadLetterStrategy {
     public void setEnableAudit(boolean enableAudit) {
         this.enableAudit = enableAudit;
     }
+
+    public long getExpiration() {
+        return expiration;
+    }
+
+    public void setExpiration(long expiration) {
+        this.expiration = expiration;
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/0142c4dc/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/DeadLetterStrategy.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/DeadLetterStrategy.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/DeadLetterStrategy.java
index 7b83dc9..fdd1174 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/DeadLetterStrategy.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/DeadLetterStrategy.java
@@ -69,4 +69,12 @@ public interface DeadLetterStrategy {
      */
     public void rollback(Message message);
 
+    /**
+     * The expiration value to use on messages sent to the DLQ, default 0
+     * @return expiration in milli seconds
+     */
+    public void setExpiration(long expiration);
+
+    public long getExpiration();
+
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/0142c4dc/activemq-unit-tests/src/test/java/org/apache/activemq/TestSupport.java
----------------------------------------------------------------------
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/TestSupport.java 
b/activemq-unit-tests/src/test/java/org/apache/activemq/TestSupport.java
index 07c38f5..7b11271 100755
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/TestSupport.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/TestSupport.java
@@ -42,6 +42,7 @@ import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
 import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
 import org.apache.activemq.store.leveldb.LevelDBPersistenceAdapter;
 import org.apache.activemq.store.memory.MemoryPersistenceAdapter;
+import org.apache.activemq.util.JMXSupport;
 
 /**
  * Useful base class for unit test cases
@@ -179,7 +180,7 @@ public abstract class TestSupport extends 
CombinationTestSupport {
 
     protected QueueViewMBean getProxyToQueue(String name) throws 
MalformedObjectNameException, JMSException {
         BrokerService brokerService = 
BrokerRegistry.getInstance().lookup("localhost");
-        ObjectName queueViewMBeanName = new 
ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName="+name);
+        ObjectName queueViewMBeanName = new 
ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName="+
 JMXSupport.encodeObjectNamePart(name));
         QueueViewMBean proxy = (QueueViewMBean) 
brokerService.getManagementContext()
                 .newProxyInstance(queueViewMBeanName, QueueViewMBean.class, 
true);
         return proxy;

http://git-wip-us.apache.org/repos/asf/activemq/blob/0142c4dc/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/DeadLetterExpiryTest.java
----------------------------------------------------------------------
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/DeadLetterExpiryTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/DeadLetterExpiryTest.java
new file mode 100644
index 0000000..25a98b9
--- /dev/null
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/DeadLetterExpiryTest.java
@@ -0,0 +1,187 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.policy;
+
+import javax.jms.Destination;
+import javax.jms.Message;
+import javax.jms.Queue;
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.RedeliveryPolicy;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.jmx.QueueViewMBean;
+import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
+import org.apache.activemq.broker.region.policy.IndividualDeadLetterStrategy;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.util.Wait;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DeadLetterExpiryTest extends DeadLetterTest {
+    private static final Logger LOG = 
LoggerFactory.getLogger(DeadLetterExpiryTest.class);
+
+    @Override
+    protected BrokerService createBroker() throws Exception {
+        BrokerService broker = super.createBroker();
+        PolicyMap pMap = broker.getDestinationPolicy();
+
+        PolicyEntry policy = new PolicyEntry();
+        DeadLetterStrategy strategy = new IndividualDeadLetterStrategy();
+        strategy.setExpiration(4000);
+        strategy.setProcessNonPersistent(true);
+        policy.setDeadLetterStrategy(strategy);
+
+        pMap.put(new ActiveMQQueue(getDestinationString()), policy);
+        pMap.put(new ActiveMQTopic(getDestinationString()), policy);
+
+        SharedDeadLetterStrategy sharedLoopStrategy = new 
SharedDeadLetterStrategy();
+        strategy.setProcessNonPersistent(true);
+        sharedLoopStrategy.setExpiration(1000);
+        sharedLoopStrategy.setDeadLetterQueue(new ActiveMQQueue("DLQ.loop"));
+
+        PolicyEntry buggyLoopingDLQPolicy = new PolicyEntry();
+        buggyLoopingDLQPolicy.setDeadLetterStrategy(sharedLoopStrategy);
+
+        pMap.put(new ActiveMQQueue("loop"), buggyLoopingDLQPolicy);
+        pMap.put(new ActiveMQQueue("DLQ.loop"), buggyLoopingDLQPolicy);
+
+        PolicyEntry policyWithExpiryProcessing = pMap.getDefaultEntry();
+        policyWithExpiryProcessing.setExpireMessagesPeriod(1000);
+        pMap.setDefaultEntry(policyWithExpiryProcessing);
+
+        broker.setDestinationPolicy(pMap);
+
+        return broker;
+    }
+
+    @Override
+    protected Destination createDlqDestination() {
+        String prefix = topic ? "ActiveMQ.DLQ.Topic." : "ActiveMQ.DLQ.Queue.";
+        return new ActiveMQQueue(prefix + getClass().getName() + "." + 
getName());
+    }
+
+    protected void doTest() throws Exception {
+        connection.start();
+        messageCount = 4;
+
+        ActiveMQConnection amqConnection = (ActiveMQConnection) connection;
+        rollbackCount = 
amqConnection.getRedeliveryPolicy().getMaximumRedeliveries() + 1;
+        LOG.info("Will redeliver messages: " + rollbackCount + " times");
+
+        makeConsumer();
+        sendMessages();
+
+        // now lets receive and rollback N times
+        for (int i = 0; i < messageCount; i++) {
+            consumeAndRollback(i);
+        }
+
+        Queue dlqQueue = (Queue) createDlqDestination();
+        verifyIsDlq(dlqQueue);
+
+        // they should expire
+        final QueueViewMBean queueViewMBean = 
getProxyToQueue(dlqQueue.getQueueName());
+
+        assertTrue("all dlq messages expired", Wait.waitFor(new 
Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                LOG.info("Queue size:" + queueViewMBean.getQueueSize());
+                return queueViewMBean.getExpiredCount() == messageCount;
+            }
+        }));
+
+        makeDlqConsumer();
+        assertNull("no message available", dlqConsumer.receive(1000));
+
+        final QueueViewMBean sharedDlqViewMBean = 
getProxyToQueue(SharedDeadLetterStrategy.DEFAULT_DEAD_LETTER_QUEUE_NAME);
+        assertTrue("messages stay on shared dlq which has default 
expiration=0", Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                LOG.info("Q " + sharedDlqViewMBean.getName() + " size:" + 
sharedDlqViewMBean.getQueueSize());
+                return sharedDlqViewMBean.getQueueSize() == messageCount;
+            }
+        }));
+
+    }
+
+    public void testNoDLQLoop() throws Exception {
+        destination = new ActiveMQQueue("loop");
+        messageCount = 2;
+
+        connection.start();
+
+        ActiveMQConnection amqConnection = (ActiveMQConnection) connection;
+        rollbackCount = 
amqConnection.getRedeliveryPolicy().getMaximumRedeliveries() + 1;
+        LOG.info("Will redeliver messages: " + rollbackCount + " times");
+
+        makeConsumer();
+        sendMessages();
+
+        // now lets receive and rollback N times
+        for (int i = 0; i < messageCount; i++) {
+            consumeAndRollback(i);
+        }
+
+        // they should expire
+        final QueueViewMBean queueViewMBean = getProxyToQueue("DLQ.loop");
+
+        assertTrue("all dlq messages expired", Wait.waitFor(new 
Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                LOG.info("Queue size:" + queueViewMBean.getQueueSize());
+                return queueViewMBean.getExpiredCount() == messageCount;
+            }
+        }));
+
+
+        // strategy audit suppresses resend
+        assertEquals("it should be empty", 0, queueViewMBean.getQueueSize());
+
+    }
+
+    protected void consumeAndRollback(int messageCounter) throws Exception {
+        for (int i = 0; i < rollbackCount; i++) {
+            Message message = consumer.receive(5000);
+            assertNotNull("No message received for message: " + messageCounter 
+ " and rollback loop: " + i, message);
+            assertMessage(message, messageCounter);
+
+            session.rollback();
+        }
+        LOG.info("Rolled back: " + rollbackCount + " times");
+    }
+
+    protected void setUp() throws Exception {
+        transactedMode = true;
+        super.setUp();
+    }
+
+    protected ActiveMQConnectionFactory createConnectionFactory() throws 
Exception {
+        ActiveMQConnectionFactory answer = super.createConnectionFactory();
+        RedeliveryPolicy policy = new RedeliveryPolicy();
+        policy.setMaximumRedeliveries(3);
+        policy.setBackOffMultiplier((short) 1);
+        policy.setInitialRedeliveryDelay(10);
+        policy.setUseExponentialBackOff(false);
+        answer.setRedeliveryPolicy(policy);
+        return answer;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/0142c4dc/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/DeadLetterTestSupport.java
----------------------------------------------------------------------
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/DeadLetterTestSupport.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/DeadLetterTestSupport.java
index 6d05b6d..b9034b3 100755
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/DeadLetterTestSupport.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/DeadLetterTestSupport.java
@@ -60,7 +60,7 @@ public abstract class DeadLetterTestSupport extends 
TestSupport {
     protected BrokerService broker;
     protected boolean transactedMode;
     protected int acknowledgeMode = Session.CLIENT_ACKNOWLEDGE;
-    private Destination destination;
+    protected Destination destination;
 
     protected void setUp() throws Exception {
         super.setUp();

Reply via email to