This is an automated email from the ASF dual-hosted git repository.
havret pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-nms-amqp.git
The following commit(s) were added to refs/heads/main by this push:
new fdc0de2 AMQNET-847 Add support for client-side redelivery delay based
on redelivery attempt count
fdc0de2 is described below
commit fdc0de27b01f40dd7272a6b8db5169daca992113
Author: Steve Martin <[email protected]>
AuthorDate: Wed Jul 30 22:47:55 2025 +0100
AMQNET-847 Add support for client-side redelivery delay based on redelivery
attempt count
* Implement redelivery policy.
Reject poison messages.
* Updates following code review.
* AMQNET-847 Add support for client-side redelivery delay based on
redelivery attempt count
* AMQNET-847 Fix build
* AMQNET-847 Fix build
---------
Co-authored-by: Havret <[email protected]>
---
src/NMS.AMQP/NmsConnection.cs | 1 +
src/NMS.AMQP/NmsMessageConsumer.cs | 65 +++++++++---
.../MessageRedeliveryPolicyIntegrationTest.cs | 116 +++++++++++++++++++++
test/Apache-NMS-AMQP-Test/TestAmqp/TestAmqpPeer.cs | 5 +
4 files changed, 172 insertions(+), 15 deletions(-)
diff --git a/src/NMS.AMQP/NmsConnection.cs b/src/NMS.AMQP/NmsConnection.cs
index 5afae35..420871b 100644
--- a/src/NMS.AMQP/NmsConnection.cs
+++ b/src/NMS.AMQP/NmsConnection.cs
@@ -23,6 +23,7 @@ using System.Threading;
using System.Threading.Tasks;
using Apache.NMS.AMQP.Message;
using Apache.NMS.AMQP.Meta;
+using Apache.NMS.AMQP.Policies;
using Apache.NMS.AMQP.Provider;
using Apache.NMS.AMQP.Util;
using Apache.NMS.AMQP.Util.Synchronization;
diff --git a/src/NMS.AMQP/NmsMessageConsumer.cs
b/src/NMS.AMQP/NmsMessageConsumer.cs
index 73206cd..dc20c0b 100644
--- a/src/NMS.AMQP/NmsMessageConsumer.cs
+++ b/src/NMS.AMQP/NmsMessageConsumer.cs
@@ -384,8 +384,7 @@ namespace Apache.NMS.AMQP
Tracer.Debug($"{Info.Id} filtered message
with excessive redelivery count: {envelope.RedeliveryCount.ToString()}");
}
- // TODO: Apply redelivery policy
- await DoAckExpiredAsync(envelope).Await();
+ await
ApplyRedeliveryPolicyOutcome(envelope).Await();
}
else
{
@@ -396,7 +395,9 @@ namespace Apache.NMS.AMQP
await
DoAckDeliveredAsync(envelope).Await();
else
await
AckFromReceiveAsync(envelope).Await();
-
+
+ await ApplyRedeliveryPolicy(envelope).Await();
+
try
{
Listener?.Invoke(envelope.Message.Copy());
@@ -456,6 +457,17 @@ namespace Apache.NMS.AMQP
return false;
}
+
+ private int GetRedeliveryDelay(InboundMessageDispatch envelope)
+ {
+ var redeliveryPolicy = Session.Connection.RedeliveryPolicy;
+
+ if (redeliveryPolicy == null || envelope.RedeliveryCount <= 0)
+ return 0;
+
+ var redeliveryDelay =
redeliveryPolicy.RedeliveryDelay(envelope.RedeliveryCount);
+ return redeliveryDelay;
+ }
private Task DoAckReleasedAsync(InboundMessageDispatch envelope)
{
@@ -544,7 +556,7 @@ namespace Apache.NMS.AMQP
Tracer.Debug($"{Info.Id} filtered message with
excessive redelivery count: {envelope.RedeliveryCount.ToString()}");
}
- await DoAckExpiredAsync(envelope).Await();
+ await ApplyRedeliveryPolicyOutcome(envelope).Await();
}
else
{
@@ -553,6 +565,8 @@ namespace Apache.NMS.AMQP
Tracer.Debug($"{Info.Id} received message
{envelope.Message.NMSMessageId}.");
}
+ await ApplyRedeliveryPolicy(envelope).Await();
+
return await func.Invoke(envelope);
}
}
@@ -566,7 +580,17 @@ namespace Apache.NMS.AMQP
throw ExceptionSupport.Wrap(ex, "Receive failed");
}
}
-
+
+ private async Task ApplyRedeliveryPolicy(InboundMessageDispatch
envelope)
+ {
+ int redeliveryDelay = GetRedeliveryDelay(envelope);
+
+ if (redeliveryDelay > 0)
+ {
+ Tracer.DebugFormat("Envelope has been redelivered, apply
redelivery policy wait {0} milliseconds", redeliveryDelay);
+ await
Task.Delay(TimeSpan.FromMilliseconds(redeliveryDelay)).Await();
+ }
+ }
private static long GetDeadline(int timeout)
{
@@ -601,16 +625,7 @@ namespace Apache.NMS.AMQP
private Task DoAckExpiredAsync(InboundMessageDispatch envelope)
{
- if (Session.Connection.RedeliveryPolicy != null)
- {
- var dispositionType =
Session.Connection.RedeliveryPolicy.GetOutcome(envelope.Message.NMSDestination);
- var ackType = LookupAckTypeForDisposition(dispositionType);
- return Session.AcknowledgeAsync(ackType, envelope);
- }
- else
- {
- return
Session.AcknowledgeAsync(AckType.MODIFIED_FAILED_UNDELIVERABLE, envelope);
- }
+ return
Session.AcknowledgeAsync(AckType.MODIFIED_FAILED_UNDELIVERABLE, envelope);
}
private static AckType LookupAckTypeForDisposition(int dispositionType)
@@ -630,6 +645,26 @@ namespace Apache.NMS.AMQP
}
}
+ private Task ApplyRedeliveryPolicyOutcome(InboundMessageDispatch
envelope)
+ {
+ if (Tracer.IsDebugEnabled)
+ {
+ Tracer.DebugFormat("{0} filtered message with excessive
redelivery count: {1}", Info.Id, envelope.RedeliveryCount);
+ }
+
+ AckType ackType;
+ if (Session.Connection.RedeliveryPolicy is { } redeliveryPolicy)
+ {
+ var dispositionType =
redeliveryPolicy.GetOutcome(envelope.Message.NMSDestination);
+ ackType = LookupAckTypeForDisposition(dispositionType);
+ }
+ else
+ {
+ ackType = AckType.MODIFIED_FAILED_UNDELIVERABLE;
+ }
+ return Session.AcknowledgeAsync(ackType, envelope);
+ }
+
private void SetAcknowledgeCallback(InboundMessageDispatch envelope)
{
if (acknowledgementMode == AcknowledgementMode.ClientAcknowledge)
diff --git
a/test/Apache-NMS-AMQP-Test/Integration/MessageRedeliveryPolicyIntegrationTest.cs
b/test/Apache-NMS-AMQP-Test/Integration/MessageRedeliveryPolicyIntegrationTest.cs
new file mode 100644
index 0000000..9569623
--- /dev/null
+++
b/test/Apache-NMS-AMQP-Test/Integration/MessageRedeliveryPolicyIntegrationTest.cs
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Threading;
+using Amqp.Framing;
+using Apache.NMS;
+using Apache.NMS.AMQP.Policies;
+using Apache.NMS.Policies;
+using NMS.AMQP.Test.TestAmqp;
+using NUnit.Framework;
+
+namespace NMS.AMQP.Test.Integration
+{
+ [TestFixture]
+ public class MessageRedeliveryPolicyIntegrationTest :
IntegrationTestFixture
+ {
+ [Test, Timeout(20_000)]
+ public void TestIncomingDeliveryCountExceededMessageGetsRejected()
+ {
+ using (TestAmqpPeer testPeer = new TestAmqpPeer())
+ {
+ IConnection connection = EstablishConnection(testPeer);
+ int initialRedeliveryDelay = 1000;
+ int clockResolution = 15;
+ connection.RedeliveryPolicy = new DefaultRedeliveryPolicy {
MaximumRedeliveries = 1, InitialRedeliveryDelay = initialRedeliveryDelay};
+ connection.Start();
+
+ testPeer.ExpectBegin();
+
+ ISession session =
connection.CreateSession(AcknowledgementMode.IndividualAcknowledge);
+ IQueue queue = session.GetQueue("myQueue");
+
+ testPeer.ExpectReceiverAttach();
+ testPeer.ExpectLinkFlowRespondWithTransfer(message: new
Amqp.Message() { BodySection = new AmqpValue() { Value = "hello" } });
+ testPeer.ExpectDispositionThatIsModifiedFailedAndSettled();
+
+ IMessageConsumer consumer = session.CreateConsumer(queue);
+
+ IMessage m = consumer.Receive(TimeSpan.FromMilliseconds(3000));
+ Assert.NotNull(m, "Message should have been received");
+ Assert.IsInstanceOf<ITextMessage>(m);
+ session.Recover();
+
+ DateTime startTimer = DateTime.UtcNow;
+ m = consumer.Receive(TimeSpan.FromMilliseconds(3000));
+
Assert.That(DateTime.UtcNow.Subtract(startTimer).TotalMilliseconds,
Is.GreaterThanOrEqualTo(initialRedeliveryDelay - clockResolution));
+
+ Assert.NotNull(m, "Message should have been received");
+ Assert.IsInstanceOf<ITextMessage>(m);
+ session.Recover();
+
+ // Verify the message is no longer there. Will drain to be
sure there are no messages.
+ Assert.IsNull(consumer.Receive(TimeSpan.FromMilliseconds(10)),
"Message should not have been received");
+
+ testPeer.ExpectClose();
+ connection.Close();
+
+ testPeer.WaitForAllMatchersToComplete(3000);
+ }
+ }
+
+ [Test, Timeout(20_000)]
+ public void TestIncomingDeliveryCountExceededMessageGetsRejectedAsync()
+ {
+ using (TestAmqpPeer testPeer = new TestAmqpPeer())
+ {
+ IConnection connection = EstablishConnection(testPeer);
+ int initialRedeliveryDelay = 1000;
+ connection.RedeliveryPolicy = new DefaultRedeliveryPolicy {
MaximumRedeliveries = 1, InitialRedeliveryDelay = initialRedeliveryDelay};
+ connection.Start();
+
+ testPeer.ExpectBegin();
+
+ ISession session =
connection.CreateSession(AcknowledgementMode.IndividualAcknowledge);
+ IQueue queue = session.GetQueue("myQueue");
+
+
+ testPeer.ExpectReceiverAttach();
+ testPeer.ExpectLinkFlowRespondWithTransfer(message: new
Amqp.Message() { BodySection = new AmqpValue() { Value = "hello" } });
+ testPeer.ExpectDispositionThatIsModifiedFailedAndSettled();
+
+ IMessageConsumer consumer = session.CreateConsumer(queue);
+
+ CountdownEvent success = new CountdownEvent(2);
+
+ consumer.Listener += m =>
+ {
+ session.Recover();
+ success.Signal();
+ };
+
+ Assert.IsTrue(success.Wait(TimeSpan.FromSeconds(3)), "Didn't
get expected messages");
+
+ testPeer.ExpectClose();
+ connection.Close();
+
+ testPeer.WaitForAllMatchersToComplete(3000);
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/test/Apache-NMS-AMQP-Test/TestAmqp/TestAmqpPeer.cs
b/test/Apache-NMS-AMQP-Test/TestAmqp/TestAmqpPeer.cs
index d59329e..2fad923 100644
--- a/test/Apache-NMS-AMQP-Test/TestAmqp/TestAmqpPeer.cs
+++ b/test/Apache-NMS-AMQP-Test/TestAmqp/TestAmqpPeer.cs
@@ -727,6 +727,11 @@ namespace NMS.AMQP.Test.TestAmqp
ExpectDisposition(settled: true, stateMatcher: stateMatcher);
}
+ public void ExpectDispositionThatIsRejectedAndSettled()
+ {
+ ExpectDisposition(settled: true, stateMatcher:
Assert.IsInstanceOf<Rejected>);
+ }
+
public void ExpectDispositionThatIsModifiedFailedAndSettled()
{
Action<DeliveryState> stateMatcher = state => {
Assert.AreEqual(state.Descriptor.Code,
MessageSupport.MODIFIED_FAILED_INSTANCE.Descriptor.Code); };
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact