This is an automated email from the ASF dual-hosted git repository.

jbonofre pushed a commit to branch activemq-5.17.x
in repository https://gitbox.apache.org/repos/asf/activemq.git


The following commit(s) were added to refs/heads/activemq-5.17.x by this push:
     new 2de859f75 AMQ-8617: RedeliveryPolicy:Exponential Backoff + 
NonBlockingRedelivery = too long delays
2de859f75 is described below

commit 2de859f758ac0e9968211248200953a3cbed6f5e
Author: Endre Stølsvik <[email protected]>
AuthorDate: Tue May 31 01:56:30 2022 +0200

    AMQ-8617: RedeliveryPolicy:Exponential Backoff + NonBlockingRedelivery = 
too long delays
    
    (cherry picked from commit 393a696955cbf97b90576e4a85b3ce1a02268ad7)
    
    Scenario on client:
    
    1. Employing RedeliveryPolicy with exponential backoff (keeping maximum
    redeliveries at default 6)
    2. Enabled non-blocking redelivery
    3. Receiving e.g. 100 consecutive poison messages (which eventually
    should DLQ after max redeliveries)
    
    This will result in massive redelivery delays due to a logic bug.
    
    The reason is that redeliveryDelay is a field variable kept on the
    ActiveMQMessageConsumer, instead of being a property on the message - or
    that the redelivery delay was calculated per message based on the
    redelivery count.
    
    When consecutive messages rollbacks multiple times, the redeliveryDelay
    field is continuously multiplied by the backoff multiplier, resulting in
    enormous delays.
    
    Fix: Ditch the field variable, instead calculating the redeliveryDelay
    per delivery from the redelivery count. (This happens to be identical to
    how it is done in afterRollback() in ActiveMQSession:1004.)
    
    Test is added - which fails with the previous code, and passes with
    this. Added a debug log line for the calculated delay.
---
 .../apache/activemq/ActiveMQMessageConsumer.java   | 20 ++---
 .../org/apache/activemq/RedeliveryPolicyTest.java  | 96 +++++++++++++++++++++-
 2 files changed, 104 insertions(+), 12 deletions(-)

diff --git 
a/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
 
b/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
index 533c6234e..f8ba61d41 100644
--- 
a/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
+++ 
b/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
@@ -139,7 +139,6 @@ public class ActiveMQMessageConsumer implements 
MessageAvailableConsumer, StatsC
     private PreviouslyDeliveredMap<MessageId, PreviouslyDelivered> 
previouslyDeliveredMessages;
     private int deliveredCounter;
     private int additionalWindowSize;
-    private long redeliveryDelay;
     private int ackCounter;
     private int dispatchedCount;
     private final AtomicReference<MessageListener> messageListener = new 
AtomicReference<MessageListener>();
@@ -1224,7 +1223,6 @@ public class ActiveMQMessageConsumer implements 
MessageAvailableConsumer, StatsC
             deliveredMessages.clear();
             clearPreviouslyDelivered();
         }
-        redeliveryDelay = 0;
     }
 
     public void rollback() throws JMSException {
@@ -1249,14 +1247,7 @@ public class ActiveMQMessageConsumer implements 
MessageAvailableConsumer, StatsC
                     return;
                 }
 
-                // use initial delay for first redelivery
                 MessageDispatch lastMd = deliveredMessages.getFirst();
-                final int currentRedeliveryCount = 
lastMd.getMessage().getRedeliveryCounter();
-                if (currentRedeliveryCount > 0) {
-                    redeliveryDelay = 
redeliveryPolicy.getNextRedeliveryDelay(redeliveryDelay);
-                } else {
-                    redeliveryDelay = 
redeliveryPolicy.getInitialRedeliveryDelay();
-                }
                 MessageId firstMsgId = 
deliveredMessages.getLast().getMessage().getMessageId();
 
                 for (Iterator<MessageDispatch> iter = 
deliveredMessages.iterator(); iter.hasNext();) {
@@ -1279,12 +1270,21 @@ public class ActiveMQMessageConsumer implements 
MessageAvailableConsumer, StatsC
                     session.sendAck(ack,true);
                     // Adjust the window size.
                     additionalWindowSize = Math.max(0, additionalWindowSize - 
deliveredMessages.size());
-                    redeliveryDelay = 0;
 
                     deliveredCounter -= deliveredMessages.size();
                     deliveredMessages.clear();
 
                 } else {
+                    // Find what redelivery delay to use, based on the 
redelivery count of last message.
+                    // Current redelivery count is already increased at this 
point
+                    final int currentRedeliveryCount = 
lastMd.getMessage().getRedeliveryCounter();
+                    long redeliveryDelay = 
redeliveryPolicy.getInitialRedeliveryDelay();
+                    // Iterating based on redelivery count to find delay to 
use.
+                    // NOTE: One less than current redelivery count, to use 
initial delay for first redelivery.
+                    for (int i = 0; i < (currentRedeliveryCount-1); i++) {
+                        redeliveryDelay = 
redeliveryPolicy.getNextRedeliveryDelay(redeliveryDelay);
+                    }
+                    LOG.debug("Redelivery delay calculated for redelivery 
count {}: {}, for messageId '{}'.", currentRedeliveryCount, redeliveryDelay, 
lastMd.getMessage().getMessageId());
 
                     // only redelivery_ack after first delivery
                     if (currentRedeliveryCount > 0) {
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java
index 5f325a491..85bdc312a 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java
@@ -56,7 +56,7 @@ public class RedeliveryPolicyTest extends JmsTestSupport {
     }
 
 
-    public void testGetNext() throws Exception {
+    public void testGetNextWithExponentialBackoff() throws Exception {
 
         RedeliveryPolicy policy = new RedeliveryPolicy();
         policy.setInitialRedeliveryDelay(0);
@@ -76,6 +76,34 @@ public class RedeliveryPolicyTest extends JmsTestSupport {
         assertEquals(500, delay);
     }
 
+    public void 
testGetNextWithExponentialBackoff_RedeliveryDelayIsIgnoredIfInitialRedeliveryDelayAboveZero()
 {
+
+        RedeliveryPolicy policy = new RedeliveryPolicy();
+        policy.setInitialRedeliveryDelay(42);
+        policy.setRedeliveryDelay(-100); // This is ignored in actual usage 
since initial > 0
+        policy.setBackOffMultiplier(2d);
+        policy.setUseExponentialBackOff(true);
+
+        // Invoke in the order employed when actually used by redelivery code 
paths
+        long delay = policy.getInitialRedeliveryDelay();
+        assertEquals(42, delay);
+        // Notice how the setRedeliveryDelay(-100) doesn't affect the 
calculation if initial > 0
+        delay = policy.getNextRedeliveryDelay(delay);
+        assertEquals(42*2, delay);
+        delay = policy.getNextRedeliveryDelay(delay);
+        assertEquals(42*4, delay);
+
+        // If the initial delay is 0, when given back to the policy via 
getNextRedeliveryDelay(), we get -100
+        assertEquals(-100, policy.getNextRedeliveryDelay(0));
+        // .. but when invoked with anything else, the backoff multiplier is 
used
+        assertEquals(123 * 2, policy.getNextRedeliveryDelay(123));
+
+        // If exponential backoff is disabled, the setRedeliveryDelay(-100) is 
used.
+        policy.setUseExponentialBackOff(false);
+        delay = policy.getNextRedeliveryDelay(delay);
+        assertEquals(-100, delay);
+    }
+
     public void testGetNextWithInitialDelay() throws Exception {
 
         RedeliveryPolicy policy = new RedeliveryPolicy();
@@ -87,7 +115,6 @@ public class RedeliveryPolicyTest extends JmsTestSupport {
         assertEquals(1000, delay);
         delay = policy.getNextRedeliveryDelay(delay);
         assertEquals(1000, delay);
-
     }
 
     /**
@@ -145,6 +172,71 @@ public class RedeliveryPolicyTest extends JmsTestSupport {
 
     }
 
+    /**
+     * By version 5.17.1 (2022-04-25), the combination of exponential 
redelivery with non-blocking redelivery was
+     * handled erroneously: The redeliveryDelay was a modifiable field on the 
ActiveMQMessageConsumer (not per message,
+     * nor calculated individually based on the message's redelivery count), 
and thus if multiple consecutive messages
+     * was rolled back multiple times in a row (i.e. redeliveries > 1), the 
exponential delay <i>which was kept on the
+     * consumer</i> would quickly result in extreme delays.
+     */
+    public void 
testExponentialRedeliveryPolicyCombinedWithNonBlockingRedelivery() throws 
Exception {
+        // :: ARRANGE
+        // Condition #1: Create an exponential redelivery policy
+        RedeliveryPolicy policy = connection.getRedeliveryPolicy();
+        policy.setInitialRedeliveryDelay(0);
+        policy.setRedeliveryDelay(100);
+        policy.setBackOffMultiplier(2);
+        policy.setUseExponentialBackOff(true);
+        policy.setMaximumRedeliveries(4); // 5 attempts: 1 delivery + 4 
redeliveries
+
+        // assert set of delays
+        long delay = policy.getInitialRedeliveryDelay();
+        assertEquals(0, delay);
+        delay = policy.getNextRedeliveryDelay(delay);
+        assertEquals(100, delay);
+        delay = policy.getNextRedeliveryDelay(delay);
+        assertEquals(200, delay);
+        delay = policy.getNextRedeliveryDelay(delay);
+        assertEquals(400, delay);
+
+        // Condition #2: Set non-blocking redelivery
+        connection.setNonBlockingRedelivery(true);
+
+        // :: ACT
+        connection.start();
+        Session session = connection.createSession(true, 
Session.AUTO_ACKNOWLEDGE);
+        ActiveMQQueue destination = new ActiveMQQueue(getName());
+        MessageProducer producer = session.createProducer(destination);
+
+        MessageConsumer consumer = session.createConsumer(destination);
+
+        // Send 'count' messages
+        int count = 10;
+        for (int i = 0; i<count; i++) {
+            producer.send(session.createTextMessage("#"+i));
+        }
+        session.commit();
+        LOG.info("{} messages sent", count);
+
+        // Receive messages, but rollback: 4+1 times each message = 5 * count
+        int receiveCount = 0;
+        // Add one extra receive, which should NOT result in a message (they 
should all be DLQed by then).
+        for (int i = 0; i < (count * 5 + 1); i++) {
+            // Max delay between redeliveries for these messages should be 
400ms
+            // Waiting for 4x that = 1600 ms, to allow for hiccups during 
testing.
+            // (With the faulty code, the last calculated delay before test 
failing was 26214400.)
+            TextMessage m = (TextMessage) consumer.receive(1600);
+            LOG.info("Message received: {}", m);
+            if (m != null) {
+                receiveCount ++;
+            }
+            session.rollback();
+        }
+
+        // ASSERT
+        // We should have received count * 5 messages
+        assertEquals(count * 5, receiveCount);
+    }
 
     /**
      * @throws Exception

Reply via email to