[AMQ-6847] pause dispatch for message move to avoid redelivery with pending 
ack/remove/audit rollback


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/2ea5d142
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/2ea5d142
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/2ea5d142

Branch: refs/heads/master
Commit: 2ea5d1420bbbf90bd151e19a75e6ca33c773f1f4
Parents: 005403e
Author: gtully <gary.tu...@gmail.com>
Authored: Fri Oct 27 11:40:06 2017 +0100
Committer: gtully <gary.tu...@gmail.com>
Committed: Fri Oct 27 11:40:06 2017 +0100

----------------------------------------------------------------------
 .../apache/activemq/broker/region/Queue.java    | 31 +++++++++----
 .../apache/activemq/broker/jmx/MBeanTest.java   | 47 ++++++++++++++++++++
 2 files changed, 70 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/2ea5d142/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
index f440f76..4a2d272 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
@@ -1471,18 +1471,33 @@ public class Queue extends BaseDestination implements 
Task, UsageListener, Index
      * @throws Exception
      */
     public boolean moveMessageTo(ConnectionContext context, 
QueueMessageReference m, ActiveMQDestination dest) throws Exception {
-        BrokerSupport.resend(context, m.getMessage(), dest);
-        removeMessage(context, m);
-        messagesLock.writeLock().lock();
+        Set<Destination> destsToPause = regionBroker.getDestinations(dest);
         try {
-            messages.rollback(m.getMessageId());
-            if (isDLQ()) {
-                DeadLetterStrategy stratagy = getDeadLetterStrategy();
-                stratagy.rollback(m.getMessage());
+            for (Destination d: destsToPause) {
+                if (d instanceof Queue) {
+                    ((Queue)d).pauseDispatch();
+                }
+            }
+            BrokerSupport.resend(context, m.getMessage(), dest);
+            removeMessage(context, m);
+            messagesLock.writeLock().lock();
+            try {
+                messages.rollback(m.getMessageId());
+                if (isDLQ()) {
+                    DeadLetterStrategy stratagy = getDeadLetterStrategy();
+                    stratagy.rollback(m.getMessage());
+                }
+            } finally {
+                messagesLock.writeLock().unlock();
             }
         } finally {
-            messagesLock.writeLock().unlock();
+            for (Destination d: destsToPause) {
+                if (d instanceof Queue) {
+                    ((Queue)d).resumeDispatch();
+                }
+            }
         }
+
         return true;
     }
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/2ea5d142/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java
----------------------------------------------------------------------
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java
index d72d709..ecc6894 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java
@@ -54,6 +54,7 @@ import org.apache.activemq.ActiveMQPrefetchPolicy;
 import org.apache.activemq.ActiveMQSession;
 import org.apache.activemq.BlobMessage;
 import org.apache.activemq.EmbeddedBrokerTestSupport;
+import org.apache.activemq.RedeliveryPolicy;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.region.BaseDestination;
 import org.apache.activemq.broker.region.policy.PolicyEntry;
@@ -180,6 +181,52 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
         assertEquals("no forwards", 0, queueNew.getForwardCount());
     }
 
+    public void testMoveFromDLQImmediateDLQ() throws Exception {
+
+        RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
+        redeliveryPolicy.setMaximumRedeliveries(0);
+        
((ActiveMQConnectionFactory)connectionFactory).setRedeliveryPolicy(redeliveryPolicy);
+        Connection connection = connectionFactory.createConnection();
+
+        // populate
+        useConnection(connection);
+
+        Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+        Destination dest = session.createQueue(getDestinationString());
+        MessageConsumer consumer = session.createConsumer(dest);
+        consumer.setMessageListener(new MessageListener() {
+            @Override
+            public void onMessage(Message message) {
+                try {
+                    System.out.println("Received: " + message + " on " + 
message.getJMSDestination());
+                } catch (JMSException e) {
+                    e.printStackTrace();
+                }
+                throw new RuntimeException("Horrible exception");
+            }});
+
+
+        ObjectName dlqQueueViewMBeanName = assertRegisteredObjectName(domain + 
":type=Broker,brokerName=localhost,destinationType=Queue,destinationName=" + 
SharedDeadLetterStrategy.DEFAULT_DEAD_LETTER_QUEUE_NAME );
+        QueueViewMBean dlq = 
MBeanServerInvocationHandler.newProxyInstance(mbeanServer, 
dlqQueueViewMBeanName, QueueViewMBean.class, true);
+
+        assertTrue("messagees on dlq", Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return MESSAGE_COUNT == dlq.getQueueSize();
+            }
+        }));
+
+        dlq.retryMessages();
+
+        assertTrue("messagees on dlq after retry", Wait.waitFor(new 
Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                LOG.info("Dlq size: " + dlq.getQueueSize());
+                return MESSAGE_COUNT == dlq.getQueueSize();
+            }
+        }));
+    }
+
     //Show broken behaviour https://issues.apache.org/jira/browse/AMQ-5752";
     // points to the need to except on a duplicate or have store.addMessage 
return boolean
     // need some thought on how best to resolve this

Reply via email to