This is an automated email from the ASF dual-hosted git repository.
jbonofre pushed a commit to branch activemq-5.17.x
in repository https://gitbox.apache.org/repos/asf/activemq.git
The following commit(s) were added to refs/heads/activemq-5.17.x by this push:
new 2de859f75 AMQ-8617: RedeliveryPolicy:Exponential Backoff +
NonBlockingRedelivery = too long delays
2de859f75 is described below
commit 2de859f758ac0e9968211248200953a3cbed6f5e
Author: Endre Stølsvik <[email protected]>
AuthorDate: Tue May 31 01:56:30 2022 +0200
AMQ-8617: RedeliveryPolicy:Exponential Backoff + NonBlockingRedelivery =
too long delays
(cherry picked from commit 393a696955cbf97b90576e4a85b3ce1a02268ad7)
Scenario on client:
1. Employing RedeliveryPolicy with exponential backoff (keeping maximum
redeliveries at default 6)
2. Enabled non-blocking redelivery
3. Receiving e.g. 100 consecutive poison messages (which eventually
should DLQ after max redeliveries)
This will result in massive redelivery delays due to a logic bug.
The reason is that redeliveryDelay is a field variable kept on the
ActiveMQMessageConsumer, instead of being a property on the message - or
that the redelivery delay was calculated per message based on the
redelivery count.
When consecutive messages rollbacks multiple times, the redeliveryDelay
field is continuously multiplied by the backoff multiplier, resulting in
enormous delays.
Fix: Ditch the field variable, instead calculating the redeliveryDelay
per delivery from the redelivery count. (This happens to be identical to
how it is done in afterRollback() in ActiveMQSession:1004.)
Test is added - which fails with the previous code, and passes with
this. Added a debug log line for the calculated delay.
---
.../apache/activemq/ActiveMQMessageConsumer.java | 20 ++---
.../org/apache/activemq/RedeliveryPolicyTest.java | 96 +++++++++++++++++++++-
2 files changed, 104 insertions(+), 12 deletions(-)
diff --git
a/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
b/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
index 533c6234e..f8ba61d41 100644
---
a/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
+++
b/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
@@ -139,7 +139,6 @@ public class ActiveMQMessageConsumer implements
MessageAvailableConsumer, StatsC
private PreviouslyDeliveredMap<MessageId, PreviouslyDelivered>
previouslyDeliveredMessages;
private int deliveredCounter;
private int additionalWindowSize;
- private long redeliveryDelay;
private int ackCounter;
private int dispatchedCount;
private final AtomicReference<MessageListener> messageListener = new
AtomicReference<MessageListener>();
@@ -1224,7 +1223,6 @@ public class ActiveMQMessageConsumer implements
MessageAvailableConsumer, StatsC
deliveredMessages.clear();
clearPreviouslyDelivered();
}
- redeliveryDelay = 0;
}
public void rollback() throws JMSException {
@@ -1249,14 +1247,7 @@ public class ActiveMQMessageConsumer implements
MessageAvailableConsumer, StatsC
return;
}
- // use initial delay for first redelivery
MessageDispatch lastMd = deliveredMessages.getFirst();
- final int currentRedeliveryCount =
lastMd.getMessage().getRedeliveryCounter();
- if (currentRedeliveryCount > 0) {
- redeliveryDelay =
redeliveryPolicy.getNextRedeliveryDelay(redeliveryDelay);
- } else {
- redeliveryDelay =
redeliveryPolicy.getInitialRedeliveryDelay();
- }
MessageId firstMsgId =
deliveredMessages.getLast().getMessage().getMessageId();
for (Iterator<MessageDispatch> iter =
deliveredMessages.iterator(); iter.hasNext();) {
@@ -1279,12 +1270,21 @@ public class ActiveMQMessageConsumer implements
MessageAvailableConsumer, StatsC
session.sendAck(ack,true);
// Adjust the window size.
additionalWindowSize = Math.max(0, additionalWindowSize -
deliveredMessages.size());
- redeliveryDelay = 0;
deliveredCounter -= deliveredMessages.size();
deliveredMessages.clear();
} else {
+ // Find what redelivery delay to use, based on the
redelivery count of last message.
+ // Current redelivery count is already increased at this
point
+ final int currentRedeliveryCount =
lastMd.getMessage().getRedeliveryCounter();
+ long redeliveryDelay =
redeliveryPolicy.getInitialRedeliveryDelay();
+ // Iterating based on redelivery count to find delay to
use.
+ // NOTE: One less than current redelivery count, to use
initial delay for first redelivery.
+ for (int i = 0; i < (currentRedeliveryCount-1); i++) {
+ redeliveryDelay =
redeliveryPolicy.getNextRedeliveryDelay(redeliveryDelay);
+ }
+ LOG.debug("Redelivery delay calculated for redelivery
count {}: {}, for messageId '{}'.", currentRedeliveryCount, redeliveryDelay,
lastMd.getMessage().getMessageId());
// only redelivery_ack after first delivery
if (currentRedeliveryCount > 0) {
diff --git
a/activemq-unit-tests/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java
index 5f325a491..85bdc312a 100644
---
a/activemq-unit-tests/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java
+++
b/activemq-unit-tests/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java
@@ -56,7 +56,7 @@ public class RedeliveryPolicyTest extends JmsTestSupport {
}
- public void testGetNext() throws Exception {
+ public void testGetNextWithExponentialBackoff() throws Exception {
RedeliveryPolicy policy = new RedeliveryPolicy();
policy.setInitialRedeliveryDelay(0);
@@ -76,6 +76,34 @@ public class RedeliveryPolicyTest extends JmsTestSupport {
assertEquals(500, delay);
}
+ public void
testGetNextWithExponentialBackoff_RedeliveryDelayIsIgnoredIfInitialRedeliveryDelayAboveZero()
{
+
+ RedeliveryPolicy policy = new RedeliveryPolicy();
+ policy.setInitialRedeliveryDelay(42);
+ policy.setRedeliveryDelay(-100); // This is ignored in actual usage
since initial > 0
+ policy.setBackOffMultiplier(2d);
+ policy.setUseExponentialBackOff(true);
+
+ // Invoke in the order employed when actually used by redelivery code
paths
+ long delay = policy.getInitialRedeliveryDelay();
+ assertEquals(42, delay);
+ // Notice how the setRedeliveryDelay(-100) doesn't affect the
calculation if initial > 0
+ delay = policy.getNextRedeliveryDelay(delay);
+ assertEquals(42*2, delay);
+ delay = policy.getNextRedeliveryDelay(delay);
+ assertEquals(42*4, delay);
+
+ // If the initial delay is 0, when given back to the policy via
getNextRedeliveryDelay(), we get -100
+ assertEquals(-100, policy.getNextRedeliveryDelay(0));
+ // .. but when invoked with anything else, the backoff multiplier is
used
+ assertEquals(123 * 2, policy.getNextRedeliveryDelay(123));
+
+ // If exponential backoff is disabled, the setRedeliveryDelay(-100) is
used.
+ policy.setUseExponentialBackOff(false);
+ delay = policy.getNextRedeliveryDelay(delay);
+ assertEquals(-100, delay);
+ }
+
public void testGetNextWithInitialDelay() throws Exception {
RedeliveryPolicy policy = new RedeliveryPolicy();
@@ -87,7 +115,6 @@ public class RedeliveryPolicyTest extends JmsTestSupport {
assertEquals(1000, delay);
delay = policy.getNextRedeliveryDelay(delay);
assertEquals(1000, delay);
-
}
/**
@@ -145,6 +172,71 @@ public class RedeliveryPolicyTest extends JmsTestSupport {
}
+ /**
+ * By version 5.17.1 (2022-04-25), the combination of exponential
redelivery with non-blocking redelivery was
+ * handled erroneously: The redeliveryDelay was a modifiable field on the
ActiveMQMessageConsumer (not per message,
+ * nor calculated individually based on the message's redelivery count),
and thus if multiple consecutive messages
+ * was rolled back multiple times in a row (i.e. redeliveries > 1), the
exponential delay <i>which was kept on the
+ * consumer</i> would quickly result in extreme delays.
+ */
+ public void
testExponentialRedeliveryPolicyCombinedWithNonBlockingRedelivery() throws
Exception {
+ // :: ARRANGE
+ // Condition #1: Create an exponential redelivery policy
+ RedeliveryPolicy policy = connection.getRedeliveryPolicy();
+ policy.setInitialRedeliveryDelay(0);
+ policy.setRedeliveryDelay(100);
+ policy.setBackOffMultiplier(2);
+ policy.setUseExponentialBackOff(true);
+ policy.setMaximumRedeliveries(4); // 5 attempts: 1 delivery + 4
redeliveries
+
+ // assert set of delays
+ long delay = policy.getInitialRedeliveryDelay();
+ assertEquals(0, delay);
+ delay = policy.getNextRedeliveryDelay(delay);
+ assertEquals(100, delay);
+ delay = policy.getNextRedeliveryDelay(delay);
+ assertEquals(200, delay);
+ delay = policy.getNextRedeliveryDelay(delay);
+ assertEquals(400, delay);
+
+ // Condition #2: Set non-blocking redelivery
+ connection.setNonBlockingRedelivery(true);
+
+ // :: ACT
+ connection.start();
+ Session session = connection.createSession(true,
Session.AUTO_ACKNOWLEDGE);
+ ActiveMQQueue destination = new ActiveMQQueue(getName());
+ MessageProducer producer = session.createProducer(destination);
+
+ MessageConsumer consumer = session.createConsumer(destination);
+
+ // Send 'count' messages
+ int count = 10;
+ for (int i = 0; i<count; i++) {
+ producer.send(session.createTextMessage("#"+i));
+ }
+ session.commit();
+ LOG.info("{} messages sent", count);
+
+ // Receive messages, but rollback: 4+1 times each message = 5 * count
+ int receiveCount = 0;
+ // Add one extra receive, which should NOT result in a message (they
should all be DLQed by then).
+ for (int i = 0; i < (count * 5 + 1); i++) {
+ // Max delay between redeliveries for these messages should be
400ms
+ // Waiting for 4x that = 1600 ms, to allow for hiccups during
testing.
+ // (With the faulty code, the last calculated delay before test
failing was 26214400.)
+ TextMessage m = (TextMessage) consumer.receive(1600);
+ LOG.info("Message received: {}", m);
+ if (m != null) {
+ receiveCount ++;
+ }
+ session.rollback();
+ }
+
+ // ASSERT
+ // We should have received count * 5 messages
+ assertEquals(count * 5, receiveCount);
+ }
/**
* @throws Exception